rocksdb supports both PessimisticTransactionDB and OptimisticTransactionDB concurrency control modes, both of which seem to be external wrappers for DB objects, doing concurrency control outside of the storage, allowing applications to do transactional KV read and write capabilities per BEGIN, COMMIT, ROLLBACK APIs.

rocksdb originally has the ability to write WriteBatch atomically, and the transaction does things on the basis of WriteBatch, where writes within the transaction are temporarily stored in its own WriteBatch, and reads within the transaction will first read its own WriteBatch, and then read MemTable, L0, L1, and so on. During this period, other transactions cannot see the contents of WriteBatch, and when the transaction commits, the value of WriteBatch will fall into MemTable and WAL, so that other transactions can see it.

Since the LSM Tree already has atomic write capabilities, what rocksdb does is mainly at the level of concurrency control outside the LSM Tree, and supports both optimistic and pessimistic concurrency control.

  • In optimistic concurrency control transactions, the reads and writes of the keys within the transaction are tracked, and conflict detection is performed at Commit() to check whether these keys have been modified by other transactions, and if so, the writeBatch is aborted, which is equivalent to nothing happening.
  • In a pessimistic concurrency control transaction, a lock is placed on the keys written within the transaction, and when these keys are dropped in Commit(), the lock is released and there is no more conflict detection.

The transaction API is used, roughly, as follows.

1
2
3
4
5
6
7
8
9
TransactionDB* txn_db;
Status s = TransactionDB::Open(options, path, &txn_db);

Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
s = txn->Put(key, value);
s = txn->Delete(key2);
s = txn->Merge(key3, value);
s = txn->Commit();
delete txn;

Optimistic Concurrency Control: Snapshots and Conflict Detection

Let’s look at optimistic concurrency control. As mentioned earlier, optimistic concurrency control mainly lies in the conflict detection at the time of Commit(), but in addition, it also relies on the Snapshot mechanism to achieve transaction isolation. rocksdb’s Snapshot does not change compared to leveldb, that is, each time a WriteBatch is written, the sequence number is incremented, and the contents of the Snapshot The content of the Snapshot is the sequence number, and the query is filtered by the latest value of the Key less than or equal to the sequence number.

The entry point for tracking the key comes from the TrackKey() method called by OptimisticTransactionImpl::TryLock.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
                                   SequenceNumber seq, bool read_only,
                                   bool exclusive) {
  PointLockRequest r;
  r.column_family_id = cfh_id;
  r.key = key;
  r.seq = seq;
  r.read_only = read_only;
  r.exclusive = exclusive;

  // Update map of all tracked keys for this transaction
  tracked_locks_->Track(r);

  if (save_points_ != nullptr && !save_points_->empty()) {
    // Update map of tracked keys in this SavePoint
    save_points_->top().new_locks_->Track(r);
  }
}

where tracked_locks_ is a unique_ptr of the LockTracker object.

1
2
3
4
5
// Tracks the lock requests.
// In PessimisticTransaction, it tracks the locks acquired through LockMgr;
// In OptimisticTransaction, since there is no LockMgr, it tracks the lock
// intention. Not thread-safe.
class LockTracker {

According to the LockTracker class, the difference between optimistic and pessimistic concurrency control is that pessimistic concurrency control uses LockMgr to manage locking and unlocking, while optimistic concurrency control does not have LockMgr and only does tracking, not actually locking. The implementation of LockTracker is roughly a convenient wrapper around the collection, so let’s skip it here and look at how conflict detection is done in Commit based on the information recorded in it.

The main function entry for conflict detection is OptimisticTransaction::CheckTransactionForConflicts(), which further calls TransactionUtil::CheckKeysForConflicts() to perform conflict detection.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
                                              const LockTracker& tracker,
                                              bool cache_only) {
  Status result;

  std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
      tracker.GetColumnFamilyIterator());
  assert(cf_it != nullptr);
  while (cf_it->HasNext()) {
    ColumnFamilyId cf = cf_it->Next();

    SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf);
    if (sv == nullptr) {
      result = Status::InvalidArgument("Could not access column family " +
                                       ToString(cf));
      break;
    }

    SequenceNumber earliest_seq =
        db_impl->GetEarliestMemTableSequenceNumber(sv, true);

    // For each of the keys in this transaction, check to see if someone has
    // written to this key since the start of the transaction.
    std::unique_ptr<LockTracker::KeyIterator> key_it(
        tracker.GetKeyIterator(cf));
    assert(key_it != nullptr);
    while (key_it->HasNext()) {
      const std::string& key = key_it->Next();
      PointLockStatus status = tracker.GetPointLockStatus(cf, key);
      const SequenceNumber key_seq = status.seq;

      result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
      if (!result.ok()) {
        break;
      }
    }

    db_impl->ReturnAndCleanupSuperVersion(cf, sv);

    if (!result.ok()) {
      break;
    }
  }

  return result;
}

