Envoy

Development environment build

Development based on Ubuntu 18.04.

Basic Tools Download

Install Bazel

1
2
sudo wget -O /usr/local/bin/bazel https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-amd64
sudo chmod +x /usr/local/bin/bazel

Install base dependencies

1
sudo apt-get install libtool cmake automake autoconf make ninja-build curl unzip virtualenv

Clang build environment (optional)

Download and install from llvm, version 9.0 is more compatible.

1
2
bazel/setup_clang.sh <PATH_TO_EXTRACTED_CLANG_LLVM>
echo "build --config=clang" >> user.bazelrc

Building DEBUG versions with bazel

1
bazel build -c dbg --spawn_strategy=standalone  //source/exe:envoy-static

Integrating Clion

Converting Bazel to Cmake

1
2
3
4
git clone https://github.com/lizan/bazel-cmakelists.git <PATH>
<bazel-cmakelists dir>/bazel-cmakelists --targets //source/exe:envoy-static //test/...
# If you don't need to build you can
<bazel-cmakelists dir>/bazel-cmakelists --targets //source/exe:envoy-static //test/... --skip_build
  • Just open Clion and import the Cmake project, Clion’s Bazel plugin does not work well against Envoy.
  • Just set it to Debug start.

Development environment build reference

Envoy Foundation

Libevent

1
Envoy is an L7 proxy and communication bus designed for large modern service oriented architectures. The project was born out of the belief.

As described in the official website, Envoy is a high-performance proxy service software, supporting L4 and L7 proxy capabilities. But Envoy is not a product that completely repeats the wheel, Envoy’s underlying network part of interaction with the operating system is libevent.

libevent is a lightweight network event library that provides the ability to handle callbacks for events such as reading and writing to the underlying socket.

The export function.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
static const char MESSAGE[] = "Hello, World!\n";

static const int PORT = 9995;

static void listener_cb(struct evconnlistener *, evutil_socket_t,
    struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);

int
main(int argc, char **argv)
{
	struct event_base *base;
	struct evconnlistener *listener;
	struct event *signal_event;

	struct sockaddr_in sin = {0};

	WSADATA wsa_data;
	WSAStartup(0x0201, &wsa_data);


	base = event_base_new();
	if (!base) {
		fprintf(stderr, "Could not initialize libevent!\n");
		return 1;
	}

	sin.sin_family = AF_INET;
	sin.sin_port = htons(PORT);

	listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
	    LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
	    (struct sockaddr*)&sin,
	    sizeof(sin));

	if (!listener) {
		fprintf(stderr, "Could not create a listener!\n");
		return 1;
	}

	signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);

	if (!signal_event || event_add(signal_event, NULL)<0) {
		fprintf(stderr, "Could not create/add a signal event!\n");
		return 1;
	}

	event_base_dispatch(base);

	evconnlistener_free(listener);
	event_free(signal_event);
	event_base_free(base);

	printf("done\n");
	return 0;
}

static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
	struct event_base *base = user_data;
	struct bufferevent *bev;

	bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
	if (!bev) {
		fprintf(stderr, "Error constructing bufferevent!");
		event_base_loopbreak(base);
		return;
	}
	bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
	bufferevent_enable(bev, EV_WRITE);
	bufferevent_disable(bev, EV_READ);

	bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}

static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
	struct evbuffer *output = bufferevent_get_output(bev);
	if (evbuffer_get_length(output) == 0) {
		printf("flushed answer\n");
		bufferevent_free(bev);
	}
}

static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
	if (events & BEV_EVENT_EOF) {
		printf("Connection closed.\n");
	} else if (events & BEV_EVENT_ERROR) {
		printf("Got an error on the connection: %s\n",
		    strerror(errno));/*XXX win32*/
	}
	/* None of the other events can happen here, since we haven't enabled
	 * timeouts */
	bufferevent_free(bev);
}

static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
	struct event_base *base = user_data;
	struct timeval delay = { 2, 0 };

	printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");

	event_base_loopexit(base, &delay);
}

From the above, we can naturally understand that we handle the four events lister/read/write/signal for listening.

Libevent in Envoy

