diff --git a/skills/grpc/client/main.go b/skills/grpc/client/main.go index 1143a38..913c4f9 100644 --- a/skills/grpc/client/main.go +++ b/skills/grpc/client/main.go @@ -3,6 +3,8 @@ package main import ( "context" "fmt" + "io" + "time" "gitlab.com/go-course-project/go17/skills/grpc/service" "google.golang.org/grpc" @@ -18,11 +20,41 @@ func main() { } client := service.NewHelloServiceClient(conn) - resp, err := client.Hello(context.Background(), &service.HelloRequest{ - MyName: "bob", - }) + // resp, err := client.Hello(context.Background(), &service.HelloRequest{ + // MyName: "bob", + // }) + // if err != nil { + // panic(err) + // } + // fmt.Println(resp) + + stream, err := client.Chat(context.Background()) if err != nil { panic(err) } - fmt.Println(resp) + + // 1. 处理返回结果的在后台 + go func() { + for { + resp, err := stream.Recv() + if err != nil { + // 如果遇到io.EOF表示客户端流被关闭 + if err == io.EOF { + break + } + fmt.Println(err) + return + } + fmt.Println(resp) + } + }() + + // 2. 发送请求的在主Goroutine + for i := range 10 { + stream.Send(&service.ChatRequest{ + Id: uint64(i + 1), + Message: "test", + }) + time.Sleep(1 * time.Second) + } } diff --git a/skills/grpc/server/main.go b/skills/grpc/server/main.go index 460af26..20477f9 100644 --- a/skills/grpc/server/main.go +++ b/skills/grpc/server/main.go @@ -2,26 +2,50 @@ package main import ( "context" + "fmt" + "io" "net" "gitlab.com/go-course-project/go17/skills/grpc/service" "google.golang.org/grpc" ) -type UnimplementedHelloServiceServer struct { +type HelloServiceServer struct { service.UnimplementedHelloServiceServer } -func (UnimplementedHelloServiceServer) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) { +func (HelloServiceServer) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) { return &service.HelloResponse{ Message: "hello, " + req.MyName, }, nil } +func (HelloServiceServer) Chat(stream grpc.BidiStreamingServer[service.ChatRequest, service.ChatResponse]) error { + // 1. 获取用户的请求,然后处理, 处理完成后,通过stream 返回给客户端 + for { + // 接收一个请求 + req, err := stream.Recv() + if err != nil { + // 如果遇到io.EOF表示客户端流被关闭 + if err == io.EOF { + return nil + } + return err + } + + // 2. 处理请求 + fmt.Println(req.Message) + stream.Send(&service.ChatResponse{ + Id: req.Id, + IsSuccess: true, + }) + } +} + func main() { // 1. 示例化一个grpc server对象 server := grpc.NewServer() - service.RegisterHelloServiceServer(server, &UnimplementedHelloServiceServer{}) + service.RegisterHelloServiceServer(server, &HelloServiceServer{}) // 2. 启动grpc服务 lis, err := net.Listen("tcp", ":1234") diff --git a/skills/grpc/service/hello.pb.go b/skills/grpc/service/hello.pb.go index d9105fd..f86d63a 100644 --- a/skills/grpc/service/hello.pb.go +++ b/skills/grpc/service/hello.pb.go @@ -133,6 +133,118 @@ func (x *HelloResponse) GetMessage() string { return "" } +type ChatRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChatRequest) Reset() { + *x = ChatRequest{} + mi := &file_skills_grpc_service_hello_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChatRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChatRequest) ProtoMessage() {} + +func (x *ChatRequest) ProtoReflect() protoreflect.Message { + mi := &file_skills_grpc_service_hello_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChatRequest.ProtoReflect.Descriptor instead. +func (*ChatRequest) Descriptor() ([]byte, []int) { + return file_skills_grpc_service_hello_proto_rawDescGZIP(), []int{2} +} + +func (x *ChatRequest) GetId() uint64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ChatRequest) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +type ChatResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + IsSuccess bool `protobuf:"varint,2,opt,name=is_success,json=isSuccess,proto3" json:"is_success,omitempty"` + Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChatResponse) Reset() { + *x = ChatResponse{} + mi := &file_skills_grpc_service_hello_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChatResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChatResponse) ProtoMessage() {} + +func (x *ChatResponse) ProtoReflect() protoreflect.Message { + mi := &file_skills_grpc_service_hello_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChatResponse.ProtoReflect.Descriptor instead. +func (*ChatResponse) Descriptor() ([]byte, []int) { + return file_skills_grpc_service_hello_proto_rawDescGZIP(), []int{3} +} + +func (x *ChatResponse) GetId() uint64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ChatResponse) GetIsSuccess() bool { + if x != nil { + return x.IsSuccess + } + return false +} + +func (x *ChatResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + var File_skills_grpc_service_hello_proto protoreflect.FileDescriptor var file_skills_grpc_service_hello_proto_rawDesc = string([]byte{ @@ -154,15 +266,28 @@ var file_skills_grpc_service_hello_proto_rawDesc = string([]byte{ 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x29, 0x0a, 0x0d, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x42, 0x0a, 0x0c, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x32, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, - 0x13, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x48, 0x65, 0x6c, - 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, - 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2d, 0x63, 0x6f, 0x75, 0x72, - 0x73, 0x65, 0x2d, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, 0x67, 0x6f, 0x31, 0x37, 0x2f, - 0x73, 0x6b, 0x69, 0x6c, 0x6c, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x37, 0x0a, 0x0b, 0x43, 0x68, 0x61, 0x74, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x57, 0x0a, 0x0c, 0x43, 0x68, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x1d, 0x0a, 0x0a, 0x69, 0x73, 0x5f, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x09, 0x69, 0x73, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x18, + 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x77, 0x0a, 0x0c, 0x48, 0x65, 0x6c, 0x6c, + 0x6f, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x32, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, + 0x6f, 0x12, 0x13, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x48, + 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, 0x04, + 0x43, 0x68, 0x61, 0x74, 0x12, 0x12, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x43, 0x68, 0x61, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, + 0x2e, 0x43, 0x68, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, + 0x01, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x67, 0x6f, 0x2d, 0x63, 0x6f, 0x75, 0x72, 0x73, 0x65, 0x2d, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, + 0x74, 0x2f, 0x67, 0x6f, 0x31, 0x37, 0x2f, 0x73, 0x6b, 0x69, 0x6c, 0x6c, 0x73, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, }) var ( @@ -177,18 +302,22 @@ func file_skills_grpc_service_hello_proto_rawDescGZIP() []byte { return file_skills_grpc_service_hello_proto_rawDescData } -var file_skills_grpc_service_hello_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_skills_grpc_service_hello_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_skills_grpc_service_hello_proto_goTypes = []any{ (*HelloRequest)(nil), // 0: hello.HelloRequest (*HelloResponse)(nil), // 1: hello.HelloResponse - nil, // 2: hello.HelloRequest.ExtrasEntry + (*ChatRequest)(nil), // 2: hello.ChatRequest + (*ChatResponse)(nil), // 3: hello.ChatResponse + nil, // 4: hello.HelloRequest.ExtrasEntry } var file_skills_grpc_service_hello_proto_depIdxs = []int32{ - 2, // 0: hello.HelloRequest.extras:type_name -> hello.HelloRequest.ExtrasEntry + 4, // 0: hello.HelloRequest.extras:type_name -> hello.HelloRequest.ExtrasEntry 0, // 1: hello.HelloService.Hello:input_type -> hello.HelloRequest - 1, // 2: hello.HelloService.Hello:output_type -> hello.HelloResponse - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type + 2, // 2: hello.HelloService.Chat:input_type -> hello.ChatRequest + 1, // 3: hello.HelloService.Hello:output_type -> hello.HelloResponse + 3, // 4: hello.HelloService.Chat:output_type -> hello.ChatResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name @@ -205,7 +334,7 @@ func file_skills_grpc_service_hello_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_skills_grpc_service_hello_proto_rawDesc), len(file_skills_grpc_service_hello_proto_rawDesc)), NumEnums: 0, - NumMessages: 3, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, diff --git a/skills/grpc/service/hello.proto b/skills/grpc/service/hello.proto index 2a2d1ac..bbbe1e3 100644 --- a/skills/grpc/service/hello.proto +++ b/skills/grpc/service/hello.proto @@ -14,7 +14,20 @@ message HelloResponse { string message = 1; } +message ChatRequest { + uint64 id = 1; + string message = 2; +} + +message ChatResponse { + uint64 id = 1; + bool is_success = 2; + string message = 3; +} + // grpc 接口声明 service HelloService { rpc Hello(HelloRequest) returns(HelloResponse); + // 双向流接口 + rpc Chat(stream ChatRequest) returns(stream ChatResponse); } diff --git a/skills/grpc/service/hello_grpc.pb.go b/skills/grpc/service/hello_grpc.pb.go index fefd28f..edc881c 100644 --- a/skills/grpc/service/hello_grpc.pb.go +++ b/skills/grpc/service/hello_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion9 const ( HelloService_Hello_FullMethodName = "/hello.HelloService/Hello" + HelloService_Chat_FullMethodName = "/hello.HelloService/Chat" ) // HelloServiceClient is the client API for HelloService service. @@ -29,6 +30,8 @@ const ( // grpc 接口声明 type HelloServiceClient interface { Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) + // 双向流接口 + Chat(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ChatRequest, ChatResponse], error) } type helloServiceClient struct { @@ -49,6 +52,19 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *HelloRequest, opts . return out, nil } +func (c *helloServiceClient) Chat(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ChatRequest, ChatResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &HelloService_ServiceDesc.Streams[0], HelloService_Chat_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[ChatRequest, ChatResponse]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type HelloService_ChatClient = grpc.BidiStreamingClient[ChatRequest, ChatResponse] + // HelloServiceServer is the server API for HelloService service. // All implementations must embed UnimplementedHelloServiceServer // for forward compatibility. @@ -56,6 +72,8 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *HelloRequest, opts . // grpc 接口声明 type HelloServiceServer interface { Hello(context.Context, *HelloRequest) (*HelloResponse, error) + // 双向流接口 + Chat(grpc.BidiStreamingServer[ChatRequest, ChatResponse]) error mustEmbedUnimplementedHelloServiceServer() } @@ -69,6 +87,9 @@ type UnimplementedHelloServiceServer struct{} func (UnimplementedHelloServiceServer) Hello(context.Context, *HelloRequest) (*HelloResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") } +func (UnimplementedHelloServiceServer) Chat(grpc.BidiStreamingServer[ChatRequest, ChatResponse]) error { + return status.Errorf(codes.Unimplemented, "method Chat not implemented") +} func (UnimplementedHelloServiceServer) mustEmbedUnimplementedHelloServiceServer() {} func (UnimplementedHelloServiceServer) testEmbeddedByValue() {} @@ -108,6 +129,13 @@ func _HelloService_Hello_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _HelloService_Chat_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HelloServiceServer).Chat(&grpc.GenericServerStream[ChatRequest, ChatResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type HelloService_ChatServer = grpc.BidiStreamingServer[ChatRequest, ChatResponse] + // HelloService_ServiceDesc is the grpc.ServiceDesc for HelloService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -120,6 +148,13 @@ var HelloService_ServiceDesc = grpc.ServiceDesc{ Handler: _HelloService_Hello_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Chat", + Handler: _HelloService_Chat_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "skills/grpc/service/hello.proto", }