# DevOps Agent 1. WebSocket Agent (Jenkins Node), 把自己注册到 Api Server 作为一个任务运行阶段 2. 需要执行来自于 Api Server 下发的任务, 执行中需要把日志和执行结果返回给 Api Server 3. 怎么运行任务喃,我们是封装一个 script的模块, 然后调用这个模块来运行任务 4. 任务需要有任务名称, 与 任务参数 这些基础信息 ## 开发脚本执行引擎 ### 执行引擎功能开发 ```go // ExecuteScript 执行脚本的核心方法,接受一个 ExecuteScriptRequest 请求,返回一个 ExecutionResult 结果 func (e *ScriptExcutor) ExecuteScript(ctx context.Context, in *ExecuteScriptRequest) (*ExecutionResult, error) { // 确保工作目录前缀, 避免脚本执行在不安全的目录下, 同时也方便清理执行结果 in = in.WithWorkspacePrefix(e.WorkDirPrefix) // 确保工作目录存在 if err := e.ensureWorkDir(in.workDir); err != nil { e.log.Error().Str("task_id", in.metadata.ID).Str("work_dir", in.workDir).Err(err).Msg("创建工作目录失败") return nil, err } // 确保元数据存在 if in.metadata == nil { in.metadata = &CommandMetadata{ ID: generateID(), CreatedBy: getCurrentUser(), CreatedAt: time.Now(), WorkDir: in.workDir, EnvVars: in.envVars, Timeout: in.timeout, } } // zerolog 的用法 e.log.Info().Str("task_id", in.metadata.ID).Str("script_path", in.ScriptPath).Msg("准备执行脚本") // 脚本路径消毒 scriptPath := e.WithPrefixScripPath(in.ScriptPath) fullScriptPath, err := e.sanitizeScriptPath(scriptPath) if err != nil { e.log.Error().Str("task_id", in.metadata.ID).Str("script_path", scriptPath).Err(err).Msg("脚本路径验证失败") return nil, err } e.log.Info().Str("task_id", in.metadata.ID).Str("full_script_path", fullScriptPath).Msg("脚本路径验证成功") // 校验脚本完整性 if err := e.integrityManager.VerifyScript(fullScriptPath); err != nil { e.log.Error().Str("task_id", in.metadata.ID).Str("script_path", fullScriptPath).Err(err).Msg("脚本完整性校验失败") return nil, fmt.Errorf("脚本完整性校验失败: %v", err) } // 构建完整的shell命令 shellArgs := []string{fullScriptPath} shellArgs = append(shellArgs, in.Args...) shellCommand := strings.Join(shellArgs, " ") e.log.Info().Str("task_id", in.metadata.ID).Str("command", shellCommand).Msg("脚本执行命令") // 注入标准变量 e.InjectEnv(in) // 根据超时设置创建 context execCtx := ctx var cancel context.CancelFunc if in.timeout > 0 { execCtx, cancel = context.WithTimeout(ctx, in.timeout) defer cancel() e.log.Info().Str("task_id", in.metadata.ID).Dur("timeout", in.timeout).Msg("设置脚本执行超时时间") } // 创建命令 cmd := exec.CommandContext(execCtx, "/bin/sh", "-c", shellCommand) cmd.Dir = in.workDir cmd.Env = in.buildEnv() // 根据参数设置进程组属性 if in.createProcessGroup { cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, // 创建新的进程组 } } // 根据参数设置自定义 Cancel 函数 if in.useProcessGroupKill { // 自定义 Cancel 函数,确保杀死进程组 cmd.Cancel = func() error { if cmd.Process == nil { return nil } // 使用负 PID 杀死整个进程组 return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) } } in.cmd = cmd // 脚本执行生成调试脚本(避免被脚本内部的 git clone 等操作清空) if in.isDebugScriptEnabled() { if debugScriptPath, writeErr := in.WriteDebugScript(fullScriptPath, in.Args); writeErr != nil { e.log.Warn().Str("task_id", in.metadata.ID).Err(writeErr).Msg("生成调试脚本失败") } else if debugScriptPath != "" { e.log.Info().Str("task_id", in.metadata.ID).Str("debug_script", debugScriptPath).Msg("已生成调试脚本,可进入工作目录执行 ./debug.sh 进行调试") } } // 执行脚本,并且把结果写到输出到文件里面 // 开始执行命令 // 添加到运行中的命令列表中 e.runningCommands[in.workDir] = in defer delete(e.runningCommands, in.workDir) taskId := "unknown" if in.metadata != nil { taskId = in.metadata.ID } // 获取日志写入器 logWriter, err := in.getLogWriter() if err != nil { e.log.Error().Str("task_id", taskId).Err(err).Msg("创建日志写入器失败") return nil, err } defer func() { if closer, ok := logWriter.(io.Closer); ok { closer.Close() } }() // 命令的日志输出到制定的地方 in.cmd.Stdout = logWriter in.cmd.Stderr = logWriter // 记录执行开始, 包括时间、工作目录、脚本路径和元数据等信息 startTime := time.Now() logHeader := fmt.Sprintf("\n=== Execution Started ===\nTime: %s\nWorkDir: %s\nScript: %s\nMetadata: %s\n", startTime.Format("2006-01-02 15:04:05"), in.workDir, fullScriptPath, in.metadata.String(), ) fmt.Fprint(logWriter, logHeader) fmt.Fprint(logWriter, "\n") // 执行命令 e.log.Info().Str("task_id", taskId).Msg("开始执行命令") err = in.cmd.Run() endTime := time.Now() duration := endTime.Sub(startTime) // 构建执行结果 result := &ExecutionResult{ Command: fullScriptPath, StartTime: startTime, EndTime: &endTime, Duration: duration, Success: err == nil, Metadata: in.metadata, } if err != nil { // 判断是否为超时错误 if in.cmd.ProcessState != nil && in.cmd.ProcessState.ExitCode() == -1 { e.log.Error().Str("task_id", taskId).Dur("duration", duration).Dur("timeout", in.timeout).Err(err).Msg("命令执行超时") } else { e.log.Error().Str("task_id", taskId).Dur("duration", duration).Err(err).Msg("命令执行失败") } // 错误信息 result.Error = err.Error() if in.cmd.ProcessState != nil { result.ExitCode = in.cmd.ProcessState.ExitCode() } else { result.ExitCode = -1 } } else { e.log.Info().Str("task_id", taskId).Dur("duration", duration).Msg("命令执行成功") } // 解析脚本输出参数(无论成功或失败都收集) outputFile := filepath.Join(in.workDir, "output.env") if outputParams, parseErr := ParseOutputParams(outputFile); parseErr == nil && len(outputParams) > 0 { result.OutputParams = outputParams e.log.Info().Str("task_id", taskId).Bool("success", result.Success).Int("param_count", len(outputParams)).Msg("成功解析脚本输出参数") } else if parseErr != nil && !os.IsNotExist(parseErr) { e.log.Warn().Str("task_id", taskId).Str("output_file", outputFile).Err(parseErr).Msg("解析输出参数文件失败") } // 保存执行结果(无论成功或失败都保存结果,方便后续查询和调试) e.log.Debug().Str("task_id", taskId).Str("result_file", in.GetResultFilePath()).Msg("准备保存执行结果") if saveErr := in.saveResult(result); saveErr != nil { e.log.Error().Str("task_id", taskId).Err(saveErr).Msg("保存执行结果失败") } // 记录执行结束 logFooter := fmt.Sprintf("\n=== Execution Finished ===\nTime: %s\nDuration: %v\nSuccess: %t\nExitCode: %d\nError: %v\n", endTime.Format("2006-01-02 15:04:05"), duration, result.Success, result.ExitCode, err, ) fmt.Fprint(logWriter, logFooter) return result, err } ``` ### 执行引擎调试 1. 构造单元测试的环境(ioc), 被测试的对象在ioc里面, 名字叫: script_excutor, 运行单元测试的时候,需要ioc容器启动,并且完成初始化 ```go package test import ( "os" "github.com/infraboard/mcube/v2/ioc" ) // 用于设置测试环境的函数,比如初始化日志系统,配置环境变量等 func Setup() { // 配置文件的逻辑 // 和我们本地运行时的保存一致, etc/application.toml, 这是一个相对路径(工程名称) // 点击单元测试的时候, 单元测试的运行目录是变化的,因此这里需要给绝对逻辑 ioc.DevelopmentSetupWithPaths(os.Getenv("workspaceFolder") + "/agent/etc/application.toml") } ``` 2. 通过单测验证脚本运行功能 ```go func TestScriptExcutor_ExecuteScript(t *testing.T) { // 直接使用单元测试的上下文, 方便取消 req := script.NewExecuteScriptRequest("task_debug.sh", []string{}) req.SetWorkDir("task-01") req.SetTimeout(30 * time.Second) req.SetDebugScript(true) req.SetLogFile("stdout.txt") req.SetLogCallback(func(s string) { fmt.Print(s) }) resp, err := script.ExecuteScript(t.Context(), req) if err != nil { t.Fatalf("failed to execute script: %v", err) } t.Logf("script execution result: %+v", resp) } ``` ## 注册脚本执行引擎 1. 注册 ```go // 把ScriptExcutor 托管给IOC管理 func init() { ioc.Controller().Registry(&ScriptExcutor{ WorkDirPrefix: "workspace", ScriptDirPrefix: "scripts", IsVerifyScriptIntegrity: false, }) } ``` 2. 初始化 ```go func (e *ScriptExcutor) Init() error { // 初始化日志记录器 e.log = log.Sub(e.Name()) // 初始化脚本完整性管理器 e.integrityManager = NewScriptIntegrityManager(e.ScriptDirPrefix, e.IsVerifyScriptIntegrity) return nil } func (e *ScriptExcutor) Name() string { return APP_NAME } ``` ## 服务启动 1. 服务配置 ```toml [app] name = "devops_agent" [http] port = 8848 [script_excutor] work_dir_prefix = "workspace" script_dir_prefix = "scripts" is_verify_script_integrity = false ``` 2. 服务启动 ```go import ( "github.com/infraboard/mcube/v2/ioc/server/cmd" // 工程引用到的一些通用工具(API) // 这些接口 不是用来做功能实现, 提供健康检查,性能,等额外信息 // 健康检查 _ "github.com/infraboard/mcube/v2/ioc/apps/health/restful" // 非业务模块 _ "github.com/infraboard/mcube/v2/ioc/apps/metric/restful" // 开启API Doc _ "github.com/infraboard/mcube/v2/ioc/apps/apidoc/restful" ) func main() { // 启动服务(ioc) cmd.Start() } ``` ## 使用接口触发任务执行 ```go package api import ( "devops/agent/tasks" "devops/server/apps/task" restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/google/uuid" "github.com/infraboard/mcube/v2/exception" "github.com/infraboard/mcube/v2/http/restful/response" "github.com/infraboard/mcube/v2/ioc" "github.com/infraboard/mcube/v2/ioc/config/gorestful" ) func init() { ioc.Api().Registry(&TaskApiHandler{}) } // 一个API, 提供给外部调用的接口,来触发一个Task的执行 // 参数 TaskSpec, 返回 Task type TaskApiHandler struct { ioc.ObjectImpl } func (h *TaskApiHandler) Name() string { return "tasks" } func (i *TaskApiHandler) Version() string { return "v1" } func (h *TaskApiHandler) Init() error { ws := gorestful.ObjectRouter(h) // restfulspec "github.com/emicklei/go-restful-openapi/v2" // 这个库是 go-restful 的一个扩展库,用于生成 OpenAPI 规范的文档 tags := []string{"Task管理"} ws.Route(ws.POST("").To(h.RunTask). Doc("运行任务"). Metadata(restfulspec.KeyOpenAPITags, tags). Reads(task.TaskSpec{}). Writes(task.Task{}). Returns(200, "OK", task.Task{})) return nil } // RunTask 处理运行任务的API请求 // Body 参数是 TaskSpec, 返回 Task func (h *TaskApiHandler) RunTask(r *restful.Request, w *restful.Response) { // 解析请求参数,构造 TaskSpec req := &task.TaskSpec{} if err := r.ReadEntity(req); err != nil { // "github.com/infraboard/mcube/v2/http/restful/response" // Failed 封装的 GoRestful框架的 异常返回 response.Failed(w, err) return } // 补充TaskId if req.Id == "" { req.Id = uuid.NewString() } // 找到对应的 TaskRunner, 这里我们通过 TaskSpec 中的 Name 字段来找到对应的 TaskRunner taskDebugRunner := tasks.GetTaskRunner(req.Name) if taskDebugRunner == nil { response.Failed(w, exception.NewInternalServerError("%s not found", req.Name)) return } // 处理请求 taskResp, err := taskDebugRunner.Run(r.Request.Context(), req) if err != nil { response.Failed(w, err) return } response.Success(w, taskResp) } ```