Overview

Since gRPC’s asynchronous call code is rather convoluted, this article is mainly used to document gRPC’s asynchronous calls.

Note that gRPC uses the CompletionQueue binding for RPC calls in order to implement asynchronous calls, which can feel strange when writing the actual code. The response, because it is asynchronous, calls CompletionQueue::Next to wait for the packet return operation. Leave an impression here first, it will be clearer when talking about the process below.

Compile and install

To complete our demo, you can install gRPC directly using the following command if you don’t have it installed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
$ export MY_INSTALL_DIR=$HOME/.local
$ mkdir -p $MY_INSTALL_DIR
$ export PATH="$MY_INSTALL_DIR/bin:$PATH"
# 工具安装
$ sudo apt install -y cmake #安装好cmke的同学省略
$ sudo apt install -y build-essential autoconf libtool pkg-config
# 下载项目
$ git clone --recurse-submodules -b v1.45.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc
# 编译安装
$ cd grpc
$ mkdir -p cmake/build
$ pushd cmake/build
$ cmake -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR ../..
$ make -j
$ make install
$ popd

The examples presented below all use the official: https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/.

Asynchronous Client

For a synchronous client, calling a remote method blocks the current thread, but asynchronous allows multiple requests to be sent at the same time without blocking.

So when we use gRPC’s asynchronous API, we need to do the following.

  1. since we are making asynchronous non-blocking requests, we must not wait for packets to be returned when we send the request.
  2. all packets are processed asynchronously by another thread to avoid blocking of the main process.
  3. the data of the return packet is passed through some medium. gRPC uses a CompletionQueue to do this.

Asynchronous Client

The overall process can be seen from the above diagram as follows.

  1. start the client and start a bypass thread to loop through the CompletionQueue data.
  2. send an asynchronous call to the server;
  3. if the server returns the packet, it will put the data into the CompletionQueue.
  4. the asynchronous thread gets the CompletionQueue data and returns it.

Let’s look at the following example.

Create

The first step is to create the client to, and then asynchronously thread the server’s packet return, as it will block while processing the packet return.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  // 创建客户端
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));

  // 起新的线程,从队列中取出结果并处理
  std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

  for (int i = 0; i < 100; i++) {
    std::string user("world " + std::to_string(i));
    // 发送rpc请求
    greeter.SayHello(user);  // The actual RPC call!
  }

  std::cout << "Press control-c to quit" << std::endl << std::endl;
  // 阻塞
  thread_.join();  // blocks forever

Asynchronous requests

Here the request will be sent by calling SayHello.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class GreeterClient {
 public:
  explicit GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}
    ...
    void SayHello(const std::string& user) {
    HelloRequest request;
    request.set_name(user);

    // 用来存储 rpc 数据
    AsyncClientCall* call = new AsyncClientCall;

    // 这里是调用 Async 方法创建 RPC 对象,但是不会理吗开始进行 RPC请求
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);

    // 初始化RPC调用
    call->response_reader->StartCall();

    // 进行RPC请求,然后 call 对象作为一个 tag 放进去,会包数据会放到 reply中
    // 这里不会阻塞等待请求
    call->response_reader->Finish(&call->reply, &call->status, (void*) call);
  }

 private:
  ...
  CompletionQueue cq_;
};

Since the packet return logic is not here, you can return directly after calling Finish without waiting. After calling this method, you can wait for the server to return the packet, and the server will stuff the packet data into cq_.

Asynchronous processing of packet returns

The logic for asynchronous packet processing is called in a loop inside the thread we created at the beginning.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;

    // 从队列里面取出回调信息,没有如果还没回包的话会阻塞
    while (cq_.Next(&got_tag, &ok)) {

      AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);

      GPR_ASSERT(ok);

      if (call->status.ok())
        // 获取到回包的数据
        std::cout << "Greeter received: " << call->reply.message() << std::endl;
      else
        std::cout << "RPC failed" << std::endl;

      // 销毁new的对象
      delete call;
    }
  }

Here will keep calling the Next method to process the server’s response, if there is no packet return will keep blocking, so here we need to start a new thread to avoid blocking the main process.

Asynchronous Server

Generally, when we write the Server side, if it is a synchronous operation, the request will be processed immediately after it is received, and then the packet will be returned to the client, and the process of returning the packet needs to wait until the whole RPC request is finished, so there is a blocking waiting process.

However, when understanding the gRPC asynchronous API, it can still feel very awkward to understand the example code at first.

First of all, gRPC makes you prepare a CallData object as a container like a pipeline operation, and then gRPC sends various events to the CallData object through ServerCompletionQueue and lets the object process them according to its own state.

Then after processing the current event, you need to manually create another CallData object, which is prepared for the next Client request, the whole process is like a pipeline.

Asynchronous Server

The above asynchronous process has a little state machine in it, all reversed by the CallData object.

CallData object

  • When the CallData object is first created, it is reversed from the CREATE state to the PROCESS state, indicating that it is waiting to receive the request.
  • After the request comes in, a CallData object is first created, then reversed to the FINISH state after processing, waiting for the end of the packet return to the Client.
  • The CallData object itself will be deleted after the packet return is completed.

