golang grcp

In the era of cloud-native and microservices-dominated architectural models, there are only two types of communication protocols used for internal service interactions: HTTP API (RESTful API) and RPC. With today’s hardware configurations and network conditions, modern RPC implementations generally perform better than HTTP API. We compare json over http with gRPC(insecure), using ghz and hey stress test the gRPC and json over http implementations. The performance of gRPC (Requests/sec: 59924.34) is 20% higher than the http api performance (Requests/sec: 49969.9234). The codec performance of protobuf used by gPRC is measured to be 2-3 times of the fastest json codec, and more than 10 times of the Go standard library json package codec performance.

For performance-sensitive systems with few internal communication protocol changes, using RPC for internal services is probably the choice of most people. Although gRPC is not the best RPC implementation in terms of performance, it is naturally the most widely used by developers because it is backed by Google and is the only RPC project of the CNCF.

In this article, we will also talk about gRPC, but we will focus more on the gRPC client, and we will take a look at the things we consider when using the gRPC client (all code in this article is based on gRPC v1.40.0, Go 1.17.

1. The default gRPC client

gRPC supports four communication modes, which are (the following four diagrams are taken from [the book “gRPC: Up and Running”])

  • Simple RPC (Simple RPC): the simplest and most commonly used gRPC communication mode, which is simply one request, one response

one request, one response

  • Server-streaming RPC (Server-streaming RPC): one request, many responses

one request, many responses

  • Client-streaming RPC (Client-streaming RPC): Multiple requests, one response

Multiple requests, one response

  • Bidirectional-Streaming RPC (BSRPC): Multiple requests, multiple responses

one request, many responses

Let’s take the most commonly used Simple RPC (also known as Unary RPC) as an example to see how to implement a gRPC version of helloworld.

We don’t need to write the helloworld.proto from scratch and generate the corresponding gRPC code ourselves. There is an official example of helloworld provided by gRPC, and we only need to modify it a little.

The IDL file helloworld.proto of the helloworld example is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// https://github.com/grpc/grpc-go/tree/master/examples/helloworld/helloworld/helloworld.proto

syntax = "proto3";

option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

You can refer to the grpc official documentation for an explanation of the specification of the .proto file, so I won’t go over it here. Obviously the above IDL is the ultimate simplicity. Here a service is defined: Greeter, which contains only one method SayHello, and the parameters and return value of this method are a structure containing only a string field.

We don’t need to execute the protoc command manually to generate the corresponding Greeter service implementation and HelloRequest and HelloReply protobuf codec implementation based on the .proto file, because gRPC has already placed the generated Go source file under example, we can refer to it directly. Note here that the latest grpc-go project repository adopts a multi-module management model, and examples exists as an independent go module, so we need to import it into its user’s project as a separate module. Take the gRPC client greeter_client as an example, its go.mod should be written like this

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo1/greeter_client/go.mod
module github.com/bigwhite/grpc-client/demo1

go 1.17

require (
    google.golang.org/grpc v1.40.0
    google.golang.org/grpc/examples v1.40.0
)

require (
    github.com/golang/protobuf v1.4.3 // indirect
    golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
    golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f // indirect
    golang.org/x/text v0.3.3 // indirect
    google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect
    google.golang.org/protobuf v1.25.0 // indirect
)

replace google.golang.org/grpc v1.40.0 => /Users/tonybai/Go/src/github.com/grpc/grpc-go

replace google.golang.org/grpc/examples v1.40.0 => /Users/tonybai/Go/src/github.com/grpc/grpc-go/examples

Note: The tag of the grpc-go project seems to have a problem, because there is no grpc/examples/v1.40.0 tag, the go command cannot find the examples in the v1.40.0 tag of grpc-go, so a replace trick is used in the above go.mod (the v1.40.0 tag of the example module’s v1.40.0 version is false) to point the examples module to the local code.

We also modified the two ends of the gRPC communication slightly. The original greeter_client only sends one request and then exits, here we change it to send requests every 2s (for easy follow-up), as shown in the following code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo1/greeter_client/main.go
... ...
func main() {
    // Set up a connection to the server.
    ctx, cf1 := context.WithTimeout(context.Background(), time.Second*3)
    defer cf1()
    conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)

    // Contact the server and print out its response.
    name := defaultName
    if len(os.Args) > 1 {
        name = os.Args[1]
    }

    for i := 0; ; i++ {
        ctx, _ := context.WithTimeout(context.Background(), time.Second)
        r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("%s-%d", name, i+1)})
        if err != nil {
            log.Fatalf("could not greet: %v", err)
        }
        log.Printf("Greeting: %s", r.GetMessage())
        time.Sleep(2 * time.Second)
    }
}

