求二叉树的最大深度

  • 递归
1
2
3
4
5
6
7
8
9
10
int find_max_depth_V1(TreeNode* root)
{
int depth = 0;
if (!root)
{
return 0;
}
depth = 1 + max(find_max_depth_V1(root->left), find_max_depth_V1(root->right));
return depth;
}
  • 层遍历
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int find_max_depth_V2(TreeNode* root)
{
if (!root) { return 0; }
int depth = 0;
queue<TreeNode*> Q;
Q.push(root);
while (!Q.empty())
{
depth++;
for (int i = 0; i < Q.size(); i++)
{
TreeNode* t = Q.front();
Q.pop();
if (t->left)
{
Q.push(t->left);
}
if (t->right)
{
Q.push(t->right);
}
}
}
return depth;
}

求二叉树的最小深度

  • 递归
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int find_min_depth_V1(TreeNode* root)
{
int depth = 0;
if (!root)
{
return 0;
}
if (!root->left)
{
return 1 + find_min_depth_V1(root->right);
}
if (!root->right)
{
return 1 + find_min_depth_V1(root->left);
}
depth = 1 + min(find_max_depth_V1(root->left), find_max_depth_V1(root->right));
return depth;
}

求二叉树中节点的个数

1
2
3
4
5
6
7
int count_node(TreeNode* root)
{
if (!root) { return 0; }
int left = count_node(root->left);
int right = count_node(root->right);
return left + right + 1;
}

求二叉树中叶子节点的个数

1
2
3
4
5
6
int count_leaf_node(TreeNode* root)
{
if (!root) return 0;
if (!root->left && !root->right) return 1;
return count_leaf_node(root->left) + count_leaf_node(root->right);
}

求二叉树中第 k 层节点的个数

1
2
3
4
5
6
7
8
int num_of_k_node(TreeNode* root, int k)
{
if (!root || k<1) { return 0; }
if (k == 1) { return 1; }
int num_left = num_of_k_node(root->left, k - 1);
int num_right = num_of_k_node(root->right, k - 1);
return num_left + num_right;
}

判断二叉树是否是平衡二叉树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int max_depth(TreeNode* root)
{
if (!root)
{
return 0;
}
int left = max_depth(root->left);
int right = max_depth(root->right);
if (left==-1 || right == -1 || fabs(left-right)>1) { return -1; }
return max(left,right) + 1;
}

bool isBanlanced(TreeNode* root)
{
return max_depth(root) != -1;
}

两个二叉树是否互为镜像

1
2
3
4
5
6
7
bool isMirror(TreeNode* root1, TreeNode* root2)
{
if (!root1 && !root2) { return true; }
if (!root1 || !root2) { return false; }
if (root1->val != root2->val) { return false; }
return isMirror(root1->left, root2->right) && isMirror(root1->right, root2->left);
}

翻转二叉树

1
2
3
4
5
6
7
8
9
10
11
12
TreeNode* mirrorTreeNode(TreeNode* root)
{
if (!root)
{
return NULL;
}
TreeNode* left = mirrorTreeNode(root->left);
TreeNode* right = mirrorTreeNode(root->right);
root->left = right;
root->right = left;
return root;
}

求两个二叉树的最低公共祖先节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
bool findNode(TreeNode* root, TreeNode* node)
{
if (!root || !node) {
return false;
}
if (root->val==node->val && root->left == node->left && root->right == node->right) {
return true;
}
bool found = findNode(root->left, node);
if (!found) {
found = findNode(root->right, node);
}
return found;
}

TreeNode* getLastCommonParent(TreeNode* root, TreeNode* node1, TreeNode* node2)
{
if (findNode(root->left,node1))
{
if (findNode(root->right, node2)) {
return root;
}
else {
return getLastCommonParent(root->left, node1, node2);
}
}
else
{
if (findNode(root->left,node2))
{
return root;
}
else
{
return getLastCommonParent(root->right, node1, node2);
}
}
}

