From afa7743e82cc9e48cd53f9110d53fdccf7a0db4b Mon Sep 17 00:00:00 2001 From: yumaojun03 <719118794@qq.com> Date: Sun, 24 Aug 2025 14:34:22 +0800 Subject: [PATCH] update stream req --- .../protobuf/hello_service/interface.proto | 2 + .../hello_service/interface_grpc.pb.go | 38 +++++++++++++++++-- skills/rpc/protobuf/service_a/main.go | 22 +++++++++++ skills/rpc/protobuf/service_b/main.go | 31 +++++++++++++++ 4 files changed, 90 insertions(+), 3 deletions(-) diff --git a/skills/rpc/protobuf/hello_service/interface.proto b/skills/rpc/protobuf/hello_service/interface.proto index 808fe97..e8bab9e 100644 --- a/skills/rpc/protobuf/hello_service/interface.proto +++ b/skills/rpc/protobuf/hello_service/interface.proto @@ -7,6 +7,8 @@ option go_package="122.51.31.227/go-course/go18/skills/rpc/protobuf/hello_servic service HelloService { // rpc 声明接口 rpc Hello (Request) returns (Response); + + rpc Channel (stream Request) returns (stream Response) {} } message Request { diff --git a/skills/rpc/protobuf/hello_service/interface_grpc.pb.go b/skills/rpc/protobuf/hello_service/interface_grpc.pb.go index 46494c4..3603830 100644 --- a/skills/rpc/protobuf/hello_service/interface_grpc.pb.go +++ b/skills/rpc/protobuf/hello_service/interface_grpc.pb.go @@ -8,7 +8,6 @@ package hello_service import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -20,7 +19,8 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - HelloService_Hello_FullMethodName = "/hello.HelloService/Hello" + HelloService_Hello_FullMethodName = "/hello.HelloService/Hello" + HelloService_Channel_FullMethodName = "/hello.HelloService/Channel" ) // HelloServiceClient is the client API for HelloService service. @@ -31,6 +31,7 @@ const ( type HelloServiceClient interface { // rpc 声明接口 Hello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) + Channel(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Request, Response], error) } type helloServiceClient struct { @@ -51,6 +52,19 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *Request, opts ...grp return out, nil } +func (c *helloServiceClient) Channel(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Request, Response], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &HelloService_ServiceDesc.Streams[0], HelloService_Channel_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[Request, Response]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type HelloService_ChannelClient = grpc.BidiStreamingClient[Request, Response] + // HelloServiceServer is the server API for HelloService service. // All implementations must embed UnimplementedHelloServiceServer // for forward compatibility. @@ -59,6 +73,7 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *Request, opts ...grp type HelloServiceServer interface { // rpc 声明接口 Hello(context.Context, *Request) (*Response, error) + Channel(grpc.BidiStreamingServer[Request, Response]) error mustEmbedUnimplementedHelloServiceServer() } @@ -72,6 +87,9 @@ type UnimplementedHelloServiceServer struct{} func (UnimplementedHelloServiceServer) Hello(context.Context, *Request) (*Response, error) { return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") } +func (UnimplementedHelloServiceServer) Channel(grpc.BidiStreamingServer[Request, Response]) error { + return status.Errorf(codes.Unimplemented, "method Channel not implemented") +} func (UnimplementedHelloServiceServer) mustEmbedUnimplementedHelloServiceServer() {} func (UnimplementedHelloServiceServer) testEmbeddedByValue() {} @@ -111,6 +129,13 @@ func _HelloService_Hello_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _HelloService_Channel_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HelloServiceServer).Channel(&grpc.GenericServerStream[Request, Response]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type HelloService_ChannelServer = grpc.BidiStreamingServer[Request, Response] + // HelloService_ServiceDesc is the grpc.ServiceDesc for HelloService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -123,6 +148,13 @@ var HelloService_ServiceDesc = grpc.ServiceDesc{ Handler: _HelloService_Hello_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Channel", + Handler: _HelloService_Channel_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "skills/rpc/protobuf/hello_service/interface.proto", } diff --git a/skills/rpc/protobuf/service_a/main.go b/skills/rpc/protobuf/service_a/main.go index d190c7a..6ac3dff 100644 --- a/skills/rpc/protobuf/service_a/main.go +++ b/skills/rpc/protobuf/service_a/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "io" "log" "net" @@ -38,3 +39,24 @@ func (h *HelloService) Hello(ctx context.Context, req *hello_service.Request) (* Value: "Hello " + req.Value, }, nil } + +// 接收来自客户端的流式请求,然后不断返回 +func (h *HelloService) Channel(stream grpc.BidiStreamingServer[hello_service.Request, hello_service.Response]) error { + for { + // 读取客户端发送过来的数据 + msg, err := stream.Recv() + if err != nil { + // 如果遇到io.EOF表示客户端流被关闭 + if err == io.EOF { + return nil + } + return err + } + // 处理消息并返回响应 + if err := stream.Send(&hello_service.Response{ + Value: "Hello " + msg.Value, + }); err != nil { + return err + } + } +} diff --git a/skills/rpc/protobuf/service_b/main.go b/skills/rpc/protobuf/service_b/main.go index 93e7e07..6c7cb49 100644 --- a/skills/rpc/protobuf/service_b/main.go +++ b/skills/rpc/protobuf/service_b/main.go @@ -3,7 +3,9 @@ package main import ( "context" "fmt" + "io" "log" + "time" "122.51.31.227/go-course/go18/skills/rpc/protobuf/hello_service" "google.golang.org/grpc" @@ -27,4 +29,33 @@ func main() { panic(err) } fmt.Println(resp.Value) + + stream, err := helloServiceClient.Channel(context.Background()) + if err != nil { + panic(err) + } + + // 首先是向服务端发送数据 + go func() { + count := 1 + for { + if err := stream.Send(&hello_service.Request{Value: fmt.Sprintf("[%d] hi", count)}); err != nil { + log.Fatal(err) + } + count++ + time.Sleep(time.Second) + } + }() + + // 然后在循环中接收服务端返回的数据 + for { + reply, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + fmt.Println(reply.GetValue()) + } }