Continuing from the above, the positioning of libevent in Envoy is also on the bottom side, and the packets obtained by different events will be truly twisted to Envoy’s internal system for processing, let’s take a common example of reading. But before we talk about this, let’s take a look at the entry definition of Envoy’s code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
int main(int argc, char** argv) {
  std::unique_ptr<Envoy::MainCommon> main_common;
  try {
    main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
  } catch (const Envoy::NoServingException& e) {
    return EXIT_SUCCESS;
  } catch (const Envoy::MalformedArgvException& e) {
    std::cerr << e.what() << std::endl;
    return EXIT_FAILURE;
  } catch (const Envoy::EnvoyException& e) {
    std::cerr << e.what() << std::endl;
    return EXIT_FAILURE;
  }
  return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}

Leaving aside the system-compatible code, we can find that, like most system designs, we have a bottom-level design called MainCommon, which allows us to locate the real startup location through a big type jump.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
void InstanceImpl::run() {
  // RunHelper exists primarily to facilitate how we respond to early shutdown during
  // startup (see RunHelperTest in server_test.cc).
  const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
                                    access_log_manager_, init_manager_, overloadManager(), [this] {
                                      notifyCallbacksForStage(Stage::PostInit);
                                      startWorkers(); // ➁
                                    });

  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog =
      guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(), "main_thread");
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
  dispatcher_->run(Event::Dispatcher::RunType::Block); 
  ENVOY_LOG(info, "main dispatch loop exited");
  guard_dog_->stopWatching(watchdog);
  watchdog.reset();

  terminate();
}

Ignoring the code around ➀, we actually find that our final main loop is here. After a lot of jumping around, we finally peel back the layers and see the Libevent part. Referring to the thread pattern at ➁.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void LibeventScheduler::run(Dispatcher::RunType mode) {
  int flag = 0;
  switch (mode) {
  case Dispatcher::RunType::NonBlock:
    flag = EVLOOP_NONBLOCK;
    break;
  case Dispatcher::RunType::Block:
    // The default flags have 'block' behavior. See
    // http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
    break;
  case Dispatcher::RunType::RunUntilExit:
    flag = EVLOOP_NO_EXIT_ON_EMPTY;
    break;
  }
  event_base_loop(libevent_.get(), flag); 
}

The ➀ processing just happens to be the Loop loop for the Libevent we eventually built.

1
int event_base_loop(struct event_base *, int);

The event_base will only stop working when there are no registered events inside it. The next step is to find out when and where we created the various events for the event_base?

Not surprisingly, after some careful investigation, we can find the logic for listening to the port in ListenerImpl::setupServerSocket.

1
2
3
4
void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
  listener_.reset(
      evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
}

This is different from Netty and other network frameworks, Libevent is a global event listener library, he can listen to different ports, recall, Socket is not distinguished as Server Socket and Client Socket, exactly, what is done here, is to create a Server Socket and listen to its listenCallback, which is the event when a new connection is created.

In the ListenCallback function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
                                  int remote_addr_len, void* arg) {
  ListenerImpl* listener = static_cast<ListenerImpl*>(arg);

  // Create the IoSocketHandleImpl for the fd here.
  IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(fd);

  // Get the local address from the new socket if the listener is listening on IP ANY
  // (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
  const Address::InstanceConstSharedPtr& local_address =
      listener->local_address_ ? listener->local_address_
                               : listener->getLocalAddress(io_handle->fd());
  listener->cb_.onAccept(
      std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
}

In the last line it is clear that when our ServerSocket gets a new client connection it will pass the file handle fd.

Editor’s note: The idea here is that everything is a file, and Libevent’s Read callback is not used here, so this FD is already passed to Envoy’s system when it is created here.

When we look further, the logic of the callback function here when we are actually receiving a network request is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker(
    Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, bool rebalanced) {
  auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
                                                         hand_off_restored_destination_connections);

  // Create and run the filters
  config_.filterChainFactory().createListenerFilterChain(*active_socket); 
  active_socket->continueFilterChain(true);

  // Move active_socket to the sockets_ list if filter iteration needs to continue later.
  // Otherwise we let active_socket be destructed when it goes out of scope.
  if (active_socket->iter_ != active_socket->accept_filters_.end()) {
    active_socket->startTimer();
    active_socket->moveIntoListBack(std::move(active_socket), sockets_); 
  }
}

