In kubernetes, the kubernetes API can be accessed from both outside and inside the cluster, directly outside the cluster by accessing the API provided by the apiserver, and inside the cluster by accessing the service as the ClusterIP for kubernetes. kubernetes clusters create a kubernetes service after initialization. A kubernetes service is created and maintained by kube-apiserver, as follows.

1
2
3
4
5
6
7
$ kubectl get service
NAME         TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)   AGE
kubernetes   ClusterIP   10.96.0.1    <none>        443/TCP   4d22h

$ kubectl get endpoints kubernetes
NAME         ENDPOINTS             AGE
kubernetes   192.168.99.113:6443   4d22h

The built-in kubernetes service cannot be removed, and its ClusterIP is the first ip in the ip segment specified with the -service-cluster-ip-range parameter. ip and port in kubernetes endpoints can be specified with the -advertise- address and -secure-port startup parameters.

The kubernetes service is controlled by the bootstrap controller in the kube-apiserver, which has the following main functions.

  • Create the kubernetes service.
  • Creating the default, kube-system, and kube-public namespaces, and the kube-node-lease namespace if the NodeLease feature is enabled.
  • Provides Service ClusterIP-based repair and inspection capabilities.
  • Provides Service NodePort-based repair and inspection capabilities.

The kubernetes service uses ClusterIP to expose the service by default. To use nodePort, specify the corresponding port at kube-apiserver startup with the -kubernetes-service-node-port argument.

bootstrap controller source code analysis

kubernetes version: v1.16

The bootstrap controller is initialized and started in the InstallLegacyAPI method of the CreateKubeAPIServer call chain. The start and stop of the bootstrap controller is controlled by the apiserver’s PostStartHook and ShutdownHook.

k8s.io/kubernetes/pkg/master/master.go:406

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (m *Master) InstallLegacyAPI(......) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("Error building core storage: %v", err)
    }

    // 初始化 bootstrap-controller
    controllerName := "bootstrap-controller"
    coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(......)
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("Error in registering group versions: %v", err)
    }
    return nil
}

postStartHooks will call RunPostStartHooks in the kube-apiserver’s startup method prepared.Run to start all Hooks.

NewBootstrapController

The bootstrap controller requires several parameters to be set during initialization, mainly PublicIP, ServiceCIDR, PublicServicePort, etc. PublicIP is specified by the command line parameter -advertise-address, if not specified, the system will automatically select a global IP. PublicServicePort is specified by the -secure-port startup parameter (default is 6443), and ServiceCIDR is specified by the -service-cluster-ip-range parameter (default is 10.0.0.0/24).

k8s.io/kubernetes/pkg/master/controller.go:89

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (c *completedConfig) NewBootstrapController(......) *Controller {
    // 1、获取 PublicServicePort
    _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
    if err != nil {
        klog.Fatalf("failed to get listener address: %v", err)
    }

    // 2、指定需要创建的 kube-system 和 kube-public
    systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic}
    if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
        systemNamespaces = append(systemNamespaces, corev1.NamespaceNodeLease)
    }

    return &Controller{
        ......
        // ServiceClusterIPRegistry 是在 CreateKubeAPIServer 初始化 RESTStorage 时初始化的,是一个 etcd 实例
        ServiceClusterIPRegistry:          legacyRESTStorage.ServiceClusterIPAllocator,
        ServiceClusterIPRange:             c.ExtraConfig.ServiceIPRange,
        SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,

        // SecondaryServiceClusterIPRange 需要在启用 IPv6DualStack 后才能使用
        SecondaryServiceClusterIPRange:    c.ExtraConfig.SecondaryServiceIPRange,

        ServiceClusterIPInterval: 3 * time.Minute,

        ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
        ServiceNodePortRange:    c.ExtraConfig.ServiceNodePortRange,
        ServiceNodePortInterval: 3 * time.Minute,

        // API Server 绑定的IP,这个IP会作为kubernetes service的Endpoint的IP
        PublicIP: c.GenericConfig.PublicAddress,
        // 取 clusterIP range 中的第一个 IP
        ServiceIP:                 c.ExtraConfig.APIServerServiceIP,
        // 默认为 6443
        ServicePort:               c.ExtraConfig.APIServerServicePort,
        ExtraServicePorts:         c.ExtraConfig.ExtraServicePorts,
        ExtraEndpointPorts:        c.ExtraConfig.ExtraEndpointPorts,
        // 这里为 6443
        PublicServicePort:         publicServicePort,

        // 缺省是基于 ClusterIP 启动模式,这里为0
        KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
    }
}

