Kubernetes的Leader选举机制

上一篇Blog里遗留一个问题:在打开了kube-proxy得tracing日志之后,除去定时同步iptables得日志之外,还出现了一些Calling handler.OnEndpointsUpdate相关得日志输出,这些输出其实是不太寻常的:

13:55:13 localhost kube-proxy[20761]: I0609 13:55:13.290051   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:14 localhost kube-proxy[20761]: I0609 13:55:14.502924   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:15 localhost kube-proxy[20761]: I0609 13:55:15.299633   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:16 localhost kube-proxy[20761]: I0609 13:55:16.515500   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:17 localhost kube-proxy[20761]: I0609 13:55:17.316952   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:18 localhost kube-proxy[20761]: I0609 13:55:18.525537   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:19 localhost kube-proxy[20761]: I0609 13:55:19.326566   20761 config.go:167] Calling handler.OnEndpointsUpdate
13:55:20 localhost kube-proxy[20761]: I0609 13:55:20.541238   20761 config.go:167] Calling handler.OnEndpointsUpdate

频率大约是1s一条,如果熟悉K8S的Watch-List机制的一眼就应该可以看出来原因:因为kube-proxy会watchEndpoints的变化,并对这些变化做相应动作,然后某些Endpoints更新了之后,就触发了这条日志。其实这个机制是没有问题的,在正常的K8S集群里,这些输出也没有问题。但是在我们的集群里就有些不正常了,因为我们当前的应用场景,根本就不存在需要Endpoints的情况!那到底是什么地方触发了Endpoints的更新?

要找到原因,首先得知道是不是真的有对应的Endpoints,并且是不是真的再更新,比较简单,kubectl也支持针对资源的watch操作:

[root@ ~]# kubectl get endpoints --all-namespaces --watch
NAMESPACE     NAME                      ENDPOINTS                              AGE
default       kubernetes                192.168.0.1:6443                       71d
kube-system   kube-controller-manager   <none>                                 71d
kube-system   kube-scheduler            <none>                                 71d
kube-system   kube-controller-manager   <none>                                 71d
kube-system   kube-scheduler            <none>                                 71d
kube-system   kube-controller-manager   <none>                                 71d
kube-system   kube-scheduler            <none>                                 71d
kube-system   kube-controller-manager   <none>                                 71d

可以发现确实在kube-system这个namespace下面有两个endpoint:kube-schedulerkube-controller-manager,而且也都是不停的在被修改。只是比较奇怪的地方是谁在修改它?修改的目的是啥?于是就获取一下其中一个endpoint看下具体的信息:

[root@ ~]# kubectl -n kube-system get endpoints kube-scheduler -oyaml
apiVersion: v1
kind: Endpoints
metadata:
  annotations:
    control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"192-168-0-1_e1e84d39-8c11-492b-8ee0-7d6eac6b3186","leaseDurationSeconds":15,"acquireTime":"2021-05-08T10:47:31Z","renewTime":"2021-06-16T10:41:23Z","leaderTransitions":9}'
  creationTimestamp: "2021-04-06T08:55:30Z"
  name: kube-scheduler
  namespace: kube-system
  resourceVersion: "24773276"
  selfLink: /api/v1/namespaces/kube-system/endpoints/kube-scheduler
  uid: ae3d725e-679d-4410-8655-7ddacb633d1f

可以看到annotations里有个key:control-plane.alpha.kubernetes.io/leader,好像和选举有关啊。之前知道K8S的scheduler和controller-manager以及一些自定义的controller有选举机制来保证同一时刻只有一个实例在工作,但是没仔细研究过到底是怎么实现的,难道是基于endpoints?

先去看看scheduler的启动参数里,是不是有相关的选项,帮助我们理解一下当前的策略。果然执行kube-scheduler --help之后发现有一些相关的输出:

Leader election flags:

  --leader-elect
            Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability. (default true)
  --leader-elect-lease-duration duration
            The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the
            maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. (default 15s)
  --leader-elect-renew-deadline duration
            The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled. (default 10s)
  --leader-elect-resource-lock endpoints
            The type of resource object that is used for locking during leader election. Supported options are endpoints (default) and `configmaps`. (default "endpoints")
  --leader-elect-resource-name string
            The name of resource object that is used for locking during leader election. (default "kube-scheduler")
  --leader-elect-resource-namespace string
            The namespace of resource object that is used for locking during leader election. (default "kube-system")
  --leader-elect-retry-period duration
            The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled. (default 2s)

发现确实有相关的选项,针对scheduler来说,默认选择是使用一个endpoints,叫kube-scheduler来作为加锁的key,也可以修改为configmaps,当然名字也可以修改。而且renew这个锁的时间间隔默认是2s,这就很符合之前的日志输出了,kube-schedulerkube-controller-manager两个组件,每个都间隔2s更新对应的endpoints,刚好看起来像是每秒都会有更新,这个和日志里输出的情况是一致的。

那具体K8S的这个选举机制是怎么工作的呢?继续分析一下代码,因为我们线上是运行的K8S的v1.16.15版本,所以下面的代码都是基于这个版本。

整体上,逻辑大致分为两部分:

  1. 控制部分,主要负责确认当前是否获取到锁,Renew锁,释放锁等等控制层面操作
  2. 存储部分,主要负责存储这个锁,比如上文中的借助一个endpoint资源进行存储

我们先看看控制部分的入口

if cc.LeaderElection != nil {
  cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
    OnStartedLeading: run,
    OnStoppedLeading: func() {
      klog.Fatalf("leaderelection lost")
    },
  }
  leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
  if err != nil {
    return fmt.Errorf("couldn't create leader elector: %v", err)
  }

  leaderElector.Run(ctx)

  return fmt.Errorf("lost lease")
}