➀ FilterChain is created here for the socket. ➁ Press the socket into the queue to be processed, from here we can venture to assume that for the Socket that has established a connection with the server, Envoy will use a thread management similar to the Reactor pattern of the Netty type. That is, the receiving thread is the Listener, and the real Worker will get the working Socket from the List here.

Additional information:

After another series of operations, we come to the place where we create a new connection (actually, it’s more like assigning a new connection) ConnectionHandlerImpl::ActiveTcpListener::newConnection.

But here we still can’t find the logic of Read, so we start Debug and we set a breakpoint at the read data ConnectionImpl::onRead.

debug

We still find that the starting point is still event_base_loop, so our question naturally becomes, where exactly is the event registered, because we know that the registration in is event_base object, we look by way of usage and find a suspicious point FileEventImpl:: assignEvents.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
  ASSERT(base != nullptr);
  event_assign(
      &raw_event_, base, fd_,
      EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
          (events & FileReadyType::Read ? EV_READ : 0) |
          (events & FileReadyType::Write ? EV_WRITE : 0) |
          (events & FileReadyType::Closed ? EV_CLOSED : 0),
      [](evutil_socket_t, short what, void* arg) -> void {
        auto* event = static_cast<FileEventImpl*>(arg);
        uint32_t events = 0;
        // ... skip something
      },
      this);
}

It looks like the event assignment is made here, we set a breakpoint and run it again. From the stack of calls, we can clearly see that

DEBUG

In OnAccpet -> newConnection -> AssignEvents such a logic, just because the stack of the whole process is too deep for us to easily discover, the registration of events is created when the socket of the client is created. The rest of the registration logic is in the libevent section, so we won’t go deeper.

Threading model

First you can read the Envoy threading model

In Envoy Libevent acts as a lower-level lib dependency, just like Tomcat when we write SpringWeb, for Socket events such as reads and writes are delegated to Libevnet. The first step is to create a Listen event for our Event_base. After that, when the connection is established, Envoy adds the listen for the Read/Write events of the socket.

In fact, if you set a breakpoint at AssginEvents and then restart, the first event registered is for the read event of the configuration file (for hot update configuration), followed by the event registration of DNS, after which we start the Envoy service, and after that, we have to initialize the various Listener in the configuration file, but it is worth noting that LibeventScheduler::run is present in every Worker.

Envoy

Unlike the processing of Netty, the Listener here is also distributed among all Threads. That is, when a new request is received, one of the workers is randomly selected for processing.

Chain in Envoy

In the above, we already know that in the Listen event created by ClientSocket, we registered the Read/Write event of this Socket to Event_Base, and in the registered event, we easily locate ConnectionImpl::onRead This is where the read logic is.

Request handling chain

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
  if (!read_enabled_ || inDelayedClose()) {
    return;
  }
  ASSERT(ioHandle().isOpen());

  if (read_buffer_size == 0 && !read_end_stream_) {
    return;
  }

  filter_manager_.onRead();
}

In the last line, we get to the beginning of the entire process processing FilterManager to start the processing logic. In the next few jumps, we find out where the core of the processing logic lies.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
 
  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      FilterStatus status = (*entry)->filter_->onNewConnection(); 
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }

    StreamBuffer read_buffer = buffer_source.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream); 
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }
  }

➀ For Envoy, it also specifies a series of abstract qualifications for the Fitler. For example, Network::ReadFilter defines a series of ReadFilter dummy functions, and we can see that at the first Filter, we call onNewConnection Here is necessarily a lifecycle callback function, as for the specifics we need to check the specific implementation class to determine.