rocksdb does not implement SSI, only conflict tracking for write operations, the process will be simpler than badger’s SSI, only need to check one thing, that is, at commit time, each Key in the db is not updated after the sequence number of the start of the current transaction exists.

The dumbest way to do this is to read the db once for each Key, get its most recent sequence number, and compare it to the transaction’s sequence number. If the sequence number in the db is larger, then sorry, there is a transaction conflict.

rocksdb will definitely not do something like reading IO N times for each Key in a transaction. Here is an idea, the execution time of the transaction is not long, in the conflict check scenario, you do not need to find all the historical data of the Key to make a judgment, but only the most recent data. The most recent data is MemTable, so to do the recent conflict detection, it is enough to read the data of MemTable, and there is no need to execute IO.

However, there is one more constraint, that is, if the start time of the transaction is earlier than the oldest Key in the MemTable, it is impossible to determine, and then rocksdb’s processing is also more violent, so it just says that the transaction is expired, try again. The following is the relevant logic in TransactionUtil::CheckKey.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Since it would be too slow to check the SST files, we will only use
  // the memtables to check whether there have been any recent writes
  // to this key after it was accessed in this transaction.  But if the
  // Memtables do not contain a long enough history, we must fail the
  // transaction.
  if (earliest_seq == kMaxSequenceNumber) {
    // The age of this memtable is unknown.  Cannot rely on it to check
    // for recent writes.  This error shouldn't happen often in practice as
    // the Memtable should have a valid earliest sequence number except in some
    // corner cases (such as error cases during recovery).
    need_to_read_sst = true;

    if (cache_only) {
      result = Status::TryAgain(
          "Transaction could not check for conflicts as the MemTable does not "
          "contain a long enough history to check write at SequenceNumber: ",
          ToString(snap_seq));
    }
  } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) {
    // Use <= for min_uncommitted since earliest_seq is actually the largest sec
    // before this memtable was created
    need_to_read_sst = true;

    if (cache_only) {
      // The age of this memtable is too new to use to check for recent
      // writes.
      char msg[300];
      snprintf(msg, sizeof(msg),
               "Transaction could not check for conflicts for operation at "
               "SequenceNumber %" PRIu64
               " as the MemTable only contains changes newer than "
               "SequenceNumber %" PRIu64
               ".  Increasing the value of the "
               "max_write_buffer_size_to_maintain option could reduce the "
               "frequency "
               "of this error.",
               snap_seq, earliest_seq);
      result = Status::TryAgain(msg);
    }
  }

The idea of using MemTable for conflict detection is still very clever, but only for SI scenarios where only write operations need to be tracked. If SSI uses the same mechanism, all read operations will have to be MemTable writes, which is far less useful than tracking key sets like badger.

Pessimistic Concurrency Control: Lock Management

Pessimistic concurrency control is achieved by putting locks on key reads and writes, so that other transactions that try to get a lock go into wait. However, “locking” here is not really a mutex for each key, but has its own set of line lock semantics.

The main objects are LockMap, LockMapStripe, and LockInfo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Map of #num_stripes LockMapStripes
struct LockMap {
  // Number of sepearate LockMapStripes to create, each with their own Mutex
  const size_t num_stripes_;

  // Count of keys that are currently locked in this column family.
  // (Only maintained if PointLockManager::max_num_locks_ is positive.)
  std::atomic<int64_t> lock_cnt{0};

