Background

When deploying an application that is highly available, we usually place a HAProxy in front of the application so that when any one Server fails, the HAProxy will automatically switch over, but HAProxy also has a single point of failure, so we need more than one HAProxy to ensure that the business is not interrupted, and this time we need another software to work with: Keepalived. Keepalived is only used to provide VIPs to ensure that when one Keepalived fails, the VIPs are automatically configured on other Keepalived nodes.

One problem with Keepalived is that the virtual route ID must be unique within the same network segment, so when we want to deploy multiple clusters within a network segment, we need to intervene to assign the virtual route ID, which is inconvenient. This time we will use Raft to implement the VIP logic ourselves.

hashicorp/raft

There are a number of open source implementations of Raft, of which the Raft library implemented by Hashicorp has been used by software such as Consul and has a friendly interface and has been chosen to be used for its implementation. There are many examples of Raft in use on Github, one of the simpler and more complete is otoolep/hraftd, let’s see how he uses it.

otoolep/hraftd

main.go

There are 4 main things done in main.go: store.New , store.Open , http.New , http.Start and first let’s see how the program is started.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func init() {
    // 设置命令行参数
	flag.BoolVar(&inmem, "inmem", false, "Use in-memory storage for Raft")
	...
	flag.Usage = func() {
		fmt.Fprintf(os.Stderr, "Usage: %s [options] <raft-data-path> \n", os.Args[0])
		flag.PrintDefaults()
	}
}

func main() {
    // 解析命令行参数
	flag.Parse()
    ...
    // 创建一个 Store 对象
	s := store.New(inmem)
	s.RaftDir = raftDir
    s.RaftBind = raftAddr
    // 运行 Store 
	if err := s.Open(joinAddr == "", nodeID); err != nil {
		log.Fatalf("failed to open store: %s", err.Error())
	}
    // 新建一个 http 对象并运行
	h := httpd.New(httpAddr, s)
	if err := h.Start(); err != nil {
		log.Fatalf("failed to start HTTP service: %s", err.Error())
	}

	// 如果 joinAddr 参数不为空,则处理 join 请求
	if joinAddr != "" {
		if err := join(joinAddr, raftAddr, nodeID); err != nil {
			log.Fatalf("failed to join node at %s: %s", joinAddr, err.Error())
		}
	}

	log.Println("hraftd started successfully")
    // 监听系统信号,若接收到 os.Interrupt 则程序退出
	terminate := make(chan os.Signal, 1)
	signal.Notify(terminate, os.Interrupt)
	<-terminate
	log.Println("hraftd exiting")
}

http

We know from main.go that http.Start is called, so let’s look at the http implementation first, regardless of what Store is.