greeter_server adds a command line option -port and supports graceful exit of the gRPC server.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo1/greeter_server/main.go
... ...

var port int

func init() {
    flag.IntVar(&port, "port", 50051, "listen port")
}

func main() {
    flag.Parse()
    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})

    go func() {
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
    }()

    var c = make(chan os.Signal)
    signal.Notify(c, os.Interrupt, os.Kill)
    <-c
    s.Stop()
    fmt.Println("exit")
}

After fixing go.mod and modifying client and server ok, we can build and run greeter_client and greeter_server.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
编译和启动server:

$cd grpc-client/demo1/greeter_server
$make
$./demo1-server -port 50051
2021/09/11 12:10:33 Received: world-1
2021/09/11 12:10:35 Received: world-2
2021/09/11 12:10:37 Received: world-3
... ...

编译和启动client:
$cd grpc-client/demo1/greeter_client
$make
$./demo1-client
2021/09/11 12:10:33 Greeting: Hello world-1
2021/09/11 12:10:35 Greeting: Hello world-2
2021/09/11 12:10:37 Greeting: Hello world-3
... ...

We see: greeter_client and greeter_server start up and can communicate normally! Let’s focus on greeter_client.

The target parameter passed by greeter_client to DialContext during Dial server is a static service address.

1
2
3
const (
      address     = "localhost:50051"
)

This form of target is parsed by google.golang.org/grpc/internal/grpcutil.ParseTarget and returns a resolver.Target with a value of nil. So gRPC uses the default scheme: passthrough (github.com/grpc/grpc-go/resolver/resolver.go), and the default " passthrough" scheme, gRPC will use the built-in passthrough resolver (google.golang.org/grpc/internal/resolver/passthrough). How does this passthrough resolver by default set the address of the service to connect to? The following is an excerpt from the passthrough resolver code.

1
2
3
4
5
// github.com/grpc/grpc-go/internal/resolver/passthrough/passthrough.go

func (r *passthroughResolver) start() {
    r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}

We see that it passes the target.Endpoint, i.e. localhost:50051, directly to ClientConnection (r.cc in the above code), which will establish a tcp connection to this address. This corresponds to the name of the resolver: passthrough .

The above greeter_client connection is only one instance of the service, if we start three instances of the service at the same time, for example by using goreman to load a script file to start multiple service instances by loading a script file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo1/greeter_server/Procfile

# Use goreman to run `go get github.com/mattn/goreman`
demo1-server1: ./demo1-server -port 50051
demo1-server2: ./demo1-server -port 50052
demo1-server3: ./demo1-server -port 50053

同时启动多实例:

$goreman start
15:22:12 demo1-server3 | Starting demo1-server3 on port 5200
15:22:12 demo1-server2 | Starting demo1-server2 on port 5100
15:22:12 demo1-server1 | Starting demo1-server1 on port 5000

So how should we tell greeter_client to connect to these three instances? Is it possible to change the address to something like the following.

1
2
3
4
const (
    address     = "localhost:50051,localhost:50052,localhost:50053"
    defaultName = "world"
)

Let’s change it, recompile greeter_client after the change, start greeter_client and we see the following result.

1
2
$./demo1-client
2021/09/11 15:26:32 did not connect: context deadline exceeded

greeter_client connects to server timeout! That means simply passing in multiple instances of addresses like the above won’t work! Then the question arises! How do we get greeter_client to connect to multiple instances of a service? Let’s continue to look down.

2. Connecting multiple instances of a Service (instance)

The target of grpc.Dial/grpc.DialContext is not simply the service address of the service instance, its real parameter (argument) determines which resolver the gRPC client will use to determine the set of addresses of the service instance .

Let’s take a StaticResolver that returns a static set of service instances (i.e., a fixed number of service instances and a fixed service address) as an example to see how the gRPC client can connect multiple instances of a Service.

1) StaticResolver

