Kubernetes CronJob Controller源码分析

最近的一个项目需要用到Kubernetes的CronJob,主要用来定时执行一个备份任务,刚开始使用的时候发现没有按照预期的情况运行,所以决定看看CronJob Controller的代码,看看他是怎么实现对应的功能的,正好发现网上也没有其他人写过关于CronJob Controller代码的解析(可能是太简单了不用写吧)。所以也就正好记录一下。

CronJob Controller的代码在kubernetes/pkg/controller/cronjob路径下,主要的逻辑实现在这个目录的cronjob_controller.go,这里分析的是v1.10.2版本的代码,可以直接链接到Github查看。

我们直接跳过创建Controller的过程,直接看运行的部分:

func (jm *CronJobController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	glog.Infof("Starting CronJob Manager")
	// Check things every 10 second.
	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
	<-stopCh
	glog.Infof("Shutting down CronJob Manager")
}

这个很简单,每隔10s去执行一次syncAll这个方法,然后我们再看看这个方法:

func (jm *CronJobController) syncAll() {
	// List children (Jobs) before parents (CronJob).
	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
	// Note that this only works because we are NOT using any caches here.
	// 先把所有的Job给列出来,主要是为了找到所有的CronJob和CronJob创建的Job的对应关系
	jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
		return
	}
	js := jl.Items
	glog.V(4).Infof("Found %d jobs", len(js))
	// 这里列出来所有的CronJob
	sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
		return
	}
	sjs := sjl.Items
	glog.V(4).Infof("Found %d cronjobs", len(sjs))

	// 获取一个map[uid][]Job,主要就是根据CronJob的UID将所有属于它的Job给聚合起来
	jobsBySj := groupJobsByParent(js)
	glog.V(4).Infof("Found %d groups", len(jobsBySj))

	for _, sj := range sjs {
		// 针对每个CronJob调用syncOne
		syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
		// 根据相应的配置,主要是SuccessfulJobsHistoryLimit和FailedJobsHistoryLimit删除掉多余的Job
		cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
	}
}

然后就走到了关键方法syncOne了:

func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
	nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)

	// 首先扫一遍所有的子Job,看是否有不在Active列表中的孤儿,以及已经执行完成但是还在Active列表中的Job,记录一下Event,删掉不对应的状态。
	childrenJobs := make(map[types.UID]bool)
	for _, j := range js {
		childrenJobs[j.ObjectMeta.UID] = true
		found := inActiveList(*sj, j.ObjectMeta.UID)
		if !found && !IsJobFinished(&j) {
			recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
			// We found an unfinished job that has us as the parent, but it is not in our Active list.
			// This could happen if we crashed right after creating the Job and before updating the status,
			// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
			// a job that they wanted us to adopt.

			// TODO: maybe handle the adoption case?  Concurrency/suspend rules will not apply in that case, obviously, since we can't
			// stop users from creating jobs if they have permission.  It is assumed that if a
			// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
			// in the same namespace "adopt" that job.  ReplicaSets and their Pods work the same way.
			// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
		} else if found && IsJobFinished(&j) {
			deleteFromActiveList(sj, j.ObjectMeta.UID)
			// TODO: event to call out failure vs success.
			recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
		}
	}

	// Remove any job reference from the active list if the corresponding job does not exist any more.
	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
	// job running.
	// 然后再看一下Active里是否有一些不存在的Job,如果有,也删除掉
	for _, j := range sj.Status.Active {
		if found := childrenJobs[j.UID]; !found {
			recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
			deleteFromActiveList(sj, j.UID)
		}
	}
	// 更新一下CronJob的状态
	updatedSJ, err := sjc.UpdateStatus(sj)
	if err != nil {
		glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
		return
	}
	*sj = *updatedSJ

	// 判断CronJob是否删除,如果删除了就不管
	if sj.DeletionTimestamp != nil {
		// The CronJob is being deleted.
		// Don't do anything other than updating status.
		return
	}
	// 判断是否是停止调度状态,如果是则不管
	if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
		glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
		return
	}
	// 这里先计算到目前为止需要执行的Job时间列表
	times, err := getRecentUnmetScheduleTimes(*sj, now)
	if err != nil {
		recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
		glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
		return
	}
	// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
	if len(times) == 0 {
		glog.V(4).Infof("No unmet start times for %s", nameForLog)
		return
	}
	if len(times) > 1 {
		glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
	}
	// 拿出最后需要执行的时间
	scheduledTime := times[len(times)-1]
	tooLate := false
	// 如果存在StartingDeadlineSeconds配置,判断当前时间是否超过了执行时限
	if sj.Spec.StartingDeadlineSeconds != nil {
		tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
	}
	// 如果超过了执行时限就不做了
	if tooLate {
		glog.V(4).Infof("Missed starting window for %s", nameForLog)
		// TODO: generate an event for a miss.  Use a warning level event because it indicates a
		// problem with the controller (restart or long queue), and is not expected by user either.
		// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
		// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
		// and event the next time we process it, and also so the user looking at the status
		// can see easily that there was a missed execution.
		return
	}
	// 如果ConcurrencyPolicy配置为Forbid即不允许并行执行但是现在有正在执行的Job也不会添加新的
	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
		// Regardless which source of information we use for the set of active jobs,
		// there is some risk that we won't see an active job when there is one.
		// (because we haven't seen the status update to the SJ or the created pod).
		// So it is theoretically possible to have concurrency with Forbid.
		// As long the as the invokations are "far enough apart in time", this usually won't happen.
		//
		// TODO: for Forbid, we could use the same name for every execution, as a lock.
		// With replace, we could use a name that is deterministic per execution time.
		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
		glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
		return
	}
	如果ConcurrencyPolicy配置为Replace并且有正在运行的Job则把对应的Job给删除掉
	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
		for _, j := range sj.Status.Active {
			// TODO: this should be replaced with server side job deletion
			// currently this mimics JobReaper from pkg/kubectl/stop.go
			glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)

			job, err := jc.GetJob(j.Namespace, j.Name)
			if err != nil {
				recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
				return
			}
			if !deleteJob(sj, job, jc, pc, recorder, "") {
				return
			}
		}
	}
	// 根据CronJob Spec中JobTemplate的配置获取Job对象,其中Job对象的名字会加上scheduledTime计算出的Hash,目前是unix timestamp
	jobReq, err := getJobFromTemplate(sj, scheduledTime)
	if err != nil {
		glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
		return
	}
	// 调用接口创建一个新的Job
	jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
	if err != nil {
		recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
		return
	}
	glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
	recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

	// ------------------------------------------------------------------ //

	// If this process restarts at this point (after posting a job, but
	// before updating the status), then we might try to start the job on
	// the next time.  Actually, if we relist the SJs and Jobs on the next
	// iteration of syncAll, we might not see our own status update, and
	// then post one again.  So, we need to use the job name as a lock to
	// prevent us from making the job twice (name the job with hash of its
	// scheduled time).

	// Add the just-started job to the status list.
	// 将刚创建的Job加到CronJob的Active列表中,设置LastScheduleTime,更新CronJob
	ref, err := getRef(jobResp)
	if err != nil {
		glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
	} else {
		sj.Status.Active = append(sj.Status.Active, *ref)
	}
	sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
	if _, err := sjc.UpdateStatus(sj); err != nil {
		glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
	}

	return
}