  std::vector<LockMapStripe*> lock_map_stripes_;
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct LockMapStripe {
  // Mutex must be held before modifying keys map
  std::shared_ptr<TransactionDBMutex> stripe_mutex;

  // Condition Variable per stripe for waiting on a lock
  std::shared_ptr<TransactionDBCondVar> stripe_cv;

  // Locked keys mapped to the info about the transactions that locked them.
  // TODO(agiardullo): Explore performance of other data structures.
  std::unordered_map<std::string, LockInfo> keys;
};
1
2
3
4
5
6
7
struct LockInfo {
  bool exclusive;
  autovector<TransactionID> txn_ids;

  // Transaction locks are not valid after this time in us
  uint64_t expiration_time;
}

LockMap is the entry of all locks in a Column Family, and each LockMap is divided into 16 LockMapStripe (stripe), and LockMapStripe has a mapping between keys to LockInfo.

In simple terms, LockMap is a mapping table from Key to LockInfo, and the inner part is divided into a strip of LockMapStripe to improve its concurrency.

LockInfo has the semantics of read/write locks, exclusive means whether it is exclusive or not, if not, multiple read operations can be allowed to hold the lock at the same time, and the list of transactions holding the lock is maintained in txn_ids.

If a lock is acquired while waiting, it will wait on stripe_cv. It can be seen that the locks here are user-state read/write locks based on system primitives such as CondVar and mutex.

To look at the implementation of TryLock().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status PointLockManager::TryLock(PessimisticTransaction* txn,
                                 ColumnFamilyId column_family_id,
                                 const std::string& key, Env* env,
                                 bool exclusive) {
  // Lookup lock map for this column family id
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  LockMap* lock_map = lock_map_ptr.get();
  if (lock_map == nullptr) {
    char msg[255];
    snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
             column_family_id);

    return Status::InvalidArgument(msg);
  }

  // Need to lock the mutex for the stripe that this key hashes to
  size_t stripe_num = lock_map->GetStripe(key);
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);

  LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
  int64_t timeout = txn->GetLockTimeout();

  return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
                            timeout, std::move(lock_info));
}

This part is relatively easy to read, it is to obtain LockMapStripe, generate LockInfo, and finally call AcquireWithTimeout to go to the process of acquiring the lock.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Helper function for TryLock().
Status PointLockManager::AcquireWithTimeout(
    PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
    ColumnFamilyId column_family_id, const std::string& key, Env* env,
    int64_t timeout, LockInfo&& lock_info) {
  // ... 获取 stripe 的 mutex

  // 尝试获取锁
  uint64_t expire_time_hint = 0;
  autovector<TransactionID> wait_ids;
  result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
                         &expire_time_hint, &wait_ids);

  if (!result.ok() && timeout != 0) {
    bool timed_out = false;
    do {
      // ... 根据 AcquireLocked() 返回的 expire_time_hint,计算 cv_end_time,即超时等待时间
      assert(result.IsBusy() || wait_ids.size() != 0);

      // ... 根据 AcquireLocked() 返回的 wait_ids,判断得知当前事务在依赖其他事务所持有的锁
      // ... 发起死锁探测

      // 进入等待
      if (cv_end_time < 0) {
        // Wait indefinitely
        result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
      } else {
        uint64_t now = env->NowMicros();
        if (static_cast<uint64_t>(cv_end_time) > now) {
          result = stripe->stripe_cv->WaitFor(stripe->stripe_mutex,
                                              cv_end_time - now);
        }
      }

      // ... 清理死锁检测上下文

      if (result.IsTimedOut()) {
          timed_out = true;
          // Even though we timed out, we will still make one more attempt to
          // acquire lock below (it is possible the lock expired and we
          // were never signaled).
      }

      // 重新尝试获取锁
      if (result.ok() || result.IsTimedOut()) {
        result = AcquireLocked(lock_map, stripe, key, env, std::move(lock_info),
                               &expire_time_hint, &wait_ids);
      }
    } while (!result.ok() && !timed_out);
  }

  stripe->stripe_mutex->UnLock();

  return result;
}

Leaving aside the deadlock detection part, the logic here is relatively simple:

  1. try to get the lock, AcquireLocked() is non-blocking, fail to get it, return failure
  2. if the lock acquisition is unsuccessful, then back off and wait for stripe_cv event notification
  3. loop to retry to get the lock

