0


grpc流式使用和注意事项

stream server client

流式grpc

  • Server-side streaming RPC:服务器端流式 RPC
  • Client-side streaming RPC:客户端流式 RPC
  • Bidirectional streaming RPC:双向流式 RPC

1、proto

syntax ="proto3";package stream;

service StreamService {
  rpc Eat(EatRequest) returns (stream EatResponse){}//服务端流式
  rpc Work(stream EatRequest) returns (EatResponse){}//客户端流式
  rpc Sleep(stream EatRequest) returns (stream EatResponse){}//双向流}

message Item{string value =1;string value2 =2;}

message EatRequest{
  Item req =1;}

message EatResponse{
    Item resp =1;}

2、服务端流式

客户端代码

/**
 * @Author: zhangsan
 * @Description:
 * @File:  main
 * @Version: 1.0.0
 * @Date: 2021/5/26 下午5:48
 */package main

import("context""fmt""google.golang.org/grpc"
    pb "grpc/test/src/proto""io""log")const(
    PORT ="9002")funcmain(){
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err !=nil{
        log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()
    client := pb.NewStreamServiceClient(conn)

    err =printEats(client,&pb.PublicRequest{
        Req:&pb.Item{
            Value:"value",
            Value2:"value1",},})if err !=nil{
        log.Fatalf("printEats.err: %v", err)}}//服务端流式funcprintEats(client pb.StreamServiceClient, r *pb.PublicRequest)error{var c context.Context

    c = context.WithValue(context.TODO(),"a","b")

    stream,err := client.Eat(c,r)if err !=nil{return err
    }//接收server的header信息
    fmt.Println(stream.Header())//map[cc:[dd] content-type:[application/grpc]]for{
        resp ,err := stream.Recv()if err == io.EOF{break}if err !=nil{return err
        }
        log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)}//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
    fmt.Println(stream.Trailer())//map[cc1:[dd1]]returnnil}

服务端代码

/**
 * @Author: zhangsan
 * @Description:
 * @File:  main
 * @Version: 1.0.0
 * @Date: 2021/5/26 下午5:32
 */package main

import("fmt""google.golang.org/grpc""google.golang.org/grpc/metadata"
    pb "grpc/test/src/proto""log""net""time")type StreamService struct{}const(
    PORT ="9002")funcmain(){
    server := grpc.NewServer()
    pb.RegisterStreamServiceServer(server,&StreamService{})
    lis, err := net.Listen("tcp",":"+PORT)if err !=nil{
        log.Fatalf("net.Listen err: %v", err)}
    server.Serve(lis)}//服务端流式func(s *StreamService)Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer)error{//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil!= err{return err
    }//设置header信息//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{//    return err//}//设置metadata,注意一元和流式的区别
    stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})

    a := stream.Context().Value("a")
    fmt.Println(a)for i :=0; i <10;i++{
        time.Sleep(1*time.Second)
        err := stream.Send(&pb.PublicResponse{
            Resp:&pb.Item{
                Value:"eat",
                Value2:"服务端流式",},})if err !=nil{return err
        }}returnnil}func(s *StreamService)Work(stream pb.StreamService_WorkServer)error{returnnil}func(s *StreamService)Sleep(stream pb.StreamService_SleepServer)error{returnnil}

验证

 go run src/client/stream_client/main.go
map[cc2:[dd2] content-type:[application/grpc]] <nil>
2021/05/27 10:05:08 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:09 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:10 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:11 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:12 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:13 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:14 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:15 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:16 resp: value1 eat, value1 服务端流式
go run src/server/stream_server/main.go

分析

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eRmLdnpt-1622633151060)(readme.assets/image-20210527111442672.png)]

server

Stream.Send 追后也是调用的SendMsg方法

  • 消息体(对象)序列化
  • 压缩序列化后的消息体
  • 对正在传输的消息体增加5个字节的header
  • 判断压缩+序列化后的消息体总字节是否大雨预设的maxSendMessageSize(math。MaxInt32),超出报错
  • 写入给流的数据集