The code to automatically select the global IP is shown below.

k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go:323

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func ChooseHostInterface() (net.IP, error) {
    var nw networkInterfacer = networkInterface{}
    if _, err := os.Stat(ipv4RouteFile); os.IsNotExist(err) {
        return chooseIPFromHostInterfaces(nw)
    }
    routes, err := getAllDefaultRoutes()
    if err != nil {
        return nil, err
    }
    return chooseHostInterfaceFromRoute(routes, nw)
}

bootstrapController.Start

The four main functions of the bootstrap controller have been mentioned above: repairing the ClusterIP, repairing the NodePort, updating the kubernetes service, and creating the namespaces (default, kube-system, kube-public) needed by the system. The bootstrap controller will first complete the ClusterIP, NodePort, and Kubernets services once after startup, and then run the above four jobs in an asynchronous loop. The following is its start method.

k8s.io/kubernetes/pkg/master/controller.go:146

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *Controller) Start() {
    if c.runner != nil {
        return
    }

    // 1、首次启动时首先从 kubernetes endpoints 中移除自身的配置,
    // 此时 kube-apiserver 可能处于非 ready 状态
    endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
        klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
    }

    // 2、初始化 repairClusterIPs 和 repairNodePorts 对象
    repairClusterIPs := servicecontroller.NewRepair(......)
    repairNodePorts := portallocatorcontroller.NewRepair(......)

    // 3、首先运行一次 epairClusterIPs 和 repairNodePorts,即进行初始化
    if err := repairClusterIPs.RunOnce(); err != nil {
        klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
    }
    if err := repairNodePorts.RunOnce(); err != nil {
        klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
    }
    // 4、定期执行 bootstrap controller 主要的四个功能
    c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
    c.runner.Start()
}

c.RunKubernetesNamespaces

The main function of c.RunKubernetesNamespaces is to create the kube-system and kube-public namespaces, and if the NodeLease feature is enabled, the kube-node-lease-namespace, which is checked every minute afterwards.

k8s.io/kubernetes/pkg/master/controller.go:199

1
2
3
4
5
6
7
8
9
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
    wait.Until(func() {
        for _, ns := range c.SystemNamespaces {
            if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
                runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
            }
        }
    }, c.SystemNamespacesInterval, ch)
}

c.RunKubernetesService

The main purpose of c.RunKubernetesService is to check if the kubernetes service is in a normal state and perform regular synchronization operations. First call the /healthz interface to check if the apiserver is currently in the ready state, if so then call the c.UpdateKubernetesService service to update the kubernetes service state.

k8s.io/kubernetes/pkg/master/controller.go:210

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (c *Controller) RunKubernetesService(ch chan struct{}) {
    wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
        var code int
        c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
        return code == http.StatusOK, nil
    }, ch)

    wait.NonSlidingUntil(func() {
        if err := c.UpdateKubernetesService(false); err != nil {
            runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
        }
    }, c.EndpointInterval, ch)
}

c.UpdateKubernetesService

The main logic of c.UpdateKubernetesService is as follows.

  • Call createNamespaceIfNeeded to create default namespace.
  • Call c.CreateOrUpdateMasterServiceIfNeeded to create a kubernetes service for master.
  • Call c.EndpointReconciler.ReconcileEndpoints to update the master’s endpoint.

k8s.io/kubernetes/pkg/master/controller.go:230

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
    if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
        return err
    }

    servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
    if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
        return err
    }
    endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
    if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
        return err
    }
    return nil
}

c.EndpointReconciler.ReconcileEndpoints

The specific implementation of EndpointReconciler is determined by EndpointReconcilerType, EndpointReconcilerType is specified by -endpoint-reconciler-type parameter, the optional parameters are master-count, lease, none, each type corresponds to a different EndpointReconciler instance, in v1.16 the default is lease, here we only analyze the implementation of EndpointReconciler corresponding to lease.

There may be multiple apiserver instances in a cluster, so the endpoints of the apiserver service need to be managed uniformly. c.EndpointReconciler.ReconcileEndpoints is used to manage the apiserver endpoints. All instances of apiserver in a cluster will create a key in the corresponding directory in etcd and update this key periodically to report their heartbeat information, ReconcileEndpoints will get the apiserver instance information from etcd and update the endpoint.

