# 流程解读 ## 执行任务 mflow/apps/task/impl/job_task.go 1. RunJob ```go func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) ( *task.JobTask, error) { ins := task.NewJobTask(in) // 获取之前任务的状态, 因为里面有当前任务的审核状态 err := i.GetJotTaskStatus(ctx, ins) if err != nil { return nil, err } // 开启审核后, 执行任务则 调整审核状态为等待中 if ins.Spec.Audit.Enable { auditStatus := ins.AuditStatus() switch auditStatus { case pipeline.AUDIT_STAGE_PENDDING: ins.AuditStatusFlowTo(pipeline.AUDIT_STAGE_WAITING) case pipeline.AUDIT_STAGE_DENY: return nil, fmt.Errorf("任务审核未通过") } i.log.Debug().Msgf("任务: %s 审核状态为, %s", ins.Spec.TaskId, auditStatus) } // 如果不忽略执行, 并且审核通过, 则执行 if in.Enabled() && ins.AuditPass() { // 任务状态检查与处理 switch ins.Status.Stage { case task.STAGE_PENDDING: ins.Status.Stage = task.STAGE_CREATING case task.STAGE_ACTIVE: return nil, exception.NewConflict("任务: %s 当前处于运行中, 需要等待运行结束后才能执行", in.TaskId) } // 查询需要执行的Job: docker_build@v1 req := job.NewDescribeJobRequestByName(in.JobName) j, err := i.job.DescribeJob(ctx, req) if err != nil { return nil, err } ins.Job = j ins.Spec.JobId = j.Meta.Id i.log.Info().Msgf("describe job success, %s[%s]", j.Spec.Name, j.Meta.Id) // 脱敏参数动态还原 in.RunParams.RestoreSensitive(j.Spec.RunParams) // 合并允许参数(Job里面有默认值), 并检查参数合法性 // 注意Param的合并是有顺序的,也就是参数优先级(低-->高): // 1. 系统变量(默认禁止修改) // 2. job默认变量 // 3. job运行变量 // 4. pipeline 运行变量 // 5. pipeline 运行时变量 params := job.NewRunParamSet() params.Add(ins.SystemRunParam()...) params.Add(j.Spec.RunParams.Params...) params.Merge(in.RunParams.Params...) err = i.LoadPipelineRunParam(ctx, in, params) if err != nil { return nil, err } // 校验参数合法性 err = params.Validate() if err != nil { return nil, fmt.Errorf("校验任务【%s】参数错误, %s", j.Spec.DisplayName, err) } i.log.Info().Msgf("params check ok, %s", params) // 获取执行器执行 r := runner.GetRunner(j.Spec.RunnerType) runReq := task.NewRunTaskRequest(ins.Spec.TaskId, j.Spec.RunnerSpec, params) runReq.DryRun = in.RunParams.DryRun runReq.Labels = in.Labels runReq.ManualUpdateStatus = j.Spec.ManualUpdateStatus i.log.Debug().Msgf("[%s] start run task: %s", ins.Spec.PipelineTask, in.TaskName) status, err := r.Run(ctx, runReq) if err != nil { return nil, fmt.Errorf("run job error, %s", err) } status.RunParams = params ins.Status = status // 添加搜索标签 ins.BuildSearchLabel() } // 保存任务 updateOpt := options.Update() updateOpt.SetUpsert(true) if _, err := i.jcol.UpdateByID(ctx, ins.Spec.TaskId, bson.M{"$set": ins}, updateOpt); err != nil { return nil, exception.NewInternalServerError("upsert a job task document error, %s", err) } return ins, nil } ``` ## 任务状态更新 ```go // 更新Job状态 func (i *impl) UpdateJobTaskStatus(ctx context.Context, in *task.UpdateJobTaskStatusRequest) ( *task.JobTask, error) { ins, err := i.DescribeJobTask(ctx, task.NewDescribeJobTaskRequest(in.Id)) if err != nil { return nil, err } // 校验更新合法性 err = i.CheckAllowUpdate(ctx, ins, in.UpdateToken, in.ForceUpdateStatus) if err != nil { return nil, err } i.log.Debug().Msgf("更新任务状态: %s,当前状态: %s, 更新状态: %s", ins.Spec.TaskId, ins.Status.Stage, in.Stage) // 状态更新 ins.Status.UpdateStatus(in) // 更新数据库 if err := i.updateJobTaskStatus(ctx, ins); err != nil { return nil, err } // Job Task状态变更回调 i.JobTaskStatusChangedCallback(ctx, ins) // Pipeline Task 状态变更回调 if ins.Spec.PipelineTask != "" { // 如果状态未变化, 不触发流水线更新 if !in.ForceTriggerPipeline && !ins.Status.Changed { i.log.Debug().Msgf("task %s status not changed: [%s], skip update pipeline", in.Id, in.Stage) return ins, nil } _, err := i.PipelineTaskStatusChanged(ctx, ins) if err != nil { return nil, err } } return ins, nil } func (i *impl) JobTaskStatusChangedCallback(ctx context.Context, in *task.JobTask) { // 状态未变化不通知 if in.Status == nil { return } // 状态未变化不通知 if !in.Status.Changed { i.log.Debug().Msgf("task %s status not changed [%s], skip status callback", in.Spec.TaskId, in.Status.Stage) return } i.log.Debug().Msgf("task %s 执行状态变化回调...", in.Spec.TaskId) // 个人通知 for index := range in.Spec.MentionUsers { mu := in.Spec.MentionUsers[index] i.TaskMention(ctx, mu, in) } if len(in.Spec.MentionUsers) > 0 { i.updateJobTaskMentionUser(ctx, in.Spec.TaskId, in.Spec.MentionUsers) } // 群组通知 imRobotHooks := in.Spec.MatchedImRobotNotify(in.Status.Stage.String()) i.log.Debug().Msgf("task %s 群组通知: %v", in.Spec.TaskId, imRobotHooks) i.hook.SendTaskStatus(ctx, imRobotHooks, in) if len(imRobotHooks) > 0 { i.updateJobTaskImRobotNotify(ctx, in.Spec.TaskId, imRobotHooks) } // WebHook回调 webhooks := in.Spec.MatchedWebHooks(in.Status.Stage.String()) i.log.Debug().Msgf("task %s WebHook通知: %v", in.Spec.TaskId, webhooks) i.hook.SendTaskStatus(ctx, webhooks, in) if len(webhooks) > 0 { i.updateJobTaskWebHook(ctx, in.Spec.TaskId, webhooks) } } ``` ## Pipeline 事件触发 ```go // Pipeline中任务有变化时, // 如果执行成功则 继续执行, 如果失败则标记Pipeline结束 // 当所有任务成功结束时标记Pipeline执行成功 func (i *impl) PipelineTaskStatusChanged(ctx context.Context, in *task.JobTask) ( *task.PipelineTask, error) { if in == nil || in.Status == nil { return nil, exception.NewBadRequest("job task or job task status is nil") } if in.Spec.PipelineTask == "" { return nil, exception.NewBadRequest("Pipeline Id参数缺失") } runErrorJobTasks := []*task.UpdateJobTaskStatusRequest{} // 获取Pipeline Task, 因为Job Task是先保存在触发的回调, 这里获取的Pipeline Task是最新的 descReq := task.NewDescribePipelineTaskRequest(in.Spec.PipelineTask) p, err := i.DescribePipelineTask(ctx, descReq) if err != nil { return nil, err } defer func() { // 更新当前任务的pipeline task状态 i.mustUpdatePipelineStatus(ctx, p) // 如果JobTask正常执行, 则等待回调更新, 如果执行失败 则需要立即更新JobTask状态 for index := range runErrorJobTasks { _, err = i.UpdateJobTaskStatus(ctx, runErrorJobTasks[index]) if err != nil { p.MarkedFailed(err) i.mustUpdatePipelineStatus(ctx, p) } } }() // 更新Pipeline Task 运行时环境变量 p.Status.RuntimeEnvs.Merge(in.RuntimeRunParams()...) switch in.Status.Stage { case task.STAGE_PENDDING, task.STAGE_SCHEDULING, task.STAGE_CREATING, task.STAGE_ACTIVE, task.STAGE_CANCELING: // Task状态无变化 return p, nil case task.STAGE_CANCELED: // 任务取消, pipeline 取消执行 p.MarkedCanceled() return p, nil case task.STAGE_FAILED: // 任务执行结果确认失败 if in.IsConfirmEnabled() && in.IsConfirming() { // 状态确认中 i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName) return p, nil } // 任务执行失败, 更新Pipeline状态为失败 if !in.Spec.RunParams.IgnoreFailed { p.MarkedFailed(in.Status.MessageToError()) return p, nil } case task.STAGE_SUCCEEDED: // 任务执行结果确认失败 if in.IsConfirmEnabled() && in.IsConfirming() { // 状态确认中 i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName) return p, nil } // 任务运行成功, pipeline继续执行 i.log.Info().Msgf("task: %s run successed", in.Spec.TaskId) } /* task执行成功或者忽略执行失败, 此时pipeline 仍然处于运行中, 需要获取下一个任务执行 */ nexts, err := p.NextRun() if err != nil { p.MarkedFailed(err) return p, nil } // 如果没有需要执行的任务, Pipeline执行结束, 更新Pipeline状态为成功 if nexts == nil || nexts.Len() == 0 { p.MarkedSuccess() return p, nil } // 如果有需要执行的JobTask, 继续执行 for index := range nexts.Items { item := nexts.Items[index] // 如果任务执行成功则等待任务的回调更新任务状态 // 如果任务执行失败, 直接更新任务状态 t, err := i.RunJob(ctx, item.Spec) if err != nil { updateT := task.NewUpdateJobTaskStatusRequest(item.Spec.TaskId) updateT.UpdateToken = item.Spec.UpdateToken updateT.MarkError(err) runErrorJobTasks = append(runErrorJobTasks, updateT) } else { item.Status = t.Status item.Job = t.Job } } return p, nil } ``` 如何获取下一组需要执行的任务: 任务执行调度 ```go // 返回下个需要执行的JobTask, 允许一次并行执行多个(批量执行) // Task DryRun属性要继承PipelineTask func (p *PipelineTask) NextRun() (*JobTaskSet, error) { set := NewJobTaskSet() var stage *StageStatus if p.Status == nil || p.Pipeline == nil { return set, nil } // 需要未执行完成的Job Tasks stages := p.Status.StageStatus for i := range stages { stage = stages[i] // 找出Stage中未执行完的Job Task set = stage.UnCompleteJobTask(p.Params.DryRun) set.UpdateFromPipelineTask(p) // 如果找到 直接Break if set.Len() > 0 { break } } // 如果所有Stage寻找完,都没找到, 表示PipelineTask执行完成 if set.Len() == 0 { return set, nil } // TODO: 如果这些未执行当中的Job Task 有处于运行中的, 不会执行下个一个任务 if set.HasStage(STAGE_ACTIVE) { return set, exception.NewConflict("Stage 还处于运行中") } // 当未执行的任务中,没有运行中的时,剩下的就是需要被执行的任务 tasks := set.GetJobTaskByStage(STAGE_PENDDING) nextTasks := NewJobTaskSet() stageSpec := p.Pipeline.GetStage(stage.Name) if stageSpec.IsParallel { // 并行任务 返回该Stage所有等待执行的job nextTasks.Add(tasks...) } else { // 串行任务取第一个 nextTasks.Add(tasks[0]) } return nextTasks.UpdateFromPipelineTask(p), nil } ```