Let’s first design the target form to be passed to grpc.DialContext. For gRPC naming resolution, gRPC has special documentation on this. Here, we also create a new scheme: static, where the service addresses of multiple service instances are passed in via comma-separated strings, as in the following code.

1
2
3
4
5
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo2/greeter_client/main.go

const (
      address = "static:///localhost:50051,localhost:50052,localhost:50053"
)

When address is passed into grpc.DialContext as a real parameter to target, it is parsed by grpcutil.ParseTarget into a resolver. This structure contains three fields.

1
2
3
4
5
6
// github.com/grpc/grpc-go/resolver/resolver.go
type Target struct {
    Scheme    string
    Authority string
    Endpoint  string
}

Scheme is “static”, Authority is empty, and Endpoint is “localhost:50051,localhost:50052,localhost:50053″.

Next, gRPC will look for the corresponding Resolver Builder instance in the builder map of the resolver package based on the value of Target. So far none of the built-in resolver builders in gRPC can match the Scheme value. It’s time to customize a StaticResolver Builder!

The grpc resolve package defines the interface that a Builder instance needs to implement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// github.com/grpc/grpc-go/resolver/resolver.go 

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
    // Build creates a new resolver for the given target.
    //
    // gRPC dial calls Build synchronously, and fails if the returned error is
    // not nil.
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    // Scheme returns the scheme supported by this resolver.
    // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
    Scheme() string
}

The Scheme method returns the scheme corresponding to this Builder, while the Build method is the real method used to build the Resolver instance, let’s look at the implementation of StaticBuilder.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo2/greeter_client/builder.go

func init() {
    resolver.Register(&StaticBuilder{}) //在init函数中将StaticBuilder实例注册到resolver包的Resolver map中
}

type StaticBuilder struct{}

func (sb *StaticBuilder) Build(target resolver.Target, cc resolver.ClientConn,
    opts resolver.BuildOptions) (resolver.Resolver, error) {

    // 解析target.Endpoint (例如:localhost:50051,localhost:50052,localhost:50053)
    endpoints := strings.Split(target.Endpoint, ",")

    r := &StaticResolver{
        endpoints: endpoints,
        cc:        cc,
    }
    r.ResolveNow(resolver.ResolveNowOptions{})
    return r, nil
}

func (sb *StaticBuilder) Scheme() string {
    return "static" // 返回StaticBuilder对应的scheme字符串
}

In this StaticBuilder implementation, the init function registers a StaticBuilder instance into the Resolver map of the resolver package upon package initialization. The build method is the key to the StaticBuilder. In this method, it first resolves the incoming target. StaticResolver instance, and calls the ResolveNow method of the StaticResolver instance to determine the set of service instances to be connected.

Like Builder, grpc’s resolver package also defines the Resolver interface that each resolver needs to implement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// github.com/grpc/grpc-go/resolver/resolver.go 

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
    // ResolveNow will be called by gRPC to try to resolve the target name
    // again. It's just a hint, resolver can ignore this if it's not necessary.
    //
    // It could be called multiple times concurrently.
    ResolveNow(ResolveNowOptions)
    // Close closes the resolver.
    Close()
}

We can also see from this interface annotation that the Resolver implementation is responsible for monitoring (watch) the address and configuration changes measured by the service and updating the changes to the grpc’s ClientConn. Let’s take a look at our StaticResolver implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo2/greeter_client/resolver.go

type StaticResolver struct {
    endpoints []string
    cc        resolver.ClientConn
    sync.Mutex
}

func (r *StaticResolver) ResolveNow(opts resolver.ResolveNowOptions) {
    r.Lock()
    r.doResolve()
    r.Unlock()
}

func (r *StaticResolver) Close() {
}

func (r *StaticResolver) doResolve() {
    var addrs []resolver.Address
    for i, addr := range r.endpoints {
        addrs = append(addrs, resolver.Address{
            Addr:       addr,
            ServerName: fmt.Sprintf("instance-%d", i+1),
        })
    }

    newState := resolver.State{
        Addresses: addrs,
    }

    r.cc.UpdateState(newState)
}

Note: The annotation of the resolver.Resolver interface requires that the ResolveNow method is to support concurrency safety, so here we implement synchronization via sync.

Since the number of service addresses and information on the service side are unchanged, there is no watch and update process here, but only the implementation of ResolveNow (and called in the Build method in the Builder), which updates the address set of the service instance in ResolveNow to ClientConnection(r.cc).