1
2
3
4
5
6
7
8
// Start starts the service.
func (s *Service) Start() error {
    // Service 实现了 ServeHTTP 方法
	http.Handle("/", s)
	go func() {
		err := server.Serve(s.ln)
    ...
}
1
2
3
4
5
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if strings.HasPrefix(r.URL.Path, "/key") {
		s.handleKeyRequest(w, r)
    // 先忽略其他分支
}

The main request handler is the s.handleKeyRequest method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *Service) handleKeyRequest(w http.ResponseWriter, r *http.Request) {
	...
	switch r.Method {
	case "GET":
		k := getKey()
		if k == "" {
			w.WriteHeader(http.StatusBadRequest)
		}
		v, err := s.store.Get(k)
		...
		io.WriteString(w, string(b))
	case "POST":
		// Read the value from the POST body.
		m := map[string]string{}
		if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
			w.WriteHeader(http.StatusBadRequest)
			return
		}
		for k, v := range m {
			if err := s.store.Set(k, v); err != nil {
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
        }
    ...

In s.handleKeyRequest, the store method is called according to the request method. This is also defined in the http module.

1
2
3
4
5
6
type Store interface {
	Get(key string) (string, error)
	Set(key, value string) error
	Delete(key string) error
	Join(nodeID string, addr string) error
}

Apart from Join, which looks strange, everything else is the interface of a K/V system, so let’s take a look at the implementation of Store’s specific methods.

store

The writing of this module involves specific concepts in Raft, so we recommend reading the Raft blog by siddontang for a quick overview (the links are listed in the references).

Here is an example of setting up a Key.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (s *Store) Set(key, value string) error {
	if s.raft.State() != raft.Leader {
		return fmt.Errorf("not leader")
	}

    // 封装具体执行的动作
	c := &command{
		Op:    "set",
		Key:   key,
		Value: value,
	}
	b, err := json.Marshal(c)
    ...
    // 将 command 应用于 FSM
	f := s.raft.Apply(b, raftTimeout)
	return f.Error()
}

View the FSM Apply method implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Apply applies a Raft log entry to the key-value store.
func (f *fsm) Apply(l *raft.Log) interface{} {
	var c command
	// 根据操作动作的不同,执行不同的方法,这里以设置 Key 为例
	switch c.Op {
	case "set":
		return f.applySet(c.Key, c.Value)
	...
	}
}
1
2
3
4
5
6
7
8
func (f *fsm) applySet(key, value string) interface{} {
    // 互斥锁
    f.mu.Lock()
    defer f.mu.Unlock()
    // 设置 Map 中的 Key/Value
	f.m[key] = value
	return nil
}

raft

Having seen the specific actions implemented above, let’s take a look at Raft’s specific launch.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (s *Store) Open(enableSingle bool, localID string) error {
	// 设置 Raft 配置
	config := raft.DefaultConfig()
	config.LocalID = raft.ServerID(localID)
    // 设置 Raft 通信
	addr, err := net.ResolveTCPAddr("tcp", s.RaftBind)
    ...
	transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr)
    // 设置 Raft 存储对象
	snapshots, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr)
	var logStore raft.LogStore
	var stableStore raft.StableStore
	if s.inmem {
		logStore = raft.NewInmemStore()
		stableStore = raft.NewInmemStore()
	} else {
		boltDB, err := raftboltdb.NewBoltStore(filepath.Join(s.RaftDir, "raft.db"))
		if err != nil {
			return fmt.Errorf("new bolt store: %s", err)
		}
		logStore = boltDB
		stableStore = boltDB
	}
    // 创建 raft 示例,并使用该 raft 实例启动集群
	ra, err := raft.NewRaft(config, (*fsm)(s), logStore, stableStore, snapshots, transport)
	if err != nil {
		return fmt.Errorf("new raft: %s", err)
	}
	s.raft = ra
	if enableSingle {
		...
		ra.BootstrapCluster(configuration)
	}
}

Here you can see that Raft requires the interfaces FSM and Snapshot, the specific implementation is based on the requirements, generally similar to hraftd, roughly understand the use of hashicorp/raft, then let’s implement the specific VIP function.

VIP

network

Since this is IP related, you will need to configure the time for the corresponding network card, in Linux we can do this by using the ip command, in Golang we use vishvananda/netlink.

netlink.AddrAdd adds an IP address to the specified network device.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (nc *NetworkConfig) addIP() error {
	res, err := nc.IsSet()
	if err != nil {
		return errors.Wrap(err, "ip check in AddIP failed")

	}
	if res {
		return nil
	}
	if err := netlink.AddrAdd(nc.link, nc.address); err != nil {
		return errors.Wrap(err, "could not add ip")
	}
	return nil
}

netlink.AddrDel can remove the IP from the specified network device.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (nc *NetworkConfig) delIP() error {
	res, err := nc.IsSet()
	if err != nil {
		return errors.Wrap(err, "ip check in DelIP failed")
	}

	if !res {
		return nil
	}

	if err := netlink.AddrDel(nc.link, nc.address); err != nil {
		return errors.Wrap(err, "could not delete ip")
	}

	return nil
}

raft

We don’t need to provide an HTTP interface, we just write the Raft implementation. Unlike the hraftd implementation, where information needs to be written and read, our VIP only depends on the Raft election leader, so we just need to write the corresponding methods, no additional operations are needed

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type FSM struct {
}

func (fsm FSM) Apply(log *raft.Log) interface{} {
	return nil
}

func (fsm FSM) Restore(snap io.ReadCloser) error {
	return nil
}

func (fsm FSM) Snapshot() (raft.FSMSnapshot, error) {
	return Snapshot{}, nil
}

type Snapshot struct {
}

func (snapshot Snapshot) Persist(sink raft.SnapshotSink) error {
	return nil
}

func (snapshot Snapshot) Release() {
}

serve