//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil!= err{return err
    }//设置header信息//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{//    return err//}//设置metadata,注意一元和流式的区别
    stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
  1. SetHeader 和 SendHeader ,SendHeader会覆盖之前的setheader信息,尽量只使用一个
  2. SetTrailer 和 SetHeader 区别,SetTrailer在一元的时候会和SetHeader一起返回,在流式rpc的时候会在send 和rev之后才会被发送和接收setTrailer和setheader区别

Client

RecvMsg 会从流中读取完整的 gRPC 消息体,另外通过阅读源码可得知:

(1)RecvMsg 是阻塞等待的

(2)RecvMsg 当流成功/结束(调用了 Close)时,会返回

io.EOF

(3)RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

同时需要注意,默认的 MaxReceiveMessageSize 值为 1024 * 1024 * 4,建议不要超出 4M

  1. 在ServerBuilder中,通过SetMaxReceiveMessageSize(int)设置这个最大允许字节长度,因为这里的参数为Int型,所以其最大的字节允许长度也就是INT_MAX=2147483647 (2G)。
  2. 流式的grpc传输4M的大小是可以的,流式传输哈

Server

SendMsg

方法,该方法涉及以下过程:

  • 消息体(对象)序列化
  • 压缩序列化后的消息体
  • 对正在传输的消息体增加 5 个字节的 header
  • 判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
  • 写入给流的数据集

3、客户端流式

客户端代码

package main

import("context""fmt""google.golang.org/grpc"
    pb "grpc/test/src/proto""io""log")const(
    PORT ="9002")funcmain(){
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err !=nil{
        log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()
    client := pb.NewStreamServiceClient(conn)

    err =printWork(client,&pb.PublicRequest{
        Req:&pb.Item{
            Value:"valueWork",
            Value2:"value1Work",},})if err !=nil{
        log.Fatalf("printWork.err: %v", err)}}funcprintWork(client pb.StreamServiceClient, r *pb.PublicRequest)error{
    stream,err := client.Work(context.Background())if err !=nil{return err
    }for i :=0;i <6;i++{
        fmt.Println(r)
        err := stream.Send(r)if err == io.EOF{break}if err !=nil{return err
        }}//注意这个header是设置不了的//fmt.Println(stream.Header())

    resp ,err := stream.CloseAndRecv()if err !=nil{return err
    }

    log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
    fmt.Println(stream.Trailer())//map[cc1:[dd1]]returnnil}

服务端代码

package main

import("fmt""google.golang.org/grpc""google.golang.org/grpc/metadata"
    pb "grpc/test/src/proto""io""log""net")type StreamService struct{}const(
    PORT ="9002")funcmain(){//设置客户端最大接收值//opt := grpc.MaxRecvMsgSize()

    server := grpc.NewServer()
    pb.RegisterStreamServiceServer(server,&StreamService{})
    lis, err := net.Listen("tcp",":"+PORT)if err !=nil{
        log.Fatalf("net.Listen err: %v", err)}
    server.Serve(lis)}//客户端流事rpcfunc(s *StreamService)Work(stream pb.StreamService_WorkServer)error{//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil!= err{return err
    }//设置header信息//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{//    return err//}//设置metadata,注意一元和流式的区别
    stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})

    a := stream.Context().Value("a")
    fmt.Println(a)for{
        r ,err := stream.Recv()if err == io.EOF{return stream.SendAndClose(&pb.PublicResponse{
                Resp:&pb.Item{
                    Value:"client-stream-server",
                    Value2:"client-stream-server-v2",},})}if err !=nil{return err
        }
        log.Printf("stream.Recv value: %s,value2: %s", r.Req.Value, r.Req.Value2)}}//服务端流式func(s *StreamService)Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer)error{returnnil}func(s *StreamService)Sleep(stream pb.StreamService_SleepServer)error{returnnil}

验证

