Why consensus via etcdraft?

I think there are the following reasons

  1. solo is not suitable for most scenarios, e.g. organization A, organization B, both want to place consensus nodes in themselves
  2. kafka can meet the above requirements, but kafka plus zookeeper requires extra deployment and is too heavy to deploy

So here comes the consensus based on etcdraft, which solves the above pain points

Say it three times!

Don’t miss the source code part of the article, there are many, many comments in it!!! Don’t miss the source code section of the article, there are lots and lots of comments!!!! Do not miss the source code section of the article, there are many, many comments!!!!

Core Interface

Here are the interfaces that I believe implement the etcdraft consensus core

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ClusterServer 集群Server接口
type ClusterServer interface {
    Step(Cluster_StepServer) error
}

// ClusterClient 集群Client接口
type ClusterClient interface {
    Step(ctx context.Context, opts ...grpc.CallOption) (orderer.Cluster_StepClient, error)
}

// Handler 用于共识的两个接口
type Handler interface {
    OnConsensus(channel string, sender uint64, req *orderer.ConsensusRequest) error
    OnSubmit(channel string, sender uint64, req *orderer.SubmitRequest) error
}

// Consenter 共识排序接口
type Consenter interface {
    HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

// Chain 共识核心的接口
type Chain interface {
    // 接收普通的交易消息
    Order(env *cb.Envelope, configSeq uint64) error
    // 接收配置消息
    Configure(config *cb.Envelope, configSeq uint64) error
    WaitReady() error
    Errored() <-chan struct{}
    Start()
    Halt()
}

Orderer initialization

To start the etcd node, you need to set ETCD_NAME, which is the node’s ID, but we don’t do that when we start Orderer, which I think is a clever design

If you have started fabric-samples/first-network you should know that the configtx.yaml , which is used to generate genesis.blok

genesis.block where you can set OrdererType , and EtcdRaft related configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
...
Profiles:
    SampleMultiNodeEtcdRaft:
        ...
        Orderer:
            <<: *OrdererDefaults
            OrdererType: etcdraft
            EtcdRaft:
                Consenters:
                - Host: orderer.example.com
                  Port: 7050
                  ClientTLSCert: crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/tls/server.crt
                  ServerTLSCert: crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/tls/server.crt
                - Host: orderer2.example.com
                  Port: 7050
                ...
            Addresses:
                - orderer.example.com:7050
                - orderer2.example.com:7050
                ...
...

If you’ve done the add organization, or deserialized genesis.block, you can see the json data in this block.

The data has been trimmed down, and only the areas that need to be explained are extracted, as follows

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
{
    "data":{
        "data":[
            {
                "payload":{
                    "config":{
                        "channel_group":{
                            "groups":{
                                "Orderer":{
                                    "values":{
                                        "ConsensusType":{
                                            "value":{
                                                "metadata":{
                                                    "consenters":[
                                                        {
                                                            "client_tls_cert":"client tls cert",
                                                            "host":"orderer.example.com",
                                                            "port":7050,
                                                            "server_tls_cert":"server tls cert"
                                                        },
                                                        {
                                                            "client_tls_cert":"client tls cert",
                                                            "host":"orderer2.example.com",
                                                            "port":7050,
                                                            "server_tls_cert":"server tls cert"
                                                        }
                                                    ],
                                                    "options":{
                                                        "election_tick":10,
                                                        "heartbeat_tick":1,
                                                        "max_inflight_blocks":5,
                                                        "snapshot_interval_size":20971520,
                                                        "tick_interval":"500ms"
                                                    }
                                                },
                                                "state":"STATE_NORMAL",
                                                "type":"etcdraft"
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                },
                "signature":null
            }
        ]
    },
    "header":{

    },
    "metadata":{

    }
}

When Orderer starts, it will read the latest configuration block of system channel, and if not, it will read genesis.blok.

If the ConsensusType is etcdraft, it will use clusterGRPCServer and pass it to initializeMultichannelRegistrar(...) in

initializeMultichannelRegistrar(...) will determine ConsensusType again and if it is etcdraft it will pass initializeEtcdraftConsenter(...) initialize the Consenter instance object of etcdraft

Service, which implements theClusterServer` core interface

Finally, all consenters are passed into multichannel.Initialize and initialized there

InitializecallsHandleChain()ofetcdraft.Consenter`, the core interface described above

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// orderer/common/server/main.go
func Start(cmd string, conf *localconfig.TopLevel) {
    // 读取 genesis.blok
    bootstrapBlock := extractBootstrapBlock(conf)
    ...
    // 创建 LedgerFactory, 如果不存在 system channel 的配置块返回 nil
    lf, _ := createLedgerFactory(conf, metricsProvider)
    sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
    // 选择引导块, 在 sysChanLastConfigBlock 和 bootstrapBlock 块中选择, 如果 sysChanLastConfigBlock 不为 nil, 则优先返回
    clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)
    ...
    // 判断 clusterBootBlock 中的共识类型如果是 `etcdraft` 就进行集群的初始化工作
    clusterType := isClusterType(clusterBootBlock)
    ...
    // 初始化多通道
    manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
}

// orderer/common/server/main.go
func initializeMultichannelRegistrar(...) *multichannel.Registrar {
    genesisBlock := extractBootstrapBlock(conf)
    ...
    registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)
    ...
    // 加载 solo, kafka 共识
    consenters["solo"] = solo.New()
    var kafkaMetrics *kafka.Metrics
    consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker)
    ...
    // 判断是否是 `etcdraft` 共识, 如果是就将 etcdraft 的实例也加入到 consenters 中
    if isClusterType(bootstrapBlock) {
        initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
    }
    registrar.Initialize(consenters)
    return registrar
}

// orderer/common/multichannel/registrar.go
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
    r.consenters = consenters
    existingChains := r.ledgerFactory.ChainIDs()

    for _, chainID := range existingChains {
        ...
        // 这里判断是否是 system channel
        if _, ok := ledgerResources.ConsortiumsConfig(); ok {
            ...
            chain := newChainSupport(
                r,
                ledgerResources,
                r.consenters,
                r.signer,
                r.blockcutterMetrics,
            )
            ...
            defer chain.start()
        } else {
            ...
        }
    }
...
}

// orderer/common/multichannel/chainsupport.go
func newChainSupport(...) *ChainSupport {
    ...
    // 这里会根据 ledger 获取 ConsensusType 选择最终要用到的共识算法, 根据本文会选出 `etcdraft`
    consenterType := ledgerResources.SharedConfig().ConsensusType()
    consenter, ok := consenters[consenterType]
    ...
    // 调用 HandleChain
    cs.Chain, err = consenter.HandleChain(cs, metadata)
    ...
    return cs
}

EtcdRaft initialization

From now on the code enters the core of etcdraft, where you can see how the system automatically sets raftID, starting with HandleChain and ending with…

In HandleChain, the latest configuration information for the current channel, or system channel, is retrieved and the ConsensusMetadata, or channel_group.groups.Orderer. ConsensusType.value.metadata

This is an array, the system will set the raftID according to the index of the certificate set by the current orderer in this array, great, really great!

RPC, which implements theClusterClient core interface described above, which means that ClusterServer and ClusterClient have a one-to-many relationship.

NewChain(…) ’ to create etcdraft.Chain, which implements the core interface Chain.

At this point, all of the core interfaces described above are now present

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// orderer/consensus/etcdraft/consenter.go
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error) {
    // 获取 ConsensusMetadata
    m := &etcdraft.ConfigMetadata{}
    if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), m); err != nil {
        return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
    }
    ...
    consenters := map[uint64]*etcdraft.Consenter{}
    for i, consenter := range m.Consenters {
        consenters[blockMetadata.ConsenterIds[i]] = consenter
    }
    // 设置ID
    id, err := c.detectSelfID(consenters)
    ...
    // 初始化 cluster.RPC 每一个 chan 都会有个 rpc, 也印证了我上面说的, cluster.server 和 cluster.client 是一对一关系
    rpc := &cluster.RPC{
        Timeout:       c.OrdererConfig.General.Cluster.RPCTimeout,
        Logger:        c.Logger,
        Channel:       support.ChainID(),
        Comm:          c.Communication,
        StreamsByType: cluster.NewStreamsByType(),
    }

    return NewChain(
        support,
        opts,
        c.Communication,
        rpc,
        func() (BlockPuller, error) { return newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) },
        func() {
            c.InactiveChainRegistry.TrackChain(support.ChainID(), nil, func() { c.CreateChain(support.ChainID()) })
        },
        nil,
    )
}

