2026-03-15 17:03:15 +08:00

264 lines
8.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/infraboard/mcube/tools/pretty"
)
func NewTask(spec *TaskSpec) *Task {
return &Task{
TaskSpec: spec,
TaskStatus: &TaskStatus{
Status: STATUS_PENDDING,
UpdateAt: time.Now(),
},
}
}
type Task struct {
// 任务定义
*TaskSpec
// 任务状态
*TaskStatus
}
func (e *Task) TableName() string {
return "devops_tasks"
}
func (e *Task) String() string {
return fmt.Sprintf("%s", pretty.ToJSON(e))
}
func NewTaskSpec() *TaskSpec {
return &TaskSpec{
Id: uuid.NewString(),
CreateAt: time.Now(),
TimeoutSecond: 60 * 60 * 24,
InputParams: map[string]string{},
}
}
type TaskSpec struct {
// 任务Id(唯一标识,由调用方生成, 比如 uuid 如果没有自动生成唯一Id)
Id string `json:"id" gorm:"column:id;type:string;primary_key"`
// 描述, 比如 "构建任务", "部署任务"
Description string `json:"description" gorm:"column:description;type:text"`
// 任务名称, 比如 "build", "deploy" 每一个名称 在Agent测有一个唯一的Task与之对应
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
// 创建时间
CreateAt time.Time `json:"create_at" gorm:"column:create_at;type:datetime"`
// 任务定义, 比如名称, job定义
Define string `json:"define" gorm:"column:define;type:text"`
// 运行参数, 比如构建任务的代码分支、部署任务的目标环境等 作为环境变量传递给任务脚本执行
InputParams map[string]string `json:"input_params" gorm:"column:input_params;type:json;serializer:json;not null;default:'{}'"`
// 流水线任务Id(忽略)
PipelineTaskId string `json:"pipeline_task_id" gorm:"column:pipeline_task_id;type:varchar(100);index"`
// 任务超时时间, 0表示不超时
TimeoutSecond int64 `json:"timeout_second" gorm:"column:timeout_second;type:bigint"`
// 是否忽略错误
IgnoreError *bool `json:"ignore_error" gorm:"column:ignore_error;type:bool;default:false"`
// 需要调度那个Agent执行, 为空表示不指定, 由调度系统根据任务类型和Agent能力自动选择一个Agent执行
ScheduleAgent string `json:"schedule_agent" gorm:"column:schedule_agent;type:varchar(255);index"`
// 依赖的任务节点列表
DependsTasks []string `json:"depends_tasks" gorm:"column:depends_tasks;type:json;serializer:json;not null;default:'[]'"`
// 执行条件(旧版,保留向后兼容)
When []Contiditon `json:"when" gorm:"column:when;type:json;serializer:json;"`
// when 条件表达式(新版 DAG 条件系统)
// 支持表达式如: "always", "never", "params.env == 'prod'", "deps.build.status == 'success'"
WhenCondition string `json:"when_condition" gorm:"column:when_condition;type:varchar(1000)"`
// 额外的其他属性
Extras map[string]string `json:"extras" form:"extras" gorm:"column:extras;type:json;serializer:json;"`
// 标签
Label map[string]string `json:"label" gorm:"column:label;type:json;serializer:json;"`
}
func (t *TaskSpec) GetWorkDir() string {
if t.PipelineTaskId != "" {
return fmt.Sprintf("%s/%s", t.PipelineTaskId, t.Id)
}
return t.Id
}
func (t *TaskSpec) SetInputParams(key, value string) *TaskSpec {
if t.InputParams == nil {
t.InputParams = make(map[string]string)
}
t.InputParams[key] = value
return t
}
func (t *TaskSpec) GetTimeout() time.Duration {
if t.TimeoutSecond > 0 {
return time.Duration(t.TimeoutSecond) * time.Second
}
return time.Duration(60) * time.Minute
}
// inputa == "a"
type Contiditon struct {
// 输入参数
InputParam string `json:"input_param"`
// 操作符
Operator CONDITION_OPERATOR `json:"operator"`
// In的值列吧
Values []string `json:"values"`
}
type TaskStatus struct {
// 状态
Status STATUS `json:"status" gorm:"column:status;type:varchar(100);index"`
// 关联URL (比如日志URL, 结果URL等)
RefURL string `json:"ref_url" gorm:"column:ref_url;type:varchar(255)"`
// 失败原因
Message string `json:"message" gorm:"column:message;type:text"`
// 异步任务调用时的返回详情
Detail string `json:"detail,omitempty" gorm:"column:detail;type:text"`
// 启动人
RunBy string `json:"run_by" gorm:"column:run_by;type:varchar(100)"`
// 开始时间
StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:datetime"`
// 更新时间
UpdateAt time.Time `json:"update_at" gorm:"column:update_at;type:datetime"`
// 结束时间
EndAt time.Time `json:"end_at" gorm:"column:end_at;type:datetime"`
// 其他信息
Extras map[string]string `json:"extras" gorm:"column:extras;type:json;serializer:json;not null;default:'{}'"`
// 调度时间
ScheduledAt *time.Time `json:"scheduled_at" gorm:"column:scheduled_at;type:datetime"`
// 具体执行任务的AgentId
ScheduledAgentId *string `json:"scheduled_agent" gorm:"column:scheduled_agent;type:varchar(255);index"`
// 确认调度超时, 默认15秒
ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" gorm:"column:scheduled_confirm_ttl;type:bigint;default:15"`
// 调度确认, Agent确认接收任务, 下发成功后设置为true
ScheduledConfirmed *bool `json:"scheduled_confirmed" gorm:"column:scheduled_confirmed;type:boolean;default:false"`
// 任务输出参数(供下一个任务使用)
Output map[string]string `json:"output,omitempty" gorm:"column:output;type:json;serializer:json;not null;default:'{}'"`
}
func (r *TaskStatus) String() string {
return pretty.ToJSON(r)
}
func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus {
r.ScheduledConfirmed = &confirmed
return r
}
func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus {
r.ScheduledAgentId = &agentId
return r
}
func (r *TaskStatus) GetScheduledAgentId() string {
if r.ScheduledAgentId == nil {
return ""
}
return *r.ScheduledAgentId
}
func (r *TaskStatus) TableName() string {
return "devops_tasks"
}
func (r *TaskStatus) MarkedRunning() {
r.setStartAt(time.Now())
r.Status = STATUS_RUNNING
}
func (r *TaskStatus) IsRunning() bool {
return r.Status == STATUS_RUNNING
}
// MarkScheduled 标记任务已被分配给指定的Agent
// 用于Agent模式下的任务分发
// 同时设置确认超时时间窗口默认15秒
func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus {
now := time.Now()
r.ScheduledAt = &now
r.SetScheduledAgentId(agentId)
r.SetScheduledConfirmed(false)
// 确保设置确认超时时间如果未设置则默认15秒
if r.ScheduledConfirmTTL <= 0 {
r.ScheduledConfirmTTL = 15
}
return r
}
// IsScheduled 检查任务是否已被分配给Agent
func (r *TaskStatus) IsScheduled() bool {
return r.ScheduledAgentId != nil && *r.ScheduledAgentId != ""
}
// IsScheduleConfirm 检查任务是否具有完整的调度信息(调度时间和超时时间都已设置)
// 用于判断是否需要进行调度超时检查
func (r *TaskStatus) IsScheduleConfirm() bool {
return r.ScheduledAt != nil && r.ScheduledConfirmTTL > 0
}
// ConfirmScheduled 确认Agent已接收任务分配
func (r *TaskStatus) ConfirmScheduled() *TaskStatus {
r.SetScheduledConfirmed(true)
return r
}
// IsScheduleConfirmed 检查任务分配是否已被Agent确认
func (r *TaskStatus) IsScheduleConfirmed() bool {
return r.ScheduledConfirmed != nil && *r.ScheduledConfirmed
}
func (r *TaskStatus) setStartAt(t time.Time) {
r.StartAt = &t
}
func (t *TaskStatus) WithExtra(key, value string) *TaskStatus {
t.Extras[key] = value
return t
}
func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus {
t.RefURL = refURL
return t
}
func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_FAILED
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_CANCELED
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) Success(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_SUCCESS
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_SKIP
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus {
t.Detail = fmt.Sprintf(format, a...)
return t
}