容器中的文件在磁盘上是临时存放的,当容器重建时,容器中的文件将会丢失,另外当在一个Pod中同时运行多个容器时,常常需要在这些容器之间共享文件,Kubernetes抽象出了Volume来解决这两个问题,也就是存储卷,Kubernetes的Volume是Pod的一部分,Volume不是单独的对象,不能独立创建,只能在Pod中定义。

Pod中的所有容器都可以访问Volume,但必须要挂载,且可以挂载到容器中任何目录。

Volume的生命周期与挂载它的Pod相同,但是Volume里面的文件可能在Volume消失后仍然存在,这取决于Volume的类型。

Volume的类型

  • emptyDir:一种简单的空目录,主要用于临时存储。
  • hostPath:将主机某个目录挂载到容器中。
  • ConfigMap、Secret:特殊类型,将Kubernetes特定的对象类型挂载到Pod,
  • PV/PVC

EmptyDir

EmptyDir是最简单的一种Volume类型,根据名字就能看出,这个Volume挂载后就是一个空目录,应用程序可以在里面读写文件,emptyDir Volume的生命周期与Pod相同,Pod删除后Volume的数据也同时删除掉。

emptyDir的一些用途: 缓存空间,例如基于磁盘的归并排序。 为耗时较长的计算任务提供检查点,emptyDir配置示例如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: v1
kind: Pod
metadata:
name: nginx
spec:
containers:
- image: nginx:alpine
name: test-container
volumeMounts:
- mountPath: /cache
name: cache-volume
volumes:
- name: cache-volume
emptyDir: {}

emptyDir实际是将Volume的内容写在Pod所在节点的磁盘上,另外emptyDir也可以设置存储介质为内存,如下所示,medium设置为Memory。

1
2
3
4
volumes:
- name: html
emptyDir:
medium: Memory

HostPath

HostPath是一种持久化存储,emptyDir里面的内容会随着Pod的删除而消失,但HostPath不会,如果对应的Pod删除,HostPath Volume里面的内容依然存在于节点的目录中,如果后续重新创建Pod并调度到同一个节点,挂载后依然可以读取到之前Pod写的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: v1
kind: Pod
metadata:
name: test-hostpath
spec:
containers:
- image: nginx:alpine
name: hostpath-container
volumeMounts:
- mountPath: /test-pd
name: test-volume
volumes:
- name: test-volume
hostPath:
path: /data

PV

如果要求Pod重新调度后仍然能使用之前读写过的数据,就只能使用网络存储了,网络存储种类非常多且有不同的使用方法,通常一个云服务提供商至少有块存储、文件存储、对象存储三种,Kubernetes解决这个问题的方式是抽象了 PV(PersistentVolume)和PVC(PersistentVolumeClaim)来解耦这个问题。从而让使用者不用关心具体的基础设施,当需要存储资源的时候,只要像CPU和内存一样,声明要多少即可。

  • PV:PV 描述的是持久化存储卷,主要定义的是一个持久化存储在宿主机上的目录,比如一个NFS的挂载目录。
  • PVC:PVC 描述的是Pod所希望使用的持久化存储的属性,比如,Volume存储的大小、可读写权限等等。

管理员设置好网络存储的类型,提供对应的PV描述符配置到Kubernetes,使用者需要存储的时候只需要创建PVC,然后在Pod中使用Volume关联PVC,即可让Pod使用到存储资源,RECLAIM POLICY 是指 PV 的回收策略,Retain表示PVC被释放后PV继续保留。STATUS值为Available,表示PV处于可用的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-example
spec:
accessModes:
- ReadWriteMany # 读写模式
capacity:
storage: 10Gi # 定义PV的大小
csi:
driver: nas.csi.everest.io # 声明使用的驱动
fsType: nfs # 存储类型
volumeAttributes:
everest.io/share-export-location: sfs-nas01.cn-north-4b.myhuaweicloud.com:/share-96314776 # 挂载地址
volumeHandle: 68e4a4fd-d759-444b-8265-20dc66c8c502 # 存储ID