入口代码比较简单,创建LeaderElector并执行Run(),继续看看他的内部逻辑,这部分代码在client-go里

// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
	defer func() {
		runtime.HandleCrash()
		le.config.Callbacks.OnStoppedLeading()
	}()
	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)
}

主要的逻辑也比较清楚,先尝试acquire,如果成功,就调用回调函数并及时renew,而在acquirerenew这两个函数里主要的逻辑就是根据配置的时间间隔不停的尝试调用另一个tryAcquireOrRenew函数,我们主要看看这个函数:

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
	now := metav1.Now()
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. obtain or create the ElectionRecord
	oldLeaderElectionRecord, err := le.config.Lock.Get()
	if err != nil {
		if !errors.IsNotFound(err) {
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
		if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}
		le.observedRecord = leaderElectionRecord
		le.observedTime = le.clock.Now()
		return true
	}

	// 2. Record obtained, check the Identity & Time
	if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
		le.observedRecord = *oldLeaderElectionRecord
		le.observedTime = le.clock.Now()
	}
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
	if le.IsLeader() {
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// update the lock itself
	if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}
	le.observedRecord = leaderElectionRecord
	le.observedTime = le.clock.Now()
	return true
}

逻辑也不复杂,主要也就是调用Lock存储的Get()Create()Update函数。那关于控制部分的逻辑基本就差不多了。

下面来看看存储相关的逻辑,入口代码在cmd/kube-scheduler/app/options/options.go

func makeLeaderElectionConfig(config kubeschedulerconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
	hostname, err := os.Hostname()
	if err != nil {
		return nil, fmt.Errorf("unable to get hostname: %v", err)
	}
	// add a uniquifier so that two processes on the same host don't accidentally both become active
	id := hostname + "_" + string(uuid.NewUUID())

	rl, err := resourcelock.New(config.ResourceLock,
		config.ResourceNamespace,
		config.ResourceName,
		client.CoreV1(),
		client.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: recorder,
		})
	if err != nil {
		return nil, fmt.Errorf("couldn't create resource lock: %v", err)
	}

	return &leaderelection.LeaderElectionConfig{
		Lock:          rl,
		LeaseDuration: config.LeaseDuration.Duration,
		RenewDeadline: config.RenewDeadline.Duration,
		RetryPeriod:   config.RetryPeriod.Duration,
		WatchDog:      leaderelection.NewLeaderHealthzAdaptor(time.Second * 20),
		Name:          "kube-scheduler",
	}, nil
}

其中最重要的调用是resourcelock.New()这里根据Lock的类型,创建了不同的实例,我们继续看看K8S提供了哪些类型的实现

// Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
	switch lockType {
	case EndpointsResourceLock:
		return &EndpointsLock{
			EndpointsMeta: metav1.ObjectMeta{
				Namespace: ns,
				Name:      name,
			},
			Client:     coreClient,
			LockConfig: rlc,
		}, nil
	case ConfigMapsResourceLock:
		return &ConfigMapLock{
			ConfigMapMeta: metav1.ObjectMeta{
				Namespace: ns,
				Name:      name,
			},
			Client:     coreClient,
			LockConfig: rlc,
		}, nil
	case LeasesResourceLock:
		return &LeaseLock{
			LeaseMeta: metav1.ObjectMeta{
				Namespace: ns,
				Name:      name,
			},
			Client:     coordinationClient,
			LockConfig: rlc,
		}, nil
	default:
		return nil, fmt.Errorf("Invalid lock-type %s", lockType)
	}
}

简单一看发现除了上面看到的endpointsconfigmaps还多了个leases类型,具体的实现也比较简单,就用endpoints类型的Create举例吧:

// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
	recordBytes, err := json.Marshal(ler)
	if err != nil {
		return err
	}
	el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{
		ObjectMeta: metav1.ObjectMeta{
			Name:      el.EndpointsMeta.Name,
			Namespace: el.EndpointsMeta.Namespace,
			Annotations: map[string]string{
				LeaderElectionRecordAnnotationKey: string(recordBytes),
			},
		},
	})
	return err
}

就非常的简单,就是调用kubeclient的Endpoints相关接口,创建一个Endpoints,同样的,其他动作也是类似。

到这里其实大体上整个选主的逻辑也基本搞清楚了,似乎没什么大问题。但总觉得目前提供的这两个基于endpointsconfigmaps的实现不怎么优雅。因为每个Node上,kube-proxy需要watchendpoints的变化、而kubelet又需要watchconfigmaps的变化。无论选择哪个,Lock的不停Renew都会Push到所有的节点,这在无形中也会对集群多造成一点点压力。

其实从官方的实现也可以看的出来,leases类型是独立的一个资源,没用其他的组件会watch这个资源,从一定程度上能解决这个问题,可惜的是因为有平滑升级的需求,不能直接切换到leases类型的Lock了。所以官方也在后续1.17版本做出了一些改进,具体的可以参考:migrate leader election to lease API #81030Migrate components to EndpointsLeases leader election lock #84084

说回我们自己的业务,暂时也没有升级K8S得欲望,好在当前我们机器上不需要InCluster类型访问apiserver,也没有ClusterIP的需求,所以其实不需要kube-proxy这个组件,一不做二不休,不如直接把kube-proxy给下线了,正好下线之后还能去掉对ipvs模块的依赖,刚好也能解决上篇疯狂输出日志的问题,一举两得。