In distributed databases allowing distributed nodes of the database to have copy backups through replication is mainly for the following purposes:

  1. Scalability, because the amount of data carried by a single machine is limited.
  2. Fault tolerance and high availability, in a distributed system, single-machine failure is the norm, more redundancy in the event of single-machine failure other machines can take over in a timely manner.
  3. Performance, if the user access to the situation of cross-region, can be deployed through the multi-location near the access to reduce the time delay.

So how to replicate? There are many replication algorithms inside the database, single leader replication, multi-leader replication, leaderless replication.

But for large amounts of data with high concurrency, just replication is not enough, so the introduction of partitioning. Partitioning allows each partition to be its own small database, each piece of data belongs to and only belongs to a partition, so that the database can support multiple partitions can be operated in parallel, allowing the database to support very high throughput and data volume.

Single Leader Replication

Synchronous or asynchronous? Semi-synchronous

Synchronous or Asynchronous actually refers to whether or not the distributed node’s data is consistent with the master node. Synchronous replication is where the master needs to wait for an acknowledgement from a slave before returning, while asynchronous returns without waiting for a response from that slave.

The advantage of synchronous replication is that the slave is guaranteed to have an up-to-date copy of the data that matches that of the master. If the master suddenly fails, we can be sure that this data will still be available on the slave. The disadvantage is that if the synchronous slave is not responding (for example, it has crashed, or there is a network failure, or any other reason), the master cannot process write operations. The master must block all writes and wait for the synchronised copy to become available again.

Asynchronous replication is not blocking though, replica nodes have different replication capabilities between them. It is possible that this could lead to inconsistent data being read from different replica nodes by different users at the same time.

To avoid this, one way is to have clients read only from the master replica, so that under normal circumstances, all clients must read the same data; another is to use semi-synchronous, for example, set up a slave to be synchronous with the master, and the other libraries to be asynchronous. If that synchronous slave becomes unavailable or slow, change one of the asynchronous slaves to run synchronously. This ensures that you have up-to-date copies of your data on at least two nodes.

Take kafka for example. In kafka, the leader maintains a list of ISRs (In-Sync Replication), and there is a delay for a follower to sync data from the leader (the timeout threshold is set by the parameter), and follower that exceeds the threshold will be excluded from the ISRs.

A message is not committed until it has been successfully replicated to all ISRs. This ensures performance and data availability without waiting for confirmation from all nodes.


The goal of high availability is to keep the entire system running even if individual nodes fail, and to contain the impact of node downtime as much as possible.

Slave node failure - quick recovery

If the slave crashes and restarts, or, if the network between the master and the slave is temporarily down, it is easier to recover: the slave can tell from the logs the last transaction processed before the failure, and then just proceed with that transaction.

Let’s look at how Redis master-slave synchronisation is recovered. The master records instructions that have a modifying effect on its state in a local memory buffer, and then asynchronously synchronises the instructions in the buffer to the slave node, which executes the synchronised stream of instructions to achieve the same state as the master while providing feedback to the master on where it is synchronised (at an offset).

If the slave node is unable to synchronise with the master node for a short period of time, then when the network condition recovers, the unsynchronised instructions in the buffer of the master node may have been overwritten by subsequent instructions, and then it is necessary to use snapshot synchronisation.

Redis Master-Slave Sync

It first needs to perform a bgsave on the master to snapshot all the data in the current memory to a disk file, and then transfer all the contents of the snapshot file to the slave node. After the slave node accepts the snapshot file, it immediately performs a full load, and clears the data in the current memory before loading. The master node is notified to continue the incremental synchronisation when the loading is complete.

Master-Slave Switchover

Master Node Failover One of the slaves needs to be promoted to a new master, the clients need to be reconfigured to send their write operations to the new master, and the other slaves need to start pulling data changes from the new master.

Failover of a master library usually consists of a few steps: 1. confirm that the master library is failing; 2. select a new master library; 3. configure to enable the new master library. Seemingly simple steps in fact there may be many places where things can go wrong:

  1. If asynchronous replication is used, the new master may not have received the last write operation before the old master went down. After electing a new master, if the old master rejoins the cluster, the new master may receive conflicting writes in the meantime, so what to do with those writes? The most common solution is to simply discard the unreplicated writes from the old master, which is likely to break customer expectations for data persistence, and discarding writes is an extremely risky operation if the database needs to be coordinated with other external storage;
  2. There may be a situation where both nodes think they are the master node, also known as brain cracking. If both master nodes can accept write operations without a conflict resolution mechanism, data can be lost or corrupted. Some systems take the safety precaution of shutting down one of the two master node nodes when they are detected to exist at the same time, but a poorly designed mechanism may end up shutting down both nodes;
  3. How should the timeout time be configured? Longer means longer recovery times as well, and too short may lead to unnecessary failover. Temporary load spikes can cause nodes to time out, and if the system is already suffering from high load or network problems, then unnecessary failovers can make the situation worse.

