HTTP/2 has two concepts, stream and frame, where frame is the smallest transmission unit of communication in HTTP/2, usually a request or response is divided into one or more frames for transmission, and stream represents a virtual channel with established connection, which can transmit multiple requests or responses. Each frame contains a Stream Identifier that identifies the stream to which it belongs. HTTP/2 achieves multiplexing through streams and frames, and requests for the same domain name can be identified by a Stream Identifier in the same stream, thus reducing connection overhead. And gRPC is based on HTTP/2 protocol transmission, which naturally also implements streaming, of which there are three types of streams in gRPC as follows

  1. server streaming
  2. client streaming
  3. bidirectional streaming

This article focuses on how to implement the three types of gRPC streaming.

Proto

By adding the keyword stream before the request or response body, the message body can be defined as a stream transport, base.proto as shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
syntax = "proto3";

package proto;

option go_package = "base;base";


service BaseService {
    // ServerStream
    rpc ServerStream (StreamRequest) returns (stream StreamResponse){}
    // ClientStream
    rpc ClientStream (stream StreamRequest) returns (StreamResponse){}
    // bidirectional streaming
    rpc Streaming (stream StreamRequest) returns (stream StreamResponse){}
}

message StreamRequest{
  string input = 1;
}

message StreamResponse{
  string output = 1;
}

Execute protoc --go_out=. --go-grpc_out=. stream.prto to generate the corresponding Go Stub.

Basic Code

server.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "context"
    "fmt"
    "net"
    "time"

    pb "rpc/base"

    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

type service struct {
    pb.UnimplementedBaseServiceServer
}

func main() {
    listen, err := net.Listen("tcp", ":50051")
    if err != nil {
        fmt.Println(err)
    }
    s := grpc.NewServer()
    reflection.Register(s)
    pb.RegisterBaseServiceServer(s, &service{})
    s.Serve(listen)
}

func (s *service) ClientStream(stream pb.BaseService_ClientStreamServer) error {
    return nil 
}

func (s *service) ServerStream(in *pb.StreamRequest, stream pb.BaseService_ServerStreamServer) error {
    return nil
}

func (s *service) Streaming(stream pb.BaseService_StreamingServer) error {
    return nil
}

client.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
    "context"
    "fmt"
    "io"
    "strconv"

    pb "rpc/base"

    "google.golang.org/grpc"
)

func main() {
    conn, err := grpc.Dial(":50051", grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        fmt.Println(err)
    }
    defer conn.Close()

    c := pb.NewBaseServiceClient(conn)
}

func clientStream(client pb.BaseServiceClient, input string) error {
    return nil
}

func serverStream(client pb.BaseServiceClient, r *pb.StreamRequest) error {
    return nil
}

func stream(client pb.BaseServiceClient) error {
    return nil
}

Server Stream

Normal gRPC will only return one response per request. Server Stream can send multiple StreamResponse by calling stream.Send() multiple times.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Server
func (s *service) ServerStream(in *pb.StreamRequest, stream pb.BaseService_ServerStreamServer) error {
    input := in.Input
    var output string
    for i := 0; i < len(input); i++ {
        output = fmt.Sprintf("index: %d, result: %s", i, string(input[i]))
            stream.Send(&pb.StreamResponse{Output: output})
    }
    return nil
}

The client code is as follows, calling stream.Recv() through a for loop, waiting to receive a response from the server and blocking until an error or the end of the stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 客户端
func serverStream(client pb.BaseServiceClient, r *pb.StreamRequest) error {
    fmt.Println("Server Stream Send:", r.Input)
    stream, _ := client.ServerStream(context.Background(), r)
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        fmt.Println("Server Stream Recv:", res.Output)
    }
    return nil
}

Start the server, the client executes serverStream(c, &pb.StreamRequest{Input: "something"}) , and the output is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Server Stream Send: something
Server Stream Recv: index: 0, result: s
Server Stream Recv: index: 1, result: o
Server Stream Recv: index: 2, result: m
Server Stream Recv: index: 3, result: e
Server Stream Recv: index: 4, result: t
Server Stream Recv: index: 5, result: h
Server Stream Recv: index: 6, result: i
Server Stream Recv: index: 7, result: n
Server Stream Recv: index: 8, result: g

It is easy to see that the client requests once and the server returns the data multiple times, thus implementing Server Stream.

Client Stream

Similar to Server Stream, except it is the server’s turn to call stream.Rece() in a for loop, receive the client message and block until the client calls stream.CloseAndRecv() to close the stream and then enter blocking listening. The server calls stream.SendAndClose(), returns the response body and closes the stream. This way the client is only responsible for sending the end of the stream, the server can end the whole stream processing in the middle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//Server
func (s *service) ClientStream(stream pb.BaseService_ClientStreamServer) error {
    output := ""
    for {
        r, err := stream.Recv()
        if err == io.EOF {
          	return stream.SendAndClose(&pb.StreamResponse{Output: output})
        }
        if err != nil {
          	fmt.Println(err)
        }
        output += r.Input
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Client
func clientStream(client pb.BaseServiceClient, input string) error {
    stream, _ := client.ClientStream(context.Background())
    for _, s := range input {
        fmt.Println("Client Stream Send:", string(s))
        err := stream.Send(&pb.StreamRequest{Input: string(s)})
        if err != nil {
       	    return err
        }
    }
    res, err := stream.CloseAndRecv()
    if err != nil {
     	  fmt.Println(err)
    }
    fmt.Println("Client Stream Recv:", res.Output)
    return nil
}

The client executes clientStream(c, "something") and the output is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Client Stream Send: s
Client Stream Send: o
Client Stream Send: m
Client Stream Send: e
Client Stream Send: t
Client Stream Send: h
Client Stream Send: i
Client Stream Send: n
Client Stream Send: g
Client Stream Recv: something

Bidirectional Streaming

The client sends a streaming request and the server responds by streaming, but the specific interaction varies depending on the logic written, similar to a chat room. After opening the chat room, how to reply, how much to reply, when to close and who to close, depending on the usage scenario. In the following example code, the client sends 0-10 to the server, and the server does a cumulative return.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// client
func streaming(client pb.BaseServiceClient) error {
    stream, _ := client.Streaming(context.Background())
    for n := 0; n < 10; n++ {
        fmt.Println("Streaming Send:", n)
        err := stream.Send(&pb.StreamRequest{Input: strconv.Itoa(n)})
        if err != nil {
            return err
        }
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        fmt.Println("Streaming Recv:", res.Output)
    }
    stream.CloseSend()
    return nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Server
func (s *service) Streaming(stream pb.BaseService_StreamingServer) error {
    for n := 0; ; {
      res, err := stream.Recv()
      if err == io.EOF {
          return nil
      }
      if err != nil {
          return err
      }
      v, _ := strconv.Atoi(res.Input)
      n += v
      stream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})
	  }
}

The client executes streaming(c) and the output is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Streaming Send: 0
Streaming Recv: 0
Streaming Send: 1
Streaming Recv: 1
Streaming Send: 2
Streaming Recv: 3
Streaming Send: 3
Streaming Recv: 6
Streaming Send: 4
Streaming Recv: 10
.....

Summary

This article just wrote a simple demo of the three types of stream processing for gRPC. In practice, the choice of which stream to use is based on the business context. For example, a bidirectional stream is similar to a chat room, or keeping a long connection type, while a unidirectional stream can be chosen when the number of transfers is large enough for the receiver to process in bulk.