In Web services, in addition to the actual business code, often need to achieve a unified record of request logs, rights management or exception handling and other functions, these in the web framework Gin or Django can be achieved through middleware, while gRPC can use interceptor, rpc request or response to intercept processing.

gRPC server and client can implement their own interceptor, according to the two types of rpc requests can be divided into two kinds.

  • Unary Interceptor
  • Stream Interceptor

Unary Interceptor

For Unary Server Interceptor, just define the UnaryServerInterceptor method, where handler(ctx, req) is to call the rpc method.

1
2
3
4
5
6
7
8
type UnaryServerInterceptor func(
    ctx context.Context, 
    req interface{},     
    info *UnaryServerInfo,
    handler UnaryHandler  
) (interface{}, error){
    return handler(ctx, req)
}

For the Unary Client Interceptor, you need to define a method UnaryClientInterceptor, which executes invoker() to actually request the rpc.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type UnaryClientInterceptor func(
    ctx context.Context,  
    method string,        
    req,                  
    reply interface{},    
    cc *ClientConn,       
    invoker UnaryInvoker, 
    opts ...CallOption    
) error {
    return invoker(ctx, method, req, reply, cc, opts...)
}

The implementation of the Unary Interceptor can be divided into three parts depending on before and after the call to the handler or invoker: pre-processing before the call, calling the rpc method, and post-processing after the call.

Streaming Interceptor

The implementation of the stream interceptor is consistent with the Unary Interceptor, just implement the methods provided, and the method parameters mean the following.

1
2
3
4
5
6
7
8
type StreamServerInterceptor func(
    srv interface{},        // rpc请求参数
    ss ServerStream,        // 服务端stream对象
    info *StreamServerInfo, // rpc方法信息
    handler StreamHandler   // rpc方法本身
) (err error){
    return handler(src, ss)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type StreamClientInterceptor func(
    ctx context.Context,  
    desc *StreamDesc,     
    cc *ClientConn,       
    method string,        
    streamer Streamer,    
    opts ...CallOption     // options
)(ClientStream, error){
    // Stream operation pre-processing
    clientStream, err := streamer(ctx, desc, cc, method, opts...)
    // Intercept stream operations via clientStream based on certain conditions
    return clientStream, err
}

Unlike other interceptors, the implementation of client-side stream interceptor is divided into two parts, stream operation pre-processing and stream operation interception, which cannot be called and post-processed by rpc methods afterwards, but can only intercept stream operations through ClientStream objects, such as calling ClientStream.CloseSend() according to a specific metadata to terminate the stream operation.

Example

Here we will write a demo of each of the above four interceptors simply outputting the request log to see the actual effect.

The demo directory structure is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
rpc
├── base.proto
├── base
│   ├── base.pb.go
│   └── base_grpc.pb.go
├── client
│   └── main.go
├── server
│    └── main.go
├── go.mod
├── go.sum

The base.proto file is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
syntax = "proto3";

package proto;

option go_package = "base;base";


service BaseService {
  rpc GetTime (TimeRequest) returns (TimeResponse){}
  rpc Streaming (stream StreamRequest) returns (stream StreamResponse){}
}

message TimeRequest {}

message TimeResponse {
  string time = 1;
}

message StreamRequest{
  string input = 1;
}

message StreamResponse{
  string output = 1;
}

Execute the command protoc --go_out=. --go-grpc_out=. base.prto to generate the corresponding pb file.

server.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
    "context"
    "log"
    "io"
    "net"
    "strconv"
    "time"

    pb "rpc/base"

    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"
)

type service struct {
    pb.UnimplementedBaseServiceServer
}

func main() {
    listen, err := net.Listen("tcp", ":50051")
    if err != nil {
        fmt.Println(err)
    }
    s := grpc.NewServer(grpc.UnaryInterceptor(UnaryServerInterceptor), grpc.StreamInterceptor(StreamServerInterceptor))
    reflection.Register(s)
    pb.RegisterBaseServiceServer(s, &service{})
    s.Serve(listen)
}

func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    log.Println("start unary")
    resp, err = handler(ctx, req)
    log.Printf("end unary %v\n", resp)
    return resp, err
}

func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    log.Println("before stream")
    err := handler(srv, ss)
    log.Println("after stream")
    return err
}

// Specific implementations
func (s *service) GetTime(ctx context.Context, in *pb.TimeRequest) (*pb.TimeResponse, error) {
    now := time.Now().Format("2006-01-02 15:04:05")
    return &pb.TimeResponse{Time: now}, nil
}

func (s *service) Streaming(stream pb.BaseService_StreamingServer) error {
    for n := 0; ; {
        res, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        v, _ := strconv.Atoi(res.Input)
        log.Println(v)
        n += v
        stream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})
    }
}

client.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
    "context"
    "io"
    "log"
    "strconv"

    pb "rpc/base"

    "google.golang.org/grpc"
)

func main() {
    conn, err := grpc.Dial(":50051", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(UnaryClientInterceptor), grpc.WithStreamInterceptor(StreamClientInterceptor))
    if err != nil {
        log.Println(err)
    }
    defer conn.Close()

    c := pb.NewBaseServiceClient(conn)

    // Execute unary, stream in sequence
    _, err = c.GetTime(context.Background(), &pb.TimeRequest{})
    if err != nil {
        log.Fatal(err)
    }
    print("===============\n")
    streaming(c)
}

func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    log.Println("before unary")
    err := invoker(ctx, method, req, reply, cc, opts...)
    log.Printf("end unary %v\n", reply)
    return err
}

func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    log.Println("before stream")
    clientStream, err := streamer(ctx, desc, cc, method, opts...)
    log.Println("check metadata")
    return clientStream, err
}

func streaming(client pb.BaseServiceClient) error {
    stream, _ := client.Streaming(context.Background())
    for n := 0; n < 10; n++ {
        log.Println("Streaming Send:", n)
        err := stream.Send(&pb.StreamRequest{Input: strconv.Itoa(n)})
        if err != nil {
            return err
        }
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        log.Println("Streaming Recv:", res.Output)
    }
    stream.CloseSend()
    return nil
}

Execute go run server/main.go and go run client/main.go in order in the rpc directory, and the output will be as follows.

grpc log

It can be clearly seen that StreamClientInterceptor outputs the log twice at the beginning of stream processing, while the remaining three interceptors output twice before and after the request.

Summary

If multiple interceptors need to be used, the corresponding four linkers are provided in grpc-go to concatenate multiple interceptors.

  • grpc.ChainUnaryInterceptor(i ...UnaryServerInterceptor)
  • grpc.ChainStreamInterceptor(i ...StreamServerInterceptor)
  • grpc.WithChainUnaryInterceptor(i ...UnaryClientInterceptor)
  • grpc.WithChainStreamInterceptor(i ...StreamClientInterceptor)

If the grpc version is too old and may not yet provide a chain api, you can use the third-party library grpc-ecosystem/go-grpc-middleware. In addition to the linker, the library also provides many common interceptors, such as grpc_zap, grpc_recovery, etc. Of course, special requirements can also be implemented by implementing the corresponding methods to achieve a custom interceptor.