# 流程解读 ## 执行任务 mflow/apps/task/impl/job_task.go 1. RunJob ```go func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) ( *task.JobTask, error) { ins := task.NewJobTask(in) // 获取之前任务的状态, 因为里面有当前任务的审核状态 err := i.GetJotTaskStatus(ctx, ins) if err != nil { return nil, err } // 开启审核后, 执行任务则 调整审核状态为等待中 if ins.Spec.Audit.Enable { auditStatus := ins.AuditStatus() switch auditStatus { case pipeline.AUDIT_STAGE_PENDDING: ins.AuditStatusFlowTo(pipeline.AUDIT_STAGE_WAITING) case pipeline.AUDIT_STAGE_DENY: return nil, fmt.Errorf("任务审核未通过") } i.log.Debug().Msgf("任务: %s 审核状态为, %s", ins.Spec.TaskId, auditStatus) } // 如果不忽略执行, 并且审核通过, 则执行 if in.Enabled() && ins.AuditPass() { // 任务状态检查与处理 switch ins.Status.Stage { case task.STAGE_PENDDING: ins.Status.Stage = task.STAGE_CREATING case task.STAGE_ACTIVE: return nil, exception.NewConflict("任务: %s 当前处于运行中, 需要等待运行结束后才能执行", in.TaskId) } // 查询需要执行的Job: docker_build@v1 req := job.NewDescribeJobRequestByName(in.JobName) j, err := i.job.DescribeJob(ctx, req) if err != nil { return nil, err } ins.Job = j ins.Spec.JobId = j.Meta.Id i.log.Info().Msgf("describe job success, %s[%s]", j.Spec.Name, j.Meta.Id) // 脱敏参数动态还原 in.RunParams.RestoreSensitive(j.Spec.RunParams) // 合并允许参数(Job里面有默认值), 并检查参数合法性 // 注意Param的合并是有顺序的,也就是参数优先级(低-->高): // 1. 系统变量(默认禁止修改) // 2. job默认变量 // 3. job运行变量 // 4. pipeline 运行变量 // 5. pipeline 运行时变量 params := job.NewRunParamSet() params.Add(ins.SystemRunParam()...) params.Add(j.Spec.RunParams.Params...) params.Merge(in.RunParams.Params...) err = i.LoadPipelineRunParam(ctx, in, params) if err != nil { return nil, err } // 校验参数合法性 err = params.Validate() if err != nil { return nil, fmt.Errorf("校验任务【%s】参数错误, %s", j.Spec.DisplayName, err) } i.log.Info().Msgf("params check ok, %s", params) // 获取执行器执行 r := runner.GetRunner(j.Spec.RunnerType) runReq := task.NewRunTaskRequest(ins.Spec.TaskId, j.Spec.RunnerSpec, params) runReq.DryRun = in.RunParams.DryRun runReq.Labels = in.Labels runReq.ManualUpdateStatus = j.Spec.ManualUpdateStatus i.log.Debug().Msgf("[%s] start run task: %s", ins.Spec.PipelineTask, in.TaskName) status, err := r.Run(ctx, runReq) if err != nil { return nil, fmt.Errorf("run job error, %s", err) } status.RunParams = params ins.Status = status // 添加搜索标签 ins.BuildSearchLabel() } // 保存任务 updateOpt := options.Update() updateOpt.SetUpsert(true) if _, err := i.jcol.UpdateByID(ctx, ins.Spec.TaskId, bson.M{"$set": ins}, updateOpt); err != nil { return nil, exception.NewInternalServerError("upsert a job task document error, %s", err) } return ins, nil } ``` ## 任务状态更新