Kubernetes CronJob Controller源码分析
最近的一个项目需要用到Kubernetes的CronJob,主要用来定时执行一个备份任务,刚开始使用的时候发现没有按照预期的情况运行,所以决定看看CronJob Controller
的代码,看看他是怎么实现对应的功能的,正好发现网上也没有其他人写过关于CronJob Controller
代码的解析(可能是太简单了不用写吧)。所以也就正好记录一下。
CronJob Controller
的代码在kubernetes/pkg/controller/cronjob
路径下,主要的逻辑实现在这个目录的cronjob_controller.go
,这里分析的是v1.10.2
版本的代码,可以直接链接到Github查看。
我们直接跳过创建Controller的过程,直接看运行的部分:
func (jm *CronJobController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting CronJob Manager")
// Check things every 10 second.
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down CronJob Manager")
}
这个很简单,每隔10s去执行一次syncAll这个方法,然后我们再看看这个方法:
func (jm *CronJobController) syncAll() {
// List children (Jobs) before parents (CronJob).
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
// Note that this only works because we are NOT using any caches here.
// 先把所有的Job给列出来,主要是为了找到所有的CronJob和CronJob创建的Job的对应关系
jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
return
}
js := jl.Items
glog.V(4).Infof("Found %d jobs", len(js))
// 这里列出来所有的CronJob
sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
return
}
sjs := sjl.Items
glog.V(4).Infof("Found %d cronjobs", len(sjs))
// 获取一个map[uid][]Job,主要就是根据CronJob的UID将所有属于它的Job给聚合起来
jobsBySj := groupJobsByParent(js)
glog.V(4).Infof("Found %d groups", len(jobsBySj))
for _, sj := range sjs {
// 针对每个CronJob调用syncOne
syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
// 根据相应的配置,主要是SuccessfulJobsHistoryLimit和FailedJobsHistoryLimit删除掉多余的Job
cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
}
}
然后就走到了关键方法syncOne了:
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
// 首先扫一遍所有的子Job,看是否有不在Active列表中的孤儿,以及已经执行完成但是还在Active列表中的Job,记录一下Event,删掉不对应的状态。
childrenJobs := make(map[types.UID]bool)
for _, j := range js {
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*sj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) {
recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else if found && IsJobFinished(&j) {
deleteFromActiveList(sj, j.ObjectMeta.UID)
// TODO: event to call out failure vs success.
recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}
// Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running.
// 然后再看一下Active里是否有一些不存在的Job,如果有,也删除掉
for _, j := range sj.Status.Active {
if found := childrenJobs[j.UID]; !found {
recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(sj, j.UID)
}
}
// 更新一下CronJob的状态
updatedSJ, err := sjc.UpdateStatus(sj)
if err != nil {
glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
return
}
*sj = *updatedSJ
// 判断CronJob是否删除,如果删除了就不管
if sj.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return
}
// 判断是否是停止调度状态,如果是则不管
if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
// 这里先计算到目前为止需要执行的Job时间列表
times, err := getRecentUnmetScheduleTimes(*sj, now)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
return
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
if len(times) == 0 {
glog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
}
// 拿出最后需要执行的时间
scheduledTime := times[len(times)-1]
tooLate := false
// 如果存在StartingDeadlineSeconds配置,判断当前时间是否超过了执行时限
if sj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
}
// 如果超过了执行时限就不做了
if tooLate {
glog.V(4).Infof("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
return
}
// 如果ConcurrencyPolicy配置为Forbid即不允许并行执行但是现在有正在执行的Job也不会添加新的
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invokations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return
}
如果ConcurrencyPolicy配置为Replace并且有正在运行的Job则把对应的Job给删除掉
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range sj.Status.Active {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
if !deleteJob(sj, job, jc, pc, recorder, "") {
return
}
}
}
// 根据CronJob Spec中JobTemplate的配置获取Job对象,其中Job对象的名字会加上scheduledTime计算出的Hash,目前是unix timestamp
jobReq, err := getJobFromTemplate(sj, scheduledTime)
if err != nil {
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
// 调用接口创建一个新的Job
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we relist the SJs and Jobs on the next
// iteration of syncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice (name the job with hash of its
// scheduled time).
// Add the just-started job to the status list.
// 将刚创建的Job加到CronJob的Active列表中,设置LastScheduleTime,更新CronJob
ref, err := getRef(jobResp)
if err != nil {
glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
sj.Status.Active = append(sj.Status.Active, *ref)
}
sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := sjc.UpdateStatus(sj); err != nil {
glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
}
return
}
整体逻辑还是比较简单和清晰的,其中有个获取需要执行的时间列表的方法getRecentUnmetScheduleTimes
,可以再看一下:
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{}
// 使用robfig/cron对Schedule进行解析
sched, err := cron.ParseStandard(sj.Spec.Schedule)
if err != nil {
return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
}
// 判断初始时间,如果CronJob之前被执行过,则以上次执行实现为准,如果没有执行过,则以CronJob创建时间为准
var earliestTime time.Time
if sj.Status.LastScheduleTime != nil {
earliestTime = sj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created scheduledJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// CronJob as last known start time.
earliestTime = sj.ObjectMeta.CreationTimestamp.Time
}
// 如果设置了StartingDeadlineSeconds,并且当前时间减去该值比初始时间还晚,那就以新的时间为准
if sj.Spec.StartingDeadlineSeconds != nil {
// Controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
// 如果初始时间比现在还晚,直接跳过了
if earliestTime.After(now) {
return []time.Time{}, nil
}
// 计算从初始时间到现在所有需要执行的任务的时间
// 主要有可能一个Cron会错过很多次执行,所以需要计算所有的,但是如果超过太多,也就没有意义了。只关注前100个
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
starts = append(starts, t)
// An object might miss several starts. For example, if
// controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
//
// I've somewhat arbitrarily picked 100, as more than 80,
// but less than "lots".
if len(starts) > 100 {
// We can't get the most recent times so just return an empty slice
return []time.Time{}, fmt.Errorf("Too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
}
}
return starts, nil
}
基本上主要的业务逻辑都在这里了,整体上看还是十分“暴力”和简单的,没有用到Informer等等类似的东西,就是不停轮询、计算需要执行的任务、添加任务。
需要吐槽一下的是,由于之前版本里K8s的CronJob名字叫ScheduledJob,后来改成了CronJob,导致代码里很多变量的命名都还是使用的ScheduledJob的缩写sj什么的,改名也不改变量名的,一开始看的时候就觉得这变量名怎么这么奇怪。
代码看完了,最后回到开头的问题,问题出在哪呢?最后发现其实是时区错了,controller-mananger
运行的容器的时区是UTC时间,然而Unix Cron语法里没有时区概念,作为中国人当然是以北京时间为标准设置Cron,结果呢,差了8个小时~~囧。