From 1a473d8b3ac8a81c5719f02c35789c0892517719 Mon Sep 17 00:00:00 2001 From: yumaojun03 <719118794@qq.com> Date: Sun, 6 Apr 2025 16:47:59 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85pipeline=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- devcloud-mini/mflow/flow.md | 165 ++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/devcloud-mini/mflow/flow.md b/devcloud-mini/mflow/flow.md index 3e80f69..c1f24c7 100644 --- a/devcloud-mini/mflow/flow.md +++ b/devcloud-mini/mflow/flow.md @@ -188,4 +188,169 @@ func (i *impl) JobTaskStatusChangedCallback(ctx context.Context, in *task.JobTas i.updateJobTaskWebHook(ctx, in.Spec.TaskId, webhooks) } } +``` + +## Pipeline 事件触发 + +```go +// Pipeline中任务有变化时, +// 如果执行成功则 继续执行, 如果失败则标记Pipeline结束 +// 当所有任务成功结束时标记Pipeline执行成功 +func (i *impl) PipelineTaskStatusChanged(ctx context.Context, in *task.JobTask) ( + *task.PipelineTask, error) { + if in == nil || in.Status == nil { + return nil, exception.NewBadRequest("job task or job task status is nil") + } + + if in.Spec.PipelineTask == "" { + return nil, exception.NewBadRequest("Pipeline Id参数缺失") + } + + runErrorJobTasks := []*task.UpdateJobTaskStatusRequest{} + // 获取Pipeline Task, 因为Job Task是先保存在触发的回调, 这里获取的Pipeline Task是最新的 + descReq := task.NewDescribePipelineTaskRequest(in.Spec.PipelineTask) + p, err := i.DescribePipelineTask(ctx, descReq) + if err != nil { + return nil, err + } + + defer func() { + // 更新当前任务的pipeline task状态 + i.mustUpdatePipelineStatus(ctx, p) + + // 如果JobTask正常执行, 则等待回调更新, 如果执行失败 则需要立即更新JobTask状态 + for index := range runErrorJobTasks { + _, err = i.UpdateJobTaskStatus(ctx, runErrorJobTasks[index]) + if err != nil { + p.MarkedFailed(err) + i.mustUpdatePipelineStatus(ctx, p) + } + } + }() + + // 更新Pipeline Task 运行时环境变量 + p.Status.RuntimeEnvs.Merge(in.RuntimeRunParams()...) + + switch in.Status.Stage { + case task.STAGE_PENDDING, + task.STAGE_SCHEDULING, + task.STAGE_CREATING, + task.STAGE_ACTIVE, + task.STAGE_CANCELING: + // Task状态无变化 + return p, nil + case task.STAGE_CANCELED: + // 任务取消, pipeline 取消执行 + p.MarkedCanceled() + return p, nil + case task.STAGE_FAILED: + // 任务执行结果确认失败 + if in.IsConfirmEnabled() && in.IsConfirming() { + // 状态确认中 + i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName) + return p, nil + } + // 任务执行失败, 更新Pipeline状态为失败 + if !in.Spec.RunParams.IgnoreFailed { + p.MarkedFailed(in.Status.MessageToError()) + return p, nil + } + case task.STAGE_SUCCEEDED: + // 任务执行结果确认失败 + if in.IsConfirmEnabled() && in.IsConfirming() { + // 状态确认中 + i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName) + return p, nil + } + // 任务运行成功, pipeline继续执行 + i.log.Info().Msgf("task: %s run successed", in.Spec.TaskId) + } + + /* + task执行成功或者忽略执行失败, 此时pipeline 仍然处于运行中, 需要获取下一个任务执行 + */ + nexts, err := p.NextRun() + if err != nil { + p.MarkedFailed(err) + return p, nil + } + + // 如果没有需要执行的任务, Pipeline执行结束, 更新Pipeline状态为成功 + if nexts == nil || nexts.Len() == 0 { + p.MarkedSuccess() + return p, nil + } + + // 如果有需要执行的JobTask, 继续执行 + for index := range nexts.Items { + item := nexts.Items[index] + // 如果任务执行成功则等待任务的回调更新任务状态 + // 如果任务执行失败, 直接更新任务状态 + t, err := i.RunJob(ctx, item.Spec) + if err != nil { + updateT := task.NewUpdateJobTaskStatusRequest(item.Spec.TaskId) + updateT.UpdateToken = item.Spec.UpdateToken + updateT.MarkError(err) + runErrorJobTasks = append(runErrorJobTasks, updateT) + } else { + item.Status = t.Status + item.Job = t.Job + } + } + + return p, nil +} +``` + +如何获取下一组需要执行的任务: 任务执行调度 +```go +// 返回下个需要执行的JobTask, 允许一次并行执行多个(批量执行) +// Task DryRun属性要继承PipelineTask +func (p *PipelineTask) NextRun() (*JobTaskSet, error) { + set := NewJobTaskSet() + var stage *StageStatus + + if p.Status == nil || p.Pipeline == nil { + return set, nil + } + + // 需要未执行完成的Job Tasks + stages := p.Status.StageStatus + for i := range stages { + stage = stages[i] + + // 找出Stage中未执行完的Job Task + set = stage.UnCompleteJobTask(p.Params.DryRun) + set.UpdateFromPipelineTask(p) + // 如果找到 直接Break + if set.Len() > 0 { + break + } + } + + // 如果所有Stage寻找完,都没找到, 表示PipelineTask执行完成 + if set.Len() == 0 { + return set, nil + } + + // TODO: 如果这些未执行当中的Job Task 有处于运行中的, 不会执行下个一个任务 + if set.HasStage(STAGE_ACTIVE) { + return set, exception.NewConflict("Stage 还处于运行中") + } + + // 当未执行的任务中,没有运行中的时,剩下的就是需要被执行的任务 + tasks := set.GetJobTaskByStage(STAGE_PENDDING) + + nextTasks := NewJobTaskSet() + stageSpec := p.Pipeline.GetStage(stage.Name) + if stageSpec.IsParallel { + // 并行任务 返回该Stage所有等待执行的job + nextTasks.Add(tasks...) + } else { + // 串行任务取第一个 + nextTasks.Add(tasks[0]) + } + + return nextTasks.UpdateFromPipelineTask(p), nil +} ``` \ No newline at end of file