二叉树的前序遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void preOrder2(TreeNode* root, vector<int> &result)
{
if (!root) { return; }
result.push_back(root->val);
preOrder2(root->left, result);
preOrder2(root->right, result);
}

vector<int> preOrderReverse(TreeNode* root)
{
vector<int> result;
preOrder2(root, result);
return result;
}

二叉树的中序遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void midOrder2(TreeNode* root, vector<int> &result)
{
if (!root)
{
return;
}
midOrder2(root->left, result);
result.push_back(root->val);
midOrder2(root->right, result);
}

vector<int> midOrderReverse(TreeNode* root)
{
vector<int> result;
midOrder2(root, result);
return result;
}

二叉树的后序遍历(递归)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void posOrder2(TreeNode* root, vector<int> &result)
{
if (!root)
{
return;
}
posOrder2(root->left, result);
posOrder2(root->right, result);
result.push_back(root->val);
}

vector<int> midOrderReverse(TreeNode* root)
{
vector<int> result;
posOrder2(root, result);
return result;
}

输入二叉树和整数,打印出二叉树节点值等于输入整数的所有路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void helper(TreeNode* root, int target, vector<int> s, int currentSum)
{
currentSum += root->val;
s.push_back(root->val);
if (!root->left && !root->right)
{
if (currentSum == target) {
for (auto i:s)
{
cout << i <<" ";
}
cout << endl;
}
}
if (root->left)
{
helper(root->left, target, s, currentSum);
}
if (root->right)
{
helper(root->right, target, s, currentSum);
}
s.pop_back();
}

void findPath(TreeNode* root, int target)
{
if (!root)
{
return;
}
vector<int> s;
int currentSum = 0;
helper(root, target, s, currentSum);
}

[toc]

Channel 是我认为 Go 最灵活的部分,而我应用的方法不多,此文是我阅读《Go并发编程实战》总结下来,当作备忘。

Channel 是什么

可在多个 goroutine 从/往 一个Channel 中 receive/send 数据, 不必考虑额外的同步措施。Channel可以作为一个先入先出的队列,接收的数据和发送的数据的顺序是一致的。

  • chanel 类型

buffered chann 满了,就会阻塞, 使用 make 分配结构空间及其附属空间,并完成其间的指针初始化, make 返回这个结构空间,不另外分配一个指针

1
2
3
4
5
//带缓冲的Channel make 
ch := make(chan Task, 3)
chan T // 可以接收和发送类型为 T 的数据
chan<- float64 // 只可以用来发送 float64 类型的数据
<-chan int // 只可以用来接收 int 类型的数据
  • 关闭 Channel

以下代码检查是否关闭, 它可以用来检查Channel是否已经被关闭了。从Channel接收一个值,如果Channel关闭了或没有数据,那么ok将被置为false

1
2
close(chan)
x, ok = <- c
  • 几种情况下的读写

在一个已经 close 的 unbuffered Channel上执行读操作,会返回Channel对应类型的零值,比如 bool 型 Channel 返回 false,int 型 Channel 返回0。

  • 向 close的Channel写则会触发panic。读不会导致阻塞。
  • 往 nil Channel 中发送数据会一直被阻塞着。
  • 对一个没有初始化的Channel进行读写操作都将发生阻塞,例子如下:
操作 空值(nil) 已关闭
关闭 panic panic
阻塞 panic
阻塞 不阻塞

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

func main() {
var c chan int
<-c
}

$ go run testnilChannel.go
fatal error: all goroutines are asleep – deadlock!

func main() {
var c chan int
c <- 1
}

$ go run testnilChannel.go
fatal error: all goroutines are asleep – deadlock!

代码技巧

  • 与 select 的配合使用

select 语句和 switch 语句一样,它不是循环,它只会选择一个 case 来处理,如果想一直处理 Channel,你可以在外面加一个无限的for循环

  • range

