badger is dgraph’s open source LSMTree KV engine, which has KV separation, transaction, concurrent merge and other enhancements compared to leveldb, and is a more production-level storage engine in the go ecosystem. Here is a look at its transaction implementation.

badger implements Serializable Snapshot isolation level (SSI) for optimistic concurrency-controlled transactions. Compared to Snapshot isolation level (SI), SSI tracks read operations in a transaction in addition to write operations for conflict detection. Conflict checking is performed at Commit time, and data read by the current transaction that has been modified by another transaction during the execution of the transaction will fail to commit.

image

Lifecycle of a transaction

The life cycle of an optimistic concurrency control transaction is roughly divided into four segments, granting, tracking reads and writes, committing, and cleaning up.

  • Transaction start: get the timing at the start of the transaction
  • Transaction process: track the key involved in the transaction’s read and write operations, read operations during the transaction according to the snapshot at the time of the start, the transaction’s write content is temporarily stored in memory
  • Transaction commit: detects conflicts based on the keys tracked in the transaction, and obtains the timing of the transaction commit time to make the writes take effect
  • Clean up the old transaction: When the active transaction is completed, the transaction-related data such as snapshot data and conflict detection data that are no longer needed can be released.

To manage the lifecycle of a transaction, two parts of meta information need to be recorded separately for each transaction and at the global level.

  • At the transaction level, you need to record the list of keys you read and write, as well as the start and commit timestamps of the transaction, which are maintained in the Txn structure.
  • At the global level, you need to manage the global timestamp and the list of recently committed transactions, which is used for conflict checking the range of transactions committed in the middle of the transaction start and commit timestamps in a new transaction commit, and even the minimum timestamp of currently active transactions, which is used for cleaning up the old transaction information, which is maintained in the oracle structure.

The timestamp obtained here is not a physical time, but a logical one: all data changes come from the moment the transaction is committed, so the timestamp is incremented only when the transaction is committed.

image

Take the above diagram as an example, transaction 4 needs to perform conflict detection with transaction 3 and transaction 1 at commit time, because the commit time of transaction 3 and transaction 1 is between the start and commit of transaction 4, and the keys written by transaction 3 and transaction 1 are considered to be in conflict if they overlap with the list of keys read and written by transaction 4.

Next, let’s go through the four lifecycles and go through the processes involved in badger.

Transaction start

The entry point for starting a new transaction is the db.newTransaction() function. This function is relatively simple, except for initializing a few fields, the only part that has behavioral semantics is the line txn.readTs = db.orc.readTs() where the request is granted. The entry point for starting a new transaction is the db.newTransaction() function. This function is relatively simple, except for initializing a few fields, the only part with behavioral semantics is the line txn.readTs = db.orc.readTs() where the time request is made. The entry point for starting a new transaction is the db.newTransaction() function. This function is relatively simple, except for initializing a few fields, the only part with behavioral semantics is the line txn.readTs = db.orc.readTs() where the time request is made.

Take a look at the implementation of the readTs() function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (o *oracle) readTs() uint64 {
	// 忽略 isManaged 部分逻辑

	var readTs uint64
	o.Lock()
	readTs = o.nextTxnTs - 1
	o.readMark.Begin(readTs)
	o.Unlock()

	// Wait for all txns which have no conflicts, have been assigned a commit
	// timestamp and are going through the write to value log and LSM tree
	// process. Not waiting here could mean that some txns which have been
	// committed would not be read.
	y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
	return readTs
}

The logic of the grant is simple, just copy the current timestamp recorded in the nextTxnTs field from the oracle object.

Here is a detail, as mentioned earlier the incremental timestamp occurs at the commit of the transaction, there will be a time window when the timestamp has been incremented but the write has not yet fallen, so if the transaction starts at this time, it will read the old data instead of the post-timestamp snapshot. The solution is to wait for the transaction with the current timestamp to finish writing before starting the transaction.

image

The txnMark field is a WaterMark structure type that internally maintains a heap data structure that can be used to track notifications of changes to the timestamp segment of a transaction.

In addition to waiting for the current timestamp-related transaction to complete writing based on txnMark, the readTs function has a line o.readMark.Begin(readTs). readMark is a WaterMark structure like txnMark, but it does not take advantage of the WaterMark structure’s ability to wait for point bits, only its s heap data structure to keep track of the timestamp range of currently active transactions, which is used to find out which transactions are ready to expire for recycling.

Transaction Execution

During transaction execution, writes are temporarily stored in the pendingWrites buffer in memory. managed mode, if the same key is written to multiple times in a transaction, the historical version of the data inserted in the transaction is stored in the duplicateWrites buffer, ignoring the duplicateWrites field for now.

The read operation during the transaction will first read the pendingWrites buffer, and then read the data in the LSM Tree. badger inherits the idea of iterator combination from leveldb, encapsulating the read link of pendingWrites as an Iterator, and working with MemTableIterator, TableIterator and other Iterators. TableIterator and other Iterators are combined into the final Iterator by MergeIterator.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
// Using prefetch is recommended if you're doing a long running iteration, for performance.
//
// Multiple Iterators:
// For a read-only txn, multiple iterators can be running simultaneously.  However, for a read-write
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
// iterator was created. If writes are performed after an iterator is created, then that iterator
// will not be able to see those writes. Only writes performed before an iterator was created can be
// viewed.
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
	if txn.discarded {
		panic("Transaction has already been discarded")
	}
	if txn.db.IsClosed() {
		panic(ErrDBClosed.Error())
	}

	// Keep track of the number of active iterators.
	atomic.AddInt32(&txn.numIterators, 1)

	// TODO: If Prefix is set, only pick those memtables which have keys with
	// the prefix.
	tables, decr := txn.db.getMemTables()
	defer decr()
	txn.db.vlog.incrIteratorCount()
	var iters []y.Iterator
	if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
		iters = append(iters, itr)
	}
	for i := 0; i < len(tables); i++ {
		iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
	}
	iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
	res := &Iterator{
		txn:    txn,
		iitr:   table.NewMergeIterator(iters, opt.Reverse),
		opt:    opt,
		readTs: txn.readTs,
	}
	return res
}