Let’s look at how the Redis Sentinel cluster solves these problems; Redis Sentinel is responsible for continuously monitoring the health of the master and slave nodes, and when the master node goes down, it automatically chooses an optimal slave to switch to as the master node. When a client connects to the cluster, it first connects to the sentinel, queries the address of the master node through the sentinel, and then connects to the master node for data interaction. When the master node fails, the client will ask the sentinel for the address again, and the sentinel will tell the client the latest address of the master node.

How Redis Sentinel clusters work

Since the master node is elected by Redis Sentinel through voting, we can set the sentinel to be more than 3 nodes and odd, and set the election quorum to be (n/2+1), then every time the master is elected, more than half of the nodes need to agree in order to pass, and the minority obeys the majority, so that there won’t be two nodes with the same number of votes to be elected. There will not be two leaders with the same number of votes elected at the same time.

Redis master and slave use asynchronous replication, when the master node is down, the slave node may not receive all the synchronous messages, Sentinel can not guarantee that the message is not lost, but also to ensure that as little as possible. There are two configurations:

min-slaves-to-write 1
min-slaves-max-lag 10

min-slaves-to-write Indicates that the master node must have at least one slave node that is replicating normally, otherwise it stops the external write service and loses availability;

min-slaves-max-lag indicates how many seconds no feedback is received from the slave node, then the master will not accept any request. We can reduce the value of the min-slaves-max-lag parameter to avoid massive data loss in the event of a failure.

How should replication logging be implemented?

The simplest idea would probably be SQL statement based replication, where each INSERT, UPDATE or DELETE statement is forwarded to each slave library as if it were received directly from the client. But this can be problematic:

  1. What if a statement uses a function such as NOW() or RAND()? What about user-defined functions, triggers, and stored procedures;
  2. If self-incrementing ids are used, or depend on existing data in the database (e.g., UPDATE ... WHERE <some condition>), execute them in exactly the same order on each replica, which might otherwise produce different results. This can become a limitation when there are multiple concurrently executing transactions;

Statement-based replication was used in MySQL before version 5.1. However, by default, MySQL now switches to row-based replication if there is any uncertainty in the statement.

Based on pre-written log (WAL) replication, WAL is before any data changes (updates, deletions, etc.), these changes are written to the log, so the log slave can build a copy of the data structure exactly the same as the master. The disadvantage is that it is tightly coupled with the storage engine, so if a change in the version of the database modifies the format of the log, this will make replication impossible.

So WAL replication is hard on Ops, and such upgrades require downtime if the replication protocol doesn’t allow version mismatches.

There is also row-based logical log replication, which is a method used by MySQL’s binlog to represent the sequence of row operations written to a record in a relational database. Because the logical log is decoupled from the internal implementation of the storage engine, the system can more easily be made backward compatible, allowing the master and slave to run different versions of the database software, or even different storage engines.

The problem of consistency in reading after writing

Reading Your Own Writes

Many applications let users submit some data and then view what they have submitted. It might be a record in the user’s database, a comment on a discussion thread, or something else along those lines. When submitting new data, it must be sent to the master node, but when the user views the data, it can be read through the slave node. This is great if the data is viewed frequently, but only written to occasionally.

However, with asynchronous replication, if a user views the data right after a write, the new data may not have reached the replica yet. To the user, it looks as if the data that was just committed was lost.

read-after-write consistency

In this case, we need read-after-write consistency, also called read-your-writes consistency. This is a guarantee that if a user reloads the page, they will always see any updates they submitted themselves.

Write-after-read consistency in a leader-based replication system:

  • Always read from the master for anything that a user may have modified; this requires a way to know if a user has modified something without actually querying.
  • If most of the content in the application is likely to be edited by the user, other criteria can be used in this case to decide whether to read from the master node or not. For example the time of the last update can be tracked and read from the master node within a minute of the last update.

