Background

The product component JobCenter uses Celery to implement an asynchronous task center, running both job-center-worker (celery worker) and job-center-scheduler (celery beat) processes, using MongoDB as the Backend to store messages, etc. (Celery has officially stated that it no longer maintains support for MongoDB). MongoDB is configured with ReplicaSet to ensure high availability.

Recently, we encountered the problem of no free channel ids in Celery/Kombu. After troubleshooting, the issue was solved in this PR. After considering the workload and maintainability of cherry-pick, we finally upgraded the celery and kombu components in the product from 3.x to 4.x.

A test colleague recently found that ifdown of the storage network of a MongoDB node causes JobCenter to hang when conducting reliability tests.

Survey

Celery

First try to reproduce the problem, first try ifdown Primary node storage network, the phenomenon is reproduced; try ifdown Secondary node storage network, can not be reproduced; try to stop MongoDB service instead of ifdown, Primary or Secondary can not be reproduced. Tried to stop MongoDB service instead of ifdown, neither Primary nor Secondary could be reproduced. It is presumed to be related to MongoDB connection processing.

Observe the logs of the phenomenon recovery, when the storage network is abnormal, there is no output in the logs. After the storage network is back to normal, we can see that Celery logs an exception when trying to connect to the Broker (MongoDB) and tries to reconnect.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[2022-05-07 10:13:01,362: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/usr/lib/python2.7/site-packages/celery/worker/loops.py", line 121, in synloop
    connection.drain_events(timeout=2.0)
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
    get(self._deliver, timeout=timeout)
  File "/usr/lib/python2.7/site-packages/kombu/utils/scheduling.py", line 56, in get
    return self.fun(resource, callback, **kwargs)
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 1001, in _drain_channel
    return channel.drain_events(callback=callback, timeout=timeout)
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 745, in drain_events
    return self._poll(self.cycle, callback, timeout=timeout)
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 402, in _poll
    return cycle.get(callback)
  File "/usr/lib/python2.7/site-packages/kombu/utils/scheduling.py", line 56, in get
    return self.fun(resource, callback, **kwargs)
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 405, in _get_and_deliver
    message = self._get(queue)
  File "/usr/lib/python2.7/site-packages/kombu/transport/mongodb.py", line 141, in _get
    remove=True,
  File "/usr/lib64/python2.7/site-packages/pymongo/collection.py", line 2315, in find_and_modify
    allowable_errors=[_NO_OBJ_ERROR])
  File "/usr/lib64/python2.7/site-packages/pymongo/collection.py", line 205, in _command
    read_concern=read_concern)
  File "/usr/lib64/python2.7/site-packages/pymongo/pool.py", line 218, in command
    self._raise_connection_failure(error)
  File "/usr/lib64/python2.7/site-packages/pymongo/pool.py", line 346, in _raise_connection_failure
    raise error
AutoReconnect: connection closed
[2022-05-07 10:13:01,363: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

The corresponding Celery code is in worker/consumer/consumer.py, and Blueprint is the Celery startup portal. You can see that exception handling is done in the blueprint.start(self) stage, triggering reconnection for self.connection_errors.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
CONNECTION_RETRY = """\
consumer: Connection to broker lost. \
Trying to re-establish the connection...\
"""
...
def start(self):
    blueprint = self.blueprint
    while blueprint.state != CLOSE:
        maybe_shutdown()
        if self.restart_count:
            try:
                self._restart_state.step()
            except RestartFreqExceeded as exc:
                crit('Frequent restarts detected: %r', exc, exc_info=1)
                sleep(1)
        self.restart_count += 1
        try:
            blueprint.start(self)
        except self.connection_errors as exc:
            # If we're not retrying connections, no need to catch
            # connection errors
            if not self.app.conf.broker_connection_retry:
                raise
            if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
                raise  # Too many open files
            maybe_shutdown()
            if blueprint.state != CLOSE:
                if self.connection:
                    self.on_connection_error_after_connected(exc)
                else:
                    self.on_connection_error_before_connected(exc)
                self.on_close()
                blueprint.restart(self)

def on_connection_error_before_connected(self, exc):
    error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
          'Trying to reconnect...')

def on_connection_error_after_connected(self, exc):
    warn(CONNECTION_RETRY, exc_info=True)
    try:
        self.connection.collect()
    except Exception:
        pass

