update stream req

This commit is contained in:
yumaojun03 2025-08-24 14:34:22 +08:00
parent a6725de634
commit afa7743e82
4 changed files with 90 additions and 3 deletions

View File

@ -7,6 +7,8 @@ option go_package="122.51.31.227/go-course/go18/skills/rpc/protobuf/hello_servic
service HelloService { service HelloService {
// rpc // rpc
rpc Hello (Request) returns (Response); rpc Hello (Request) returns (Response);
rpc Channel (stream Request) returns (stream Response) {}
} }
message Request { message Request {

View File

@ -8,7 +8,6 @@ package hello_service
import ( import (
context "context" context "context"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"
@ -21,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion9
const ( const (
HelloService_Hello_FullMethodName = "/hello.HelloService/Hello" HelloService_Hello_FullMethodName = "/hello.HelloService/Hello"
HelloService_Channel_FullMethodName = "/hello.HelloService/Channel"
) )
// HelloServiceClient is the client API for HelloService service. // HelloServiceClient is the client API for HelloService service.
@ -31,6 +31,7 @@ const (
type HelloServiceClient interface { type HelloServiceClient interface {
// rpc 声明接口 // rpc 声明接口
Hello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) Hello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
Channel(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Request, Response], error)
} }
type helloServiceClient struct { type helloServiceClient struct {
@ -51,6 +52,19 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *Request, opts ...grp
return out, nil return out, nil
} }
func (c *helloServiceClient) Channel(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Request, Response], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &HelloService_ServiceDesc.Streams[0], HelloService_Channel_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[Request, Response]{ClientStream: stream}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type HelloService_ChannelClient = grpc.BidiStreamingClient[Request, Response]
// HelloServiceServer is the server API for HelloService service. // HelloServiceServer is the server API for HelloService service.
// All implementations must embed UnimplementedHelloServiceServer // All implementations must embed UnimplementedHelloServiceServer
// for forward compatibility. // for forward compatibility.
@ -59,6 +73,7 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *Request, opts ...grp
type HelloServiceServer interface { type HelloServiceServer interface {
// rpc 声明接口 // rpc 声明接口
Hello(context.Context, *Request) (*Response, error) Hello(context.Context, *Request) (*Response, error)
Channel(grpc.BidiStreamingServer[Request, Response]) error
mustEmbedUnimplementedHelloServiceServer() mustEmbedUnimplementedHelloServiceServer()
} }
@ -72,6 +87,9 @@ type UnimplementedHelloServiceServer struct{}
func (UnimplementedHelloServiceServer) Hello(context.Context, *Request) (*Response, error) { func (UnimplementedHelloServiceServer) Hello(context.Context, *Request) (*Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented") return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
} }
func (UnimplementedHelloServiceServer) Channel(grpc.BidiStreamingServer[Request, Response]) error {
return status.Errorf(codes.Unimplemented, "method Channel not implemented")
}
func (UnimplementedHelloServiceServer) mustEmbedUnimplementedHelloServiceServer() {} func (UnimplementedHelloServiceServer) mustEmbedUnimplementedHelloServiceServer() {}
func (UnimplementedHelloServiceServer) testEmbeddedByValue() {} func (UnimplementedHelloServiceServer) testEmbeddedByValue() {}
@ -111,6 +129,13 @@ func _HelloService_Hello_Handler(srv interface{}, ctx context.Context, dec func(
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _HelloService_Channel_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(HelloServiceServer).Channel(&grpc.GenericServerStream[Request, Response]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type HelloService_ChannelServer = grpc.BidiStreamingServer[Request, Response]
// HelloService_ServiceDesc is the grpc.ServiceDesc for HelloService service. // HelloService_ServiceDesc is the grpc.ServiceDesc for HelloService service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -123,6 +148,13 @@ var HelloService_ServiceDesc = grpc.ServiceDesc{
Handler: _HelloService_Hello_Handler, Handler: _HelloService_Hello_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{
{
StreamName: "Channel",
Handler: _HelloService_Channel_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "skills/rpc/protobuf/hello_service/interface.proto", Metadata: "skills/rpc/protobuf/hello_service/interface.proto",
} }

View File

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"io"
"log" "log"
"net" "net"
@ -38,3 +39,24 @@ func (h *HelloService) Hello(ctx context.Context, req *hello_service.Request) (*
Value: "Hello " + req.Value, Value: "Hello " + req.Value,
}, nil }, nil
} }
// 接收来自客户端的流式请求,然后不断返回
func (h *HelloService) Channel(stream grpc.BidiStreamingServer[hello_service.Request, hello_service.Response]) error {
for {
// 读取客户端发送过来的数据
msg, err := stream.Recv()
if err != nil {
// 如果遇到io.EOF表示客户端流被关闭
if err == io.EOF {
return nil
}
return err
}
// 处理消息并返回响应
if err := stream.Send(&hello_service.Response{
Value: "Hello " + msg.Value,
}); err != nil {
return err
}
}
}

View File

@ -3,7 +3,9 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"log" "log"
"time"
"122.51.31.227/go-course/go18/skills/rpc/protobuf/hello_service" "122.51.31.227/go-course/go18/skills/rpc/protobuf/hello_service"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -27,4 +29,33 @@ func main() {
panic(err) panic(err)
} }
fmt.Println(resp.Value) fmt.Println(resp.Value)
stream, err := helloServiceClient.Channel(context.Background())
if err != nil {
panic(err)
}
// 首先是向服务端发送数据
go func() {
count := 1
for {
if err := stream.Send(&hello_service.Request{Value: fmt.Sprintf("[%d] hi", count)}); err != nil {
log.Fatal(err)
}
count++
time.Sleep(time.Second)
}
}()
// 然后在循环中接收服务端返回的数据
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
} }