Next, let’s compile and run the client and server of demo2.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
$cd grpc-client/demo2/greeter_server
$make
$goreman start
22:58:21 demo2-server1 | Starting demo2-server1 on port 5000
22:58:21 demo2-server2 | Starting demo2-server2 on port 5100
22:58:21 demo2-server3 | Starting demo2-server3 on port 5200

$cd grpc-client/demo2/greeter_client
$make
$./demo2-client

After executing for a while, you will find a problem in the logs on the server side, as shown in the following log.

1
2
3
4
5
6
7
8
9
22:57:16 demo2-server1 | 2021/09/11 22:57:16 Received: world-1
22:57:18 demo2-server1 | 2021/09/11 22:57:18 Received: world-2
22:57:20 demo2-server1 | 2021/09/11 22:57:20 Received: world-3
22:57:22 demo2-server1 | 2021/09/11 22:57:22 Received: world-4
22:57:24 demo2-server1 | 2021/09/11 22:57:24 Received: world-5
22:57:26 demo2-server1 | 2021/09/11 22:57:26 Received: world-6
22:57:28 demo2-server1 | 2021/09/11 22:57:28 Received: world-7
22:57:30 demo2-server1 | 2021/09/11 22:57:30 Received: world-8
22:57:32 demo2-server1 | 2021/09/11 22:57:32 Received: world-9

There are clearly three addresses in our Service instance collection, why only server1 received the rpc request, the other two servers are in idle state? This is the client’s load balancing policy at work! By default, grpc selects the built-in “pick_first” load balancing policy for the client, i.e., it selects the first intance in the service instance set for the request. In this example, with the pick_first policy, grpc will always choose demo2-server1 to initiate the rpc request.

If we want to send requests to individual servers, we can change the load balancing policy to another built-in policy: round_robin, like the following code.

1
2
3
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo2/greeter_client/main.go

conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithBalancerName("round_robin"))

After recompiling and running greeter_client, we can see in the server test that the rpc requests are polled to each server instance.

2) Resolver Principle

Let’s use a diagram to sort out how the Builder and Resolver work.

how the Builder and Resolver work

The SchemeResolver in the diagram refers to a resolver that implements a particular scheme. as shown in the diagram, the steps of the service instance collection resolve process are roughly as follows.

  • SchemeBuilder registers its own instance to the map of the resolver package.
  • grpc.Dial/DialContext when using a specific form of target parameter
  • After parsing the target, find the Buider corresponding to the Scheme in the map of the resolver package according to target.Scheme.
  • Build method of Buider is called
  • Build method to build the SchemeResolver instance.
  • Subsequently, the SchemeResolver instance monitors the service instance change status and updates the ClientConnection when there is a change.

3) NacosResolver

In the production environment, considering the high availability and scalability of services, we seldom use the fixed address and fixed number of service instance collections, and more often achieve the update of service instance collections automatically through the service registration and discovery mechanism. Here we implement a NacosResolver based on nacos to achieve automatic adjustment of the grpc Client when the service instance changes (Note: see the text appendix for the local single-node installation scheme of nacos), so that the example has real-world significance ^_^.

With the above description of the Resolver principle, here is a simplified description.

First, like StaticResolver, let’s design the form of the target. nacos has the concept of namespace, group, so we will design the target as follows.

1
nacos://[authority]/host:port/namespace/group/serviceName

Specifically in our greeter_client, its address is

1
2
3
4
5
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo3/greeter_client/main.go

const (
      address = "nacos:///localhost:8848/public/group-a/demo3-service" //no authority
)

Next we look at NacosBuilder.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo3/greeter_client/builder.go

func (nb *NacosBuilder) Build(target resolver.Target,
    cc resolver.ClientConn,
    opts resolver.BuildOptions) (resolver.Resolver, error) {

    // use info in target to access naming service
    // parse the target.endpoint
    // target.Endpoint - localhost:8848/public/DEFAULT_GROUP/serviceName, the addr of naming service :nacos endpoint
    sl := strings.Split(target.Endpoint, "/")
    nacosAddr := sl[0]
    namespace := sl[1]
    group := sl[2]
    serviceName := sl[3]
    sl1 := strings.Split(nacosAddr, ":")
    host := sl1[0]
    port := sl1[1]
    namingClient, err := initNamingClient(host, port, namespace, group)
    if err != nil {
        return nil, err
    }

    r := &NacosResolver{
        namingClient: namingClient,
        cc:           cc,
        namespace:    namespace,
        group:        group,
        serviceName:  serviceName,
    }

    // initialize the cc's states
    r.ResolveNow(resolver.ResolveNowOptions{})

    // subscribe and watch
    r.watch()
    return r, nil
}