->%go run src/client/client-stream-client/main.go
req:<value:"valueWork" value2:"value1Work"> 
req:<value:"valueWork" value2:"value1Work"> 
req:<value:"valueWork" value2:"value1Work"> 
req:<value:"valueWork" value2:"value1Work"> 
req:<value:"valueWork" value2:"value1Work"> 
req:<value:"valueWork" value2:"value1Work">2021/06/0213:51:54 resp: value1 client-stream-server, value1 client-stream-server-v2
map[cc1:[dd1]]->%go run src/server/client-stream_server/mian.go<nil>2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/0213:51:54 stream.Recv value: valueWork,value2: value1Work

分析

客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PyN9Izz2-1622633151074)(readme.assets/image-20210602135900704.png)]

  1. 服务端是可以设置grpc.MaxRecvMsgSize()的接收大小的 默认是102410244= 4m大小 还可以设置grpc.MaxSendMsgSize() 发送的大小,默认是int32,超出会报错
  2. stream.SendAndClose 当发现client的流关闭之后,需要将最终的结果响应给客户端,同时关闭在另一侧的recv
  3. stream.CloseAndRecv 就是和上面的一起使用的

4、客户端、服务端流式

客户端

package main

import("context""google.golang.org/grpc"
    pb "grpc/test/src/proto""io""log")const(
    PORT ="9002")funcmain(){
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err !=nil{
        log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()
    client := pb.NewStreamServiceClient(conn)

    err =printSleep(client,&pb.PublicRequest{
        Req:&pb.Item{
            Value:"valueSleep",
            Value2:"value1Sleep",},})if err !=nil{
        log.Fatalf("printSleep.err: %v", err)}}//双向流funcprintSleep(client pb.StreamServiceClient, r *pb.PublicRequest)error{
    stream, err := client.Sleep(context.Background())if err !=nil{return err
    }for n :=0; n <=6; n++{
        err = stream.Send(r)if err !=nil{return err
        }
        resp, err := stream.Recv()if err == io.EOF {break}if err !=nil{return err
        }
        log.Printf("resp: value1: %s, value2: %s", resp.Resp.Value, resp.Resp.Value2)}if err = stream.CloseSend();nil!= err{
        log.Println(err)}returnnil}

服务端

package main

import("google.golang.org/grpc"
    pb "grpc/test/src/proto""io""log""net")type StreamService struct{}const(
    PORT ="9002")funcmain(){//设置客户端最大接收值//opt := grpc.MaxRecvMsgSize()//grpc.MaxSendMsgSize()

    server := grpc.NewServer()
    pb.RegisterStreamServiceServer(server,&StreamService{})
    lis, err := net.Listen("tcp",":"+PORT)if err !=nil{
        log.Fatalf("net.Listen err: %v", err)}
    server.Serve(lis)}//客户端流事rpcfunc(s *StreamService)Work(stream pb.StreamService_WorkServer)error{returnnil}//服务端流式func(s *StreamService)Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer)error{returnnil}//双向流func(s *StreamService)Sleep(stream pb.StreamService_SleepServer)error{
    n :=0for{
        err := stream.Send(&pb.PublicResponse{
            Resp:&pb.Item{
                Value:"gPRC Stream Client: Sleep",
                Value2:"双向stream-value2",},})if err !=nil{return err
        }
        r, err := stream.Recv()if err == io.EOF {returnnil}if err !=nil{return err
        }
        n++
        log.Printf("stream.Recv req.value: %s, pt.value2: %s", r.Req.Value, r.Req.Value2)}}

验证

->%go run src/client/clientServer-stream_client/main.go2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/0214:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2

->%go run src/server/clientServer-stream_server/main.go2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/0214:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep

简单的介绍了几种rpc的流的使用,大家根据需求,合理使用

标签:

本文转载自: https://blog.csdn.net/qq_39787367/article/details/117480171
版权归原作者 a...Z 所有, 如有侵权,请联系我们删除。

“grpc流式使用和注意事项”的评论:

还没有评论