创建 PV

1
2
3
4
5
6
$ kubectl create -f pv.yaml
persistentvolume/pv-example created

$ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pv-example 10Gi RWX Retain Available 4s

PVC

PVC可以绑定一个PV,示例如下。

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-example
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi # 声明存储的大小
volumeName: pv-example # PV的名称

创建 PVC 并查看。

1
2
3
4
5
6
$ kubectl create -f pvc.yaml
persistentvolumeclaim/pvc-example created

$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
pvc-example Bound pv-example 10Gi RWX 9s

这里可以看到状态是 Bound,VOLUME 是 pv-example,表示PVC已经绑定了PV。

1
2
3
$ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pv-example 10Gi RWX Retain Bound default/pvc-example 50s

可以看到状态也变成了Bound,CLAIM是default/pvc-example,表示这个PV绑定了default命名空间下的pvc-example这个PVC。 PV是集群级别的资源,并不属于某个命名空间,而PVC是命名空间级别的资源,PV可以与任何命名空间的PVC资源绑定。

StorageClass

管理员可以部署PV配置器(provisioner),然后定义对应的StorageClass,这样开发者在创建PVC的时候就可以选择需要创建存储的类型,PVC会把StorageClass传递给PV provisioner,由provisioner自动创建PV。如CCE就提供csi-disk、csi-nas、csi-obs等StorageClass,在声明PVC时加上StorageClassName,就可以自动创建PV,并自动创建底层的存储资源

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-sfs-auto-example
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
storageClassName: csi-nas # StorageClass

创建PVC并查看PVC和PV。

1
2
3
4
5
6
7
8
9
10
$ kubectl create -f pvc2.yaml
persistentvolumeclaim/pvc-sfs-auto-example created

$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
pvc-sfs-auto-example Bound pvc-1f1c1812-f85f-41a6-a3b4-785d21063ff3 10Gi RWX csi-nas 29s

$ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pvc-1f1c1812-f85f-41a6-a3b4-785d21063ff3 10Gi RWO Delete Bound default/pvc-sfs-auto-example csi-nas 20s

这可以看到使用StorageClass后,不仅创建了PVC,而且创建了PV,并且将二者绑定了。
定义了StorageClass后,就可以减少创建并维护PV的工作,PV变成了自动创建,作为使用者,只需要在声明PVC时指定StorageClassName即可,这就大大减少工作量。
再次说明,StorageClassName的类型在不同厂商的产品上各不相同, 在Pod中使用PVC, 有了PVC后,在Pod中使用持久化存储就非常方便了,在Pod Template中的Volume直接关联PVC的名称,然后挂载到容器之中即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
spec:
selector:
matchLabels:
app: nginx
replicas: 2
template:
metadata:
labels:
app: nginx
spec:
containers:
- image: nginx:alpine
name: container-0
volumeMounts:
- mountPath: /tmp # 挂载路径
name: pvc-sfs-example
restartPolicy: Always
volumes:
- name: pvc-sfs-example
persistentVolumeClaim:
claimName: pvc-example # PVC的名称

一个思考

电商网站有一个场景是:用户购买一个商品,库存会做扣减。如果用 MySQL 做商品存储,会遇到什么坑,该怎么避免。

简化的商品表如下。

1
2
3
4
5
6
CREATE TABLE `goods` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`name` varchar(256) NOT NULL DEFAULT '' COMMENT '商品名称',
`available` int(11) NOT NULL DEFAULT '0' COMMENT '库存剩余量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

在阅读网上的解决方法之前,脑海会迸发一个解决方法是:

1
UPDATE goods SET available = available - 1 WHERE id = xxx 

如果是这么简单的话,那就没有必要讨论了。实际上在并发请求的业务场景下,有可能出现 available 变量变成负数。

虽然我们可以将 available 变量设计成 int unsigned 字段来避免,但是在不同的 MySQL 版本下,会出现 available 溢出成为一个非常大的数。

解决方法

  • 悲观锁的方案:SELECT FOR UPDATE

