GRPC 通讯的四种方式

GRPC 通讯的四种方式有

1
2
3
4
5
6
7
8
9
10
service Echo {
// UnaryAPI
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
// SServerStreaming
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
// ClientStreamingE
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
// BidirectionalStreaming
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}
  • 服务端与客户端普通的 Protobuf Message 通讯

  • 客户端发起普通的 ProtoBuf Message, 服务端使用Stream 回应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Server 代码
1. 获取stream
2 接受客户端消息,然后不断发送
3 返回 nil 表示结束
*/
func (e *Echo) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
log.Printf("Recved %v", req.GetMessage())
// 具体返回多少个response根据业务逻辑调整
for i := 0; i < 2; i++ {
// 通过 send 方法不断推送数据
err := stream.Send(&pb.EchoResponse{Message: req.GetMessage()})
if err != nil {
log.Fatalf("Send error:%v", err)
return err
}
}
// 返回nil表示已经完成响应
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
Client 代码
1. 获取stream, 并发送 HelloWorld
2. for 循环获取服务端推送的消息, err == io.EOF 则表示服务端关闭stream了 退出
*/
func serverStream(client pb.EchoClient) {
// 获取stream
stream, err := client.ServerStreamingEcho(context.Background(), &pb.EchoRequest{Message: "Hello World"})
if err != nil {
log.Fatalf("could not echo: %v", err)
}
// for 循环获取服务端推送的消息
for {
// 通过 Recv() 不断获取服务端send()推送的消息
resp, err := stream.Recv()
// err==io.EOF 则表示服务端关闭stream了 退出
if err == io.EOF {
log.Println("server closed")
break
}
if err != nil {
log.Printf("Recv error:%v", err)
continue
}
log.Printf("Recv data:%v", resp.GetMessage())
}
}
  • 客户端端发起 Stream 流式请求, 服务端使用 Protobuf Message 回应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Server代码
for 循环中通过 stream.Recv() 不断接收client传来的数据
err == io.EOF表示客户端已经发送完毕关闭连接了,此时在等待服务端处理完并返回消息
stream.SendAndClose() 发送消息并关闭连接
*/
func (e *Echo) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
log.Println("client closed")
// 4.SendAndClose 返回并关闭连接, 在客户端发送完毕后服务端即可返回响应
return stream.SendAndClose(&pb.EchoResponse{Message: "ok"})
}
if err != nil {
return err
}
log.Printf("Recved %v", req.GetMessage())
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
Client 代码
1. 建立连接并获取client
2. 获取 stream 并通过 Send 方法不断推送数据到服务端
3. 发送完成后通过 stream.CloseAndRecv() 关闭steam并接收服务端返回结果
*/
func clientStream(client pb.EchoClient) {
stream, err := client.ClientStreamingEcho(context.Background())
if err != nil {
log.Fatalf("Sum() error: %v", err)
}
for i := int64(0); i < 2; i++ {
err := stream.Send(&pb.EchoRequest{Message: "hello world"})
if err != nil {
log.Printf("send error: %v", err)
continue
}
}

resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("CloseAndRecv() error: %v", err)
}
log.Printf("sum: %v", resp.GetMessage())
}
  • 客户端服务端 双向流式通讯。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*
Server 代码
1. 建立连接 获取client
2. 通过client调用方法获取stream
3. 开两个goroutine(使用 chan 传递数据) 分别用于Recv()和Send()
一直Recv()到err==io.EOF(即客户端关闭stream), Send()则自己控制什么时候Close, 服务端stream没有Close 只要跳出循环就算close了
*/
func (e *Echo) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
var (
waitGroup sync.WaitGroup
msgCh = make(chan string)
)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()

for v := range msgCh {
err := stream.Send(&pb.EchoResponse{Message: v})
if err != nil {
fmt.Println("Send error:", err)
continue
}
}
}()

waitGroup.Add(1)
go func() {
defer waitGroup.Done()
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("recv error:%v", err)
}
fmt.Printf("Recved :%v \n", req.GetMessage())
msgCh <- req.GetMessage()
}
close(msgCh)
}()
waitGroup.Wait()

// 返回nil表示已经完成响应
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
Client 代码
1. 建立连接 获取client
2. 通过client获取stream
3. 开两个goroutine 分别用于Recv()和Send()
3.1 一直Recv()到err==io.EOF(即服务端关闭stream)
3.2 Send()则由自己控制
4. 发送完毕调用 stream.CloseSend()关闭stream 必须调用关闭 否则Server会一直尝试接收数据
*/
func bidirectionalStream(client pb.EchoClient) {
var wg sync.WaitGroup
stream, err := client.BidirectionalStreamingEcho(context.Background())
if err != nil {
panic(err)
}
wg.Add(1)
go func() {
defer wg.Done()
for {
req, err := stream.Recv()
if err == io.EOF {
fmt.Println("Server Closed")
break
}
if err != nil {
continue
}
fmt.Printf("Recved:%v \n", req.GetMessage())
}
}()

wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 2; i++ {
err := stream.Send(&pb.EchoRequest{Message: "hello world"})
if err != nil {
log.Printf("send error:%v\n", err)
}
time.Sleep(time.Second)
}
// 4. 发送完毕关闭stream
err := stream.CloseSend()
if err != nil {
log.Printf("Send error:%v\n", err)
return
}
}()
wg.Wait()
}

上述完整代码 Link