241 lines
8.1 KiB
Go
Raw Normal View History

2026-03-08 18:05:17 +08:00
package task
import (
"fmt"
"time"
"github.com/infraboard/mcube/tools/pretty"
)
type Task struct {
// 任务定义
*TaskSpec
// 任务状态
*TaskStatus
}
func (e *Task) TableName() string {
return "devops_tasks"
}
type TaskSpec struct {
// 任务Id(唯一标识,由调用方生成, 比如 uuid 如果没有自动生成唯一Id)
Id string `json:"id" gorm:"column:id;type:string;primary_key"`
// 描述, 比如 "构建任务", "部署任务"
Description string `json:"description" gorm:"column:description;type:text"`
// 任务名称, 比如 "build", "deploy" 每一个名称 在Agent测有一个唯一的Task与之对应
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
// 创建时间
CreateAt time.Time `json:"create_at" gorm:"column:create_at;type:datetime"`
// 任务定义, 比如名称, job定义
Define string `json:"define" gorm:"column:define;type:text"`
// 运行参数, 比如构建任务的代码分支、部署任务的目标环境等 作为环境变量传递给任务脚本执行
InputParams map[string]string `json:"input_params" gorm:"column:input_params;type:json;serializer:json;not null;default:'{}'"`
// 流水线任务Id(忽略)
PipelineTaskId string `json:"pipeline_task_id" gorm:"column:pipeline_task_id;type:varchar(100);index"`
// 任务超时时间, 0表示不超时
TimeoutSecond int64 `json:"timeout_second" gorm:"column:timeout_second;type:bigint"`
// 是否忽略错误
IgnoreError *bool `json:"ignore_error" gorm:"column:ignore_error;type:bool;default:false"`
// 需要调度那个Agent执行, 为空表示不指定, 由调度系统根据任务类型和Agent能力自动选择一个Agent执行
ScheduleAgent string `json:"schedule_agent" gorm:"column:schedule_agent;type:varchar(255);index"`
// 依赖的任务节点列表
DependsTasks []string `json:"depends_tasks" gorm:"column:depends_tasks;type:json;serializer:json;not null;default:'[]'"`
// 执行条件(旧版,保留向后兼容)
When []Contiditon `json:"when" gorm:"column:when;type:json;serializer:json;"`
// when 条件表达式(新版 DAG 条件系统)
// 支持表达式如: "always", "never", "params.env == 'prod'", "deps.build.status == 'success'"
WhenCondition string `json:"when_condition" gorm:"column:when_condition;type:varchar(1000)"`
// 额外的其他属性
Extras map[string]string `json:"extras" form:"extras" gorm:"column:extras;type:json;serializer:json;"`
// 标签
Label map[string]string `json:"label" gorm:"column:label;type:json;serializer:json;"`
}
// inputa == "a"
type Contiditon struct {
// 输入参数
InputParam string `json:"input_param"`
// 操作符
Operator CONDITION_OPERATOR `json:"operator"`
// In的值列吧
Values []string `json:"values"`
}
type TaskStatus struct {
// 状态
Status STATUS `json:"status" gorm:"column:status;type:varchar(100);index"`
// 关联URL (比如日志URL, 结果URL等)
RefURL string `json:"ref_url" gorm:"column:ref_url;type:varchar(255)"`
// 失败原因
Message string `json:"message" gorm:"column:message;type:text"`
// 异步任务调用时的返回详情
Detail string `json:"detail,omitempty" gorm:"column:detail;type:text"`
// 启动人
RunBy string `json:"run_by" gorm:"column:run_by;type:varchar(100)"`
// 开始时间
StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:datetime"`
// 更新时间
UpdateAt time.Time `json:"update_at" gorm:"column:update_at;type:datetime"`
// 结束时间
EndAt time.Time `json:"end_at" gorm:"column:end_at;type:datetime"`
// 其他信息
Extras map[string]string `json:"extras" gorm:"column:extras;type:json;serializer:json;not null;default:'{}'"`
// 调度时间
ScheduledAt *time.Time `json:"scheduled_at" gorm:"column:scheduled_at;type:datetime"`
// 具体执行任务的AgentId
ScheduledAgentId *string `json:"scheduled_agent" gorm:"column:scheduled_agent;type:varchar(255);index"`
// 确认调度超时, 默认15秒
ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" gorm:"column:scheduled_confirm_ttl;type:bigint;default:15"`
// 调度确认, Agent确认接收任务, 下发成功后设置为true
ScheduledConfirmed *bool `json:"scheduled_confirmed" gorm:"column:scheduled_confirmed;type:boolean;default:false"`
// 任务输出参数(供下一个任务使用)
Output map[string]string `json:"output,omitempty" gorm:"column:output;type:json;serializer:json;not null;default:'{}'"`
}
// // 命令
// Command string `json:"command"`
// // 错误原因
// Error string `json:"error,omitempty"`
// // 命令退出码
// ExitCode int `json:"exit_code"`
// // 命令开始执行时间
// StartTime time.Time `json:"start_time"`
// // 命令结束执行时间
// EndTime *time.Time `json:"end_time"`
// // 命令执行时长
// Duration time.Duration `json:"duration"`
// // 命令执行是否成功
// Success bool `json:"success"`
// // 是否跳过执行(跳过视为成功,但标记为 Skip 以便管道状态同步)
// Skipped bool `json:"skipped,omitempty"`
// // 非错误的说明信息(比如跳过原因等)
// Message string `json:"message,omitempty"`
// // 元数据
// Metadata *CommandMetadata `json:"metadata"`
// // 文件内容集合
// FileContents map[string]string `json:"file_contents,omitempty"`
// // 脚本输出参数(供下一个任务使用)
// OutputParams map[string]string `json:"output_params,omitempty"`
func (r *TaskStatus) String() string {
return pretty.ToJSON(r)
}
func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus {
r.ScheduledConfirmed = &confirmed
return r
}
func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus {
r.ScheduledAgentId = &agentId
return r
}
func (r *TaskStatus) GetScheduledAgentId() string {
if r.ScheduledAgentId == nil {
return ""
}
return *r.ScheduledAgentId
}
func (r *TaskStatus) TableName() string {
return "devops_tasks"
}
func (r *TaskStatus) MarkedRunning() {
r.setStartAt(time.Now())
r.Status = STATUS_RUNNING
}
func (r *TaskStatus) IsRunning() bool {
return r.Status == STATUS_RUNNING
}
// MarkScheduled 标记任务已被分配给指定的Agent
// 用于Agent模式下的任务分发
// 同时设置确认超时时间窗口默认15秒
func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus {
now := time.Now()
r.ScheduledAt = &now
r.SetScheduledAgentId(agentId)
r.SetScheduledConfirmed(false)
// 确保设置确认超时时间如果未设置则默认15秒
if r.ScheduledConfirmTTL <= 0 {
r.ScheduledConfirmTTL = 15
}
return r
}
// IsScheduled 检查任务是否已被分配给Agent
func (r *TaskStatus) IsScheduled() bool {
return r.ScheduledAgentId != nil && *r.ScheduledAgentId != ""
}
// IsScheduleConfirm 检查任务是否具有完整的调度信息(调度时间和超时时间都已设置)
// 用于判断是否需要进行调度超时检查
func (r *TaskStatus) IsScheduleConfirm() bool {
return r.ScheduledAt != nil && r.ScheduledConfirmTTL > 0
}
// ConfirmScheduled 确认Agent已接收任务分配
func (r *TaskStatus) ConfirmScheduled() *TaskStatus {
r.SetScheduledConfirmed(true)
return r
}
// IsScheduleConfirmed 检查任务分配是否已被Agent确认
func (r *TaskStatus) IsScheduleConfirmed() bool {
return r.ScheduledConfirmed != nil && *r.ScheduledConfirmed
}
func (r *TaskStatus) setStartAt(t time.Time) {
r.StartAt = &t
}
func (t *TaskStatus) WithExtra(key, value string) *TaskStatus {
t.Extras[key] = value
return t
}
func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus {
t.RefURL = refURL
return t
}
func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_FAILED
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_CANCELED
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) Success(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_SUCCESS
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus {
t.EndAt = time.Now()
t.Status = STATUS_SKIP
t.Message = fmt.Sprintf(format, a...)
return t
}
func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus {
t.Detail = fmt.Sprintf(format, a...)
return t
}