After it is clear what this CallData object is used to do, let’s look at the entire Server process as follows:

  1. the Server is started, registered, and a CallData object is created, which is used to prepare for the next Client request.
  2. the created CallData object will be hosted by gRPC, and when an event comes, the event will be put into the CallData object pair, and then notified with a ServerCompletionQueue object.
  3. wait for the Client request to come…
  4. unpacking the data from the ServerCompletionQueue object when an event comes, turning it into a CallData object to call the Proceed method, then performing business logic processing and re-creating the CallData object to prepare for the next Client request.
  5. wait for the packet return to the Client to finish.
  6. Continue to process the event events returned by ServerCompletionQueue and clean up its own CallData object.

Take a look at the code below.

Start registration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  void Run() {
    // 启动注册监听
    std::string server_address("0.0.0.0:50051"); 
    ServerBuilder builder; 
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); 
    builder.RegisterService(&service_); 
    cq_ = builder.AddCompletionQueue(); 
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;
    // 处理RPC
    HandleRpcs();
  }

Start the main process

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  std::unique_ptr<ServerCompletionQueue> cq_;
    void HandleRpcs() { 
    // 这里其实是异步逻辑的处理地方
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) { 
      // Next方法会阻塞,直到有下个请求过来,才会继续往下走
      GPR_ASSERT(cq_->Next(&tag, &ok));
      // 必须检查 Next 的返回值,这个返回值告诉我们是有事件到来
      // 还是 cq_ 正在关闭。
      GPR_ASSERT(ok);
      // 转成 CallData 调用 Proceed
      static_cast<CallData*>(tag)->Proceed();
    }
  }

The main flow here will create CallData object and then continuously loop through the events from the cq object, which is a waiting queue and will keep blocking when there are no events coming. When there is an event, the tag will be taken from cq_ and converted into a CallData object to call the Proceed method.

Create CallData & Logical Processing & Completion

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ServerImpl final {
    ...
 private: 
  class CallData {
   public: 
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { 
      Proceed();
    }

    void Proceed() {
      if (status_ == CREATE) { 
        // 首次进入,改变状态到 PROCESS
        status_ = PROCESS; 
        // 需要注意的是,这里将 this 作为 tag 塞入到请求中作为唯一识别请求
        service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
      } else if (status_ == PROCESS) { 
        // 在我们处理这个请求之前,创建一个新的 CallData 实例用于处理未来
        // 的新请求。实例会在它的 FINISH 状态流程中释放自己占用的内存
        new CallData(service_, cq_); 
        // 实际的业务逻辑了
        std::string prefix("Hello ");
        // 从request_中获取client请求数据,并设置回包数据
        reply_.set_message(prefix + request_.name()); 
        // 业务处理完毕,让 gRPC 运行时知道我们已经完成了,使用这个实例的内存
        // 地址作为事件内唯一识别请求的 tag
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
      } else {
        GPR_ASSERT(status_ == FINISH); 
        // 已经到达 FINISH 状态,释放自身占用内存(CallData)
        delete this;
      }
    }

   private: 
    Greeter::AsyncService* service_; 
    // 生产-消费队列,用来异步服务消息通知
    ServerCompletionQueue* cq_; 
    // RPC 的上下文信息,用于例如压缩、鉴权以及发送元数据给客户等用途。
    ServerContext ctx_; 
    // 从客户端接受到了什么
    HelloRequest request_; 
    // 从客户端返回什么
    HelloReply reply_; 
    // 用于回复客户端的方法
    ServerAsyncResponseWriter<HelloReply> responder_; 
    // 状态机
    enum CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;  // The current serving state.
  };
  ...
};

From here we can relate to the HandleRpcs method above.

  1. first the new CallData will call the Proceed method directly, this time going to the first branch of if, then writing itself this to cq_ and reversing the state to PROCESS.
  2. this time will continue to wait for events in the while loop of the HandleRpcs method.
  3. when a Client sends a request, it goes to the second if branch of Proceed to process the business logic.
    1. here first new CallData will be used for the next request;
    2. then the Client request parameters are obtained from request_ and processed;
    3. the packet return data is written to reply_ and finally the Finish call is made.
  4. this time will continue to HandleRpcs method in the while loop waiting for the response to the Client packet end.
  5. after receiving the reply packet, it will continue to call the if third branch of the Proceed method to delete the current object.

Summary

In fact, compared to go’s grpc asynchronous API, I have to say that cpp’s API design is very problematic, at first glance, I don’t know what new CallData is for, why it doesn’t do anything after new, and there is no delete operation, won’t memory overflow? And then into the constructor method to find the logic are in the constructor, such a way of writing code I have only seen here.

There are also a lot of people spit it out the design of this api: https://github.com/grpc/grpc/issues/7352 , the official also promised to modify, but a shake 6 years later also did not improve the meaning.


Reference https://www.luozhiyun.com/archives/671