range c 产生的迭代值为Channel中发送的值,它会一直迭代直到 Channel 被关闭。上面的例子中如果把close(c)注释掉,程序会一直阻塞在 for 那一行。

1
2
3
for i := range c {
fmt.Println(i)
}

业务使用场景

  • 超时控制,心跳 HeartBeat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 利用 time.After 实现
func worker(start chan bool) {
timeout := time.After(30 * time.Second)
for {
select {
// … do some stuff
case <- timeout:
return
}
}
}

// 与 timeout实现类似,下面是一个简单的心跳select实现:

func worker(start chan bool) {
heartbeat := time.Tick(30 * time.Second)
for {
select {
// … do some stuff
case <- heartbeat:
//… do heartbeat stuff
}
}
}
  • 取最快的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
main() {
ret := make(chan string, 3)
for i := 0; i < cap(ret); i++ {
go call(ret)
}
fmt.Println(<-ret)
}

func call(ret chan<- string) {
// do something
// ...
ret <- "result"
}
  • 限制并发
1
2
3
4
5
6
7
8
9
10
// 最大并发数为 2
limits := make(chan struct{}, 2)
for i := 0; i < 10; i++ {
go func() {
// 缓冲区满了就会阻塞在这
limits <- struct{}{}
do()
<-limits
}()
}
  • 广播, 多个 goroutine 同步响应
1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
c := make(chan struct{})
for i := 0; i < 5; i++ {
go do(c)
}
close(c)
}

func do(c <-chan struct{}) {
// 会阻塞直到收到 close
<-c
fmt.Println("hello")
}
  • 等待一个事件

main goroutine 通过”<-c”来等待 sub goroutine中的完成事件,sub goroutine 通过close Channel触发这一事件。当然也可以通过向 Channel 写入一个 bool 值的方式来作为事件通知。main goroutine 在 Channel c上没有任何数据可读的情况下会阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
import "fmt"

func main() {
fmt.Println("Begin doing something!")
c := make(chan bool)
go func() {
fmt.Println("Doing something…")
close(c)
}()
<-c
fmt.Println("Done!")
}

忘记关闭的陷阱

事实上除了超时场景,其他使用协程(goroutine)的场景,也很容易因为实现不当,导致协程无法退出,随着时间的积累,造成内存耗尽,程序崩溃。

造成泄露的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func do(taskCh chan int) {
for {
select {
case t := <-taskCh:
time.Sleep(time.Millisecond)
fmt.Printf("task %d is done\n", t)
}
}
}

func sendTasks() {
taskCh := make(chan int, 10)
go do(taskCh)
for i := 0; i < 1000; i++ {
taskCh <- i
}
}

func TestDo(t *testing.T) {
t.Log(runtime.NumGoroutine())
sendTasks()
time.Sleep(time.Second)
t.Log(runtime.NumGoroutine())
}

正确的样子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func doCheckClose(taskCh chan int) {
for {
select {
case t, beforeClosed := <-taskCh:
if !beforeClosed {
fmt.Println("taskCh has been closed")
return
}
time.Sleep(time.Millisecond)
fmt.Printf("task %d is done\n", t)
}
}
}

func sendTasksCheckClose() {
taskCh := make(chan int, 10)
go doCheckClose(taskCh)
for i := 0; i < 1000; i++ {
taskCh <- i
}
close(taskCh)
}

func TestDoCheckClose(t *testing.T) {
t.Log(runtime.NumGoroutine())
sendTasksCheckClose()
time.Sleep(time.Second)
runtime.GC()
t.Log(runtime.NumGoroutine())
}

link: http://colobu.com/2016/04/14/Golang-Channels/
link:https://studygolang.com/articles/11320

Page Cache

为优化读写性能,Kafka 利用操作系统本身的 Page Cache,就是利用操作系统自身的内存而不是 JVM 空间内存。

  1. 避免 Object 消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
  2. 避免 GC问题:随着 JVM 中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在 GC 问题

