264 lines
8.1 KiB
Go
264 lines
8.1 KiB
Go
package task
|
||
|
||
import (
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/infraboard/mcube/tools/pretty"
|
||
)
|
||
|
||
func NewTask(spec *TaskSpec) *Task {
|
||
return &Task{
|
||
TaskSpec: spec,
|
||
TaskStatus: &TaskStatus{
|
||
Status: STATUS_PENDDING,
|
||
UpdateAt: time.Now(),
|
||
},
|
||
}
|
||
}
|
||
|
||
type Task struct {
|
||
// 任务定义
|
||
*TaskSpec
|
||
// 任务状态
|
||
*TaskStatus
|
||
}
|
||
|
||
func (e *Task) TableName() string {
|
||
return "devops_tasks"
|
||
}
|
||
|
||
func (e *Task) String() string {
|
||
return fmt.Sprintf("%s", pretty.ToJSON(e))
|
||
}
|
||
|
||
func NewTaskSpec() *TaskSpec {
|
||
return &TaskSpec{
|
||
Id: uuid.NewString(),
|
||
CreateAt: time.Now(),
|
||
TimeoutSecond: 60 * 60 * 24,
|
||
InputParams: map[string]string{},
|
||
}
|
||
}
|
||
|
||
type TaskSpec struct {
|
||
// 任务Id(唯一标识,由调用方生成, 比如 uuid, 如果没有自动生成唯一Id)
|
||
Id string `json:"id" gorm:"column:id;type:string;primary_key"`
|
||
// 描述, 比如 "构建任务", "部署任务"
|
||
Description string `json:"description" gorm:"column:description;type:text"`
|
||
// 任务名称, 比如 "build", "deploy" 每一个名称 在Agent测有一个唯一的Task与之对应
|
||
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
|
||
// 创建时间
|
||
CreateAt time.Time `json:"create_at" gorm:"column:create_at;type:datetime"`
|
||
// 任务定义, 比如名称, job定义
|
||
Define string `json:"define" gorm:"column:define;type:text"`
|
||
// 运行参数, 比如构建任务的代码分支、部署任务的目标环境等 作为环境变量传递给任务脚本执行
|
||
InputParams map[string]string `json:"input_params" gorm:"column:input_params;type:json;serializer:json;not null;default:'{}'"`
|
||
// 流水线任务Id(忽略)
|
||
PipelineTaskId string `json:"pipeline_task_id" gorm:"column:pipeline_task_id;type:varchar(100);index"`
|
||
|
||
// 任务超时时间, 0表示不超时
|
||
TimeoutSecond int64 `json:"timeout_second" gorm:"column:timeout_second;type:bigint"`
|
||
// 是否忽略错误
|
||
IgnoreError *bool `json:"ignore_error" gorm:"column:ignore_error;type:bool;default:false"`
|
||
// 需要调度那个Agent执行, 为空表示不指定, 由调度系统根据任务类型和Agent能力自动选择一个Agent执行
|
||
ScheduleAgent string `json:"schedule_agent" gorm:"column:schedule_agent;type:varchar(255);index"`
|
||
// 依赖的任务节点列表
|
||
DependsTasks []string `json:"depends_tasks" gorm:"column:depends_tasks;type:json;serializer:json;not null;default:'[]'"`
|
||
// 执行条件(旧版,保留向后兼容)
|
||
When []Contiditon `json:"when" gorm:"column:when;type:json;serializer:json;"`
|
||
// when 条件表达式(新版 DAG 条件系统)
|
||
// 支持表达式如: "always", "never", "params.env == 'prod'", "deps.build.status == 'success'"
|
||
WhenCondition string `json:"when_condition" gorm:"column:when_condition;type:varchar(1000)"`
|
||
// 额外的其他属性
|
||
Extras map[string]string `json:"extras" form:"extras" gorm:"column:extras;type:json;serializer:json;"`
|
||
// 标签
|
||
Label map[string]string `json:"label" gorm:"column:label;type:json;serializer:json;"`
|
||
}
|
||
|
||
func (t *TaskSpec) GetWorkDir() string {
|
||
if t.PipelineTaskId != "" {
|
||
return fmt.Sprintf("%s/%s", t.PipelineTaskId, t.Id)
|
||
}
|
||
|
||
return t.Id
|
||
}
|
||
|
||
func (t *TaskSpec) SetInputParams(key, value string) *TaskSpec {
|
||
if t.InputParams == nil {
|
||
t.InputParams = make(map[string]string)
|
||
}
|
||
t.InputParams[key] = value
|
||
return t
|
||
}
|
||
|
||
func (t *TaskSpec) GetTimeout() time.Duration {
|
||
if t.TimeoutSecond > 0 {
|
||
return time.Duration(t.TimeoutSecond) * time.Second
|
||
}
|
||
|
||
return time.Duration(60) * time.Minute
|
||
}
|
||
|
||
// inputa == "a"
|
||
type Contiditon struct {
|
||
// 输入参数
|
||
InputParam string `json:"input_param"`
|
||
// 操作符
|
||
Operator CONDITION_OPERATOR `json:"operator"`
|
||
// In的值列吧
|
||
Values []string `json:"values"`
|
||
}
|
||
|
||
type TaskStatus struct {
|
||
// 状态
|
||
Status STATUS `json:"status" gorm:"column:status;type:varchar(100);index"`
|
||
// 关联URL (比如日志URL, 结果URL等)
|
||
RefURL string `json:"ref_url" gorm:"column:ref_url;type:varchar(255)"`
|
||
// 失败原因
|
||
Message string `json:"message" gorm:"column:message;type:text"`
|
||
// 异步任务调用时的返回详情
|
||
Detail string `json:"detail,omitempty" gorm:"column:detail;type:text"`
|
||
// 启动人
|
||
RunBy string `json:"run_by" gorm:"column:run_by;type:varchar(100)"`
|
||
// 开始时间
|
||
StartAt *time.Time `json:"start_at" gorm:"column:start_at;type:datetime"`
|
||
// 更新时间
|
||
UpdateAt time.Time `json:"update_at" gorm:"column:update_at;type:datetime"`
|
||
// 结束时间
|
||
EndAt time.Time `json:"end_at" gorm:"column:end_at;type:datetime"`
|
||
// 其他信息
|
||
Extras map[string]string `json:"extras" gorm:"column:extras;type:json;serializer:json;not null;default:'{}'"`
|
||
|
||
// 调度时间
|
||
ScheduledAt *time.Time `json:"scheduled_at" gorm:"column:scheduled_at;type:datetime"`
|
||
// 具体执行任务的AgentId
|
||
ScheduledAgentId *string `json:"scheduled_agent" gorm:"column:scheduled_agent;type:varchar(255);index"`
|
||
// 确认调度超时, 默认15秒
|
||
ScheduledConfirmTTL int64 `json:"scheduled_confirm_ttl" gorm:"column:scheduled_confirm_ttl;type:bigint;default:15"`
|
||
// 调度确认, Agent确认接收任务, 下发成功后设置为true
|
||
ScheduledConfirmed *bool `json:"scheduled_confirmed" gorm:"column:scheduled_confirmed;type:boolean;default:false"`
|
||
|
||
// 任务输出参数(供下一个任务使用)
|
||
Output map[string]string `json:"output,omitempty" gorm:"column:output;type:json;serializer:json;not null;default:'{}'"`
|
||
}
|
||
|
||
func (r *TaskStatus) String() string {
|
||
return pretty.ToJSON(r)
|
||
}
|
||
|
||
func (r *TaskStatus) SetScheduledConfirmed(confirmed bool) *TaskStatus {
|
||
r.ScheduledConfirmed = &confirmed
|
||
return r
|
||
}
|
||
|
||
func (r *TaskStatus) SetScheduledAgentId(agentId string) *TaskStatus {
|
||
r.ScheduledAgentId = &agentId
|
||
return r
|
||
}
|
||
|
||
func (r *TaskStatus) GetScheduledAgentId() string {
|
||
if r.ScheduledAgentId == nil {
|
||
return ""
|
||
}
|
||
return *r.ScheduledAgentId
|
||
}
|
||
|
||
func (r *TaskStatus) TableName() string {
|
||
return "devops_tasks"
|
||
}
|
||
|
||
func (r *TaskStatus) MarkedRunning() {
|
||
r.setStartAt(time.Now())
|
||
r.Status = STATUS_RUNNING
|
||
}
|
||
|
||
func (r *TaskStatus) IsRunning() bool {
|
||
return r.Status == STATUS_RUNNING
|
||
}
|
||
|
||
// MarkScheduled 标记任务已被分配给指定的Agent
|
||
// 用于Agent模式下的任务分发
|
||
// 同时设置确认超时时间窗口(默认15秒)
|
||
func (r *TaskStatus) MarkScheduled(agentId string) *TaskStatus {
|
||
now := time.Now()
|
||
r.ScheduledAt = &now
|
||
r.SetScheduledAgentId(agentId)
|
||
r.SetScheduledConfirmed(false)
|
||
// 确保设置确认超时时间,如果未设置则默认15秒
|
||
if r.ScheduledConfirmTTL <= 0 {
|
||
r.ScheduledConfirmTTL = 15
|
||
}
|
||
return r
|
||
}
|
||
|
||
// IsScheduled 检查任务是否已被分配给Agent
|
||
func (r *TaskStatus) IsScheduled() bool {
|
||
return r.ScheduledAgentId != nil && *r.ScheduledAgentId != ""
|
||
}
|
||
|
||
// IsScheduleConfirm 检查任务是否具有完整的调度信息(调度时间和超时时间都已设置)
|
||
// 用于判断是否需要进行调度超时检查
|
||
func (r *TaskStatus) IsScheduleConfirm() bool {
|
||
return r.ScheduledAt != nil && r.ScheduledConfirmTTL > 0
|
||
}
|
||
|
||
// ConfirmScheduled 确认Agent已接收任务分配
|
||
func (r *TaskStatus) ConfirmScheduled() *TaskStatus {
|
||
r.SetScheduledConfirmed(true)
|
||
return r
|
||
}
|
||
|
||
// IsScheduleConfirmed 检查任务分配是否已被Agent确认
|
||
func (r *TaskStatus) IsScheduleConfirmed() bool {
|
||
return r.ScheduledConfirmed != nil && *r.ScheduledConfirmed
|
||
}
|
||
|
||
func (r *TaskStatus) setStartAt(t time.Time) {
|
||
r.StartAt = &t
|
||
}
|
||
|
||
func (t *TaskStatus) WithExtra(key, value string) *TaskStatus {
|
||
t.Extras[key] = value
|
||
return t
|
||
}
|
||
|
||
func (t *TaskStatus) WithRefURL(refURL string) *TaskStatus {
|
||
t.RefURL = refURL
|
||
return t
|
||
}
|
||
|
||
func (t *TaskStatus) Failedf(format string, a ...any) *TaskStatus {
|
||
t.EndAt = time.Now()
|
||
t.Status = STATUS_FAILED
|
||
t.Message = fmt.Sprintf(format, a...)
|
||
return t
|
||
}
|
||
|
||
func (t *TaskStatus) Canceledf(format string, a ...any) *TaskStatus {
|
||
t.EndAt = time.Now()
|
||
t.Status = STATUS_CANCELED
|
||
t.Message = fmt.Sprintf(format, a...)
|
||
return t
|
||
}
|
||
|
||
func (t *TaskStatus) Success(format string, a ...any) *TaskStatus {
|
||
t.EndAt = time.Now()
|
||
t.Status = STATUS_SUCCESS
|
||
t.Message = fmt.Sprintf(format, a...)
|
||
return t
|
||
}
|
||
|
||
func (t *TaskStatus) Skipf(format string, a ...any) *TaskStatus {
|
||
t.EndAt = time.Now()
|
||
t.Status = STATUS_SKIP
|
||
t.Message = fmt.Sprintf(format, a...)
|
||
return t
|
||
}
|
||
|
||
func (t *TaskStatus) WithDetail(format string, a ...any) *TaskStatus {
|
||
t.Detail = fmt.Sprintf(format, a...)
|
||
return t
|
||
}
|