➁ From here we find the logic of the real read data. From here because the focus of this article is on the processing of the Http protocol, we here at ReadFilter will also look at Http::ConnectionManagerImpl the implementation mechanism of this class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
etwork::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
  

  bool redispatch;
  do {
    redispatch = false;

    try {
      codec_->dispatch(data); //将此数据分发出去
    } 
    if (codec_->protocol() < Protocol::Http2) {
      if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
          data.length() > 0 && streams_.empty()) {
        redispatch = true;
      }

      if (!streams_.empty() && streams_.front()->state_.remote_complete_) {
        read_callbacks_->connection().readDisable(true);
      }
    }
  } while (redispatch);

  if (!read_callbacks_->connection().streamInfo().protocol()) {
    read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
  }

  return Network::FilterStatus::StopIteration;  // return
}

In the process of implementation here, we at least found that envoy’s Filter is not like Java’s Fitler which is called layer by layer, each layer will only do its own thing, because from the code, we can find these because if the normal processing process, we should go through all the Fitler, so based on this logic, we can tell that in this s logic, we should just decode the HTTP, however, in the actual breakpoint, there is actually an optimization in this codec that when we Pase the HTTP request, the logic will be processed directly here, specifically jumping to the following.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ServerConnectionImpl::onMessageComplete() {
  if (active_request_.has_value()) {
    auto& active_request = active_request_.value();
    active_request.remote_complete_ = true;
    if (deferred_end_stream_headers_) {
      active_request.request_decoder_->decodeHeaders(
          std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true); 
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      active_request.request_decoder_->decodeTrailers(
          std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      active_request.request_decoder_->decodeData(buffer, true);
    }

    // Reset to ensure no information from one requests persists to the next.
    headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
  }

  http_parser_pause(&parser_, 1);
}

At ➀ you will enter the Route stage (in fact, I think the design here is not very clear, resulting in having to resort to Debug tools during the reading process), where you finally enter the ConfigImpl::route logic.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
RouteConstSharedPtr RouteMatcher::route(const Http::RequestHeaderMap& headers,
                                        const StreamInfo::StreamInfo& stream_info,
                                        uint64_t random_value) const {
  const VirtualHostImpl* virtual_host = findVirtualHost(headers);
  if (virtual_host) {
    return virtual_host->getRouteFromEntries(headers, stream_info, random_value); 
  } else {
    return nullptr;
  }
}

The next step in the processing logic is determined by whether or not the hit Route is returned. In the logic that follows, we follow the breakpoint to the

1
2
3
4
5
6
7
8
9
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {
  
  // omit....

  decodeHeaders(nullptr, *request_headers_, end_stream);

  // Reset it here for both global and overridden cases.
  resetIdleTimer();
}

In the ConnectionManagerImpl::ActiveStream::decodeHeaders code there is a lot of logic to deal with httpheader, here we do not expand, we enter, the most important decodeHeaders function to continue our exploration, in the logic here, we can find Here a very complex logic processing, we slowly look at.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
  
  // Determine if Route is owned or not, if not it is directly Return
  route_ = callbacks_->route();
  if (!route_) {
    config_.stats_.no_route_.inc();
    ENVOY_STREAM_LOG(debug, "no cluster match for URL '{}'", *callbacks_,
                     headers.Path()->value().getStringView());

    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
    callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
                               StreamInfo::ResponseCodeDetails::get().RouteNotFound);
    return Http::FilterHeadersStatus::StopIteration;
  }

  
  // Find the routing object for this request
  route_entry_ = route_->routeEntry();
  
  // Get the configured Cluster, which is actually the address of the target
  cluster_ = cluster->info();

  

  // Get a connection from upstream in order to get the host
  const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
  Http::ConnectionPool::Instance* http_pool = getHttpConnPool();
  Upstream::HostDescriptionConstSharedPtr host;

  ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);

  // Push the data to be processed onto the stack
  modify_headers_ = modify_headers;

  UpstreamRequestPtr upstream_request =
      std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*http_pool));
  upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_);
  upstream_requests_.front()->encodeHeaders(end_stream);
  if (end_stream) {
    onRequestComplete();
  }

  return Http::FilterHeadersStatus::StopIteration;
}

In upstream_requests_.front()->encodeHeaders(end_stream), there is a different step

1
2
3
4
5
6
void UpstreamRequest::encodeHeaders(bool end_stream) {
  ASSERT(!encode_complete_);
  encode_complete_ = end_stream;

  conn_pool_->newStream(this); 
}

