2025-04-06 16:47:59 +08:00

356 lines
10 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 流程解读
## 执行任务
mflow/apps/task/impl/job_task.go
1. RunJob
```go
func (i *impl) RunJob(ctx context.Context, in *pipeline.Task) (
*task.JobTask, error) {
ins := task.NewJobTask(in)
// 获取之前任务的状态, 因为里面有当前任务的审核状态
err := i.GetJotTaskStatus(ctx, ins)
if err != nil {
return nil, err
}
// 开启审核后, 执行任务则 调整审核状态为等待中
if ins.Spec.Audit.Enable {
auditStatus := ins.AuditStatus()
switch auditStatus {
case pipeline.AUDIT_STAGE_PENDDING:
ins.AuditStatusFlowTo(pipeline.AUDIT_STAGE_WAITING)
case pipeline.AUDIT_STAGE_DENY:
return nil, fmt.Errorf("任务审核未通过")
}
i.log.Debug().Msgf("任务: %s 审核状态为, %s", ins.Spec.TaskId, auditStatus)
}
// 如果不忽略执行, 并且审核通过, 则执行
if in.Enabled() && ins.AuditPass() {
// 任务状态检查与处理
switch ins.Status.Stage {
case task.STAGE_PENDDING:
ins.Status.Stage = task.STAGE_CREATING
case task.STAGE_ACTIVE:
return nil, exception.NewConflict("任务: %s 当前处于运行中, 需要等待运行结束后才能执行", in.TaskId)
}
// 查询需要执行的Job: docker_build@v1
req := job.NewDescribeJobRequestByName(in.JobName)
j, err := i.job.DescribeJob(ctx, req)
if err != nil {
return nil, err
}
ins.Job = j
ins.Spec.JobId = j.Meta.Id
i.log.Info().Msgf("describe job success, %s[%s]", j.Spec.Name, j.Meta.Id)
// 脱敏参数动态还原
in.RunParams.RestoreSensitive(j.Spec.RunParams)
// 合并允许参数(Job里面有默认值), 并检查参数合法性
// 注意Param的合并是有顺序的也就是参数优先级(低-->高):
// 1. 系统变量(默认禁止修改)
// 2. job默认变量
// 3. job运行变量
// 4. pipeline 运行变量
// 5. pipeline 运行时变量
params := job.NewRunParamSet()
params.Add(ins.SystemRunParam()...)
params.Add(j.Spec.RunParams.Params...)
params.Merge(in.RunParams.Params...)
err = i.LoadPipelineRunParam(ctx, in, params)
if err != nil {
return nil, err
}
// 校验参数合法性
err = params.Validate()
if err != nil {
return nil, fmt.Errorf("校验任务【%s】参数错误, %s", j.Spec.DisplayName, err)
}
i.log.Info().Msgf("params check ok, %s", params)
// 获取执行器执行
r := runner.GetRunner(j.Spec.RunnerType)
runReq := task.NewRunTaskRequest(ins.Spec.TaskId, j.Spec.RunnerSpec, params)
runReq.DryRun = in.RunParams.DryRun
runReq.Labels = in.Labels
runReq.ManualUpdateStatus = j.Spec.ManualUpdateStatus
i.log.Debug().Msgf("[%s] start run task: %s", ins.Spec.PipelineTask, in.TaskName)
status, err := r.Run(ctx, runReq)
if err != nil {
return nil, fmt.Errorf("run job error, %s", err)
}
status.RunParams = params
ins.Status = status
// 添加搜索标签
ins.BuildSearchLabel()
}
// 保存任务
updateOpt := options.Update()
updateOpt.SetUpsert(true)
if _, err := i.jcol.UpdateByID(ctx, ins.Spec.TaskId, bson.M{"$set": ins}, updateOpt); err != nil {
return nil, exception.NewInternalServerError("upsert a job task document error, %s", err)
}
return ins, nil
}
```
## 任务状态更新
```go
// 更新Job状态
func (i *impl) UpdateJobTaskStatus(ctx context.Context, in *task.UpdateJobTaskStatusRequest) (
*task.JobTask, error) {
ins, err := i.DescribeJobTask(ctx, task.NewDescribeJobTaskRequest(in.Id))
if err != nil {
return nil, err
}
// 校验更新合法性
err = i.CheckAllowUpdate(ctx, ins, in.UpdateToken, in.ForceUpdateStatus)
if err != nil {
return nil, err
}
i.log.Debug().Msgf("更新任务状态: %s当前状态: %s, 更新状态: %s",
ins.Spec.TaskId, ins.Status.Stage, in.Stage)
// 状态更新
ins.Status.UpdateStatus(in)
// 更新数据库
if err := i.updateJobTaskStatus(ctx, ins); err != nil {
return nil, err
}
// Job Task状态变更回调
i.JobTaskStatusChangedCallback(ctx, ins)
// Pipeline Task 状态变更回调
if ins.Spec.PipelineTask != "" {
// 如果状态未变化, 不触发流水线更新
if !in.ForceTriggerPipeline && !ins.Status.Changed {
i.log.Debug().Msgf("task %s status not changed: [%s], skip update pipeline", in.Id, in.Stage)
return ins, nil
}
_, err := i.PipelineTaskStatusChanged(ctx, ins)
if err != nil {
return nil, err
}
}
return ins, nil
}
func (i *impl) JobTaskStatusChangedCallback(ctx context.Context, in *task.JobTask) {
// 状态未变化不通知
if in.Status == nil {
return
}
// 状态未变化不通知
if !in.Status.Changed {
i.log.Debug().Msgf("task %s status not changed [%s], skip status callback", in.Spec.TaskId, in.Status.Stage)
return
}
i.log.Debug().Msgf("task %s 执行状态变化回调...", in.Spec.TaskId)
// 个人通知
for index := range in.Spec.MentionUsers {
mu := in.Spec.MentionUsers[index]
i.TaskMention(ctx, mu, in)
}
if len(in.Spec.MentionUsers) > 0 {
i.updateJobTaskMentionUser(ctx, in.Spec.TaskId, in.Spec.MentionUsers)
}
// 群组通知
imRobotHooks := in.Spec.MatchedImRobotNotify(in.Status.Stage.String())
i.log.Debug().Msgf("task %s 群组通知: %v", in.Spec.TaskId, imRobotHooks)
i.hook.SendTaskStatus(ctx, imRobotHooks, in)
if len(imRobotHooks) > 0 {
i.updateJobTaskImRobotNotify(ctx, in.Spec.TaskId, imRobotHooks)
}
// WebHook回调
webhooks := in.Spec.MatchedWebHooks(in.Status.Stage.String())
i.log.Debug().Msgf("task %s WebHook通知: %v", in.Spec.TaskId, webhooks)
i.hook.SendTaskStatus(ctx, webhooks, in)
if len(webhooks) > 0 {
i.updateJobTaskWebHook(ctx, in.Spec.TaskId, webhooks)
}
}
```
## Pipeline 事件触发
```go
// Pipeline中任务有变化时,
// 如果执行成功则 继续执行, 如果失败则标记Pipeline结束
// 当所有任务成功结束时标记Pipeline执行成功
func (i *impl) PipelineTaskStatusChanged(ctx context.Context, in *task.JobTask) (
*task.PipelineTask, error) {
if in == nil || in.Status == nil {
return nil, exception.NewBadRequest("job task or job task status is nil")
}
if in.Spec.PipelineTask == "" {
return nil, exception.NewBadRequest("Pipeline Id参数缺失")
}
runErrorJobTasks := []*task.UpdateJobTaskStatusRequest{}
// 获取Pipeline Task, 因为Job Task是先保存在触发的回调, 这里获取的Pipeline Task是最新的
descReq := task.NewDescribePipelineTaskRequest(in.Spec.PipelineTask)
p, err := i.DescribePipelineTask(ctx, descReq)
if err != nil {
return nil, err
}
defer func() {
// 更新当前任务的pipeline task状态
i.mustUpdatePipelineStatus(ctx, p)
// 如果JobTask正常执行, 则等待回调更新, 如果执行失败 则需要立即更新JobTask状态
for index := range runErrorJobTasks {
_, err = i.UpdateJobTaskStatus(ctx, runErrorJobTasks[index])
if err != nil {
p.MarkedFailed(err)
i.mustUpdatePipelineStatus(ctx, p)
}
}
}()
// 更新Pipeline Task 运行时环境变量
p.Status.RuntimeEnvs.Merge(in.RuntimeRunParams()...)
switch in.Status.Stage {
case task.STAGE_PENDDING,
task.STAGE_SCHEDULING,
task.STAGE_CREATING,
task.STAGE_ACTIVE,
task.STAGE_CANCELING:
// Task状态无变化
return p, nil
case task.STAGE_CANCELED:
// 任务取消, pipeline 取消执行
p.MarkedCanceled()
return p, nil
case task.STAGE_FAILED:
// 任务执行结果确认失败
if in.IsConfirmEnabled() && in.IsConfirming() {
// 状态确认中
i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName)
return p, nil
}
// 任务执行失败, 更新Pipeline状态为失败
if !in.Spec.RunParams.IgnoreFailed {
p.MarkedFailed(in.Status.MessageToError())
return p, nil
}
case task.STAGE_SUCCEEDED:
// 任务执行结果确认失败
if in.IsConfirmEnabled() && in.IsConfirming() {
// 状态确认中
i.log.Debug().Msgf("%s 状态确认中", in.Spec.TaskName)
return p, nil
}
// 任务运行成功, pipeline继续执行
i.log.Info().Msgf("task: %s run successed", in.Spec.TaskId)
}
/*
task执行成功或者忽略执行失败, 此时pipeline 仍然处于运行中, 需要获取下一个任务执行
*/
nexts, err := p.NextRun()
if err != nil {
p.MarkedFailed(err)
return p, nil
}
// 如果没有需要执行的任务, Pipeline执行结束, 更新Pipeline状态为成功
if nexts == nil || nexts.Len() == 0 {
p.MarkedSuccess()
return p, nil
}
// 如果有需要执行的JobTask, 继续执行
for index := range nexts.Items {
item := nexts.Items[index]
// 如果任务执行成功则等待任务的回调更新任务状态
// 如果任务执行失败, 直接更新任务状态
t, err := i.RunJob(ctx, item.Spec)
if err != nil {
updateT := task.NewUpdateJobTaskStatusRequest(item.Spec.TaskId)
updateT.UpdateToken = item.Spec.UpdateToken
updateT.MarkError(err)
runErrorJobTasks = append(runErrorJobTasks, updateT)
} else {
item.Status = t.Status
item.Job = t.Job
}
}
return p, nil
}
```
如何获取下一组需要执行的任务: 任务执行调度
```go
// 返回下个需要执行的JobTask, 允许一次并行执行多个(批量执行)
// Task DryRun属性要继承PipelineTask
func (p *PipelineTask) NextRun() (*JobTaskSet, error) {
set := NewJobTaskSet()
var stage *StageStatus
if p.Status == nil || p.Pipeline == nil {
return set, nil
}
// 需要未执行完成的Job Tasks
stages := p.Status.StageStatus
for i := range stages {
stage = stages[i]
// 找出Stage中未执行完的Job Task
set = stage.UnCompleteJobTask(p.Params.DryRun)
set.UpdateFromPipelineTask(p)
// 如果找到 直接Break
if set.Len() > 0 {
break
}
}
// 如果所有Stage寻找完都没找到, 表示PipelineTask执行完成
if set.Len() == 0 {
return set, nil
}
// TODO: 如果这些未执行当中的Job Task 有处于运行中的, 不会执行下个一个任务
if set.HasStage(STAGE_ACTIVE) {
return set, exception.NewConflict("Stage 还处于运行中")
}
// 当未执行的任务中,没有运行中的时,剩下的就是需要被执行的任务
tasks := set.GetJobTaskByStage(STAGE_PENDDING)
nextTasks := NewJobTaskSet()
stageSpec := p.Pipeline.GetStage(stage.Name)
if stageSpec.IsParallel {
// 并行任务 返回该Stage所有等待执行的job
nextTasks.Add(tasks...)
} else {
// 串行任务取第一个
nextTasks.Add(tasks[0])
}
return nextTasks.UpdateFromPipelineTask(p), nil
}
```