相比于使用 JVM 等数据结构,利用操作系统的 Page Cache 更加简单可靠。

  • 操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。
  • 即使服务进程重启,系统缓存依然不会消失,避免了重建缓存的过程。

零拷贝

如果没有零拷贝技术 Kafka 读数据的过程如下图:

  1. 先看数据在不在 os cache里,如果不在的话就从磁盘文件里读取数据后放入 os cache。
  2. 从操作系统的 os cache 里拷贝数据到应用程序进程的缓存里,
  3. 从应用程序进程的缓存里拷贝数据到操作系统层面的 Socket 缓存里,
  4. 从Socket缓存里提取数据后发送到网卡,最后发送出去给下游消费。

通过零拷贝技术,就不需把 os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了。

  • 对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,大大的提升数据消费时读取文件数据的性能。
  • 在从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。
  • 大量的数据都是直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。 相当于是 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。

分区分段+索引

  • Kafka 的 message 是按 topic 分类存储的,
  • topic 中的数据按照一个一个的 partition 即分区存储到不同broker节点。
  • 每个 partition 对应了操作系统上的一个文件夹,partition 实际上又是按照 segment 分段存储的。Kafka 的 message 消息实际上是分布式存储在一个一个小的 segment 中的,每次文件操作也是直接操作的 segment。
  • Kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

磁盘顺序写, 批量读写, 批量压缩

  • 消息是顺序写磁盘的
  • Kafka 数据读写也是批量的而不是单条的。 除了利用底层的技术外,Kafka 还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销

pipline 是什么

pipeline 就是把所有的命令一次发过去,避免频繁的发送、接收带来的网络开销,redis在打包接收到一堆命令后,依次执行,然后把结果再打包返回给客户端。

1
2
3
4
5
6
7
8
9
10
import redis
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='127.0.0.1', port=6380)

if __name__ == "__main__":
with r.pipeline(transaction=False) as p:
p.sadd('seta', 1)
p.sadd('seta', 2)
p.execute()

这容易让人想起他和MULTI有什么区别呢?

  • pipeline 选择客户端缓冲,multi 选择服务端队列缓冲;
  • 请求次数的不一致,multi需要每个命令都发送一次给服务端,pipeline最后一次性发送给服务端,请求次数相对于multi减少
  • multi/exec 可以保证原子性,而 pipeline 不保证原子性

watch 是什么

提到 watch 就离不开 Redis 事务,Redis 事务可以一次执行多个命令,它先以 MULTI 开始一个事务,然后将多个命令入队到事务中, 最后由 EXEC 命令触发事务, 一并执行事务中的所有命令。
在 Redis 中使用 watch 命令可以决定事务是执行还是回滚。在 multi 命令之前使用 watch 命令监控某些键值对,然后使用 multi 命令开启事务,执行各类对数据结构进行操作的命令,这个时候命令会进入队列。 当 Redis 使用 exec 命令执行事务的时候

  1. 它首先去比对被 watch 命令所监控的键值对,如果没有发生变化,那么它会执行事务队列中的命令,提交事务;
  2. 如果发生变化,那么它不会执行任何事务中的命令,而去事务回滚。无论事务是否回滚,Redis 都会去取消执行事务前的 watch 命令

Pipeline 加上 watch, 实现事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import redis
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='127.0.0.1', port=6379)
r.set("stock:count", 100)

if __name__ == "__main__":
with r.pipeline() as pipe:
while True:
try:
# watch库存键, multi后如果该key被其他客户端改变, 事务操作会抛出WatchError异常
pipe.watch('stock:count')
count = int(pipe.get('stock:count'))
if count > 0: # 有库存
# 事务开始
pipe.multi() #
pipe.decr('stock:count')
# 把命令推送过去
return True
else:
return False
except redis.WatchError:
# 打印WatchError异常, 观察被watch锁住的情况
pipe.unwatch()

RocketMQ 是阿里巴巴消息中间件团队研发并大规模应用于生产系统的中间件,满足线上海量消息堆积的需求,早期阿里曾经基于ActiveMQ研发消息系统,随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。