整体逻辑还是比较简单和清晰的,其中有个获取需要执行的时间列表的方法getRecentUnmetScheduleTimes,可以再看一下:

func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
	starts := []time.Time{}
	// 使用robfig/cron对Schedule进行解析
	sched, err := cron.ParseStandard(sj.Spec.Schedule)
	if err != nil {
		return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
	}

	// 判断初始时间,如果CronJob之前被执行过,则以上次执行实现为准,如果没有执行过,则以CronJob创建时间为准
	var earliestTime time.Time
	if sj.Status.LastScheduleTime != nil {
		earliestTime = sj.Status.LastScheduleTime.Time
	} else {
		// If none found, then this is either a recently created scheduledJob,
		// or the active/completed info was somehow lost (contract for status
		// in kubernetes says it may need to be recreated), or that we have
		// started a job, but have not noticed it yet (distributed systems can
		// have arbitrary delays).  In any case, use the creation time of the
		// CronJob as last known start time.
		earliestTime = sj.ObjectMeta.CreationTimestamp.Time
	}
	// 如果设置了StartingDeadlineSeconds,并且当前时间减去该值比初始时间还晚,那就以新的时间为准
	if sj.Spec.StartingDeadlineSeconds != nil {
		// Controller is not going to schedule anything below this point
		schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))

		if schedulingDeadline.After(earliestTime) {
			earliestTime = schedulingDeadline
		}
	}
	// 如果初始时间比现在还晚,直接跳过了
	if earliestTime.After(now) {
		return []time.Time{}, nil
	}
	// 计算从初始时间到现在所有需要执行的任务的时间
	// 主要有可能一个Cron会错过很多次执行,所以需要计算所有的,但是如果超过太多,也就没有意义了。只关注前100个
	for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
		starts = append(starts, t)
		// An object might miss several starts. For example, if
		// controller gets wedged on friday at 5:01pm when everyone has
		// gone home, and someone comes in on tuesday AM and discovers
		// the problem and restarts the controller, then all the hourly
		// jobs, more than 80 of them for one hourly scheduledJob, should
		// all start running with no further intervention (if the scheduledJob
		// allows concurrency and late starts).
		//
		// However, if there is a bug somewhere, or incorrect clock
		// on controller's server or apiservers (for setting creationTimestamp)
		// then there could be so many missed start times (it could be off
		// by decades or more), that it would eat up all the CPU and memory
		// of this controller. In that case, we want to not try to list
		// all the missed start times.
		//
		// I've somewhat arbitrarily picked 100, as more than 80,
		// but less than "lots".
		if len(starts) > 100 {
			// We can't get the most recent times so just return an empty slice
			return []time.Time{}, fmt.Errorf("Too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
		}
	}
	return starts, nil
}

基本上主要的业务逻辑都在这里了,整体上看还是十分“暴力”和简单的,没有用到Informer等等类似的东西,就是不停轮询、计算需要执行的任务、添加任务。

需要吐槽一下的是,由于之前版本里K8s的CronJob名字叫ScheduledJob,后来改成了CronJob,导致代码里很多变量的命名都还是使用的ScheduledJob的缩写sj什么的,改名也不改变量名的,一开始看的时候就觉得这变量名怎么这么奇怪。

代码看完了,最后回到开头的问题,问题出在哪呢?最后发现其实是时区错了,controller-mananger运行的容器的时区是UTC时间,然而Unix Cron语法里没有时区概念,作为中国人当然是以北京时间为标准设置Cron,结果呢,差了8个小时~~囧。