当我们查询 goods 信息后就把当前的数据锁定,直到我们修改完毕后再解锁。不过 FOR UPDATE 仅适用于 InnoDB,且必须在事务区块中才能生效。

1
2
3
4
5
6
7
8
9
set autocommit = 0; // 设置MySQL为非autocommit模式:
begin trans;// 开始事务
select avaliable from goods where id = xxx for update; //1.查询出商品信息
if (avaliable >= 0) {
affectNum = udpate goods set available = available - 1 where id = xx ;
commit ; // 4.提交事务
} else {
rollback ;
}

弊端:
悲观锁是排他锁,会阻塞其他写请求,如果上述代码有异常,导致 Commit/Rollback 没有执行,就会造成所有请求都在等待锁。

  • 乐观锁的方案

乐观锁假设数据一般情况下不会冲突,在数据提交更新的时候,才会对检测数据的冲突与,如果发现冲突了,则让返回用户错误的信息,让用户决定。
通常做法是记录加一个 Version 字段。读取记录时,连同 Version 一同读出。数据每次更新时候,Version + 1, 更新时判断记录的当前版本与之前取出来的 Version 值比对,
如果当前版本号与第一次取出来的 Version 值相等,则予以更新,否则认为是过期数据。

1
2
3
4
select avaiable ,version from goods where id=xxx limit 1; // 查询version
if (available > 0) {
update goods set avaiable=avaiable-1 where id =xx and version= <刚查出来的 Version>;
}

弊端: 这样虽然保证安全,但是需要执行2次 SQL

  • Update 时增加 available 查询条件
1
udpate goods set available = available - 1 where id = xx and available - 1 >= 0 ;

Innodb 的 Update 语句,对于 id 是主键索引的情况下会执行行锁。该语句在 MySQL中是先读后更新,串行且原子的。单条语句,实际上也是一个事务。这个方法效率更快,且安全。

接触 grpc,从它的 example-GreetHelloWorld 开始, client 指定了 Server 监听的IP和端口,然后直接访问.就完成了一次 RPC 请求。

那时候我就在想,这里是否能指定代理转发,做负载均衡呢。比如 http访问下,我们会用 proxy_pass + upstream 来代理服务端,这样来实现服务端的负载均衡,以及灰度发布。

今日偶然发现 nginx 1.13 已经 支持 grpc 协议, 实操上我们可以像使用 proxy_pass 一样使用 grpc_pass

1
2
3
4
5
6
7
8
server {
listen 80 http2;
access_log logs/access.log main;
location / {
// Your Server Address
grpc_pass grpc://localhost:50051;
}
}

C++11 可以通过 atomic_flag 来实现自旋锁。

test_and_set()函数检查 atomic_flag 标志, 如果之前没被设置过则设置 atomic_flag 的标志, 并返回 false;
如果之前已被设置则返回 true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Spinlock
{
public:
Spinlock(): flag(ATOMIC_FLAG_INIT) {}
void lock()
{
while( flag.test_and_set() );
}

void unlock()
{
flag.clear();
}

public:
std::atomic_flag flag;
}

相信使用过 ProtoBuffer 的同学都不陌生以下的定义, 一个 Kid ProtoBuffer 对象,有一个 age 字段,且默认值是7。

1
2
3
4
5
message Kid
{
optional uint32 age = 1 [default=7];
optional uint32 score = 2 ;
}

运用 PB 的 默认 option 扩展,我们可以指定一些行为,比如上面的默认初始值。

我们想实现自定义的选项,比如,我想指定一种上报的 option, 对于此类 option, 我就将这种字段上报给老师。那么定义的 ProtoBuffer 大概会长这样:

1
2
3
4
5
message Kid
{
optional uint32 age = 1 [default=7];
optional uint32 score = 2 [action = Report];
}

是否可行呢?幸运的是 PB 有自定义 option 的机制,使得我们的设想变成可能。

如何实现自定义扩展

1: 注册 Fieldoption , action.proto 就是定义好了 option 的 Proto 文件

1
2
3
4
5
6
7
8
9
package action;
import "google/protobuf/descriptor.proto";