The client can remember the timestamp of the most recent write, and the system needs to make sure that any changes prior to that timestamp have been propagated to this slave by the time the slave processes that user’s read request. If the current slave is not new enough, it can either read from another slave or wait for the slave to catch up. The timestamp here can be a logical timestamp (something that indicates the order of writes, such as a log sequence number) or the actual system clock.

There is additional complexity if your replicas are spread across multiple data centres (to be geographically close to users or for availability purposes). Any requests that need to be serviced by the master node must be routed to the data centre containing that master node.

Another complication occurs when the same user requests a service from multiple devices (e.g., a desktop browser and a mobile APP). In this case it may be necessary to provide cross-device write-after-read consistency: if a user enters some information on one device and then views it on another, they should see the information they just entered.

In this context, there are a number of other issues that need to be considered:

  • The method of remembering the user’s last update timestamp becomes more difficult because a programme running on one device doesn’t know what’s happening on another device. There is a need for centralised storage of this metadata.
  • If replicas are distributed across different data centres, it is difficult to ensure that connections from different devices are routed to the same data centre. If your method needs to read the master node, it may first need to route requests from all devices for that user to the same data centre.

Monotonic Reads

Moving backward in time can occur if a user does multiple reads from different slave nodes. For example, if user 2345 performs the same query twice, first from a slave with a small latency, then from a slave with a larger latency, the first query returns the most recent comment added by user 1234, but the second query doesn’t return anything because the lagging slave hasn’t pulled that write yet. User 2345 sees user 1234’s comment first and then sees it disappear, which can get very confusing.

Turning Back Time in Monotonic Reads

Monotonic reads are a weaker guarantee than strong consistency, but stronger than eventual consistency. When reading data, you may see an old value; monotonic reads simply mean that if a user does multiple reads sequentially, they won’t see a time regression, i.e., if newer data has been read, subsequent reads won’t get older data.

One way to achieve monotonic reads is to ensure that each user always reads from the same replica, for example by selecting a replica based on a hash of the user’s ID.

Multi-Master Replication

Say you have a database with replicas spread across several different data centres (possibly to tolerate failures in a single data centre, or to be geographically closer to users). It is possible to have master nodes in each data centre in a multi-master configuration.

Multi-Master Replication

In the operation and maintenance of multiple data centres multi-master is a lot of advantages, such as: in a multi-master configuration, each write operation can be processed close to the local data centre, the performance will be better; each data centre can continue to run independently of the other data centres, the failure of the other data centre will not lead to global paralysis;

Multi-master configurations also have a major disadvantage: two different data centres may modify the same data at the same time, and write conflicts must be resolved.

Handling write conflicts

Avoiding Conflicts

The simplest strategy for dealing with conflicts is to avoid them, and if an application can ensure that all writes for a particular record go through the same master node, then conflicts will not occur. For example, in an application where users can edit their own data, it is possible to ensure that requests from a particular user are always routed to the same datacenter and reads and writes are performed using that datacenter’s master node. Different users may have different “master” datacentres.

But it is also possible that you need to re-route traffic to another data centre because of a data centre failure, in which case conflict avoidance will fail and you will have to deal with the possibility of simultaneous writes from different master nodes.

Convergence to a Consistent State

In a multi-master configuration, since there is no explicit order of writes, if each replica just writes in the order it sees writes, the database will end up in an inconsistent state. So there needs to be a way to converge to an identical final value when all change replicas complete.

For example, each replica could be assigned a unique ID, and writes with a higher ID number would have higher priority, but this approach would also mean data loss.

Users take matters into their own hands

It’s not uncommon to give this operation directly to the user and let them do their own conflict resolution before reading or writing, and Github uses this approach.

Masterless replication

In some implementations of masterless replication, the client sends writes directly to several replicas, while in other cases a coordinator node does the writing on behalf of the client.

In a masterless configuration, there is no failover. Suppose that the client (user 1234) sends writes to all three replicas in parallel, and that two of the available replicas accept the write, but the unavailable replicas miss it. It is sufficient to assume that two of the three replicas acknowledge the write: after user 1234 has received two OK responses, we consider the write successful.

Masterless replication

Because it is possible that a node may not write successfully, when a client reads data from the database, it sends its request not just to one replica, but to multiple replicas in parallel. The client may get different responses from different replicas and then use the version number to determine which value is the most recent.

So how does an unavailable node catch up on the writes it missed once it’s back online? Generally speaking, there are two ways:

  1. Read repair, when a client reads multiple nodes in parallel and finds that some replicas have old values, it writes the new values back to that replica.
  2. Anti-entropy process, which uses a background process to constantly look for data differences between replicas and copy any missing data from one replica to another.