// orderer/consensus/etcdraft/chan.go
func NewChain(...) (*Chain, error) {
    ...
    // 此方法基本都是初始化工作, 需要注意的是 rpc 给了 chan
    c := &Chain{
        configurator:     conf,
        rpc:              rpc,
        channelID:        support.ChainID(),
        raftID:           opts.RaftID,
        ...
    }
    ...
    // raft 的配置文件, 也是从 block 中获取的.
    config := &raft.Config{
        ...
    }
    // node, 里面有 raft 的 node
    c.Node = &node{
        ...
    }

    return c, nil
}

EtcdRaft Startup

Here we go back to newChainSupport(), where we copy the code from there, and when this function exits defer chain.start() it calls the start method

which will start node.start() in etcdraft, which will in turn start node.start() in etcd.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// orderer/consensus/etcdraft/chan.go
func (c *Chain) Start() {
    ...
    // 启动了 etcdraft.Node
    c.Node.start(c.fresh, isJoin)
    ...
    // serveRequest 是个核心的函数, 在里面会做 raft 的角色切换等一堆的操作
    go c.serveRequest()

    es := c.newEvictionSuspector()

    interval := DefaultLeaderlessCheckInterval
    if c.opts.LeaderCheckInterval != 0 {
        interval = c.opts.LeaderCheckInterval
    }

    c.periodicChecker = &PeriodicCheck{
        Logger:        c.logger,
        Report:        es.confirmSuspicion,
        CheckInterval: interval,
        Condition:     c.suspectEviction,
    }
    c.periodicChecker.Run()
}