self.connection_errors corresponds to what is actually defined by Transport in Kombu, which can be viewed in kombu/kombu/transport/mongodb.py. In the current version, it is defined as pymongo.errors.ConnectionFailure. The common network connection exceptions AutoReconnect or NetworkTimeout in pymongo are inherited from ConnectionFailure.

1
2
3
4
5
6
7
8
9
class Transport(virtual.Transport):
    Channel = Channel

    can_parse_url = True
    polling_interval = 1
    default_port = DEFAULT_PORT
    connection_errors = (
        virtual.Transport.connection_errors + (errors.ConnectionFailure, )
    )

As of now, Celery can correctly handle exceptions reported by kombu, but when storing network exceptions, Kombu does not throw exceptions, so the problem investigation shifts from Celery to Kombu.

Kombu

Look at MongoDB Transport’s handling of the connection creation part. The code execution path is: client -> _create_client -> _open -> _parse_uri, where _open is the actual connection creation process and the parameters used for the connection are returned in _parse_uri. _parse_uri eventually calls pymongo.uri_parser.parse_uri.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def _open(self, scheme='mongodb://'):
    hostname, dbname, options = self._parse_uri(scheme=scheme)

    conf = self._prepare_client_options(options)
    conf['host'] = hostname

    env = _detect_environment()
    if env == 'gevent':
        from gevent import monkey
        monkey.patch_all()
    elif env == 'eventlet':
        from eventlet import monkey_patch
        monkey_patch()

    mongoconn = MongoClient(**conf)
    database = mongoconn[dbname]

    version_str = mongoconn.server_info()['version']
    version = tuple(map(int, version_str.split('.')))

    if version < (1, 3):
        raise VersionMismatch(E_SERVER_VERSION.format(version_str))
    elif self.ttl and version < (2, 2):
        raise VersionMismatch(E_NO_TTL_INDEXES.format(version_str))

    return database

def _parse_uri(self, scheme='mongodb://'):
    # See mongodb uri documentation:
    # http://docs.mongodb.org/manual/reference/connection-string/
    client = self.connection.client
    hostname = client.hostname

    if not hostname.startswith(scheme):
        hostname = scheme + hostname

    if not hostname[len(scheme):]:
        hostname += self.default_hostname

    if client.userid and '@' not in hostname:
        head, tail = hostname.split('://')
        credentials = client.userid
        if client.password:
            credentials += ':' + client.password
        hostname = head + '://' + credentials + '@' + tail

    port = client.port if client.port else self.default_port
    parsed = uri_parser.parse_uri(hostname, port)
    dbname = parsed['database'] or client.virtual_host
    if dbname in ('/', None):
        dbname = self.default_database
    options = {
        'auto_start_request': True,
        'ssl': self.ssl,
        'connectTimeoutMS': (int(self.connect_timeout * 1000)
                             if self.connect_timeout else None),
    }
    options.update(parsed['options'])

    return hostname, dbname, options

Suppose our connection parameters are mongodb://192.168.1.1:27017,192.168.1.2:27017/yiran, then the result parsed by pymongo.uri_parser.parse_uri will be: {'username': None, 'nodelist ': [('192.168.1.1', 27017), ('192.168.1.2', 27017)], 'database': 'yiran', 'collection': None, 'password': None, 'options': {}}.

socketTimeoutMS In our environment, there are no options specified in the MongoDB URI, so pymongo.uri_parser.parse_uri results in options that are empty. The options used for the final connection are the options defined in _parse_uri, where the class variable connect_timeout is defined as None in Channel, so Kombu does not set socketTimeoutMS for the final MongoDB connection, and if it does not set socketTimeoutMS, the default is None and waits forever. When there is an exception in the network, the intuitive phenomenon will be hang.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class Channel(virtual.Channel):
    """MongoDB Channel."""

    supports_fanout = True

    # Mutable container. Shared by all class instances
    _fanout_queues = {}

    # Options
    ssl = False
    ttl = False
    connect_timeout = None
    capped_queue_size = 100000
    calc_queue_size = True

Celery and Kombu parameter passing