func (nb *NacosBuilder) Scheme() string {
    return "nacos"
}
NacosB

The process of the Build method of NacosBuilder is the same as StaticBuilder, first we also parse the incoming target’s Endpoint, i.e. “localhost:8848/public/group-a/demo3-service”, and store the parsed segments in the newly created NacosResolver instance. “The NacosResolver also needs one more piece of information, which is the connection to nacos. here we use initNamingClient to create a nacos client-side instance (call nacos provided go sdk).

Next we call NacosResolver’s ResolveNow to get a list of service instances of demo3-services on nacos and initialize ClientConn, and finally we call NacosResolver’s watch method to subscribe and monitor demo3-services for instance changes. The following is a partial implementation of NacosResolver.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo3/greeter_client/resolver.go

func (r *NacosResolver) doResolve(opts resolver.ResolveNowOptions) {
    instances, err := r.namingClient.SelectAllInstances(vo.SelectAllInstancesParam{
        ServiceName: r.serviceName,
        GroupName:   r.group,
    })
    if err != nil {
        fmt.Println(err)
        return
    }

    if len(instances) == 0 {
        fmt.Printf("service %s has zero instance\n", r.serviceName)
        return
    }

    // update cc.States
    var addrs []resolver.Address
    for i, inst := range instances {
        if (!inst.Enable) || (inst.Weight == 0) {
            continue
        }

        addrs = append(addrs, resolver.Address{
            Addr:       fmt.Sprintf("%s:%d", inst.Ip, inst.Port),
            ServerName: fmt.Sprintf("instance-%d", i+1),
        })
    }

    if len(addrs) == 0 {
        fmt.Printf("service %s has zero valid instance\n", r.serviceName)
    }

    newState := resolver.State{
        Addresses: addrs,
    }

    r.Lock()
    r.cc.UpdateState(newState)
    r.Unlock()
}

func (r *NacosResolver) ResolveNow(opts resolver.ResolveNowOptions) {
    r.doResolve(opts)
}

func (r *NacosResolver) Close() {
    r.namingClient.Unsubscribe(&vo.SubscribeParam{
        ServiceName: r.serviceName,
        GroupName:   r.group,
    })
}

func (r *NacosResolver) watch() {
    r.namingClient.Subscribe(&vo.SubscribeParam{
        ServiceName: r.serviceName,
        GroupName:   r.group,
        SubscribeCallback: func(services []model.SubscribeService, err error) {
            fmt.Printf("subcallback: %#v\n", services)
            r.doResolve(resolver.ResolveNowOptions{})
        },
    })
}

An important implementation here is the doResolve method called by both ResolveNow and watch, which gets all instances of demo-service3 via SelectAllInstances in the nacos-go sdk and updates the resulting set of enabled(=true) and weight(weight) The set of legal instances that are not 0 is updated to ClientConn(r.cc.UpdateState).

In the watch method of NacosResolver, we subscribe to demo3-service and provide a callback function via the Subscribe method in the nacos-go sdk. This callback will be called whenever the instance of demo3-service changes. In this callback we can update the ClientConn based on the latest set of service instances passed back (services []model.SubscribeService), but here we reuse the doResolve method, i.e. we go to nacos again to get an instance of demo-service3.

Compile and run greeter_server under demo3.

1
2
3
4
5
6
7
8
9
$cd grpc-client/demo3/greeter_server
$make
$goreman start
06:06:02 demo3-server3 | Starting demo3-server3 on port 5200
06:06:02 demo3-server1 | Starting demo3-server1 on port 5000
06:06:02 demo3-server2 | Starting demo3-server2 on port 5100
06:06:02 demo3-server3 | 2021-09-12T06:06:02.913+0800   INFO    nacos_client/nacos_client.go:87 logDir:</tmp/nacos/log/50053>   cacheDir:</tmp/nacos/cache/50053>
06:06:02 demo3-server2 | 2021-09-12T06:06:02.913+0800   INFO    nacos_client/nacos_client.go:87 logDir:</tmp/nacos/log/50052>   cacheDir:</tmp/nacos/cache/50052>
06:06:02 demo3-server1 | 2021-09-12T06:06:02.913+0800   INFO    nacos_client/nacos_client.go:87 logDir:</tmp/nacos/log/50051>   cacheDir:</tmp/nacos/cache/50051>