主要组成包括 NameServer, Broker, Producer, Consumer

NameServer

RocketMQ 自研了类似于 zookeeper 的一个软件, 我觉得是因为功能简单,就没有引入zookeeper了, 其主要功能:

  • 管理brokers:broker 启动时会注册到 NameServer,两者之间保持心跳监测机制,来保证 NameServer 知道 broker 的存活状态
  • 路由信息管理:每一台 NameServer 存有全部的 broker 集群信息和生产者/消费者客户端的请求信息

为帮助理解,先上概念图

Broker

Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上。Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个 Broker 上。

CommitLog:是消息主体以及元数据的存储主体,对 CommitLog 建立一个ConsumeQueue,每个 ConsumeQueue 对应一个(概念模型中的)MessageQueue,所以只要有 CommitLog 在,ConsumeQueue即使数据丢失,仍然可以恢复出来。

ConsumeQueue:是一个消息的逻辑队列,存储了这个 Queue 在 CommitLog 中的起始 offset,log 大小和 MessageTag的hashCode。每个Topic下的每个 Queue 都有一个对应的 ConsumeQueue 文件,例如Topic中有三个队列,每个队列中的消息索引都会有一个编号,编号从0开始,往上递增。并由此一个位点offset的概念。

RocketMQ的高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读, 尽量命中PageCache, 所以内存越大越好。同时由于缓存的局部性原理,可以很快的在内存上读取到消息。

高可靠性高性能

高可靠性在于刷盘和 Master/Slave,即使 NameServer 全部挂掉不影响已经运行的 Broker,Producer,Consumer。
发送消息可负载均衡,且发送消息线程安全,集群消费模式下消费者端负载均衡,这些特性加上上述的高性能读写,共同造就了 RocketMQ 的高并发读写能力。
刷盘和主从同步均为异步(默认)时,broker进程挂掉(例如重启),消息依然不会丢失,因为 broker 关机时会执行持久化。 当物理机器宕机时,才有消息丢失的风险。另外,master挂掉后,消费者从slave消费消息,但slave不能写消息。

动态伸缩能力

RocketMQ具有很好动态伸缩能力(非顺序消息),伸缩性体现在Topic和Broker两个维度。

  • Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。

  • Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker 起来后向 NameServer 注册,Producer、Consumer 通过 NameServer 发现新 Broker,立即跟该Broker直连,收发消息。

事务消息机制

RocketMQ事务消息

  • Half(Prepare) Message

指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成 “暂不能投递”状态,处于该种状态下的消息即半消息。

  • 消息回查
    由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

  1. 发送方向 MQ 服务端发送消息。
  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

关于 Consumer

采用的是长轮询方式

  • CLUSTERING 模式下,一条消息只会被 ConsumerGroup 里的一个实例消费,但可以被多个不同的 ConsumerGroup 消费,
  • BROADCASTING 模式下,一条消息会被ConsumerGroup里的所有实例消费。

Kafka 与 RocketMQ 对比:

当业务需要系统间调用解耦时,MQ 是一个很好的方案,目前选择最多的当属Kafka和阿里的RocketMQ, 两种中间件的对比屡屡被提及。

  • 适用场景
    Kafka适合日志处理; RocketMQ适合业务处理。

  • 性能
    Kafka单机写入 TPS 号称在百万条/秒; RocketMQ 大约在10万条/秒。 追求性能的话,Kafka单机性能更高。

  • 可靠性
    RocketMQ 支持异步/同步刷盘; 异步/同步 Replication; Kafka使用异步刷盘方式,异步Replication。 RocketMQ所支持的同步方式提升了数据的可靠性。

  • 支持的队列数
    Kafka单机超过64个队列/分区,消息发送性能降低严重; RocketMQ 单机支持最高5万个队列,性能稳定, 这也是适合业务处理的原因之一

  • 消费失败重试机制
    Kafka消费失败不支持重试, RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。

  • 定时消息
    Kafka不支持定时消息, RocketMQ支持定时消息

  • 分布式事务消息
    Kafka不支持分布式事务消息, RocketMQ支持分布式事务消息

  • 消息查询机制
    Kafka不支持消息查询, RocketMQ支持根据Message Id查询消息,也支持根据消息内容查询消息

  • 消息回溯
    Kafka 理论上可以按照 Offset 来回溯消息, RocketMQ 支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息

