Go gRPC流式操作

服务端流式

完整代码可参考: https://github.com/rexyan/Go-Microservice/tree/main/server_stream

客户端发出一个RPC请求,服务端与客户端之间建立一个单向的流,服务端可以向流中写入多个响应消息,最后主动关闭流;而客户端需要监听这个流,不断获取响应直到流关闭。

创建 proto 文件,返回值是 stream 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
syntax = "proto3";

option go_package="server/server/pb";

package server;

service Greeter {
// 这里返回是 stream 类型的
rpc SayHello(HelloRequest) returns (stream HelloResponse){}
}

message HelloRequest{
string name=1;
}

message HelloResponse{
string reply =1;
}

// server 目录下执行:
// protoc --proto_path=pb --go_out=pb --go_opt=paths=source_relative hello.proto
// protoc --proto_path=pb --go-grpc_out=pb --go-grpc_opt=paths=source_relative hello.proto

服务端函数的实现中,第一个参数是接收到的请求值,第二个是接受到的 stream。使用 stream.sent 方法不断的往里面发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"google.golang.org/grpc"
"net"
"server/pb"
)

type server struct {
pb.UnimplementedGreeterServer
}

func (s *server) SayHello(in *pb.HelloRequest, stream pb.Greeter_SayHelloServer) error {
words := []string{
"你好",
"hello",
"こんにちは",
"안녕하세요",
}
reply := "hello" + in.GetName()

// 使用 stream.Send 返回流式数据
for _, word := range words {
err := stream.Send(&pb.HelloResponse{Reply: word + reply})
if err != nil {
return err
}
}
return nil
}

func main() {
// 创建 tcp
listen, err := net.Listen("tcp", ":8888")
if err != nil {
return
}
// 创建 grpc 服务
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
// 启动服务
err = s.Serve(listen)
if err != nil {
return
}
}

客户端调用方法后,获取到的返回值是 stream 类型。客户端需要一直轮询从 stream 中获取数据,直到 err 为 io.EOF 表示流已经关闭。这时结束轮询即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"client/pb"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"time"
)

func main() {

// 连接 server 端
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
defer conn.Close()

// 创建客户端
c := pb.NewGreeterClient(conn)

// 创建一个带超时时间的 ctx
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

// 调用方法,返回值为 stream
stream, err := c.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
return
}

// 循环从 stream 中获取返回值
for {
data, err := stream.Recv()
if err == io.EOF {
fmt.Println("stream 已关闭!")
break
}
if err != nil {
return
}
fmt.Println(data.GetReply())
}
}

客户端流式

完整代码可参考:https://github.com/rexyan/Go-Microservice/tree/main/client_stream

客户端传入多个请求对象,服务端返回一个响应结果。典型的应用场景举例:物联网终端向服务器上报数据、大数据流式计算等。

创建 proto 文件,请求参数是 stream 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
syntax = "proto3";

option go_package="client/client/pb";

package server;

service Greeter {
// 这里参数 stream 类型的
rpc SayHello(stream HelloRequest) returns (HelloResponse){}
}

message HelloRequest{
string name=1;
}

message HelloResponse{
string reply =1;
}

// client 目录下执行:
// protoc --proto_path=pb --go_out=pb --go_opt=paths=source_relative hello.proto
// protoc --proto_path=pb --go-grpc_out=pb --go-grpc_opt=paths=source_relative hello.proto

客户端调用对应的方法时候,返回值为 stream 类型的,我们需要将要发送的数据通过 stream.send 发往服务端。数据发完后,然后通过 stream.CloseAndRecv 关闭流,并且接收服务端的返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"client/pb"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"time"
)

func main() {
// 连接 server 端
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
defer conn.Close()

// 创建客户端
c := pb.NewGreeterClient(conn)

// 创建一个带超时时间的 ctx
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

// 调用方法,返回值为 stream
stream, err := c.SayHello(ctx)
if err != nil {
return
}

// 循环调用 stream.Send 发送数据
names := []string{"张三", "王五", "李四"}
for _, name := range names {
err := stream.Send(&pb.HelloRequest{Name: name})
if err != nil {
return
}
}
// 关闭流并获取返回
recv, err := stream.CloseAndRecv()
if err != nil {
return
}
fmt.Println(recv.GetReply())
// 欢迎: 张三,王五,李四
}