After running greeter_server, we will see all instances of demo-service3 on the nacos dashboard.

nacos dashboard nacos dashboard

Compile and run greeter_client under demo3.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$cd grpc-client/demo3/greeter_client
$make
$./demo3-client
2021-09-12T06:08:25.551+0800    INFO    nacos_client/nacos_client.go:87 logDir:</Users/tonybai/go/src/github.com/bigwhite/experiments/grpc-client/demo3/greeter_client/log>   cacheDir:</Users/tonybai/go/src/github.com/bigwhite/experiments/grpc-client/demo3/greeter_client/cache>
2021/09/12 06:08:25 Greeting: Hello world-1
2021/09/12 06:08:27 Greeting: Hello world-2
2021/09/12 06:08:29 Greeting: Hello world-3
2021/09/12 06:08:31 Greeting: Hello world-4
2021/09/12 06:08:33 Greeting: Hello world-5
2021/09/12 06:08:35 Greeting: Hello world-6
... ...

Due to the round robin load policy, each server (with a weight of 1) on the greeter_server side will receive rpc requests equally.

1
2
3
4
5
6
7
06:06:36 demo3-server1 | 2021/09/12 06:06:36 Received: world-1
06:06:38 demo3-server3 | 2021/09/12 06:06:38 Received: world-2
06:06:40 demo3-server2 | 2021/09/12 06:06:40 Received: world-3
06:06:42 demo3-server1 | 2021/09/12 06:06:42 Received: world-4
06:06:44 demo3-server3 | 2021/09/12 06:06:44 Received: world-5
06:06:46 demo3-server2 | 2021/09/12 06:06:46 Received: world-6
... ...

At this point we can adjust the instance weight of demo3-service or take offline an instance through nacos dashboard, for example, take offline service-instance-2 (port 50052), after that we will see the greeter_client callback function executed, after that only the greeter_server side will instance-1 and instance-3 will receive rpc requests. After bringing service instance-2 back online, everything will be back to normal.

3. Customizing the client balancer

In reality, the server-side instances are deployed on different hosts (VMs/containers) with different computing power, so if all instances use the same weight1, it is definitely unscientific and wastes computing power. However, the built-in balancer of grpc-go is limited and cannot meet our needs, so we need to customize a balancer that can meet our needs.

Here we take a custom Weighted Round Robin (wrr) balancer as an example, and see the steps to customize the balancer (we refer to the built-in implementation of round_robin in grpc-go) master/balancer/roundrobin)).

Similar to the resolver package, balancer is instantiated by a Builder (creation schema), and the Balancer interface of balancer is similar to resolver.Balancer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// github.com/grpc/grpc-go/balancer/balancer.go 

// Builder creates a balancer.
type Builder interface {
    // Build creates a new balancer with the ClientConn.
    Build(cc ClientConn, opts BuildOptions) Balancer
    // Name returns the name of balancers built by this builder.
    // It will be used to pick balancers (for example in service config).
    Name() string
}

Build method we build an implementation of the Balancer interface, the Balancer interface is defined as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// github.com/grpc/grpc-go/balancer/balancer.go 

type Balancer interface {
    // UpdateClientConnState is called by gRPC when the state of the ClientConn
    // changes.  If the error returned is ErrBadResolverState, the ClientConn
    // will begin calling ResolveNow on the active name resolver with
    // exponential backoff until a subsequent call to UpdateClientConnState
    // returns a nil error.  Any other errors are currently ignored.
    UpdateClientConnState(ClientConnState) error
    // ResolverError is called by gRPC when the name resolver reports an error.
    ResolverError(error)
    // UpdateSubConnState is called by gRPC when the state of a SubConn
    // changes.
    UpdateSubConnState(SubConn, SubConnState)
    // Close closes the balancer. The balancer is not required to call
    // ClientConn.RemoveSubConn for its existing SubConns.
    Close()
}

