Background

I’ve written a previous article [“Kubernetes in Action - Smooth Node Removal”]) about how to remove nodes from a K8s cluster, and today I’ll take a look at what the kubectl drain command does and how it does it.

kubectl

K8s uses cobra as a command line builder (I don’t find cobra very useful and the documentation is unclear.) The actual processing logic is in pkg/kubectl/cmd/cmd.go, with a unified entry point in cmd/kubectl/kubectl.go.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   ...
groups := templates.CommandGroups{
	{
		Message: "Basic Commands (Beginner):",
		...
	},
	{
		Message: "Deploy Commands:",
		...
	},
	{
		Message: "Cluster Management Commands:",
		Commands: []*cobra.Command{
			certificates.NewCmdCertificate(f, ioStreams),
			clusterinfo.NewCmdClusterInfo(f, ioStreams),
			top.NewCmdTop(f, ioStreams),
			drain.NewCmdCordon(f, ioStreams),
			drain.NewCmdUncordon(f, ioStreams),
			drain.NewCmdDrain(f, ioStreams),
			taint.NewCmdTaint(f, ioStreams),
		},
	},
   ...
}
   groups.Add(cmds)

As you can see in the kubectl entry point for all the subcommands, the drain commands we’re looking at today are all cluster management commands and contain.

  • cordon
  • uncordon
  • drain

cordon

Let’s start with the cordon command, the purpose of this command is to mark a node as non-schedulable to prevent K8s from scheduling resources to that node while node maintenance is being performed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
	o := NewDrainCmdOptions(f, ioStreams)

	cmd := &cobra.Command{
		Use:                   "cordon NODE",
		DisableFlagsInUseLine: true,
		Short:                 i18n.T("Mark node as unschedulable"),
		Long:                  cordonLong,
		Example:               cordonExample,
		Run: func(cmd *cobra.Command, args []string) {
			cmdutil.CheckErr(o.Complete(f, cmd, args))
			cmdutil.CheckErr(o.RunCordonOrUncordon(true))
		},
	}
	cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
	cmdutil.AddDryRunFlag(cmd)
	return cmd
}

Looking directly at the contents of Run, ignoring cmdutil.CheckErr for the moment, two main methods are executed here: o.Complete and o.RunCordonOrUncordon. The fundamental purpose of kubectl is to send the corresponding HTTP request to the APIServer, and kubectl implements a layer of encapsulation through Builder and Visitor, making the implementation of each subcommand uniform and concise. consistent and concise.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Builder provides convenience functions for taking arguments and parameters
// from the command line and converting them to a list of resources to iterate
// over using the Visitor interface.
type Builder struct {
    ...
}
// Visitor lets clients walk a list of resources.
type Visitor interface {
	Visit(VisitorFunc) error
}

The corresponding builder is constructed in o.Complete :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
...
    // 根据命令行参数构建 builder 实例
	builder := f.NewBuilder().
		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
		NamespaceParam(o.Namespace).DefaultNamespace().
		ResourceNames("nodes", args...).
		SingleResourceType().
		Flatten()

	if len(o.drainer.Selector) > 0 {
		builder = builder.LabelSelectorParam(o.drainer.Selector).
			ResourceTypes("nodes")
	}
    // builder.Do 返回带有 Visitor 的 Result 对象
    r := builder.Do()

See what builder.Do() does next to return the Result type resource.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (b *Builder) Do() *Result {
    // 调用 visitorResult 返回 Result 类型
	r := b.visitorResult()
    ...
	return r
}
...
func (b *Builder) visitorResult() *Result {
    ...
	// 跳过其他步骤,直接看最简单的通过 Name 来获取 Result
	if len(b.names) != 0 {
		return b.visitByName()
    }
    ...
}
...
func (b *Builder) visitByName() *Result {
    // 声明 Result 对象
	result := &Result{
		singleItemImplied:  len(b.names) == 1,
		targetsSingleItems: true,
    }
    ...
    // 获取 K8s client
	client, err := b.getClient(mapping.GroupVersionKind.GroupVersion())
	...
	visitors := []Visitor{}
	for _, name := range b.names {
		info := &Info{
			Client:    client,
			Mapping:   mapping,
			Namespace: selectorNamespace,
			Name:      name,
			Export:    b.export,
		}
		visitors = append(visitors, info)
    }
    // VisitorList 也实现了 Visit 接口,遍历执行 Visitor 的 Visit 方法
	result.visitor = VisitorList(visitors)
	result.sources = visitors
	return result
}

Having seen how to get an object of type Result, let’s look at how o.Complete handles it, passing in a VisitorFunc, the visitors of Result all implement the Visit interface, the role of the Visit interface is to receive the VisitorFunc and execute it. The role of the Visit interface is to receive the VisitorFunc and execute it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
return r.Visit(func(info *resource.Info, err error) error {
		...
    })
...
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
	return v.visitor.Visit(func(info *Info, err error) error {
        ...
		for i := range v.decorators {
			if err := v.decorators[i](info, nil); err != nil {
				return err
			}
		}
		return fn(info, nil)
	})
}