➀ Here we create a downstream connection. as we explore deeper into our code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
ConnPoolImplBase::ActiveClient::ActiveClient(ConnPoolImplBase& parent,
                                             uint64_t lifetime_request_limit,
                                             uint64_t concurrent_request_limit)
    : parent_(parent), remaining_requests_(translateZeroToUnlimited(lifetime_request_limit)),
      concurrent_request_limit_(translateZeroToUnlimited(concurrent_request_limit)),
      connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })) {
  Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(
      parent_.dispatcher_, parent_.socket_options_, parent_.transport_socket_options_);
  real_host_description_ = data.host_description_;
  codec_client_ = parent_.createCodecClient(data);
  codec_client_->addConnectionCallbacks(*this);

  conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
      parent_.host_->cluster().stats().upstream_cx_connect_ms_, parent_.dispatcher_.timeSource());
  conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
      parent_.host_->cluster().stats().upstream_cx_length_ms_, parent_.dispatcher_.timeSource());
  connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout());
}

We can see here that I’m actually going to create a downstream connection. But we know that for a high-performance WebServer, we can’t Blocking for downstream access, so our common processing capability should be to hang the downstream request after it is established, and we wait for the return of the downstream before processing it, so later we can find that

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
                               TransportSocketPtr&& transport_socket,
                               StreamInfo::StreamInfo& stream_info, bool connected)
    : ConnectionImplBase(dispatcher, next_global_id_++),
  // We never ask for both early close and read at the same time. If we are reading, we want to
  // consume all available data.
  file_event_ = dispatcher_.createFileEvent(
      ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
      Event::FileReadyType::Read | Event::FileReadyType::Write);

  transport_socket_->setTransportSocketCallbacks(*this);

When we create the downstream link, we register the Read and Write events of the downstream reply data to the Event_Base of the current Worker thread. Let’s take a look at the whole flow of the first half here.

1
// ...

Up to the above, we have gone through most of the process from receiving the request to creating the request to the downstream, after which there are some other transactional ones that need to be handled, such as response timeout, etc. Without going too deep, we move to the next link: the response data processing link.

For the response data, we also know that after we finish the execution, we load the subsequent read and write events to BackendSteam into EventBase Let’s see how the subsequent data is handled, the entrance is still the OnRead() function, and the logic after that is similar to the previous one, until FilterManagerImpl:: onRead is called after the specific implementation class is CodecClient will be relatively simple after all, because the response body we do not need to do any more routing settings and other operations, and then when we finish reading, we enter the ConnectionImpl::onMessageCompleteBase function, and enter the end of the process. Let’s see how we respond to the initial caller when we’ve finished reading all the data from the response stream. Then it goes to ClientConnectionImpl::onMessageComplete.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void ClientConnectionImpl::onMessageComplete() {
  ENVOY_CONN_LOG(trace, "message complete", connection_);
  if (pending_response_.has_value()) { // 当We only process when we have a pending Responses
    if (connection_.state() == Network::Connection::State::Open) {
      while (!connection_.readEnabled()) {
        connection_.readDisable(false); // Let the connection disable reads because it is going to write
      }
    }

    if (deferred_end_stream_headers_) {
      response.decoder_->decodeHeaders(
          std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      response.decoder_->decodeTrailers(
          std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      response.decoder_->decodeData(buffer, true); //Write data
    }

    // Reset to ensure no information from one requests persists to the next.
    pending_response_.reset();
    headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);
  }
}

During the response, decodeData is called and eventually returns to ConnectionMangerImpl, and then in a series of operations, we come to the place where we finally write.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
  ASSERT(!end_stream || enable_half_close_);

  if (write_end_stream_) {
    ASSERT(data.length() == 0 && end_stream);

    return;
  }

  if (through_filter_chain) {
    current_write_buffer_ = &data;
    current_write_end_stream_ = end_stream;
    FilterStatus status = filter_manager_.onWrite(); // ➀ 这里进入了写入流的 FilterChain
    current_write_buffer_ = nullptr;

    if (FilterStatus::StopIteration == status) { // 值得注意的是这里是可以提前终结
      return;
    }
  }

  write_end_stream_ = end_stream;
  if (data.length() > 0 || end_stream) {
    ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
    write_buffer_->move(data); // ➁ 将数据流写入
    if (!connecting_) {
      ASSERT(file_event_ != nullptr, "ConnectionImpl file event was unexpectedly reset");
      file_event_->activate(Event::FileReadyType::Write);
    }
  }
}