As you can see, Balancer is much more complex than Resolver. gRPC’s core developers saw this and provided a package that simplifies the creation of custom Balancers: google.golang.org/grpc/balancer/base. gRPC’s built-in round_ robin Balancer is also based on the base package.

The base package provides NewBalancerBuilder to quickly return an implementation of balancer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// github.com/grpc/grpc-go/balancer/base/base.go 

// NewBalancerBuilder returns a base balancer builder configured by the provided config.
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
    return &baseBuilder{
        name:          name,
        pickerBuilder: pb,
        config:        config,
    }
}

We see that this function takes one parameter: pb, which is of type PikcerBuilder, and this interface type is relatively simple.

1
2
3
4
5
6
7
// github.com/grpc/grpc-go/balancer/base/base.go 

// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
    // Build returns a picker that will be used by gRPC to pick a SubConn.
    Build(info PickerBuildInfo) balancer.Picker
}

We only need to provide an implementation of PickerBuilder and an implementation of balancer.Picker, which is an interface type with only one method.

1
2
3
4
5
// github.com/grpc/grpc-go/balancer/balancer.go 

type Picker interface {
    Pick(info PickInfo) (PickResult, error)
}

Nested somewhat more, we use the following diagram to watch the creation and use process of balancer directly.

balancer

To recapitulate the general process.

  • First register a balancer Builder:wrrBuilder named “my_weighted_round_robin”, which is built by the NewBalancerBuilder of the base package.
  • The NewBalancerBuilder function of the base package requires a PickerBuilder implementation to be passed in, so we need to customize a PickerBuilder that returns a Picker interface implementation.
  • grpc.Dial is called by passing in a WithBalancerName(“my_weighted_round_robin”). grpc selects our implementation from the registered balancer builder by balancer Name and call wrrrBuilder to create Picker: wrrPicker.
  • When the grpc implementation rpc calls SayHello, the Pick method of wrrrPicker is called, a Connection is selected, and an rpc request is sent on that connection.

Due to the weight values used, our resolver implementation needs to make some changes, mainly setting the weight of the service instance to the ClientConnection via Attribute in the doResolve method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo4/greeter_client/resolver.go

func (r *NacosResolver) doResolve(opts resolver.ResolveNowOptions) {
    instances, err := r.namingClient.SelectAllInstances(vo.SelectAllInstancesParam{
        ServiceName: r.serviceName,
        GroupName:   r.group,
    })
    if err != nil {
        fmt.Println(err)
        return
    }

    if len(instances) == 0 {
        fmt.Printf("service %s has zero instance\n", r.serviceName)
        return
    }

    // update cc.States
    var addrs []resolver.Address
    for i, inst := range instances {
        if (!inst.Enable) || (inst.Weight == 0) {
            continue
        }

        addr := resolver.Address{
            Addr:       fmt.Sprintf("%s:%d", inst.Ip, inst.Port),
            ServerName: fmt.Sprintf("instance-%d", i+1),
        }
        addr.Attributes = addr.Attributes.WithValues("weight", int(inst.Weight)) //考虑权重并纳入cc的状态中
        addrs = append(addrs, addr)
    }

    if len(addrs) == 0 {
        fmt.Printf("service %s has zero valid instance\n", r.serviceName)
    }

    newState := resolver.State{
        Addresses: addrs,
    }

    r.Lock()
    r.cc.UpdateState(newState)
    r.Unlock()
}

Next we focus on the implementation of wrrPickerBuilder and wrrPicker in greeter_client.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// https://github.com/bigwhite/experiments/tree/master/grpc-client/demo4/greeter_client/balancer.go

type wrrPickerBuilder struct{}

func (*wrrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
    if len(info.ReadySCs) == 0 {
        return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
    }

    var scs []balancer.SubConn
    // 提取已经就绪的connection的权重信息,作为Picker实例的输入
    for subConn, addr := range info.ReadySCs {
        weight := addr.Address.Attributes.Value("weight").(int)
        if weight <= 0 {
            weight = 1
        }
        for i := 0; i < weight; i++ {
            scs = append(scs, subConn)
        }
    }

    return &wrrPicker{
        subConns: scs,
        // Start at a random index, as the same RR balancer rebuilds a new
        // picker when SubConn states change, and we don't want to apply excess
        // load to the first server in the list.
        next: rand.Intn(len(scs)),
    }
}