enum ActionType {
REPORT = 1;
}
extend google.protobuf.Fieldoptions {
optional ActionType rule = 12345 [default = REPORT];
}

2: 应用 option
import 定义的 Proto 文件, 就可以轻易用上了。

1
2
3
4
5
6
7
package Kid                                                                          
import "action.proto";
message person
{
optional uint32 age = 1;
optional int32 age = 2 [(action.rule) = REPORT];
}

怎么通过反射获取 option 的值呢

通过 descriptor && reflection 我们可以得到 option 的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include "kid.pb.h"
using namespace std;

int main()
{
Kid obj;
obj.set_age(12);

auto descriptor = obj.GetDescriptor();
auto reflection = obj.GetReflection();
// 通过 FieldName 获取字段
auto idField = descriptor->FindFieldByName("age");
reflection->SetInt32(&obj, idField, 18);
for (unsigned int i = 0; i < descriptor->field_count(); ++i)
{
auto field = descriptor->field(i);

cout << "field_is_repeated:" << field->is_repeated() << endl;
// 获取 field_num
cout << "field_num:" << field->number() << endl;
// 获取 option cout << "optiona value:" << field->options().GetExtension(action::rule) << endl;

switch (field->type())
{
case ::google::protobuf::FieldDescriptor::TYPE_INT32:
cout << "int type, value:" << reflection->GetInt32(obj, field);
break;
case ::google::protobuf::FieldDescriptor::TYPE_FLOAT:
cout << "float type, value:" << reflection->GetFloat(obj, field);
break;
default:
cout << "not found" << endl;
}
}
return 0;
}

以上的例子,通过反射我们也可以在不知道字段的名字情况下,得到它的值和类型等信息。

ElasticSearch 有 两种备份方法

  • ES 快照备份
  • ElasticDump 备份

ES 快照备份

ElasticSearch 自带备份的方法,数据量大的时候,通常推荐这种方法。
集群的环境下,需要依赖 共享文件系统 NFS,AWS S3, HDFS 文件系统等,以下用HDFS 为例。

备份到 HDFS 需要 repository-hdfs 插件的支持。下载下来放在了 /tmp 并安装,重启 ES,变可享用了。

1
./elasticsearch-plugin install file:///tmp/repository-hdfs-6.2.1.zip 
  1. 构建仓库,以下创建一个名为 hdfs_backup 的仓库, 我们计划将数据备份到 hdfs://<hadoop-domain.com>:9000/data/es_backup 下面,如无意外会返回 {"acknowlege":"true"}
1
2
3
4
5
6
7
8
9
curl -XPUT http://<Port:IP>/_snapshot/hdfs_backup -H 'Content-Type:application/json' -d'
{
"type": "hdfs",
"settings": {
"uri": "hdfs://<hadoop-domain.com>:9000",
"path": "/data/es_backup"
}
}'

  1. 选定索引备份,例如选择了 student 索引,如无意外会返回 {"acknowlege":"true"},如果你希望命令阻塞直到备份完成。可以在 URL 尾部加上 wait_for_completion=true
1
2
3
4
curl -XPUT http://<Port:IP>/_snapshot/hdfs_backup/snapshot_1 -H 'ContentType:application/json' -d'
{
"indices": "student"
}'
  1. 可使用以下命令查看备份的进度:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
curl -XGET "http://<Port:IP>/_snapshot/hdfs_backup/*" '
{
"snapshots" : [
{
"snapshot" : "snapshot_1",
"uuid" : "kTdQRNs6RbO9LZ0LCfXLYg",
"version_id" : 6060199,
"version" : "6.6.1",
"indices" : [
"student"
],
"include_global_state" : true,
"state" : "IN_PROGRESS",
"start_time" : "2019-06-13T07:41:47.556Z",
"start_time_in_millis" : 1560411707556,
"end_time" : "1970-01-01T00:00:00.000Z",
"end_time_in_millis" : 0,
"duration_in_millis" : -1560411707556,
"failures" : [ ],
"shards" : {
"total" : 0,
"failed" : 0,
"successful" : 0
}
}
]
}
  1. 如何使用快照来恢复