In the above example, the write is successful if it is processed on two of the three replicas. So how many replicas in a cluster with multiple replicas does it take for a write to be successful?

In general, if there are n replicas, each write must be acknowledged by w nodes to be considered successful, and we must query at least r nodes for each read. As long as w + r > n, we can expect to get the latest value at read time, and r and w are the minimum number of tickets required for a valid read or write.

However, quorums (as described so far) are not as fault-tolerant as they might be. Network outages can easily cut clients off from a large number of database nodes. While these nodes are alive, and other clients may be able to connect to them, they may also be dead in terms of clients cut off from database nodes. In this case, there may be fewer than w or r available nodes remaining, so clients can no longer reach quorum.

Detecting Concurrent Writes

Due to variable network latency and failure of some nodes, events may arrive at different nodes in different order. Then there will be data inconsistencies, for example, the following figure shows two clients A and B writing simultaneously to key X in a three-node datastore:

  • Node 1 receives a write from A, but does not receive a write from B due to a temporary interruption.
  • Node 2 receives writes from A first, then from B. * Node 3 receives writes from B first.
  • Node 3 receives writes from B first, then writes from A. * Node 2 receives writes from A first, then writes from A.

If each node simply overwrites a key value whenever it receives a write request from a client, the nodes will be permanently inconsistent.

Detecting Concurrent Writes

One solution is Last write wins (discarding concurrent writes), which means that only the “most recent” values need to be stored, and allows “older” values to be overwritten and discarded. values and allow “older” values to be overwritten and discarded. The implementation could actually append a timestamp to each write, then pick the largest timestamp as the “most recent” and discard any writes with earlier timestamps.


Partitions are often used in conjunction with replication, allowing copies of each partition to be stored on multiple nodes, and a single node may also store multiple partitions. Each partition leader (master) is assigned to one node, and followers (slaves) are assigned to other nodes. Each node may be a master node for some partitions and a slave node for other partitions.

I’ll use TiKV, TiDB’s storage, as an example. TiKV’s data is stored in Regions, and a Region is a partition. When the size of a Region exceeds a certain limit (the default is 144MB), TiKV will split it into two or more Regions to ensure that the size of each Region is roughly the same, similarly, when the size of a Region becomes smaller due to a large number of deletion requests, TiKV will merge the two smaller neighbouring Similarly, when a Region becomes smaller due to a large number of deletion requests, TiKV will merge the two smaller neighbouring Regions into one.

After dividing the data into Regions, TiKV will try to ensure that the number of Regions served on each node is almost the same, and do the Raft replication and membership management with the Region as the unit. That is, as you can see in the figure below, different node nodes actually have a copy of the Region.

TiKV Regions

Division of partitions

Assuming you have a large amount of data and want to partition it, how do you decide which records to store on which nodes? The goal of partitioning is to distribute the data and query load evenly across the nodes. If the partitioning is unfair and some partitions have more data or queries than others, we call this skew. The imbalance leads to loads that may be inconsistent across nodes, and partitions with high loads are called hot spots, which loses the load-balancing property of partitioning, which is something to be avoided.

In fact the easiest way is to randomly assign records to nodes. This distributes the data evenly across all nodes, but it has one big drawback: when you try to read a particular value, you have no way of knowing which node it’s on, so you have to query all the nodes in parallel, which is obviously unscientific.

So now there are two more typical schemes:

  • Range: according to the Key Range, a section of continuous Key are saved in a storage node.
  • Hash: Hash according to the Key, and select the corresponding storage node according to the Hash value.

Partitioning by Range of Keys

One way to partition then, without randomisation, is to specify a continuous block of key ranges (from minimum to maximum) for each partition. But the range of keys is not necessarily evenly distributed, because the data is also likely to be unevenly distributed. In order to distribute the data evenly, the partition boundaries need to be adjusted based on the data.

Partition boundaries can be selected manually by the administrator or automatically by the database, and then in each partition we can save the keys in a certain order.

For TiDB, it is also partitioned according to the range of keys, each partition is called Region, because it is ordered, so it can be described by a left-closed-right-open interval like [StartKey, EndKey). Then it is based on the size of the Region to adjust the boundary, by default a Region in 96 MiB will create a new Region.

Tidb node