Now that the basic methods have been implemented, the cluster start-up logic is written.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (manager *VIPManager) Start() error {
	// 初始化 raft 配置、存储对象、通信
	for id, ip := range manager.peers {
		configuration.Servers = append(configuration.Servers, raft.Server{ID: raft.ServerID(id), Address: raft.ServerAddress(ip)})
	}

	// 启动 Raft 集群,这里与 hraftd 不同,需要注意
	if error := raft.BootstrapCluster(config, logStore, stableStore, snapshots, transport, configuration); error != nil {
		return error
	}

	// 创建 raft 实例
	raftServer, error := raft.NewRaft(config, manager.fsm, logStore, stableStore, snapshots, transport)
    ...
	ticker := time.NewTicker(time.Second)
	isLeader := false
    // 服务启动时先删除 VIP,防止集群中同时存在节点都配置了 VIP
	manager.deleteIP(false)
	go func() {
		for {
			select {
            // 如果 当前节点是 Leader 节点,则设置 VIP
			case leader := <-raftServer.LeaderCh():
				if leader {
					isLeader = true
					log.Info("Leading")
					manager.addIP(true)

                }
            // 定时检测,如果是 Leader,则检测 VIP 是否正确设置,如果没有就再次配置 VIP
			case <-ticker.C:
				if isLeader {
					result, error := manager.networkConfigurator.IsSet()
					if error != nil {
						log.WithFields(log.Fields{"error": error, "ip": manager.networkConfigurator.IP(), "interface": manager.networkConfigurator.Interface()}).Error("Could not check ip")
					}

					if result == false {
						log.Error("Lost IP")

						manager.addIP(true)
					}
				}
            ...
			}
		}
	}()
}

The advantage of this is that even if there is only one node in the cluster, the first node, then the cluster will work, the disadvantage is that the cluster starts in a fixed order, the first node must be started first and then the other nodes are added to the Raft cluster with a Join request (we ignore the Join reads).

Think again about our requirements: clustering, high availability, failure. When these words are put together, we know that the hraftd approach is not for us for the following reasons.

  1. the cluster node boot order requirement
  2. different node configuration files, some of which require Join parameters

So we are using raft.BootstrapCluster directly to start the cluster, although only one node cluster does not work properly, but this is tolerable.

ARP

After implementing the above features, I thought I was done and started self-testing, but during the test I found a very strange phenomenon, although we achieved the automatic drift of VIP failures through Raft’s own election, but the actual test found that service access was not immediately restored as the VIP was rebuilt, and checking ARP records found that the ARP records of each node in the cluster about the VIP were different, or even completely different.

Let’s revisit the ARP protocol content.

ARP is a network transport protocol that finds data link layer addresses by resolving network layer addresses, and is extremely important in IPv4. . ARP may also refer to a process that manages its associated addresses in most operating systems.

In the Ethernet protocol it is specified that in order for a host on the same LAN to communicate directly with another host, the MAC address of the target host must be known. In contrast, in the TCP/IP protocol, the network and transport layers are only concerned with the IP address of the target host. This leads to the fact that when using IP protocols in Ethernet, the Ethernet protocol at the data link layer receives data from the upper IP protocols containing only the IP address of the destination host. A method is then needed to obtain the MAC address of the destination host based on its IP address. This is what the ARP protocol does. Address resolution is the process by which a host converts the destination IP address into the destination MAC address before sending the frame.

If the VIP is different from the MAC address of the real node, it is equivalent to an ARP attack, so after our Leader node has set the VIP, it also needs to send an ARP request broadcast to tell the other nodes in the broadcast domain the correct MAC address of the VIP. The method used is gratuitous ARP (free ARP). Let’s look directly at an open source ARP implementation for this requirement.

google/seesaw is a load balancer that implements this functionality in.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// ARPSendGratuitous sends a gratuitous ARP message via the specified interface.
func (ncc *SeesawNCC) ARPSendGratuitous(arp *ncctypes.ARPGratuitous, out *int) error {
	iface, err := net.InterfaceByName(arp.IfaceName)
	if err != nil {
		return fmt.Errorf("failed to get interface %q: %v", arp.IfaceName, err)
	}
	log.V(2).Infof("Sending gratuitous ARP for %s (%s) via %s", arp.IP, iface.HardwareAddr, iface.Name)
	m, err := gratuitousARPReply(arp.IP, iface.HardwareAddr)
	if err != nil {
		return err
	}
	return sendARP(iface, m)
}

Summary

Although our company’s Slogan is Make IT Simple, we feel more and more that Hashicorp is the faithful embodiment of this statement. Their Terraform, Vault, Consul, Nomad, Vagrant, etc. are all software that make the application and management of infrastructure easier and more convenient, which is still very cool.

The exact implementation of this article can be seen at sparrow.