1
2
3
4
curl -XPOST "http://<Port:IP>/_snapshot/hdfs_backup/snapshot_1/_restore?wait_for_completion=true" -H 'Content-Type: application/json' -d'
{
"indices": "student","rename_replacement": "student"
}'

使用 ElasticDump 来备份

elasticdump 需要 nodeJs 的环境,适用于小数量的备份。其原理是用了 ES 的 Scroll 接口

  • 将 mapping 和 数据备份,并 gzip 到本地盘
1
2
elasticdump --input=http://<Port:IP>/student  --type=data --limit=10000  --output=$ | gzip > student.json.gz
elasticdump --input=http://<Port:IP>/student --output=stduent_mapping.json --type=mapping
  • 创建好mapping 之后,恢复数据, 如果是 gzip 格式的,需要先解压
1
elasticdump --input=./student.json --output=http://<Port:IP>/student

自从写了 C++,这一年多时间是没有再碰到 Go 代码,五一期间想通过阅读 Go 源码来找回熟悉的味道。选择了 GroupCache 仔细研读,收获颇丰,记下笔记以作备忘。

groupCache 简介

groupcache是一个缓存库,需自己实现 main 函数,groupcache 既是服务器,也是客户端。作者称在某些方面替代 memcache,groupcache 只能 get,不能设置过期时间,只能通过lru淘汰最近最少访问的数据。

当在本地 groupcache 缓存中没有查找的数据时,通过一致性哈希,查找到该 key 所对应的 peer 服务器,再通过HTTP 协议,从 peer 服务器上获取数据。

过程中也做了一个优化:当多个客户端同时访问 cache 中不存在的 Key 时,会导致多个客户端从后端获取数据并同时插入 cache 中,相同 Key 情况下,groupcache 只会有一个客户端从后端获取数据,其他客户端阻塞,直到第一个客户端获取到数据之后,再返回给多个客户端。

GroupCache 源码组成

groupCache 源码巧妙利用 Go 的Interface,将核心组件做了抽离:

让我们先来探索各个组件,最后再做一次高屋建瓴的俯瞰。

内存 LRU

Least Recently Used 的缩写,即最近最少使用的页面置换算法。借助内建容器 container/list 及 哈希表实现 Cache 结构,不保证并发安全。list 的元素 Entry 是一个结构体,也是实际承载键值对的载体,当列表容器元素个数达到最大数量,触发用户自定义的 OnEvicted 函数。

那 cache 是怎么做到 LRU?

  • Add: 新增一个元素,先查找该元素是否在哈希表? 若不在,则将元素写入哈希表,并移动元素到 list 头部;如果列表长度超过 MaxEntries,则删除列表尾部元素。
  • Get: 查找元素,如果找到则移动元素到 list 头部,始终保证最近使用的元素放在头部。
  • Remove: 将元素从列表和哈希表一同删除, 并触发用户自定义 OnEvicted 函数。

一致性哈希

目标是解决互联网中的server 的扩展和收缩,不至于导致大量的 Key 失效。核心数据结构是一个哈希表,键为Int,值为对应的 Key。

通过哈希函数把元素映射到整型索引值,对索引值升序排列,返回的节点是与这个索引值最近的顺时针节点。
默认选择的哈希算法是 crc32.CheckSumIEEE,为了使得环形列表更平衡,加入了虚拟节点,其中replicas

执行虚拟节点个数 。

我们来一窥其工作流程:

  • Add: 添加节点到哈希表,对 Key 取 replicas 个 hash 值,将得到哈希值都映射到同个 Key,与此同时哈希值升序排列。
  • Get:查找 Key 对应的节点,保证一致性,计算 Key 的 hash 值,然后在有序列表中二分查找,找到离他最近的顺时针的索引,在哈希表中返回此索引对应的 Key。

SingleFlight

