From 2f9d40a681e415f1dce3bcf5e8824c0cfa548922 Mon Sep 17 00:00:00 2001 From: yumaojun03 <719118794@qq.com> Date: Sun, 6 Apr 2025 15:47:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E4=BB=BB=E5=8A=A1=E7=9A=84?= =?UTF-8?q?=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- devcloud-mini/mflow/flow.md | 110 ++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 devcloud-mini/mflow/flow.md diff --git a/devcloud-mini/mflow/flow.md b/devcloud-mini/mflow/flow.md new file mode 100644 index 0000000..00d85f8 --- /dev/null +++ b/devcloud-mini/mflow/flow.md @@ -0,0 +1,110 @@ +# 流程解读 + + +## 执行任务 + +mflow/apps/task/impl/job_task.go + +1. RunJob +```go +func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) ( + *task.JobTask, error) { + ins := task.NewJobTask(in) + + // 获取之前任务的状态, 因为里面有当前任务的审核状态 + err := i.GetJotTaskStatus(ctx, ins) + if err != nil { + return nil, err + } + + // 开启审核后, 执行任务则 调整审核状态为等待中 + if ins.Spec.Audit.Enable { + auditStatus := ins.AuditStatus() + switch auditStatus { + case pipeline.AUDIT_STAGE_PENDDING: + ins.AuditStatusFlowTo(pipeline.AUDIT_STAGE_WAITING) + case pipeline.AUDIT_STAGE_DENY: + return nil, fmt.Errorf("任务审核未通过") + } + i.log.Debug().Msgf("任务: %s 审核状态为, %s", ins.Spec.TaskId, auditStatus) + } + + // 如果不忽略执行, 并且审核通过, 则执行 + if in.Enabled() && ins.AuditPass() { + // 任务状态检查与处理 + switch ins.Status.Stage { + case task.STAGE_PENDDING: + ins.Status.Stage = task.STAGE_CREATING + case task.STAGE_ACTIVE: + return nil, exception.NewConflict("任务: %s 当前处于运行中, 需要等待运行结束后才能执行", in.TaskId) + } + + // 查询需要执行的Job: docker_build@v1 + req := job.NewDescribeJobRequestByName(in.JobName) + + j, err := i.job.DescribeJob(ctx, req) + if err != nil { + return nil, err + } + ins.Job = j + ins.Spec.JobId = j.Meta.Id + i.log.Info().Msgf("describe job success, %s[%s]", j.Spec.Name, j.Meta.Id) + + // 脱敏参数动态还原 + in.RunParams.RestoreSensitive(j.Spec.RunParams) + + // 合并允许参数(Job里面有默认值), 并检查参数合法性 + // 注意Param的合并是有顺序的,也就是参数优先级(低-->高): + // 1. 系统变量(默认禁止修改) + // 2. job默认变量 + // 3. job运行变量 + // 4. pipeline 运行变量 + // 5. pipeline 运行时变量 + params := job.NewRunParamSet() + params.Add(ins.SystemRunParam()...) + params.Add(j.Spec.RunParams.Params...) + params.Merge(in.RunParams.Params...) + err = i.LoadPipelineRunParam(ctx, in, params) + if err != nil { + return nil, err + } + + // 校验参数合法性 + err = params.Validate() + if err != nil { + return nil, fmt.Errorf("校验任务【%s】参数错误, %s", j.Spec.DisplayName, err) + } + i.log.Info().Msgf("params check ok, %s", params) + + // 获取执行器执行 + r := runner.GetRunner(j.Spec.RunnerType) + runReq := task.NewRunTaskRequest(ins.Spec.TaskId, j.Spec.RunnerSpec, params) + runReq.DryRun = in.RunParams.DryRun + runReq.Labels = in.Labels + runReq.ManualUpdateStatus = j.Spec.ManualUpdateStatus + + i.log.Debug().Msgf("[%s] start run task: %s", ins.Spec.PipelineTask, in.TaskName) + status, err := r.Run(ctx, runReq) + if err != nil { + return nil, fmt.Errorf("run job error, %s", err) + } + status.RunParams = params + ins.Status = status + + // 添加搜索标签 + ins.BuildSearchLabel() + } + + // 保存任务 + updateOpt := options.Update() + updateOpt.SetUpsert(true) + if _, err := i.jcol.UpdateByID(ctx, ins.Spec.TaskId, bson.M{"$set": ins}, updateOpt); err != nil { + return nil, exception.NewInternalServerError("upsert a job task document error, %s", err) + } + return ins, nil +} +``` + +## 任务状态更新 + +