// orderer/consensus/etcdraft/node.go
func (n *node) start(fresh, join bool) {
    ...
    var campaign bool
    // 是否是新节点标记位
    if fresh {
        // 是否是新加入标记位
        if join {
            raftPeers = nil
            n.logger.Info("Starting raft node to join an existing channel")
        } else {
            n.logger.Info("Starting raft node as part of a new channel")

            // determine the node to start campaign by selecting the node with ID equals to:
            //                hash(channelID) % cluster_size + 1
            sha := sha256.Sum256([]byte(n.chainID))
            number, _ := proto.DecodeVarint(sha[24:])
            if n.config.ID == number%uint64(len(raftPeers))+1 {
                campaign = true
            }
        }
        // 最终会启动 raft 的 node.
        // 有点像设置配置文件.
        // raftPeers, 对应的就是 ETCD_INITIAL_CLUSTER
        n.Node = raft.StartNode(n.config, raftPeers)
    } else {
        n.logger.Info("Restarting raft node")
        n.Node = raft.RestartNode(n.config)
    }

    go n.run(campaign)
}

One transaction

The Order(...) method of the Chain interface is used to receive general transaction messages. ’ method, which is used to receive ordinary transaction messages

When a message enters the Order method, it is wrapped in an orderer.SubmitRequest and sent

Submit(…) ’ method, the message is encapsulated in orderer.submit and deposited in the chan Chain.submitC channel.

before determining whether leader is the current node, if not, it will send the data to leader via rpc

Chain.rpc is an implementation of ClusterClient that can be used to send messages to other orderers

1
2
3
4
5
6
7
8
st=>start
e=>end
opOrder=>operation: Order(...)
condLeader=>condition: IsLeader
opRPC=>operation: rpc.SendSubmit(...)

st->opOrder->condLeader(no)->opRPC->e
condLeader(yes)->e
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// orderer/consensus/etcdraft/chain.go
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
    c.Metrics.NormalProposalsReceived.Add(1)
    // 这里将 env, configSeq 以及 channelID 封装成了 SubmitRequest 并发给了 Submit 方法
    return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}

// orderer/consensus/etcdraft/chain.go
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
    ...
    // 获取一个 chan 用于接收网络当前的 leader
    leadC := make(chan uint64, 1)
    select {
    // 这个 case 就是将 req, 以及用于接收 leader 的 chan, 再次封装发给了 Chain 中的 chan submitC
    case c.submitC <- &submit{req, leadC}:
        // 等待 submitC 被处理, 并返回 leader 的 id
        lead := <-leadC
        // 如果没有 leader 就报错
        if lead == raft.None {
            c.Metrics.ProposalFailures.Add(1)
            return errors.Errorf("no Raft leader")
        }

        // 如果自己不是 leader, 就会将消息发给 leader.
        if lead != c.raftID {
            if err := c.rpc.SendSubmit(lead, req); err != nil {
                c.Metrics.ProposalFailures.Add(1)
                return err
            }
        }
    ...
    return nil
}