call 存来调用的返回值和返回码。利用 sync.Wait 会阻塞的特性,控制并发性,直到所有任务完成才继续执行。结合 Group 结构来看就更明了,Group 有一个哈希表,其值是 call 结构, 对于给定的 Key,保证同一时间只存在一个后端调用,这么设计的好处是在缓存穿透的时候只有一个请求访问后端,避免雪崩。

ByteView & Sink

ByteView 是 groupCache 内部 value 的数据结构,结构体支持两种类型的元素,[]byte 或者 string 方式,并提供了 Copy 的方法。

Sink 是一个接口类型,包含了 SetString, SetBytes SetProto view 四种方法,每个实现该接口的 Struct 也额外实现了 SetView ,通过他们,value 会被保存成 byteview 的格式。

Peer & HTTPPool

peers.go 要与 http.go 结合起来看,不得不说抽象的很高级,或者说很绕。

HTTPPool 是实现了 http.Handler 和 PeerPicker 接口的节点池。他的组成如下:

  • peers : 一致性哈希生成器,返回 Key 对应的服务器节点
  • httpGetters map[string]*httpGetter 对于每个服务器,均定义了 httpclient,用于获取远程服务器的数据
    其中 httpGetter 就是一个ProtoGetter, 它实现了Get 方法
  • BasePath 指定外部服务器节点的请求的地址 默认是 "/_groupcache/". 通过 http.Handle 在NewHTTPPool 中注册
  • Replicas :一致性哈希虚拟节点的数量

groupcache 获取数据流程

当客户端连上 groupcache 获取数据

  • 如果本地有缓存,包括 mainCache (本机节点的缓存数据),hotCache(属于其他节点的数据缓存的热数据)有所需要的数据,则直接返回。
  • 如果没有,则通过一致性哈希函数判断这个 key 所对应的节点( PickPeer(key) ),通过 http 从这个节点上获取数据( getFromPeer ),如果这个节点上有需要的数据,则通过 http 回复给之前的那个 groupcache。
  • 如果节点上也没有所需要的数据,则 groupcache 从后端数据源(数据库或者文件)获取数据,并将数据保存在本地 mainCache,并返回给客户端;其中 getLocallly 函数是利用 NewGroup 时传进去的 Getter,在调用这个 Getter 的 Get 函数从数据源获取数据。
  • hotCache 存的是本节点没有的,但是访问频繁的数据, 避免多次网络请求, 热点数据采用10%随机概率,效果可能不理想。
  • loadGroup 是singleFlight的实现,确保同一时间每个 Key 只有一次访问后端。

AC自动机算法, 全称 Aho–Corasick 算法, 是由 Alfred V. Aho 和 Margaret J.Corasick 发明的字符串搜索算法。
该算法主要依靠构造一个有限状态机,在一个 trie 树中添加失配指针来实现。这些额外的失配指针允许在查找字符串失败时进行回退,转向某前缀的其他分支,免于重复匹配前缀,提高算法效率。 当一个字典串集合是已知的情况下,算法的时间复杂度为输入字符串长度和匹配数量之和。

现实中用到AC自动机的场景有如下:

  • 评论反垃圾:通过维护一个屏蔽词库,建立AC自动机,输入评论,如果命中了屏蔽词,就可以把它定性为反垃圾
  • 病毒检测:通过维护一个病毒码库,简历AC自动机,输入一串码,判断是否命中病毒

算法关键是

构建失败指针:

  • 将根结点的所有孩子结点的 failPtr 指向根结点,然后将根结点的所有孩子结点依次入列。
  • 若队列不为空:
    • A: 出列,出列结点记为 curr, failTo 表示 curr 的 fail 指向的结点,即 failTo = curr.fail
    • B: 判断 curr.child[i] == failTo.child[i]是否成立,
      • 成立:curr.child[i].fail = failTo.child[i],
      • 不成立:判断 failTo == null是否成立
        • 成立: curr.child[i].fail == root
        • 不成立:执行failTo = failTo.fail,继续执行 B
      • curr.child[i]入列,再次执行再次执行步骤 A
  • 若队列为空:结束

查找