Now that we have observed that the connection parameters are not as expected, why did the previous 3.x version have no problem? Switching to the 3.x branch to see the corresponding code, we can see that the general logic is similar, regarding the handling of options, there is a line in 3.x: options.update(client.transport_options), where the client is assigned at the beginning of the function, corresponding to self. connection.client, and self.connection is the parameter passed in by the Transport construct.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def _parse_uri(self, scheme='mongodb://'):
    # See mongodb uri documentation:
    # http://docs.mongodb.org/manual/reference/connection-string/
    client = self.connection.client
    hostname = client.hostname

    if not hostname.startswith(scheme):
        hostname = scheme + hostname

    if not hostname[len(scheme):]:
        hostname += DEFAULT_HOST

    if client.userid and '@' not in hostname:
        head, tail = hostname.split('://')

        credentials = client.userid
        if client.password:
            credentials += ':' + client.password

        hostname = head + '://' + credentials + '@' + tail

    port = client.port if client.port is not None else DEFAULT_PORT

    parsed = uri_parser.parse_uri(hostname, port)

    dbname = parsed['database'] or client.virtual_host

    if dbname in ('/', None):
        dbname = 'kombu_default'

    options = {
        'auto_start_request': True,
        'ssl': client.ssl,
        'connectTimeoutMS': (int(client.connect_timeout * 1000)
                             if client.connect_timeout else None),
    }
    options.update(client.transport_options)
    options.update(parsed['options'])

    return hostname, dbname, options

connection corresponds to Connection in Kombu, which hides Transport from the outside, and Celery establishes the connection during the initialization phase, calling the path celery/app/base.py:Celery._connection -> celery/ app/amqp.py:AMQP.Connection -> kombu/connection.py:Connection . The transfer parameter transport_options is the parameter configured in the Celery app declaration, and the specific configurable parameters can be found in the documentation: https://docs.celeryq.dev/en/stable/userguide/configuration.html.

In our scenario, the following parameters are declared.

1
2
3
4
5
6
7
8
BROKER_TRANSPORT_OPTIONS = {
    "connect": False,
    "maxPoolSize": 5 if "worker" in process_cmdline else 2,
    "socketTimeoutMS": 5000,
    "connectTimeoutMS": 5000,
    "serverSelectionTimeoutMS": 5000,
    "w": 0,
}

When Kombu uses MongoDB Transport, it will eventually create MongoDB connections with these parameters, so the phenomenon described in this question will not occur.

Celery change background

@rmihael reported an issue: Celery events are not removed from MongoDB broker #1047, indicating that after using Celery Flower (Celery monitoring component), the events in messages are not removed, resulting in a large amount of MongoDB storage space. The issue discusses the eventual decision to use MongoDB TTL to resolve this issue.

In the Kombu 4.x development cycle, @daevaorn committed MongoDB TTL support and refactorings #537 to support MongoDB TTL, which contains a PR with The PR contains a number of TTL-unrelated commits and includes some refactoring, with the following commits.

  • Complete unit tests suit for MongoDB transport
  • Optional TTL support for MongoDB transport. AMQP TTL headers: x-messa…
  • Rearrange methods at MongoDB channel class
  • Another MongoDB transport clean up and refactor. Use of transport opt…
  • Opt-out for queue size calculation
  • Use natural sort for more FIFO semantic
  • Fix docstrings

Optional TTL support for MongoDB transport. is the most critical change, ignore the TTL changes and focus on the changes to establish MongoDB connections. In the Channel class, some new class variables have been added to identify the current configuration, in _parse_uri, SSL,connectTImeoutMS has been replaced with self from client, and options.update(client.transport_ options) has been removed. options). The removal ofoptions.update(client.transport_options)` is the key to this problem.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 class Channel(virtual.Channel):
     _client = None
     supports_fanout = True
+
+    # Mutable containers. Shared by all class instances
     _fanout_queues = {}

+    # Options
+    connect_timeout = None
+    ssl = False
+    capped_queue_size = 100000
+    ttl = False
+
+    from_transport_options = (
+        virtual.Channel.from_transport_options
+        + ('connect_timeout', 'ssl', 'ttl', 'capped_queue_size'))
+
+
     def __init__(self, *vargs, **kwargs):
         super(Channel, self).__init__(*vargs, **kwargs)
         ...

         ...

    def _parse_uri(self, scheme='mongodb://'):
        ...
         options = {
             'auto_start_request': True,
-            'ssl': client.ssl,
-            'connectTimeoutMS': (int(client.connect_timeout * 1000)
-                                 if client.connect_timeout else None),
+            'ssl': self.ssl,
+            'connectTimeoutMS': (int(self.connect_timeout * 1000)
+                                 if self.connect_timeout else None),
         }
-        options.update(client.transport_options)
         options.update(parsed['options'])

Summary

Celery Kombu code management feels a bit unclear and it is very difficult to compare on multiple branches. Full set testing is necessary when upgrading major versions of necessary components.