AcquireLocked() is a textbook implementation of read/write locking.

  • If the lock corresponding to the Key is not occupied, if there is no accident, the lock information will be saved in the stripe and txn_ids will be configured as the current transaction ID.
    • The accident here means that there is a configuration max_num_locks_ in rocksdb to limit the total number of locks, if the upper limit is exceeded, then locking fails and returns Status::Busy
  • If the lock corresponding to the Key is already occupied
    • If neither the occupied lock nor the lock to be acquired is mutually exclusive, multiple readers are allowed to share the lock, and the current transaction ID is appended to the txn_ids of lock_info in stripe to indicate successful read lock acquisition.
    • If either the occupied lock or the lock to be acquired has a mutually exclusive flag, the lock should fail to be acquired and return Status::TimedOut if there is no accident, and also return txn_ids to indicate to the caller the list of transactions holding the lock to assist in deadlock detection, but there are two special cases.
      • Recursive lock: if the lock is held by the current transaction, the lock’s mutex marker is overwritten and the lock is acquired successfully.
      • Lock timeout: If the occupied lock happens to time out, the lock can be grabbed

The code of AcquireLocked() is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// Try to lock this key after we have acquired the mutex.
// Sets *expire_time to the expiration time in microseconds
//  or 0 if no expiration.
// REQUIRED:  Stripe mutex must be held.
Status PointLockManager::AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
                                       const std::string& key, Env* env,
                                       LockInfo&& txn_lock_info,
                                       uint64_t* expire_time,
                                       autovector<TransactionID>* txn_ids) {
  assert(txn_lock_info.txn_ids.size() == 1);

  Status result;
  // Check if this key is already locked
  auto stripe_iter = stripe->keys.find(key);
  if (stripe_iter != stripe->keys.end()) {
    // Lock already held
    LockInfo& lock_info = stripe_iter->second;
    assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);

    if (lock_info.exclusive || txn_lock_info.exclusive) {
      if (lock_info.txn_ids.size() == 1 &&
          lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
        // The list contains one txn and we're it, so just take it.
        lock_info.exclusive = txn_lock_info.exclusive;
        lock_info.expiration_time = txn_lock_info.expiration_time;
      } else {
        // Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
        // it's there for a shared lock with multiple holders which was not
        // caught in the first case.
        if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
                          expire_time)) {
          // lock is expired, can steal it
          lock_info.txn_ids = txn_lock_info.txn_ids;
          lock_info.exclusive = txn_lock_info.exclusive;
          lock_info.expiration_time = txn_lock_info.expiration_time;
          // lock_cnt does not change
        } else {
          result = Status::TimedOut(Status::SubCode::kLockTimeout);
          *txn_ids = lock_info.txn_ids;
        }
      }
    } else {
      // We are requesting shared access to a shared lock, so just grant it.
      lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
      // Using std::max means that expiration time never goes down even when
      // a transaction is removed from the list. The correct solution would be
      // to track expiry for every transaction, but this would also work for
      // now.
      lock_info.expiration_time =
          std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
    }
  } else {  // Lock not held.
    // Check lock limit
    if (max_num_locks_ > 0 &&
        lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {
      result = Status::Busy(Status::SubCode::kLockLimit);
    } else {
      // acquire lock
      stripe->keys.emplace(key, std::move(txn_lock_info));

      // Maintain lock count if there is a limit on the number of locks
      if (max_num_locks_) {
        lock_map->lock_cnt++;
      }
    }
  }

  return result;
}

A final look at the unlocking section.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
void PointLockManager::UnLock(PessimisticTransaction* txn,
                              ColumnFamilyId column_family_id,
                              const std::string& key, Env* env) {
  std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
  LockMap* lock_map = lock_map_ptr.get();
  if (lock_map == nullptr) {
    // Column Family must have been dropped.
    return;
  }

  // Lock the mutex for the stripe that this key hashes to
  size_t stripe_num = lock_map->GetStripe(key);
  assert(lock_map->lock_map_stripes_.size() > stripe_num);
  LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);

  stripe->stripe_mutex->Lock().PermitUncheckedError();
  UnLockKey(txn, key, stripe, lock_map, env);
  stripe->stripe_mutex->UnLock();

  // Signal waiting threads to retry locking
  stripe->stripe_cv->NotifyAll();
}