遍历字符串的每个字符进行查找,从根结点开始,查看其孩子结点,看是否匹配,

  • 如果不匹配则沿着他的失败指针查找,直到失败指针是根节点就重启自动机,再次查找。
  • 如果经过的节点是模式字符串的结尾。意味着匹配到该模式串,并将该字符下标记录下来。

写了一个库

实现了AC自动机的 Go 语言版本。goAcAutoMachine, 支持中文搜索。

其实在几年前就接触 Eleasticsearch 了,当时作为一个SA,并没有好好了解它的用法。最近在做搜索的功能,才发觉曾经对力量一无所知。

Mapping 创建 Schema

ES 使用前需要定义 mapping,相当于 MySQL 的 TableSchema,在创建 mapping 的时候,配置 type 的dynamic=strict,禁止未定义字段自动被添加到索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
curl -XPUT "$URL/mall_goods_v1/?pretty" -H 'Content-Type:application/json' -d'
{
"mappings" : {
"goods": {
"dynamic": "strict",
"properties": {
"item_name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"item_code": {
"type": "keyword"
},
"status": {
"type": "integer"
},
"start_time":{
"type" : "date"
},
"group_ids": {
"type": "text",
"analyzer": "whitespace"
},
}
}
}
}';

中文分词

ES提供了IK分词器应对中文分词,它有两种分词模式:

  • ik_max_word:会将文本做最细粒度的拆分
  • ik_smart:会做最粗粒度的拆分
    两种分词器使用的最佳实践是:索引时用ik_max_word,在搜索时用 ik_smart。
    索引时最大化的将文章内容分词,搜索时更精确的搜索到想要的结果。

别名

给 ES 的 Index 起一个别名(alias) 能优雅解决两个索引无缝切换的问题,此功能在某些场景下非常使用。
举个例子:
如果发现 student_v1 的 Index 有问题,需重建一个 student_v2 的 Index, 我们可以给 student_v1 赋予一个别名 student,外界使用别名访问这个索引, 建好 student_v2 之后,将这个 student 别名指向 student_v2, 这样就可在外部无感知的情况下完成切换。

1
2
3
curl -XPOST "$URL/_aliases?pretty" -H 'Content-Type:application/json'  -d' 
{ "actions" : [ { "add" : { "index" : "student_v2", "alias" : "student" } } ] } '

ES 查询技巧

  • 只要不是 text 类型的字段,尽量使用 filter 过滤,因为它支持结果缓存,性能最佳。全文检索必须使用 must 而不是 filter,否则将导致 filter 缓存失效。

  • 自然语言检索应该用 “match” 查询,”term” 是不分词当成整体的。

  • 通过 termvector 查看一个字段的切分状况,用来辅助我们的查询语句是否正确

1
2
3
4
5
6
7
8
curl -XGET -H 'Content-Type:application/json' "http://$URL/mall_goods/goods/12954895599938508532/_termvectors?pretty" -d ' {
"fields" : [ "item_name"],
"offsets" : true,
"payloads" : true,
"positions" : true,
"term_statistics" : true,
"field_statistics" : true
} '
  • 版本号更新
    Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。 检查当前 _version 是否 小于 指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储.
    常见的方法是通过增加 version_type = external , 然后使用时间戳做版本号更新来保证旧文档不会覆盖新文档。
1
2
3
4
5
PUT /website/blog/2?version=5&version_type=external
{
"title": "My first external blog entry",
"text": "Starting to get the hang of this..."
}
  • 提供排序权重
    ES默认是按照 BM25 来计算分数的。比如以下例子,
    用户输入qwe 我们需要搜索出名字中包含这些字符的记录
1
2
3
4
5
6
7
8
9
GET group_member_v1/_search
{
"query": {
"bool": {
"must": {"match": {"split_nickname": {"query": "q w e","operator": "and"}}}
}
},
"size": 100
}

实际上我们希望,qwe 连在一起的 nickname 权重可以更高,排序更前,于是引用进了 const_scorematch_phrase

