5.5 KiB
5.5 KiB
流程解读
执行任务
mflow/apps/task/impl/job_task.go
- RunJob
func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) (
*task.JobTask, error) {
ins := task.NewJobTask(in)
// 获取之前任务的状态, 因为里面有当前任务的审核状态
err := i.GetJotTaskStatus(ctx, ins)
if err != nil {
return nil, err
}
// 开启审核后, 执行任务则 调整审核状态为等待中
if ins.Spec.Audit.Enable {
auditStatus := ins.AuditStatus()
switch auditStatus {
case pipeline.AUDIT_STAGE_PENDDING:
ins.AuditStatusFlowTo(pipeline.AUDIT_STAGE_WAITING)
case pipeline.AUDIT_STAGE_DENY:
return nil, fmt.Errorf("任务审核未通过")
}
i.log.Debug().Msgf("任务: %s 审核状态为, %s", ins.Spec.TaskId, auditStatus)
}
// 如果不忽略执行, 并且审核通过, 则执行
if in.Enabled() && ins.AuditPass() {
// 任务状态检查与处理
switch ins.Status.Stage {
case task.STAGE_PENDDING:
ins.Status.Stage = task.STAGE_CREATING
case task.STAGE_ACTIVE:
return nil, exception.NewConflict("任务: %s 当前处于运行中, 需要等待运行结束后才能执行", in.TaskId)
}
// 查询需要执行的Job: docker_build@v1
req := job.NewDescribeJobRequestByName(in.JobName)
j, err := i.job.DescribeJob(ctx, req)
if err != nil {
return nil, err
}
ins.Job = j
ins.Spec.JobId = j.Meta.Id
i.log.Info().Msgf("describe job success, %s[%s]", j.Spec.Name, j.Meta.Id)
// 脱敏参数动态还原
in.RunParams.RestoreSensitive(j.Spec.RunParams)
// 合并允许参数(Job里面有默认值), 并检查参数合法性
// 注意Param的合并是有顺序的,也就是参数优先级(低-->高):
// 1. 系统变量(默认禁止修改)
// 2. job默认变量
// 3. job运行变量
// 4. pipeline 运行变量
// 5. pipeline 运行时变量
params := job.NewRunParamSet()
params.Add(ins.SystemRunParam()...)
params.Add(j.Spec.RunParams.Params...)
params.Merge(in.RunParams.Params...)
err = i.LoadPipelineRunParam(ctx, in, params)
if err != nil {
return nil, err
}
// 校验参数合法性
err = params.Validate()
if err != nil {
return nil, fmt.Errorf("校验任务【%s】参数错误, %s", j.Spec.DisplayName, err)
}
i.log.Info().Msgf("params check ok, %s", params)
// 获取执行器执行
r := runner.GetRunner(j.Spec.RunnerType)
runReq := task.NewRunTaskRequest(ins.Spec.TaskId, j.Spec.RunnerSpec, params)
runReq.DryRun = in.RunParams.DryRun
runReq.Labels = in.Labels
runReq.ManualUpdateStatus = j.Spec.ManualUpdateStatus
i.log.Debug().Msgf("[%s] start run task: %s", ins.Spec.PipelineTask, in.TaskName)
status, err := r.Run(ctx, runReq)
if err != nil {
return nil, fmt.Errorf("run job error, %s", err)
}
status.RunParams = params
ins.Status = status
// 添加搜索标签
ins.BuildSearchLabel()
}
// 保存任务
updateOpt := options.Update()
updateOpt.SetUpsert(true)
if _, err := i.jcol.UpdateByID(ctx, ins.Spec.TaskId, bson.M{"$set": ins}, updateOpt); err != nil {
return nil, exception.NewInternalServerError("upsert a job task document error, %s", err)
}
return ins, nil
}
任务状态更新
// 更新Job状态
func (i *impl) UpdateJobTaskStatus(ctx context.Context, in *task.UpdateJobTaskStatusRequest) (
*task.JobTask, error) {
ins, err := i.DescribeJobTask(ctx, task.NewDescribeJobTaskRequest(in.Id))
if err != nil {
return nil, err
}
// 校验更新合法性
err = i.CheckAllowUpdate(ctx, ins, in.UpdateToken, in.ForceUpdateStatus)
if err != nil {
return nil, err
}
i.log.Debug().Msgf("更新任务状态: %s,当前状态: %s, 更新状态: %s",
ins.Spec.TaskId, ins.Status.Stage, in.Stage)
// 状态更新
ins.Status.UpdateStatus(in)
// 更新数据库
if err := i.updateJobTaskStatus(ctx, ins); err != nil {
return nil, err
}
// Job Task状态变更回调
i.JobTaskStatusChangedCallback(ctx, ins)
// Pipeline Task 状态变更回调
if ins.Spec.PipelineTask != "" {
// 如果状态未变化, 不触发流水线更新
if !in.ForceTriggerPipeline && !ins.Status.Changed {
i.log.Debug().Msgf("task %s status not changed: [%s], skip update pipeline", in.Id, in.Stage)
return ins, nil
}
_, err := i.PipelineTaskStatusChanged(ctx, ins)
if err != nil {
return nil, err
}
}
return ins, nil
}
func (i *impl) JobTaskStatusChangedCallback(ctx context.Context, in *task.JobTask) {
// 状态未变化不通知
if in.Status == nil {
return
}
// 状态未变化不通知
if !in.Status.Changed {
i.log.Debug().Msgf("task %s status not changed [%s], skip status callback", in.Spec.TaskId, in.Status.Stage)
return
}
i.log.Debug().Msgf("task %s 执行状态变化回调...", in.Spec.TaskId)
// 个人通知
for index := range in.Spec.MentionUsers {
mu := in.Spec.MentionUsers[index]
i.TaskMention(ctx, mu, in)
}
if len(in.Spec.MentionUsers) > 0 {
i.updateJobTaskMentionUser(ctx, in.Spec.TaskId, in.Spec.MentionUsers)
}
// 群组通知
imRobotHooks := in.Spec.MatchedImRobotNotify(in.Status.Stage.String())
i.log.Debug().Msgf("task %s 群组通知: %v", in.Spec.TaskId, imRobotHooks)
i.hook.SendTaskStatus(ctx, imRobotHooks, in)
if len(imRobotHooks) > 0 {
i.updateJobTaskImRobotNotify(ctx, in.Spec.TaskId, imRobotHooks)
}
// WebHook回调
webhooks := in.Spec.MatchedWebHooks(in.Status.Stage.String())
i.log.Debug().Msgf("task %s WebHook通知: %v", in.Spec.TaskId, webhooks)
i.hook.SendTaskStatus(ctx, webhooks, in)
if len(webhooks) > 0 {
i.updateJobTaskWebHook(ctx, in.Spec.TaskId, webhooks)
}
}