k8s.io/kubernetes/pkg/master/reconcilers/lease.go:144

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func (r *leaseEndpointReconciler) ReconcileEndpoints(......) error {
    r.reconcilingLock.Lock()
    defer r.reconcilingLock.Unlock()

    if r.stopReconcilingCalled {
        return nil
    }

    // 更新 lease 信息
    if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
        return err
    }

    return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(......) error {
    // 1、获取 master 的 endpoint
    e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
    shouldCreate := false
    if err != nil {
        if !errors.IsNotFound(err) {
            return err
        }

        shouldCreate = true
        e = &corev1.Endpoints{
            ObjectMeta: metav1.ObjectMeta{
                Name:      serviceName,
                Namespace: corev1.NamespaceDefault,
            },
        }
    }

    // 2、从 etcd 中获取所有的 master
    masterIPs, err := r.masterLeases.ListLeases()
    if err != nil {
        return err
    }

    if len(masterIPs) == 0 {
        return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
    }

    // 3、检查 endpoint 中 master 信息,如果与 etcd 中的不一致则进行更新
    formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
    if formatCorrect && ipCorrect && portsCorrect {
        return nil
    }

    if !formatCorrect {
        e.Subsets = []corev1.EndpointSubset{{
            Addresses: []corev1.EndpointAddress{},
            Ports:     endpointPorts,
        }}
    }
    if !formatCorrect || !ipCorrect {
        e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
        for ind, ip := range masterIPs {
            e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
        }

        e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
    }

    if !portsCorrect {
        e.Subsets[0].Ports = endpointPorts
    }

    if shouldCreate {
        if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
            err = nil
        }
    } else {
        _, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
    }
    return err
}

repairClusterIPs.RunUntil

The main problems that repairClusterIP solves are.

  • Ensuring that all ClusterIPs in a cluster are uniquely assigned.
  • Ensure that the assigned ClusterIPs do not exceed the specified range.
  • Ensuring that ClusterIPs are assigned to services but are not created correctly due to crashes and other reasons.
  • automatically migrating older versions of Kubernetes services to the ipallocator atomicity model.

repairClusterIPs.RunUntil actually calls repairClusterIPs.runOnce to handle it, and the main logic in its code is shown below.

k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller/repair.go:134

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
func (c *Repair) runOnce() error {
    ......

    // 1、首先从 etcd 中获取已经使用 ClusterIP 的快照
    err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
        var err error
        snapshot, err = c.alloc.Get()
        if err != nil {
            return false, err
        }

        if c.shouldWorkOnSecondary() {
            secondarySnapshot, err = c.secondaryAlloc.Get()
            if err != nil {
                return false, err
            }
        }
        return true, nil
    })
    if err != nil {
        return fmt.Errorf("unable to refresh the service IP block: %v", err)
    }
    // 2、判断 snapshot 是否已经初始化
    if snapshot.Range == "" {
        snapshot.Range = c.network.String()
    }

    if c.shouldWorkOnSecondary() && secondarySnapshot.Range == "" {
        secondarySnapshot.Range = c.secondaryNetwork.String()
    }

    stored, err = ipallocator.NewFromSnapshot(snapshot)
    if c.shouldWorkOnSecondary() {
        secondaryStored, secondaryErr = ipallocator.NewFromSnapshot(secondarySnapshot)
    }

    if err != nil || secondaryErr != nil {
        return fmt.Errorf("unable to rebuild allocator from snapshots: %v", err)
    }
    // 3、获取 service list
    list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        return fmt.Errorf("unable to refresh the service IP block: %v", err)
    }

    // 4、将 CIDR 转换为对应的 IP range 格式
    var rebuilt, secondaryRebuilt *ipallocator.Range
    rebuilt, err = ipallocator.NewCIDRRange(c.network)

    ......

    // 5、检查每个 Service 的 ClusterIP,保证其处于正常状态
    for _, svc := range list.Items {
        if !helper.IsServiceIPSet(&svc) {
            continue
        }
        ip := net.ParseIP(svc.Spec.ClusterIP)
        ......

        actualAlloc := c.selectAllocForIP(ip, rebuilt, secondaryRebuilt)
        switch err := actualAlloc.Allocate(ip); err {
        // 6、检查 ip 是否泄漏
        case nil:
            actualStored := c.selectAllocForIP(ip, stored, secondaryStored)
            if actualStored.Has(ip) {
                actualStored.Release(ip)
            } else {
                ......
            }
            delete(c.leaks, ip.String())
        // 7、ip 重复分配
        case ipallocator.ErrAllocated:
            ......
        // 8、ip 超出范围
        case err.(*ipallocator.ErrNotInRange):
            ......
        // 9、ip 已经分配完
        case ipallocator.ErrFull:
            ......
        default:
            ......
        }
    }
    // 10、对比是否有泄漏 ip
    c.checkLeaked(stored, rebuilt)
    if c.shouldWorkOnSecondary() {
        c.checkLeaked(secondaryStored, secondaryRebuilt)
    }

    // 11、更新快照
    err = c.saveSnapShot(rebuilt, c.alloc, snapshot)
    if err != nil {
        return err
    }

    if c.shouldWorkOnSecondary() {
        err := c.saveSnapShot(secondaryRebuilt, c.secondaryAlloc, secondarySnapshot)
        if err != nil {
            return nil
        }
    }
    return nil
}