1
2
3
4
5
6
7
8
9
10
11
12
13
GET group_member_v1/_search
{
"query": {
"bool": {
"minimum_should_match": 1,
"should": [
{"match": {"split_nickname": {"query": "q w e","operator": "and"}}},
{ "constant_score": {"boost": 1000,"filter": {"match_phrase": {"split_nickname": "q w e"}}}}
]
}
},
"size": 100
}

Suggester

completion suggest,常叫做自动完成(auto completion) 也叫搜索推荐。
比如说我们在搜索歌曲,搜索”月亮”,百度自动给你提示 “月亮代表我的心”, “月亮之上”, 搜索引擎会自动提示补全,借助 ES 的 Suggester 也有这个功能。看如下的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
PUT /songs_index
{
"mappings": {
"songs" : {
"properties" : {
"title" : {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"suggest" : {
"type" : "completion", // 这个特性就是我们想要的
"analyzer": "ik_max_word"
}
}
},
"content": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
}

查询语句

1
2
3
4
5
6
7
8
9
10
11
GET /song_indexs/songs/_search
{
"suggest": {
"my-suggest" : {
"text" : "华南",
"completion" : {
"field" : "title.suggest"
}
}
}
}

删除指定记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST school_rank_v1/rank/_delete_by_query
{
"query": {
"constant_score" : {
"filter" : {
"bool" : {
"must" : [
{"term" : { "qs_rank" : 0 } }
]
}
}
}
}
}

STL queue 的实现

STL queue 的 API

1
2
3
4
5
6
7
8
template <typename T, typename Container = std::deque<T>>
class queue
{
T & front();
void push(const T &elem);
void pop();
bool empty() const;
};

API 看起来平淡自然,他的背后有设计的考量:

  • 用于存储元素的底层容器,容器须满足顺序容器,如 std::vector, std::list, std::dequeue
  • 在调用 front() 之前,需保证队列不为空, 即 !empty() , 否则会出现未定义错误

实际应用中,我们希望队列作为一个基础组件,可以像 Redis Queue 那样

  • 元素出列的操作将会阻塞,直到队列不为空, 在多线程环境下,我们通常不需要 front() 操作,而只是调用 pop() 将元素返回,操作是阻塞的
  • 允许并发访问队列

阻塞队列实现

  • 使用条件变量实现阻塞,要是队列是空的,则 pop() 操作将会阻塞队列
  • 提供了一个 try_pop() 操作,要是队列是空的, try_pop() 将会立即返回而不会阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
template <typename T, typename Container = std::queue<T>>
class ThreadQueue
{
public:
using container_type = Container;
using value_type = typename Container::value_type;
using reference = typename Container::reference;
using const_reference = typename Container::const_reference;
using size_type = typename Container::size_type;
using mutex_type = std::mutex;
using condition_variable_type = std::condition_variable;
private:
Container queue_;
mutable mutex_type mutex_;
condition_variable_type cond_;

public:
ThreadQueue() = default;
ThreadQueue(const ThreadQueue &) = delete;
ThreadQueue &operator= (const ThreadQueue &) = delete;
void pop(reference elem)
{
std::unique_lock<mutex_type> lock( mutex_ );
cond_.wait( lock, [this]() { return !queue_.empty(); } );
elem = std::move( queue_.front() );
queue_.pop();
}
bool try_pop( reference elem )
{
std::unique_lock<mutex_type> lock( mutex_ );
if ( queue_.empty() ) {
return false;
}
elem = std::move( queue_.front() );
queue_.pop();
return true;
}

bool empty() const
{
std::lock_guard<mutex_type> lock( mutex_ );
return queue_.empty();
}

size_type size() const
{
std::lock_guard<mutex_type> lock( mutex_ );
return queue_.size();
}

void push( const value_type &elem )
{
std::lock_guard<mutex_type> lock( mutex_ );
queue_.push( elem );
cond_.notify_one();
}

void push( value_type &&elem )
{
std::lock_guard<mutex_type> lock( mutex_ );
queue_.push( std::move( elem ) );
cond_.notify_one();
}
};
0%