GTID 是基于 MySQL 服务器生成的已经被成功执行的全局事务ID,由服务器ID以及事务ID组合而成。这个全局ID在所有存在主从关系的数据库服务器上是唯一的。这样特性使 MySQL 的主从复制变得更加简单,以及数据库一致性更可靠。

GTID 是什么

MySQL-5.6.5开始支持GTID。 global transaction identifiers。全局唯一ID。
一个GTID在一个服务器上只执行一次,避免重复执行导致数据混乱或者主从不一致。
GTID用来代替传统复制方法,不再使用 MASTER_LOG_FILEMASTER_LOG_POS 开启复制。而是使用 MASTER_AUTO_POSTION=1 的方式开始复制。

为什么用 GTID 复制

1、更简单搭建主从, 不用以前那样在需要找log_file和log_pos。
2、比传统的复制更加安全,保证数据的一致性,零丢失。

GTID的工作原理

1 当一个事务在主库端执行并提交时,产生GTID,记录到 binlog 。
2 binlog 传输到 slave, 存储到 slave 的 relaylog 后,设置gtid_next变量,告诉 slave,下一个要执行的 GTID 值。
3 SQL 线程从 relay log中获取GTID,然后对比 slave的 binlog 是否有该GTID。
4 如果有记录,说明该 GTID 的事务已经执行,slave 会忽略。
5 如果没有记录,slave 就会执行该 GTID 事务,并记录该 GTID 到自身的 binlog, 在执行事务前会先检查其他 session 持有该GTID,确保不被重复执行。
6 在解析过程中会判断是否有主键,如果没有就用二级索引,如果没有就用全部扫描。

配置GTID

对于 GTID 的配置(使用mysql-5.6.5以上版本),如下:

  • 主:
1
2
3
4
5
6
7
8
9
10
11
12
[mysqld]
server_id=1 #服务器id
gtid_mode=on #开启gtid模式
enforce_gtid_consistency=on #强制gtid一致性,开启后对于特定create table不被支持

#binlog
log_bin=master-binlog
log-slave-updates=1
binlog_format=row #其他格式可能造成数据不一致

#relay log
skip_slave_start=1
  • 从:
1
2
3
4
5
6
7
8
9
10
11
12
[mysqld]
gtid_mode=on
enforce_gtid_consistency=on
server_id=2

#binlog
log-bin=slave-binlog
log-slave-updates=1
binlog_format=row #其他格式可能造成数据不一致

#relay log
skip_slave_start=1

开始同步

  • 新数据库服务器, 在slave端执行以下操作
1
2
3
4
5
6
7
8
9
[master]> CHANGE MASTER TO  
-> MASTER_HOST='$IP',
-> MASTER_USER='repl',
-> MASTER_PASSWORD='xxx',
-> MASTER_PORT=3306,
-> MASTER_AUTO_POSITION = 1;

[master]> start slave;
Query OK, 0 rows affected (0.01 sec)
  • 传统复制的转向 GTID复制
  1. 按上文修改配置参数文件;

  2. 所有服务器设置global.read_only参数,等待主从服务器同步完毕;

1
mysql> SET @@global.read_only = ON; 
  1. 依次重启主从服务器;

  2. 使用change master 更新主从配置;

1
2
3
4
5
6
mysql> CHANGE MASTER TO
> MASTER_HOST = host,
> MASTER_PORT = port,
> MASTER_USER = user,
> MASTER_PASSWORD = password,
> MASTER_AUTO_POSITION = 1;
  1. START SLAVE;
0%