type wrrPicker struct {
    // subConns is the snapshot of the roundrobin balancer when this picker was
    // created. The slice is immutable. Each Get() will do a round robin
    // selection from it and return the selected SubConn.
    subConns []balancer.SubConn

    mu   sync.Mutex
    next int
}

// 选出一个Connection
func (p *wrrPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    p.mu.Lock()
    sc := p.subConns[p.next]
    p.next = (p.next + 1) % len(p.subConns)
    p.mu.Unlock()
    return balancer.PickResult{SubConn: sc}, nil
}

This is a simple Weighted Round Robin implementation, the weighting algorithm is very simple, if the weight of a conn is n, then add n conn in the weighted result set, so that in the subsequent Pick does not need to consider the weighting problem, just to the ordinary Round Robin as Pick out one by one.

After running demo4 greeter_server, we change the weight of instance-1 to 5 at nacos, and we subsequently see the following output.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$goreman start
09:20:18 demo4-server3 | Starting demo4-server3 on port 5200
09:20:18 demo4-server2 | Starting demo4-server2 on port 5100
09:20:18 demo4-server1 | Starting demo4-server1 on port 5000
09:20:18 demo4-server2 | 2021-09-12T09:20:18.633+0800   INFO    nacos_client/nacos_client.go:87 logDir:</tmp/nacos/log/50052>   cacheDir:</tmp/nacos/cache/50052>
09:20:18 demo4-server1 | 2021-09-12T09:20:18.633+0800   INFO    nacos_client/nacos_client.go:87 logDir:</tmp/nacos/log/50051>   cacheDir:</tmp/nacos/cache/50051>
09:20:18 demo4-server3 | 2021-09-12T09:20:18.633+0800   INFO    nacos_client/nacos_client.go:87 logDir:</tmp/nacos/log/50053>   cacheDir:</tmp/nacos/cache/50053>
09:20:23 demo4-server2 | 2021/09/12 09:20:23 Received: world-1
09:20:25 demo4-server3 | 2021/09/12 09:20:25 Received: world-2
09:20:27 demo4-server1 | 2021/09/12 09:20:27 Received: world-3
09:20:29 demo4-server2 | 2021/09/12 09:20:29 Received: world-4
09:20:31 demo4-server3 | 2021/09/12 09:20:31 Received: world-5
09:20:33 demo4-server1 | 2021/09/12 09:20:33 Received: world-6
09:20:35 demo4-server2 | 2021/09/12 09:20:35 Received: world-7
09:20:37 demo4-server3 | 2021/09/12 09:20:37 Received: world-8
09:20:39 demo4-server1 | 2021/09/12 09:20:39 Received: world-9
09:20:41 demo4-server2 | 2021/09/12 09:20:41 Received: world-10
09:20:43 demo4-server1 | 2021/09/12 09:20:43 Received: world-11
09:20:45 demo4-server2 | 2021/09/12 09:20:45 Received: world-12
09:20:47 demo4-server3 | 2021/09/12 09:20:47 Received: world-13
//这里将权重改为5后
09:20:49 demo4-server1 | 2021/09/12 09:20:49 Received: world-14
09:20:51 demo4-server1 | 2021/09/12 09:20:51 Received: world-15
09:20:53 demo4-server1 | 2021/09/12 09:20:53 Received: world-16
09:20:55 demo4-server1 | 2021/09/12 09:20:55 Received: world-17
09:20:57 demo4-server1 | 2021/09/12 09:20:57 Received: world-18
09:20:59 demo4-server2 | 2021/09/12 09:20:59 Received: world-19
09:21:01 demo4-server3 | 2021/09/12 09:21:01 Received: world-20
09:21:03 demo4-server1 | 2021/09/12 09:21:03 Received: world-21

Note: Each time the service instance of nacos changes, balancer will rebuild a new Picker instance and subsequently use the new Picker instance to Pick out a conn in its Connection collection.

4. Summary

In this article we have learned about the four communication modes of gRPC. We focused on the things to consider on the gRPC Client side in the most common simple RPC (unary RPC) mode, including.

  • How to implement a helloworld for one-to-one communication
  • How to implement a custom Resolver to enable communication from a client to a collection of static service instances
  • How to implement a custom Resolver to enable communication from a client to a collection of dynamic service instances
  • How to customize the client Balancer

The code in this article is for example use only and does not consider much exception handling.

All code involved in this article can be downloaded from here.