Next, see what o.RunCordonOrUncordon does.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
	cordonOrUncordon := "cordon"
	if !desired {
		cordonOrUncordon = "un" + cordonOrUncordon
	}
    // 通过 Visit 获取到的 nodeInfos 列表
	for _, nodeInfo := range o.nodeInfos {
        ...
		gvk := nodeInfo.ResourceMapping().GroupVersionKind
		if gvk.Kind == "Node" {
			c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk)
			if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
				...
			} else {
				if o.drainer.DryRunStrategy != cmdutil.DryRunClient {
                    ...
                    // 修改对应节点的配置
					err, patchErr := c.PatchOrReplace(o.drainer.Client, o.drainer.DryRunStrategy == cmdutil.DryRunServer)
					...
				}
			}
        }
        ...
	}
	return nil
}
...
func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface, serverDryRun bool) (error, error) {
	client := clientset.CoreV1().Nodes()
    oldData, err := json.Marshal(c.node)
    // 更新 node Spec 的 Unschedulable 字段
	c.node.Spec.Unschedulable = c.desired
	newData, err := json.Marshal(c.node)
    // merge 数据,通过 diff 然后获取
	patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node)
	if patchErr == nil {
		...
		_, err = client.Patch(context.TODO(), c.node.Name, types.StrategicMergePatchType, patchBytes, patchOptions)
	} 
	...
}

Drain

After Cordon, on to Drain.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
    ...
	cmd := &cobra.Command{
		...
		Run: func(cmd *cobra.Command, args []string) {
			cmdutil.CheckErr(o.Complete(f, cmd, args))
			cmdutil.CheckErr(o.RunDrain())
		},
    }
    ...

Looking directly at o.RunDrain, the first thing we see is the execution of o.RunCordonOrUncordon, which marks the node as undispatchable, so the blog I wrote earlier is actually incorrect, if you want to take the node offline, then just execute kubectl drain.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (o *DrainCmdOptions) RunDrain() error {
    if err := o.RunCordonOrUncordon(true); err != nil {
		return err
	}
    ...
	drainedNodes := sets.NewString()
	var fatal error
	for _, info := range o.nodeInfos {
        // 驱逐 Pod
		if err := o.deleteOrEvictPodsSimple(info); err == nil {
			drainedNodes.Insert(info.Name)
			printObj(info.Object, o.Out)
		} else {
			// 如果驱逐 Pod 失败,则显示对应的 Node 信息
			if len(remainingNodes) > 0 {
				fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
				for _, nodeName := range remainingNodes {
					fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
				}
			}
			break
		}
	}
}

In deleteOrEvictPodsSimple, the corresponding Pod information is first obtained by Node name, and then the eviction action is performed.

1
2
3
4
5
6
7
func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
	list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
    ...
	if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
        ...
	}
}

Here GetPodsForDeletion performs a filter that contains filters for the following scenarios, it should be noted that the filtering scenarios here are in a strict order.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (d *Helper) makeFilters() []podFilter {
	return []podFilter{
        // 被标记删除的 Pod(DeletionTimestamp 不为0)
        d.skipDeletedFilter,
        // 属于 DaemonSet 的 Pod
        d.daemonSetFilter,
        // mirror pod 其实就是 static pod,
        // 是我们在 /etc/kubernetes/manifests/ 中定义的由 kubelet 负责生命周期管理的 Pod
        // 在 `Annotations` 中会包含 `kubernetes.io/config.mirror` 
        d.mirrorPodFilter,
        // 包含本地存储的 Pod,Pod 中的 Volume 字段不为空
        d.localStorageFilter,
        // 不属于 replicate 的 pod,`Controlled By` 不为空的 pod
		d.unreplicatedFilter,
	}
}

Once the filtered list of Pods is obtained, the eviction action is performed, starting with a goroutine for each Pod, and waiting until the Pod eviction is complete.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
	returnCh := make(chan error, 1)
    ...
	ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout)
	defer cancel()
	for _, pod := range pods {
		go func(pod corev1.Pod, returnCh chan error) {
			for {
				...
				select {
				case <-ctx.Done():
					// 驱逐超时
					returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout)
					return
				default:
                }
                // 驱逐 Pod 动作,最终执行 d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction)
				err := d.EvictPod(pod, policyGroupVersion)
				...
			}
			...
			params := waitForDeleteParams{
				...
            }
            // 等待驱逐动作完成
			_, err := waitForDelete(params)
			if err == nil {
				returnCh <- nil
			} else {
				returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
			}
		}(pod, returnCh)
    }

waitForDelete will pass ConditionFunc into the WaitFor loop if it does not complete immediately, where ConditionFunc detects that a Pod exists and the ObjectMeta UID has changed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
	stopCh := make(chan struct{})
	defer close(stopCh)
	c := wait(stopCh)
	for {
		select {
		case _, open := <-c:
			ok, err := runConditionWithCrashProtection(fn)
			if err != nil {
				return err
			}
			if ok {
				return nil
			}
			if !open {
				return ErrWaitTimeout
			}
		case <-done:
			return ErrWaitTimeout
		}
	}
}

Summary

The implementation of the kubectl drain command is very simple, with no particularly complex logic. An important reason why K8s is able to do this is that all actions are declarative, and there is no need to actively do something dirty after declaring and waiting for execution to complete. In the case of Pod eviction, not all Pods will be evicted to other nodes, so extra care needs to be taken to check if there are simply Pod resources still running on the node before it goes offline, or if there are any Pods using local storage, or similar.

I’ve seen this combination of design patterns for a while now, and I’m looking for a chance to re-learn them.