where UnLockKey() is a wrapper for removing the current transaction ID from the LockInfo in stripe or deleting the entire LockInfo.

Wake Lock Waiting Here is a very violent stripe_cv→NotifyAll(), in the form of a swarm to wake up all the players waiting to get a lock, but only one player can successfully get stripe_mutex.

Pessimistic concurrency control: deadlock detection

What deadlock detection does is to track lock dependencies between transactions, determine if there is a ring in it by BFS traversal, and prevent such ringed locking operations in advance. deadlock_detect is off by default in rocksdb, and when active deadlock detection is turned off, it can still be recovered from deadlocks by the lock timeout mechanism.

Deadlock detection occurs in the AcquireWithTimeout function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
      // We are dependent on a transaction to finish, so perform deadlock
      // detection.
      if (wait_ids.size() != 0) {
        if (txn->IsDeadlockDetect()) {
          if (IncrementWaiters(txn, wait_ids, key, column_family_id,
                               lock_info.exclusive, env)) {
            result = Status::Busy(Status::SubCode::kDeadlock);
            stripe->stripe_mutex->UnLock();
            return result;
          }
        }
        txn->SetWaitingTxn(wait_ids, column_family_id, &key);
      }

A single call to AcquireLocked will return a list of wait_ids waiting for a lock when the lock fails. wait_ids This part of the information is used as the basis for tracking the lock dependency graph.

The fields related to deadlock detection are.

1
2
3
4
5
6
7
8
  // Must be held when modifying wait_txn_map_ and rev_wait_txn_map_.
  std::mutex wait_txn_map_mutex_;

  // Maps from waitee -> number of waiters.
  HashMap<TransactionID, int> rev_wait_txn_map_;
  // Maps from waiter -> waitee.
  HashMap<TransactionID, TrackedTrxInfo> wait_txn_map_;
  DeadlockInfoBuffer dlock_buffer_

The rev_wait_txn_map_ seems to be used for pruning, tracking the number of waiters for each transaction ID, if the number of waiters is 0, then there must be no deadlock dependency, so there is no need to traverse the map later. On the other hand, if the number of waiters > 1, there is not necessarily a deadlock dependency, so we still need to traverse the graph to know.

The wait_txn_map_ field is the target of BFS traversal. In IncrementWaiters, wait_txn_map_ is traversed from the wait_ids of the current transaction, and if the ID of the current transaction is traversed, the ring is considered to exist.

The logic of the BFS part is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
    int i = 0;
    if (next_ids) {
      for (; i < static_cast<int>(next_ids->size()) &&
             tail + i < txn->GetDeadlockDetectDepth();
           i++) {
        queue_values[tail + i] = (*next_ids)[i];
        queue_parents[tail + i] = parent;
      }
      tail += i;
    }

    // No more items in the list, meaning no deadlock.
    if (tail == head) {
      return false;
    }

    auto next = queue_values[head];
    if (next == id) {
      // 有死锁存在,结合 queue_parents 中的路径信息,记录到 dlock_buffer_ 中
      return true;
    } else if (!wait_txn_map_.Contains(next)) {
      next_ids = nullptr;
      continue;
    } else {
      parent = head;
      next_ids = &(wait_txn_map_.Get(next).m_neighbors);
    }
  }

The BFS queue uses two arrays of maximum length deadlock_detect_depth_ queue_values[] and two subscripts head and tail, tail represents the tail of the queue, head represents the head, and the traversal ends when head catches up with tail. When a deadlock dependency is found, the path to the deadlock is recorded in the dlock_buffer_ based on the information in queue_parents[] to aid in the diagnosis.

Summary

  • In OptimisticTransaction, rocksdb directly uses MemTable to get the latest Sequence number of the Key, which is used for conflict detection during Commit to determine whether the Key has been written by other transactions during the transaction.
  • In PessimisticTransactionDB, rocksdb implements row lock semantics based on CondVar and mutex in LockManager, each Key corresponds to a row lock, and waits for CondVar notification if there is a lock conflict when obtaining a row lock.
  • Deadlock conflict is a BFS of lock dependency graph to find whether there is a ring between lock dependencies, if there is, it is considered that there will be a deadlock, so it will prevent locking in advance. rocksdb does not enable deadlock detection by default, if there is a deadlock, it can still recover by lock timeout.