package task import ( "fmt" "time" "github.com/infraboard/mcube/tools/pretty" ) type Task struct { // 任务定义 *TaskSpec // 任务状态 *TaskStatus } func (e *Task) TableName() string { return "devops_tasks" } type TaskSpec struct { // 任务Id(唯一标识,由调用方生成, 比如 uuid, 如果没有自动生成唯一Id) Id string `json:"id" gorm:"column:id;type:string;primary_key"` // 描述, 比如 "构建任务", "部署任务" Description string `json:"description" gorm:"column:description;type:text"` // 任务名称, 比如 "build", "deploy" 每一个名称 在Agent测有一个唯一的Task与之对应 Name string `json:"name" gorm:"column:name;type:varchar(255)"` // 创建时间 CreateAt time.Time `json:"create_at" gorm:"column:create_at;type:datetime"` // 任务定义, 比如名称, job定义 Define string `json:"define" gorm:"column:define;type:text"` // 运行参数, 比如构建任务的代码分支、部署任务的目标环境等 作为环境变量传递给任务脚本执行 InputParams map[string]string `json:"input_params" gorm:"column:input_params;type:json;serializer:json;not null;default:'{}'"` // 流水线任务Id(忽略) PipelineTaskId string `json:"pipeline_task_id" gorm:"column:pipeline_task_id;type:varchar(100);index"` // 任务超时时间, 0表示不超时 TimeoutSecond int64 `json:"timeout_second" gorm:"column:timeout_second;type:bigint"` // 是否忽略错误 IgnoreError *bool `json:"ignore_error" gorm:"column:ignore_error;type:bool;default:false"` // 需要调度那个Agent执行, 为空表示不指定, 由调度系统根据任务类型和Agent能力自动选择一个Agent执行 ScheduleAgent string `json:"schedule_agent" gorm:"column:schedule_agent;type:varchar(255);index"` // 依赖的任务节点列表 DependsTasks []string `json:"depends_tasks" gorm:"column:depends_tasks;type:json;serializer:json;not null;default:'[]'"` // 执行条件(旧版,保留向后兼容) When []Contiditon `json:"when" gorm:"column:when;type:json;serializer:json;"` // when 条件表达式(新版 DAG 条件系统) // 支持表达式如: "always", "never", "params.env == 'prod'", "deps.build.status == 'success'" WhenCondition string `json:"when_condition" gorm:"column:when_condition;type:varchar(1000)"` // 额外的其他属性 Extras map[string]string `json:"extras" form:"extras" gorm:"column:extras;type:json;serializer:json;"` // 标签 Label map[string]string `json:"label" gorm:"column:label;type:json;serializer:json;"` } // inputa == "a" type Contiditon struct { // 输入参数 InputParam string `json:"input_param"` // 操作符 Operator CONDITION_OPERATOR `json:"operator"` // In的值列吧 Values []string `json:"values"` } type TaskStatus struct { // 状态 Status STATUS `json:"status" gorm:"column:status;type:varchar(100);index"` // 关联URL (比如日志URL, 结果URL等) RefURL string `json:"ref_url" gorm:"column:ref_url;type:varchar(255)"` // 失败原因 Message string `json:"message" gorm:"column:message;type:text"` // 异步任务调用时的返回详情 Detail string `json:"detail,omitempty" gorm:"column:detail;type:text"` // 启动人 RunBy string `json:"run_by" gorm:"column:run_by;type:varchar(100)"` // 开始时间 StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:datetime"` // 更新时间 UpdateAt time.Time `json:"update_at" gorm:"column:update_at;type:datetime"` // 结束时间 EndAt time.Time `json:"end_at" gorm:"column:end_at;type:datetime"` // 其他信息 Extras map[string]string `json:"extras" gorm:"column:extras;type:json;serializer:json;not null;default:'{}'"` // 调度时间 ScheduledAt *time.Time `json:"scheduled_at" gorm:"column:scheduled_at;type:datetime"` // 具体执行任务的AgentId ScheduledAgentId *string `json:"scheduled_agent" gorm:"column:scheduled_agent;type:varchar(255);index"` // 确认调度超时, 默认15秒 ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" gorm:"column:scheduled_confirm_ttl;type:bigint;default:15"` // 调度确认, Agent确认接收任务, 下发成功后设置为true ScheduledConfirmed *bool `json:"scheduled_confirmed" gorm:"column:scheduled_confirmed;type:boolean;default:false"` // 任务输出参数(供下一个任务使用) Output map[string]string `json:"output,omitempty" gorm:"column:output;type:json;serializer:json;not null;default:'{}'"` } // // 命令 // Command string `json:"command"` // // 错误原因 // Error string `json:"error,omitempty"` // // 命令退出码 // ExitCode int `json:"exit_code"` // // 命令开始执行时间 // StartTime time.Time `json:"start_time"` // // 命令结束执行时间 // EndTime *time.Time `json:"end_time"` // // 命令执行时长 // Duration time.Duration `json:"duration"` // // 命令执行是否成功 // Success bool `json:"success"` // // 是否跳过执行(跳过视为成功,但标记为 Skip 以便管道状态同步) // Skipped bool `json:"skipped,omitempty"` // // 非错误的说明信息(比如跳过原因等) // Message string `json:"message,omitempty"` // // 元数据 // Metadata *CommandMetadata `json:"metadata"` // // 文件内容集合 // FileContents map[string]string `json:"file_contents,omitempty"` // // 脚本输出参数(供下一个任务使用) // OutputParams map[string]string `json:"output_params,omitempty"` func (r *TaskStatus) String() string { return pretty.ToJSON(r) } func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus { r.ScheduledConfirmed = &confirmed return r } func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus { r.ScheduledAgentId = &agentId return r } func (r *TaskStatus) GetScheduledAgentId() string { if r.ScheduledAgentId == nil { return "" } return *r.ScheduledAgentId } func (r *TaskStatus) TableName() string { return "devops_tasks" } func (r *TaskStatus) MarkedRunning() { r.setStartAt(time.Now()) r.Status = STATUS_RUNNING } func (r *TaskStatus) IsRunning() bool { return r.Status == STATUS_RUNNING } // MarkScheduled 标记任务已被分配给指定的Agent // 用于Agent模式下的任务分发 // 同时设置确认超时时间窗口(默认15秒) func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus { now := time.Now() r.ScheduledAt = &now r.SetScheduledAgentId(agentId) r.SetScheduledConfirmed(false) // 确保设置确认超时时间,如果未设置则默认15秒 if r.ScheduledConfirmTTL <= 0 { r.ScheduledConfirmTTL = 15 } return r } // IsScheduled 检查任务是否已被分配给Agent func (r *TaskStatus) IsScheduled() bool { return r.ScheduledAgentId != nil && *r.ScheduledAgentId != "" } // IsScheduleConfirm 检查任务是否具有完整的调度信息(调度时间和超时时间都已设置) // 用于判断是否需要进行调度超时检查 func (r *TaskStatus) IsScheduleConfirm() bool { return r.ScheduledAt != nil && r.ScheduledConfirmTTL > 0 } // ConfirmScheduled 确认Agent已接收任务分配 func (r *TaskStatus) ConfirmScheduled() *TaskStatus { r.SetScheduledConfirmed(true) return r } // IsScheduleConfirmed 检查任务分配是否已被Agent确认 func (r *TaskStatus) IsScheduleConfirmed() bool { return r.ScheduledConfirmed != nil && *r.ScheduledConfirmed } func (r *TaskStatus) setStartAt(t time.Time) { r.StartAt = &t } func (t *TaskStatus) WithExtra(key, value string) *TaskStatus { t.Extras[key] = value return t } func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus { t.RefURL = refURL return t } func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus { t.EndAt = time.Now() t.Status = STATUS_FAILED t.Message = fmt.Sprintf(format, a...) return t } func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus { t.EndAt = time.Now() t.Status = STATUS_CANCELED t.Message = fmt.Sprintf(format, a...) return t } func (t *TaskStatus) Success(format string, a ...any) *TaskStatus { t.EndAt = time.Now() t.Status = STATUS_SUCCESS t.Message = fmt.Sprintf(format, a...) return t } func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus { t.EndAt = time.Now() t.Status = STATUS_SKIP t.Message = fmt.Sprintf(format, a...) return t } func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus { t.Detail = fmt.Sprintf(format, a...) return t }