Synchronization of a block

This method, I think is the core method of etcdraft, if you are interested, you can look deeper into this method

It involves Leader, Follower role switching, consensus and other implementations.

Here we continue to follow the previous step, and start consensus after the transaction message reaches the Leader.

If the node is a Leader, it will be sorted

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// orderer/consensus/etcdraft/chain.go
func (c *Chain) serveRequest() {
    ...
    // 下方会使用到里面的协程
    becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
        ...
        ctx, cancel := context.WithCancel(context.Background())
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:
                    data := utils.MarshalOrPanic(b)
                    // 这里会丢给 raft node 进行同步 block, 里面的处理非常复杂.., 属于 raft 中的东西了, 这里就不细追了.
                    if err := c.Node.Propose(ctx, data); err != nil {
                        c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                        return
                    }
                    c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)

                case <-ctx.Done():
                    c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                    return
                }
            }
        }(ctx, ch)
        return ch, cancel
    }
    ...
    for {
        select {
        case s := <-submitC:
            // leader 进行出块操作, 如果未符合要求,比如大小不够, 就会 pending
            batches, pending, err := c.ordered(s.req)
            if err != nil {
                c.logger.Errorf("Failed to order message: %s", err)
                continue
            }
            if pending {
                startTimer() // no-op if timer is already started
            } else {
                stopTimer()
            }

            // 生成生成 block 并将 block 传给 propC, propC 是一个 chan 结构, 所以会给上面的协程做处理
            c.propose(propC, bc, batches...)

            if c.configInflight {
                c.logger.Info("Received config transaction, pause accepting transaction till it is committed")
                submitC = nil
            } else if c.blockInflight >= c.opts.MaxInflightBlocks {
                c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
                    c.blockInflight, c.opts.MaxInflightBlocks)
                submitC = nil
            }
            ...
        case <-timer.C():
            ticking = false

            batch := c.support.BlockCutter().Cut()
            if len(batch) == 0 {
                c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
                continue
            }

            c.logger.Debugf("Batch timer expired, creating block")
            c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
        }
    }
}

Storage of a block

According to raft the flow of the consensus sync message is uncommitted -> committed, both will be done on top

If it is committed, it will write the block

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// orderer/consensus/etcdraft/chain.go
func (c *Chain) serveRequest() {
    ...
    // 下方会使用到里面的协程
    becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
        ...
        ctx, cancel := context.WithCancel(context.Background())
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:
                    data := utils.MarshalOrPanic(b)
                    // 这里会丢给 raft node 进行同步 block, 里面的处理非常复杂.., 属于 raft 中的东西了, 这里就不细追了.
                    if err := c.Node.Propose(ctx, data); err != nil {
                        c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                        return
                    }
                    c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)

                case <-ctx.Done():
                    c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                    return
                }
            }
        }(ctx, ch)
        return ch, cancel
    }
    ...
    for {
        select {
        ...
        case app := <-c.applyC:
            ...
            // 这里面会讲 raftlog 中的 block 落地.
            c.apply(app.entries)
            ...
        }
    }
}

// orderer/consensus/etcdraft/chain.go
func (c *Chain) apply(ents []raftpb.Entry) {
    ...
    for i := range ents {
        // 只有两种类型 EntryNormal, EntryConfChange, block 消息是 EntryNormal
        switch ents[i].Type {
        case raftpb.EntryNormal:
            ...
            block := utils.UnmarshalBlockOrPanic(ents[i].Data)
            // 这里就会写入 block 了
            c.writeBlock(block, ents[i].Index)
            c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
        ...
        case raftpb.EntryConfChange:
            ...
        }

        if ents[i].Index > c.appliedIndex {
            c.appliedIndex = ents[i].Index
        }
    }

    if c.accDataSize >= c.sizeLimit {
        b := utils.UnmarshalBlockOrPanic(ents[position].Data)

        select {
        case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
            c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
                "taking snapshot at block [%d] (index: %d), last snapshotted block number is %d, current nodes: %+v",
                c.accDataSize, c.sizeLimit, b.Header.Number, c.appliedIndex, c.lastSnapBlockNum, c.confState.Nodes)
            c.accDataSize = 0
            c.lastSnapBlockNum = b.Header.Number
            c.Metrics.SnapshotBlockNumber.Set(float64(b.Header.Number))
        default:
            c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotIntervalSize is too small")
        }
    }

    return
}