After all our data has been written, we enter the final Cleanup process.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
  
  ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
  transport_socket_->closeSocket(close_type);

  // Drain input and output buffers.
  updateReadBufferStats(0, 0);
  updateWriteBufferStats(0, 0);

  write_buffer_->drain(write_buffer_->length());

  connection_stats_.reset();

  file_event_.reset();

  socket_->close();

  raiseEvent(close_type);
}

Here is the end of a paragraph, we will organize the whole process.

1
// ...

Envoy Extend

From the analysis above, we have sort of read through the main implementation part of the code, leaving us to look at the part of Envoy that sets aside extensions for us. Extending Envoy for custom use cases, from the configuration file, we can also get some inspiration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
static_resources:
  listeners:
    - name: listener_0
      address:
        socket_address: { address: 127.0.0.1, port_value: 10000 }
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                "@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager
                stat_prefix: ingress_http
                codec_type: AUTO
                route_config:
                  name: local_route
                  virtual_hosts:
                    - name: local_service
                      domains: ["*"]
                      routes:
                        - match: { prefix: "/" }
                          route: { cluster: some_service }
                http_filters:
                  - name: envoy.filters.http.router

We can find that the most important part for Envoy is FilterChains, although we have not written CPP, but from the similarity of ideas, we should be able to expand the implementation of Filter, and then registered to this FilterChians, fortunately for the official, provide a more detailed DEMO. As you can see from it, our core is to inherit the Interface of the Fitler that is already there and then implement it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
namespace Envoy {
namespace Filter {

/**
 * Implementation of a basic echo filter.
 */
class Echo2 : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter> {
public:
  // Network::ReadFilter
  Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
  Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
  void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
    read_callbacks_ = &callbacks;
  }

private:
  Network::ReadFilterCallbacks* read_callbacks_{};
};

} // namespace Filter
} // namespace Envoy

It is also relatively easy to use.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
static_resources:
  clusters:
    name: cluster_0
    connect_timeout: 0.25s
    load_assignment:
      cluster_name: cluster_0
      endpoints:
        - lb_endpoints:
            - endpoint:
                address:
                  socket_address:
                    address: 127.0.0.1
                    port_value: 0

But behind the scenes, how does the mechanism of this work? We still need to come from the source code. The existing Extensions implementation can be found here extensions

From the code above, we can easily locate the logic to initialize the FilterChain at ListenerImpl::createNetworkFilterChain.

1
2
3
4
5
bool ListenerImpl::createNetworkFilterChain(
    Network::Connection& connection,
    const std::vector<Network::FilterFactoryCb>& filter_factories) {
  return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
}

As you can see, it is a very standard factory pattern. Depending on the factory, we will create different Filter instances, and after the creation, if it is ReaderFliter, it will be directly added to the bottom of upsteam, and if it is WriterFliter, it will be added to the bottom of downsteam. The code is also relatively simple, as shown below.

1
2
3
4
5
6
void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
  ASSERT(connection_.state() == Connection::State::Open);
  ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});
  filter->initializeReadFilterCallbacks(*new_filter);
  new_filter->moveIntoListBack(std::move(new_filter), upstream_filters_);
}

Then the problem becomes our Filter_Factory from which we find two implementations from the virtual table Admin & Impl, if you have read the official website documentation you should be able to guess that the Admin implementation is based on dynamic requests. I’d better restart and trace the process from the beginning. Initializing FilterFactory can be located at the earliest to ListenerManagerImpl::addOrUpdateListener and the profile hierarchy is similar, we create Listerner from the first, in the process of creating Listener we create FilterChain.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
filter_chain_manager_.addFilterChain(config.filter_chains(), builder, filter_chain_manager_);