repairNodePorts.RunUnti

repairNodePorts is mainly used to correct the nodePort information in the service and ensure that all ports are created based on the cluster. The alarm is triggered when the ports are not synchronized with the cluster, which is eventually handled by calling repairNodePorts.runOnce, the main logic is similar to that of ClusterIP.

k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller/repair.go:84

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
func (c *Repair) runOnce() error {
    // 1、首先从 etcd 中获取已使用 nodeport 的快照
    err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
        var err error
        snapshot, err = c.alloc.Get()
        return err == nil, err
    })
    if err != nil {
        return fmt.Errorf("unable to refresh the port allocations: %v", err)
    }
    // 2、检查 snapshot 是否初始化
    if snapshot.Range == "" {
        snapshot.Range = c.portRange.String()
    }
    // 3、获取已分配 nodePort 信息
    stored, err := portallocator.NewFromSnapshot(snapshot)
    if err != nil {
        return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
    }
    // 4、获取 service list
    list, err := c.serviceClient.Services(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        return fmt.Errorf("unable to refresh the port block: %v", err)
    }

    rebuilt, err := portallocator.NewPortAllocator(c.portRange)
    if err != nil {
        return fmt.Errorf("unable to create port allocator: %v", err)
    }

    // 5、检查每个 Service ClusterIP 的 port,保证其处于正常状态
    for i := range list.Items {
        svc := &list.Items[i]
        ports := collectServiceNodePorts(svc)
        if len(ports) == 0 {
            continue
        }
        for _, port := range ports {
            switch err := rebuilt.Allocate(port); err {
            // 6、检查 port 是否泄漏
            case nil:
                if stored.Has(port) {
                    stored.Release(port)
                } else {
                    ......
                }
                delete(c.leaks, port)
            // 7、port 重复分配
            case portallocator.ErrAllocated:
                ......
            // 8、port 超出分配范围
            case err.(*portallocator.ErrNotInRange):
                ......
            // 9、port 已经分配完
            case portallocator.ErrFull:
                ......
            default:
                ......
            }
        }
    }
    // 10、检查 port 是否泄漏
    stored.ForEach(func(port int) {
        count, found := c.leaks[port]
        switch {
        case !found:
            ......
            count = numRepairsBeforeLeakCleanup - 1
            fallthrough
        case count > 0:
            c.leaks[port] = count - 1
            if err := rebuilt.Allocate(port); err != nil {
                runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
            }
        default:
            ......
        }
    })

    // 11、更新 snapshot
    if err := rebuilt.Snapshot(snapshot); err != nil {
        return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
    }
    ......
    return nil
}

The above is the main implementation of bootstrap controller.

Summary

This article analyzes the implementation of apiserver service in kube-apiserver. apiserver service is controlled by bootstrap controller. bootstrap controller will ensure that apiserver service and its The bootstrap controller ensures that the apiserver service and its endpoint are in a normal state. Note that the endpoint of the apiserver service is divided into three control methods according to the parameters specified at startup, and this article only analyzes the implementation of lease. If you use the master-count method, you need to change the port, apiserver-count and other configuration parameters to the same for each master instance.