服务端流式 完整代码可参考: https://github.com/rexyan/Go-Microservice/tree/main/server_stream
客户端发出一个RPC请求,服务端与客户端之间建立一个单向的流,服务端可以向流中写入多个响应消息,最后主动关闭流;而客户端需要监听这个流,不断获取响应直到流关闭。
创建 proto 文件,返回值是 stream 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 syntax = "proto3"; option go_package="server/server/pb"; package server; service Greeter { // 这里返回是 stream 类型的 rpc SayHello(HelloRequest) returns (stream HelloResponse){} } message HelloRequest{ string name=1; } message HelloResponse{ string reply =1; } // server 目录下执行: // protoc --proto_path=pb --go_out=pb --go_opt=paths=source_relative hello.proto // protoc --proto_path=pb --go-grpc_out=pb --go-grpc_opt=paths=source_relative hello.proto
服务端函数的实现中,第一个参数是接收到的请求值,第二个是接受到的 stream。使用 stream.sent
方法不断的往里面发送数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package mainimport ( "google.golang.org/grpc" "net" "server/pb" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello (in *pb.HelloRequest, stream pb.Greeter_SayHelloServer) error { words := []string { "你好" , "hello" , "こんにちは" , "안녕하세요" , } reply := "hello" + in.GetName() for _, word := range words { err := stream.Send(&pb.HelloResponse{Reply: word + reply}) if err != nil { return err } } return nil } func main () { listen, err := net.Listen("tcp" , ":8888" ) if err != nil { return } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) err = s.Serve(listen) if err != nil { return } }
客户端调用方法后,获取到的返回值是 stream 类型。客户端需要一直轮询从 stream 中获取数据,直到 err 为 io.EOF
表示流已经关闭。这时结束轮询即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package mainimport ( "client/pb" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "io" "time" ) func main () { conn, err := grpc.Dial("127.0.0.1:8888" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1 ) defer cancel() stream, err := c.SayHello(ctx, &pb.HelloRequest{Name: "张三" }) if err != nil { return } for { data, err := stream.Recv() if err == io.EOF { fmt.Println("stream 已关闭!" ) break } if err != nil { return } fmt.Println(data.GetReply()) } }
客户端流式 完整代码可参考:https://github.com/rexyan/Go-Microservice/tree/main/client_stream
客户端传入多个请求对象,服务端返回一个响应结果。典型的应用场景举例:物联网终端向服务器上报数据、大数据流式计算等。
创建 proto 文件,请求参数是 stream 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 syntax = "proto3"; option go_package="client/client/pb"; package server; service Greeter { // 这里参数 stream 类型的 rpc SayHello(stream HelloRequest) returns (HelloResponse){} } message HelloRequest{ string name=1; } message HelloResponse{ string reply =1; } // client 目录下执行: // protoc --proto_path=pb --go_out=pb --go_opt=paths=source_relative hello.proto // protoc --proto_path=pb --go-grpc_out=pb --go-grpc_opt=paths=source_relative hello.proto
客户端调用对应的方法时候,返回值为 stream 类型的,我们需要将要发送的数据通过 stream.send
发往服务端。数据发完后,然后通过 stream.CloseAndRecv
关闭流,并且接收服务端的返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package mainimport ( "client/pb" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "time" ) func main () { conn, err := grpc.Dial("127.0.0.1:8888" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1 ) defer cancel() stream, err := c.SayHello(ctx) if err != nil { return } names := []string {"张三" , "王五" , "李四" } for _, name := range names { err := stream.Send(&pb.HelloRequest{Name: name}) if err != nil { return } } recv, err := stream.CloseAndRecv() if err != nil { return } fmt.Println(recv.GetReply()) }
当服务端接受到 stream 数据的时候,需要在循环使用 stream.Recv()
从里面获取数据,直到返回的 err 为 io.EOF
为止,这时调用 stream.SendAndClose
关闭 stream,并返回数据给客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package mainimport ( "google.golang.org/grpc" "io" "net" "server/pb" "strings" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello (stream pb.Greeter_SayHelloServer) error { replyData := "欢迎: " var recvData []string for { recv, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.HelloResponse{ Reply: replyData + strings.Join(recvData, "," ), }) } if err != nil { return err } recvData = append (recvData, recv.GetName()) } } func main () { listen, err := net.Listen("tcp" , ":8888" ) if err != nil { return } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) err = s.Serve(listen) if err != nil { return } }
双向流式 完整代码可参考: https://github.com/rexyan/Go-Microservice/tree/main/two_way_stream
客户端主协程一直接收用户的输入,子协程一直接收服务端返回的数据。服务端一直处理客户端的数据。
proto 文件示例如下,参数和返回值都为 stream 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 syntax = "proto3" ; option go_package="server/server/pb" ; package server;service Greeter { rpc SayHello(stream HelloRequest) returns (stream HelloResponse){} } message HelloRequest{ string name=1 ; } message HelloResponse{ string reply =1 ; }
客户端代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package mainimport ( "bufio" "client/pb" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "io" "log" "os" "strings" "time" ) func main () { conn, err := grpc.Dial("127.0.0.1:8888" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*100 ) defer cancel() stream, err := c.SayHello(ctx) if err != nil { return } waitCh := make (chan struct {}) go receiveMessage(stream, waitCh) reader := bufio.NewReader(os.Stdin) for { cmd, _ := reader.ReadString('\n' ) cmd = strings.TrimSpace(cmd) if len (cmd) == 0 { continue } if strings.ToUpper(cmd) == "QUIT" { break } if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil { log.Fatalf("发送数据 (%v) 失败: %v" , cmd, err) } } stream.CloseSend() <-waitCh } func receiveMessage (stream pb.Greeter_SayHelloClient, waitCh chan struct {}) { for { receive, err := stream.Recv() if err == io.EOF { close (waitCh) return } if err != nil { return } fmt.Printf("接收到消息: %v\n" , receive.GetReply()) } }
服务端处理流数据如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package mainimport ( "google.golang.org/grpc" "io" "net" "server/pb" ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello (stream pb.Greeter_SayHelloServer) error { for { recv, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } err = stream.Send(&pb.HelloResponse{Reply: "你是:" + recv.GetName()}) if err != nil { return err } } } func main () { listen, err := net.Listen("tcp" , ":8888" ) if err != nil { return } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) err = s.Serve(listen) if err != nil { return } }