void FilterChainManagerImpl::addFilterChain(
    absl::Span<const envoy::config::listener::v3::FilterChain* const> filter_chain_span,
    FilterChainFactoryBuilder& filter_chain_factory_builder,
    FilterChainFactoryContextCreator& context_creator) {
  Cleanup cleanup([this]() { origin_ = absl::nullopt; });
  std::unordered_set<envoy::config::listener::v3::FilterChainMatch, MessageUtil, MessageUtil>
      filter_chains;
  uint32_t new_filter_chain_size = 0;
  for (const auto& filter_chain : filter_chain_span) {
    const auto& filter_chain_match = filter_chain->filter_chain_match();
    if (!filter_chain_match.address_suffix().empty() || filter_chain_match.has_suffix_len()) {
      throw EnvoyException(fmt::format("error adding listener '{}': contains filter chains with "
                                       "unimplemented fields",
                                       address_->asString()));
    }
    if (filter_chains.find(filter_chain_match) != filter_chains.end()) {
      throw EnvoyException(fmt::format("error adding listener '{}': multiple filter chains with "
                                       "the same matching rules are defined",
                                       address_->asString()));
    }
    filter_chains.insert(filter_chain_match);

    // Validate IP addresses.
    std::vector<std::string> destination_ips;
    destination_ips.reserve(filter_chain_match.prefix_ranges().size());
    for (const auto& destination_ip : filter_chain_match.prefix_ranges()) {
      const auto& cidr_range = Network::Address::CidrRange::create(destination_ip);
      destination_ips.push_back(cidr_range.asString());
    }

    std::vector<std::string> source_ips;
    source_ips.reserve(filter_chain_match.source_prefix_ranges().size());
    for (const auto& source_ip : filter_chain_match.source_prefix_ranges()) {
      const auto& cidr_range = Network::Address::CidrRange::create(source_ip);
      source_ips.push_back(cidr_range.asString());
    }

    // Reject partial wildcards, we don't match on them.
    for (const auto& server_name : filter_chain_match.server_names()) {
      if (server_name.find('*') != std::string::npos &&
          !FilterChainManagerImpl::isWildcardServerName(server_name)) {
        throw EnvoyException(
            fmt::format("error adding listener '{}': partial wildcards are not supported in "
                        "\"server_names\"",
                        address_->asString()));
      }
    }

    // Reuse created filter chain if possible.
    // FilterChainManager maintains the lifetime of FilterChainFactoryContext
    // ListenerImpl maintains the dependencies of FilterChainFactoryContext
    auto filter_chain_impl = findExistingFilterChain(*filter_chain);
    if (filter_chain_impl == nullptr) {
      filter_chain_impl =
          filter_chain_factory_builder.buildFilterChain(*filter_chain, context_creator);
      ++new_filter_chain_size;
    }

    addFilterChainForDestinationPorts(
        destination_ports_map_,
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(filter_chain_match, destination_port, 0), destination_ips,
        filter_chain_match.server_names(), filter_chain_match.transport_protocol(),
        filter_chain_match.application_protocols(), filter_chain_match.source_type(), source_ips,
        filter_chain_match.source_ports(), filter_chain_impl);
    fc_contexts_[*filter_chain] = filter_chain_impl;
  }
  convertIPsToTries();
  ENVOY_LOG(debug, "new fc_contexts has {} filter chains, including {} newly built",
            fc_contexts_.size(), new_filter_chain_size);
}

Envoy Admin

When we build the Example, there is a paragraph.

1
2
3
4
admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 127.0.0.1, port_value: 9901 }

is the administrative port for Enovy. It mainly provides a series of statistics, etc. See Envoy-Admin for details

Envoy Dynamic Manager

We analyzed the simple process of reading from a file, but for a full-fledged gateway, the type of dynamically loaded data is indispensable, so Envoy also provides several capabilities to dynamically obtain configurations, such as LDS, XDS, etc.

LDS

The listener discovery service (LDS) is an optional API that Envoy will call to dynamically fetch listeners. Envoy will reconcile the API response and add, modify, or remove known listeners depending on what is required.