当服务端接受到 stream 数据的时候,需要在循环使用 stream.Recv() 从里面获取数据,直到返回的 err 为 io.EOF 为止,这时调用 stream.SendAndClose 关闭 stream,并返回数据给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"google.golang.org/grpc"
"io"
"net"
"server/pb"
"strings"
)

type server struct {
pb.UnimplementedGreeterServer
}

func (s *server) SayHello(stream pb.Greeter_SayHelloServer) error {
replyData := "欢迎: "
var recvData []string
// 接收 stream 中的值
for {
recv, err := stream.Recv()
// 客户端传完了数据,服务端可以停止接收,关闭 stream 并返回
// 这里并不一定要等到 客户端 发完数据才返回客户端。也可以边接受,边发送,只是等到客户端传完数据才关闭 stream 即可。
if err == io.EOF {
return stream.SendAndClose(&pb.HelloResponse{
Reply: replyData + strings.Join(recvData, ","),
})
}
if err != nil {
return err
}
recvData = append(recvData, recv.GetName())
}
}

func main() {
// 创建 tcp
listen, err := net.Listen("tcp", ":8888")
if err != nil {
return
}
// 创建 grpc 服务
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
// 启动服务
err = s.Serve(listen)
if err != nil {
return
}
}

双向流式

完整代码可参考: https://github.com/rexyan/Go-Microservice/tree/main/two_way_stream

客户端主协程一直接收用户的输入,子协程一直接收服务端返回的数据。服务端一直处理客户端的数据。

proto 文件示例如下,参数和返回值都为 stream 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
syntax = "proto3";

option go_package="server/server/pb";

package server;

service Greeter {
// 这里参数 stream 类型的
rpc SayHello(stream HelloRequest) returns (stream HelloResponse){}
}

message HelloRequest{
string name=1;
}

message HelloResponse{
string reply =1;
}

// client 目录下执行:
// protoc --proto_path=pb --go_out=pb --go_opt=paths=source_relative hello.proto
// protoc --proto_path=pb --go-grpc_out=pb --go-grpc_opt=paths=source_relative hello.proto

客户端代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main

import (
"bufio"
"client/pb"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"os"
"strings"
"time"
)

func main() {
// 连接 server 端
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
defer conn.Close()

// 创建客户端
c := pb.NewGreeterClient(conn)

// 创建一个带超时时间的 ctx
ctx, cancel := context.WithTimeout(context.Background(), time.Second*100)
defer cancel()

// 调用方法,返回值为 stream
stream, err := c.SayHello(ctx)
if err != nil {
return
}

waitCh := make(chan struct{})
// 子协程一直接收数据
go receiveMessage(stream, waitCh)

// 主协程一直发送数据
// 从标准输入生成读对象
reader := bufio.NewReader(os.Stdin)
for {
// 读到换行
cmd, _ := reader.ReadString('\n')
cmd = strings.TrimSpace(cmd)
if len(cmd) == 0 {
continue
}
if strings.ToUpper(cmd) == "QUIT" {
break
}
// 将获取到的数据发送至服务端
if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil {
log.Fatalf("发送数据 (%v) 失败: %v", cmd, err)
}
}
stream.CloseSend()

<-waitCh
}

func receiveMessage(stream pb.Greeter_SayHelloClient, waitCh chan struct{}) {
for {
receive, err := stream.Recv()
if err == io.EOF {
close(waitCh)
return
}
if err != nil {
return
}
fmt.Printf("接收到消息: %v\n", receive.GetReply())
}
}

服务端处理流数据如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"google.golang.org/grpc"
"io"
"net"
"server/pb"
)

type server struct {
pb.UnimplementedGreeterServer
}

func (s *server) SayHello(stream pb.Greeter_SayHelloServer) error {
// 接收 stream 中的值
for {
recv, err := stream.Recv()
// 客户端传完了数据,服务端可以停止接收,关闭 stream 并返回
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = stream.Send(&pb.HelloResponse{Reply: "你是:" + recv.GetName()})
if err != nil {
return err
}
}
}

func main() {
// 创建 tcp
listen, err := net.Listen("tcp", ":8888")
if err != nil {
return
}
// 创建 grpc 服务
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
// 启动服务
err = s.Serve(listen)
if err != nil {
return
}
}