stream server client
流式grpc
- Server-side streaming RPC:服务器端流式 RPC
- Client-side streaming RPC:客户端流式 RPC
- Bidirectional streaming RPC:双向流式 RPC
1、proto
syntax ="proto3";package stream;
service StreamService {
rpc Eat(EatRequest) returns (stream EatResponse){}//服务端流式
rpc Work(stream EatRequest) returns (EatResponse){}//客户端流式
rpc Sleep(stream EatRequest) returns (stream EatResponse){}//双向流}
message Item{string value =1;string value2 =2;}
message EatRequest{
Item req =1;}
message EatResponse{
Item resp =1;}
2、服务端流式
客户端代码
/**
* @Author: zhangsan
* @Description:
* @File: main
* @Version: 1.0.0
* @Date: 2021/5/26 下午5:48
*/package main
import("context""fmt""google.golang.org/grpc"
pb "grpc/test/src/proto""io""log")const(
PORT ="9002")funcmain(){
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err !=nil{
log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err =printEats(client,&pb.PublicRequest{
Req:&pb.Item{
Value:"value",
Value2:"value1",},})if err !=nil{
log.Fatalf("printEats.err: %v", err)}}//服务端流式funcprintEats(client pb.StreamServiceClient, r *pb.PublicRequest)error{var c context.Context
c = context.WithValue(context.TODO(),"a","b")
stream,err := client.Eat(c,r)if err !=nil{return err
}//接收server的header信息
fmt.Println(stream.Header())//map[cc:[dd] content-type:[application/grpc]]for{
resp ,err := stream.Recv()if err == io.EOF{break}if err !=nil{return err
}
log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)}//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
fmt.Println(stream.Trailer())//map[cc1:[dd1]]returnnil}
服务端代码
/**
* @Author: zhangsan
* @Description:
* @File: main
* @Version: 1.0.0
* @Date: 2021/5/26 下午5:32
*/package main
import("fmt""google.golang.org/grpc""google.golang.org/grpc/metadata"
pb "grpc/test/src/proto""log""net""time")type StreamService struct{}const(
PORT ="9002")funcmain(){
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server,&StreamService{})
lis, err := net.Listen("tcp",":"+PORT)if err !=nil{
log.Fatalf("net.Listen err: %v", err)}
server.Serve(lis)}//服务端流式func(s *StreamService)Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer)error{//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil!= err{return err
}//设置header信息//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{// return err//}//设置metadata,注意一元和流式的区别
stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
a := stream.Context().Value("a")
fmt.Println(a)for i :=0; i <10;i++{
time.Sleep(1*time.Second)
err := stream.Send(&pb.PublicResponse{
Resp:&pb.Item{
Value:"eat",
Value2:"服务端流式",},})if err !=nil{return err
}}returnnil}func(s *StreamService)Work(stream pb.StreamService_WorkServer)error{returnnil}func(s *StreamService)Sleep(stream pb.StreamService_SleepServer)error{returnnil}
验证
go run src/client/stream_client/main.go
map[cc2:[dd2] content-type:[application/grpc]] <nil>
2021/05/27 10:05:08 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:09 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:10 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:11 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:12 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:13 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:14 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:15 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:16 resp: value1 eat, value1 服务端流式
go run src/server/stream_server/main.go
分析
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eRmLdnpt-1622633151060)(readme.assets/image-20210527111442672.png)]
server
Stream.Send 追后也是调用的SendMsg方法
- 消息体(对象)序列化
- 压缩序列化后的消息体
- 对正在传输的消息体增加5个字节的header
- 判断压缩+序列化后的消息体总字节是否大雨预设的maxSendMessageSize(math。MaxInt32),超出报错
- 写入给流的数据集
//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil!= err{return err
}//设置header信息//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{// return err//}//设置metadata,注意一元和流式的区别
stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
- SetHeader 和 SendHeader ,SendHeader会覆盖之前的setheader信息,尽量只使用一个
- SetTrailer 和 SetHeader 区别,SetTrailer在一元的时候会和SetHeader一起返回,在流式rpc的时候会在send 和rev之后才会被发送和接收setTrailer和setheader区别
Client
RecvMsg 会从流中读取完整的 gRPC 消息体,另外通过阅读源码可得知:
(1)RecvMsg 是阻塞等待的
(2)RecvMsg 当流成功/结束(调用了 Close)时,会返回
io.EOF
(3)RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误:
- io.EOF
- io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes
同时需要注意,默认的 MaxReceiveMessageSize 值为 1024 * 1024 * 4,建议不要超出 4M
- 在ServerBuilder中,通过SetMaxReceiveMessageSize(int)设置这个最大允许字节长度,因为这里的参数为Int型,所以其最大的字节允许长度也就是INT_MAX=2147483647 (2G)。
- 流式的grpc传输4M的大小是可以的,流式传输哈
Server
SendMsg
方法,该方法涉及以下过程:
- 消息体(对象)序列化
- 压缩序列化后的消息体
- 对正在传输的消息体增加 5 个字节的 header
- 判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为
math.MaxInt32
),若超出则提示错误 - 写入给流的数据集
3、客户端流式
客户端代码
package main
import("context""fmt""google.golang.org/grpc"
pb "grpc/test/src/proto""io""log")const(
PORT ="9002")funcmain(){
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err !=nil{
log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err =printWork(client,&pb.PublicRequest{
Req:&pb.Item{
Value:"valueWork",
Value2:"value1Work",},})if err !=nil{
log.Fatalf("printWork.err: %v", err)}}funcprintWork(client pb.StreamServiceClient, r *pb.PublicRequest)error{
stream,err := client.Work(context.Background())if err !=nil{return err
}for i :=0;i <6;i++{
fmt.Println(r)
err := stream.Send(r)if err == io.EOF{break}if err !=nil{return err
}}//注意这个header是设置不了的//fmt.Println(stream.Header())
resp ,err := stream.CloseAndRecv()if err !=nil{return err
}
log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
fmt.Println(stream.Trailer())//map[cc1:[dd1]]returnnil}
服务端代码
package main
import("fmt""google.golang.org/grpc""google.golang.org/grpc/metadata"
pb "grpc/test/src/proto""io""log""net")type StreamService struct{}const(
PORT ="9002")funcmain(){//设置客户端最大接收值//opt := grpc.MaxRecvMsgSize()
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server,&StreamService{})
lis, err := net.Listen("tcp",":"+PORT)if err !=nil{
log.Fatalf("net.Listen err: %v", err)}
server.Serve(lis)}//客户端流事rpcfunc(s *StreamService)Work(stream pb.StreamService_WorkServer)error{//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil!= err{return err
}//设置header信息//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{// return err//}//设置metadata,注意一元和流式的区别
stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
a := stream.Context().Value("a")
fmt.Println(a)for{
r ,err := stream.Recv()if err == io.EOF{return stream.SendAndClose(&pb.PublicResponse{
Resp:&pb.Item{
Value:"client-stream-server",
Value2:"client-stream-server-v2",},})}if err !=nil{return err
}
log.Printf("stream.Recv value: %s,value2: %s", r.Req.Value, r.Req.Value2)}}//服务端流式func(s *StreamService)Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer)error{returnnil}func(s *StreamService)Sleep(stream pb.StreamService_SleepServer)error{returnnil}
验证
->%go run src/client/client-stream-client/main.go
req:<value:"valueWork" value2:"value1Work">
req:<value:"valueWork" value2:"value1Work">
req:<value:"valueWork" value2:"value1Work">
req:<value:"valueWork" value2:"value1Work">
req:<value:"valueWork" value2:"value1Work">
req:<value:"valueWork" value2:"value1Work">2021/06/0213:51:54 resp: value1 client-stream-server, value1 client-stream-server-v2
map[cc1:[dd1]]->%go run src/server/client-stream_server/mian.go<nil>2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
分析
客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PyN9Izz2-1622633151074)(readme.assets/image-20210602135900704.png)]
- 服务端是可以设置grpc.MaxRecvMsgSize()的接收大小的 默认是102410244= 4m大小 还可以设置grpc.MaxSendMsgSize() 发送的大小,默认是int32,超出会报错
- stream.SendAndClose 当发现client的流关闭之后,需要将最终的结果响应给客户端,同时关闭在另一侧的recv
- stream.CloseAndRecv 就是和上面的一起使用的
4、客户端、服务端流式
客户端
package main
import("context""google.golang.org/grpc"
pb "grpc/test/src/proto""io""log")const(
PORT ="9002")funcmain(){
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err !=nil{
log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err =printSleep(client,&pb.PublicRequest{
Req:&pb.Item{
Value:"valueSleep",
Value2:"value1Sleep",},})if err !=nil{
log.Fatalf("printSleep.err: %v", err)}}//双向流funcprintSleep(client pb.StreamServiceClient, r *pb.PublicRequest)error{
stream, err := client.Sleep(context.Background())if err !=nil{return err
}for n :=0; n <=6; n++{
err = stream.Send(r)if err !=nil{return err
}
resp, err := stream.Recv()if err == io.EOF {break}if err !=nil{return err
}
log.Printf("resp: value1: %s, value2: %s", resp.Resp.Value, resp.Resp.Value2)}if err = stream.CloseSend();nil!= err{
log.Println(err)}returnnil}
服务端
package main
import("google.golang.org/grpc"
pb "grpc/test/src/proto""io""log""net")type StreamService struct{}const(
PORT ="9002")funcmain(){//设置客户端最大接收值//opt := grpc.MaxRecvMsgSize()//grpc.MaxSendMsgSize()
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server,&StreamService{})
lis, err := net.Listen("tcp",":"+PORT)if err !=nil{
log.Fatalf("net.Listen err: %v", err)}
server.Serve(lis)}//客户端流事rpcfunc(s *StreamService)Work(stream pb.StreamService_WorkServer)error{returnnil}//服务端流式func(s *StreamService)Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer)error{returnnil}//双向流func(s *StreamService)Sleep(stream pb.StreamService_SleepServer)error{
n :=0for{
err := stream.Send(&pb.PublicResponse{
Resp:&pb.Item{
Value:"gPRC Stream Client: Sleep",
Value2:"双向stream-value2",},})if err !=nil{return err
}
r, err := stream.Recv()if err == io.EOF {returnnil}if err !=nil{return err
}
n++
log.Printf("stream.Recv req.value: %s, pt.value2: %s", r.Req.Value, r.Req.Value2)}}
验证
->%go run src/client/clientServer-stream_client/main.go2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
->%go run src/server/clientServer-stream_server/main.go2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
简单的介绍了几种rpc的流的使用,大家根据需求,合理使用
版权归原作者 a...Z 所有, 如有侵权,请联系我们删除。