Another problem with partitioning based on ranges is that specific access patterns can lead to hotspots, if the primary key is a timestamp and a partition is assigned for each day, the partition for that day’s data may be overloaded with writes, so you need to use something other than the timestamp as the first part of the primary key.

Partitioning by Hash of Keys

Partitioning by hash is the process of using the hash function to return a “random” number between 0 and 2^32-1 whenever a new string is entered. Even if the input strings are very similar, their hashes will be evenly distributed over this range of numbers. A hash range is then assigned to each partition, and each key that falls within the partition range by hashing the hash will be stored in that partition.

But this also has the disadvantage of losing the ability to efficiently execute scoped queries, so for relational databases, because of the frequent need for table scans or index scans, basically use a scoped sharding strategy, like the nosql database redis cluster uses a hash partition.

Partition rebalancing

Fixed number of partitions

We’ve talked about partitioning based on the hash of a key, but how do we store the hash value in a partition? A simple idea is to make the number of partitions equal to the number of nodes in the machine, and take the modulus of the number of partitions, i.e. hash(key) % number of partitions. So what if you want to add a new partition? How do you migrate data from one partition to another? The cost of such data migration would actually be very high.

This can be done in another way: create more partitions than nodes and assign multiple partitions to each node. For example, a database running on a 10-node cluster might be split into 1,000 partitions from the start, so there are about 100 partitions assigned to each node. If a node is added to the cluster, the new node can steal some some partitions from each current node until the partitions are fairly distributed again.

Only partitions are moved between nodes. The number of partitions does not change, nor does the partition assigned by the key. The only thing that changes is the node on which the partition resides.

The redis cluster uses this approach. The Redis Cluster defines a total of 16384 slots for the entire cluster, the slots are partitions, and each node is responsible for a portion of the slot. then the key locates the slot based on the results of the crc16 computation and the 16384 modulo to locate the specific node.

Fixed number of partitions

Dynamic Partitioning

For databases that partition according to the Range of keys, a fixed number of partitions with fixed boundaries will be very inconvenient, so generally when the partition grows to more than the configured size of the automatic partitioning, I have also talked about TiDB it is based on the size of the partition to adjust the boundaries of a partition, the default a partition in the 96MB will be created when the new partition.

And in addition to splitting can also be merged, TiDB will be based on the size of the partition and the number of keys, the default is less than 20MB and the number of keys less than 200000 will trigger a merge.


The last thing to say is routing. How does the client know which partition the data I want to check is in. Generally speaking, there are three ways to do this:

  1. let the client connect to any node, if the data happens to be on this node, then directly query; if the data is not on this node, this node will forward the request to the node where the data is located, this kind of cluster is decentralised, Redis Cluster is this kind of architecture, the node to node through the gossip protocol to interact with the information;
  2. the client first connects to the routing layer of the cluster, which knows which node in which partition the requested data is located. TiDB is this kind of architecture, and the PD of the TiDB cluster is responsible for managing the routing information of all the partitions. when TiDB wants to access the data of a certain partition, it needs to query the PD for the status and location of this partition;
  3. The last way is that the client saves the partition and node information itself, so that the client can directly query the data it wants and return it.



This article mainly summarises some of the main technical details of replication and partitioning points, discusses the similarities and differences between the various implementations, as well as possible problems, and then combined with the current mainstream databases to explain how to apply the landing.

Both replication and partitioning are centred around the aspects of availability, performance and scalability. For replication there are mainly single-master replication, multi-master replication, and masterless replication.

Currently the most popular or single-master replication, many distributed databases are single-master replication + partitioning architecture to provide support for large throughput, and single-master replication does not need to worry about conflict resolution, the implementation of a simpler. However, single-master replication may also cause data loss because of asynchronous replication leader downtime, so many are semi-synchronous (semi-synchronous) way to replication, such as kafka to join the ISR anti-discard mechanism, ISR a set of reliable backup collection, only when the machine in the ISR are successfully replicated, only when the message is considered successfully submitted.

Then it talks about the strange behaviours caused by replication latency, such as writing data in the master node, but not yet synchronized to the slave node, then will not be able to check out the data; there is also from the slave node with a large latency to check out the data from the slave node with a small latency is not the same as checking out the data from the slave node with a small latency.

Next is partitioning, this technique is used in many distributed databases, using it mainly for scalability, because different partitions can be placed on different nodes in the unshared cluster so that concurrent loads can be spread across different processors, and new partitions can be added when the load increases, and partitions can be merged when the load decreases.