The badger stores commitTs as a suffix of the key in the LSM Tree, and the Iterator is also timestamp-aware in its iterations, iterating over the snapshot data at the time of readTs. This is consistent with the sequence number of leveldb and the iterative behavior of Snapshot.

Transaction Commit

The commit entry point for the transaction is the Commit() function, which calls the commitAndSend() function, which is the bulk of the logic. The general process consists of.

  1. perform transaction conflict detection via orc.newCommitTs(txn), and if there is no conflict, get the grant commitTs
  2. loop to bind commitTs to the version of the Entry in pendingWrites and duplicateWrites, and make the stored key bind commitTs
  3. call txn.db.sendToWriteCh(entries) to put the write buffer into the drop write
  4. wait for the completion of the drop, then notify orc.doneCommit(commitTs) to move the txnMark point

newCommitTs internally initiates conflict detection and expired transaction cleanup and causes transactions to be tracked to commitedTxns.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (o *oracle) newCommitTs(txn *Txn) uint64 {
	o.Lock()
	defer o.Unlock()

	if o.hasConflict(txn) {
		return 0
	}

	var ts uint64
	o.doneRead(txn)
	o.cleanupCommittedTransactions()

	// This is the general case, when user doesn't specify the read and commit ts.
	ts = o.nextTxnTs
	o.nextTxnTs++
	o.txnMark.Begin(ts)

	y.AssertTrue(ts >= o.lastCleanupTs)

	if o.detectConflicts {
		// We should ensure that txns are not added to o.committedTxns slice when
		// conflict detection is disabled otherwise this slice would keep growing.
		o.committedTxns = append(o.committedTxns, committedTxn{
			ts:           ts,
			conflictKeys: txn.conflictKeys,
		})
	}

	return ts
}

The logic of conflict detection is simple: it iterate through committedTxns, find out which transactions have been committed since the start of the current transaction, and determine whether the key you read exists in the write list of other transactions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  // hasConflict must be called while having a lock.
  func (o *oracle) hasConflict(txn *Txn) bool {
      if len(txn.reads) == 0 {
          return false
      }
      for _, committedTxn := range o.committedTxns {
          // If the committedTxn.ts is less than txn.readTs that implies that the
          // committedTxn finished before the current transaction started.
          // We don't need to check for conflict in that case.
          // This change assumes linearizability. Lack of linearizability could
          // cause the read ts of a new txn to be lower than the commit ts of
          // a txn before it (@mrjn).
          if committedTxn.ts <= txn.readTs {
              continue
          }

          for _, ro := range txn.reads {
              if _, has := committedTxn.conflictKeys[ro]; has {
                  return true
              }
          }
      }

      return false
  }

Transaction cleanup

The committedTxns array, which records information about recent committed transactions, obviously cannot grow indefinitely. So when can the committedTxns array be cleaned up? The criterion is the start timestamp of the earliest active transaction. If the commit timestamp of a historical transaction is earlier than the start timestamp of the currently active transaction, it is not considered for conflict checking and can be recycled in committedTxns.

image

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
	if !o.detectConflicts {
		// When detectConflicts is set to false, we do not store any
		// committedTxns and so there's nothing to clean up.
		return
	}
	// Same logic as discardAtOrBelow but unlocked
	var maxReadTs uint64
	if o.isManaged {
		maxReadTs = o.discardTs
	} else {
		maxReadTs = o.readMark.DoneUntil() // 在 readMark 堆中获取当前活跃事务的最早 readTs
	}

	y.AssertTrue(maxReadTs >= o.lastCleanupTs)

	// do not run clean up if the maxReadTs (read timestamp of the
	// oldest transaction that is still in flight) has not increased
	if maxReadTs == o.lastCleanupTs {
		return
	}
	o.lastCleanupTs = maxReadTs

	tmp := o.committedTxns[:0]
	for _, txn := range o.committedTxns {
		if txn.ts <= maxReadTs {
			continue
		}
		tmp = append(tmp, txn)
	}
	o.committedTxns = tmp
}

oracle will record lastCleanupTs to record the timestamp of the last cleanup to avoid unnecessary cleanup operations.

Summary

  • The transaction-related structures in badger include Txn and oracle. Txn internal information is mainly the start timestamp, commit timestamp, and the list of read and write keys. oracle is equivalent to the transaction manager, which maintains the list of recently committed transactions, the global timestamp, and the earliest timestamp of currently active transactions.
  • Transaction timestamps are logical timestamps that increment by 1 each time a transaction commits.
  • The logic of conflict detection in SSI transactions is to find the list of transactions that committed during the execution of the current transaction and check if the key list read by the current transaction overlaps with the key list written by those transactions.
  • The internal WaterMark structure is a heap, which is used to manage and find the beginning and end segments of the transaction. oracle’s txnMarker is mainly used to coordinate the time window for waiting for the Commit grant and the drop, and the readMarker manages the earliest timestamp of the currently active transaction, which is used to clean up the expired committedTxns.