go20/devops/agent/README.md

404 lines
12 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# DevOps Agent
1. WebSocket Agent (Jenkins Node), 把自己注册到 Api Server 作为一个任务运行阶段
2. 需要执行来自于 Api Server 下发的任务, 执行中需要把日志和执行结果返回给 Api Server
3. 怎么运行任务喃,我们是封装一个 script的模块, 然后调用这个模块来运行任务
4. 任务需要有任务名称, 与 任务参数 这些基础信息
## 开发脚本执行引擎
### 执行引擎功能开发
```go
// ExecuteScript 执行脚本的核心方法,接受一个 ExecuteScriptRequest 请求,返回一个 ExecutionResult 结果
func (e *ScriptExcutor) ExecuteScript(ctx context.Context, in *ExecuteScriptRequest) (*ExecutionResult, error) {
// 确保工作目录前缀, 避免脚本执行在不安全的目录下, 同时也方便清理执行结果
in = in.WithWorkspacePrefix(e.WorkDirPrefix)
// 确保工作目录存在
if err := e.ensureWorkDir(in.workDir); err != nil {
e.log.Error().Str("task_id", in.metadata.ID).Str("work_dir", in.workDir).Err(err).Msg("创建工作目录失败")
return nil, err
}
// 确保元数据存在
if in.metadata == nil {
in.metadata = &CommandMetadata{
ID: generateID(),
CreatedBy: getCurrentUser(),
CreatedAt: time.Now(),
WorkDir: in.workDir,
EnvVars: in.envVars,
Timeout: in.timeout,
}
}
// zerolog 的用法
e.log.Info().Str("task_id", in.metadata.ID).Str("script_path", in.ScriptPath).Msg("准备执行脚本")
// 脚本路径消毒
scriptPath := e.WithPrefixScripPath(in.ScriptPath)
fullScriptPath, err := e.sanitizeScriptPath(scriptPath)
if err != nil {
e.log.Error().Str("task_id", in.metadata.ID).Str("script_path", scriptPath).Err(err).Msg("脚本路径验证失败")
return nil, err
}
e.log.Info().Str("task_id", in.metadata.ID).Str("full_script_path", fullScriptPath).Msg("脚本路径验证成功")
// 校验脚本完整性
if err := e.integrityManager.VerifyScript(fullScriptPath); err != nil {
e.log.Error().Str("task_id", in.metadata.ID).Str("script_path", fullScriptPath).Err(err).Msg("脚本完整性校验失败")
return nil, fmt.Errorf("脚本完整性校验失败: %v", err)
}
// 构建完整的shell命令
shellArgs := []string{fullScriptPath}
shellArgs = append(shellArgs, in.Args...)
shellCommand := strings.Join(shellArgs, " ")
e.log.Info().Str("task_id", in.metadata.ID).Str("command", shellCommand).Msg("脚本执行命令")
// 注入标准变量
e.InjectEnv(in)
// 根据超时设置创建 context
execCtx := ctx
var cancel context.CancelFunc
if in.timeout > 0 {
execCtx, cancel = context.WithTimeout(ctx, in.timeout)
defer cancel()
e.log.Info().Str("task_id", in.metadata.ID).Dur("timeout", in.timeout).Msg("设置脚本执行超时时间")
}
// 创建命令
cmd := exec.CommandContext(execCtx, "/bin/sh", "-c", shellCommand)
cmd.Dir = in.workDir
cmd.Env = in.buildEnv()
// 根据参数设置进程组属性
if in.createProcessGroup {
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true, // 创建新的进程组
}
}
// 根据参数设置自定义 Cancel 函数
if in.useProcessGroupKill {
// 自定义 Cancel 函数,确保杀死进程组
cmd.Cancel = func() error {
if cmd.Process == nil {
return nil
}
// 使用负 PID 杀死整个进程组
return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
}
}
in.cmd = cmd
// 脚本执行生成调试脚本(避免被脚本内部的 git clone 等操作清空)
if in.isDebugScriptEnabled() {
if debugScriptPath, writeErr := in.WriteDebugScript(fullScriptPath, in.Args); writeErr != nil {
e.log.Warn().Str("task_id", in.metadata.ID).Err(writeErr).Msg("生成调试脚本失败")
} else if debugScriptPath != "" {
e.log.Info().Str("task_id", in.metadata.ID).Str("debug_script", debugScriptPath).Msg("已生成调试脚本,可进入工作目录执行 ./debug.sh 进行调试")
}
}
// 执行脚本,并且把结果写到输出到文件里面
// 开始执行命令
// 添加到运行中的命令列表中
e.runningCommands[in.workDir] = in
defer delete(e.runningCommands, in.workDir)
taskId := "unknown"
if in.metadata != nil {
taskId = in.metadata.ID
}
// 获取日志写入器
logWriter, err := in.getLogWriter()
if err != nil {
e.log.Error().Str("task_id", taskId).Err(err).Msg("创建日志写入器失败")
return nil, err
}
defer func() {
if closer, ok := logWriter.(io.Closer); ok {
closer.Close()
}
}()
// 命令的日志输出到制定的地方
in.cmd.Stdout = logWriter
in.cmd.Stderr = logWriter
// 记录执行开始, 包括时间、工作目录、脚本路径和元数据等信息
startTime := time.Now()
logHeader := fmt.Sprintf("\n=== Execution Started ===\nTime: %s\nWorkDir: %s\nScript: %s\nMetadata: %s\n",
startTime.Format("2006-01-02 15:04:05"),
in.workDir,
fullScriptPath,
in.metadata.String(),
)
fmt.Fprint(logWriter, logHeader)
fmt.Fprint(logWriter, "\n")
// 执行命令
e.log.Info().Str("task_id", taskId).Msg("开始执行命令")
err = in.cmd.Run()
endTime := time.Now()
duration := endTime.Sub(startTime)
// 构建执行结果
result := &ExecutionResult{
Command: fullScriptPath,
StartTime: startTime,
EndTime: &endTime,
Duration: duration,
Success: err == nil,
Metadata: in.metadata,
}
if err != nil {
// 判断是否为超时错误
if in.cmd.ProcessState != nil && in.cmd.ProcessState.ExitCode() == -1 {
e.log.Error().Str("task_id", taskId).Dur("duration", duration).Dur("timeout", in.timeout).Err(err).Msg("命令执行超时")
} else {
e.log.Error().Str("task_id", taskId).Dur("duration", duration).Err(err).Msg("命令执行失败")
}
// 错误信息
result.Error = err.Error()
if in.cmd.ProcessState != nil {
result.ExitCode = in.cmd.ProcessState.ExitCode()
} else {
result.ExitCode = -1
}
} else {
e.log.Info().Str("task_id", taskId).Dur("duration", duration).Msg("命令执行成功")
}
// 解析脚本输出参数(无论成功或失败都收集)
outputFile := filepath.Join(in.workDir, "output.env")
if outputParams, parseErr := ParseOutputParams(outputFile); parseErr == nil && len(outputParams) > 0 {
result.OutputParams = outputParams
e.log.Info().Str("task_id", taskId).Bool("success", result.Success).Int("param_count", len(outputParams)).Msg("成功解析脚本输出参数")
} else if parseErr != nil && !os.IsNotExist(parseErr) {
e.log.Warn().Str("task_id", taskId).Str("output_file", outputFile).Err(parseErr).Msg("解析输出参数文件失败")
}
// 保存执行结果(无论成功或失败都保存结果,方便后续查询和调试)
e.log.Debug().Str("task_id", taskId).Str("result_file", in.GetResultFilePath()).Msg("准备保存执行结果")
if saveErr := in.saveResult(result); saveErr != nil {
e.log.Error().Str("task_id", taskId).Err(saveErr).Msg("保存执行结果失败")
}
// 记录执行结束
logFooter := fmt.Sprintf("\n=== Execution Finished ===\nTime: %s\nDuration: %v\nSuccess: %t\nExitCode: %d\nError: %v\n",
endTime.Format("2006-01-02 15:04:05"),
duration,
result.Success,
result.ExitCode,
err,
)
fmt.Fprint(logWriter, logFooter)
return result, err
}
```
### 执行引擎调试
1. 构造单元测试的环境(ioc), 被测试的对象在ioc里面, 名字叫: script_excutor, 运行单元测试的时候需要ioc容器启动并且完成初始化
```go
package test
import (
"os"
"github.com/infraboard/mcube/v2/ioc"
)
// 用于设置测试环境的函数,比如初始化日志系统,配置环境变量等
func Setup() {
// 配置文件的逻辑
// 和我们本地运行时的保存一致, etc/application.toml, 这是一个相对路径(工程名称)
// 点击单元测试的时候, 单元测试的运行目录是变化的,因此这里需要给绝对逻辑
ioc.DevelopmentSetupWithPaths(os.Getenv("workspaceFolder") + "/agent/etc/application.toml")
}
```
2. 通过单测验证脚本运行功能
```go
func TestScriptExcutor_ExecuteScript(t *testing.T) {
// 直接使用单元测试的上下文, 方便取消
req := script.NewExecuteScriptRequest("task_debug.sh", []string{})
req.SetWorkDir("task-01")
req.SetTimeout(30 * time.Second)
req.SetDebugScript(true)
req.SetLogFile("stdout.txt")
req.SetLogCallback(func(s string) {
fmt.Print(s)
})
resp, err := script.ExecuteScript(t.Context(), req)
if err != nil {
t.Fatalf("failed to execute script: %v", err)
}
t.Logf("script execution result: %+v", resp)
}
```
## 注册脚本执行引擎
1. 注册
```go
// 把ScriptExcutor 托管给IOC管理
func init() {
ioc.Controller().Registry(&ScriptExcutor{
WorkDirPrefix: "workspace",
ScriptDirPrefix: "scripts",
IsVerifyScriptIntegrity: false,
})
}
```
2. 初始化
```go
func (e *ScriptExcutor) Init() error {
// 初始化日志记录器
e.log = log.Sub(e.Name())
// 初始化脚本完整性管理器
e.integrityManager = NewScriptIntegrityManager(e.ScriptDirPrefix, e.IsVerifyScriptIntegrity)
return nil
}
func (e *ScriptExcutor) Name() string {
return APP_NAME
}
```
## 服务启动
1. 服务配置
```toml
[app]
name = "devops_agent"
[http]
port = 8848
[script_excutor]
work_dir_prefix = "workspace"
script_dir_prefix = "scripts"
is_verify_script_integrity = false
```
2. 服务启动
```go
import (
"github.com/infraboard/mcube/v2/ioc/server/cmd"
// 工程引用到的一些通用工具(API)
// 这些接口 不是用来做功能实现, 提供健康检查,性能,等额外信息
// 健康检查
_ "github.com/infraboard/mcube/v2/ioc/apps/health/restful"
// 非业务模块
_ "github.com/infraboard/mcube/v2/ioc/apps/metric/restful"
// 开启API Doc
_ "github.com/infraboard/mcube/v2/ioc/apps/apidoc/restful"
)
func main() {
// 启动服务(ioc)
cmd.Start()
}
```
## 使用接口触发任务执行
```go
package api
import (
"devops/agent/tasks"
"devops/server/apps/task"
restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
"github.com/google/uuid"
"github.com/infraboard/mcube/v2/exception"
"github.com/infraboard/mcube/v2/http/restful/response"
"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/ioc/config/gorestful"
)
func init() {
ioc.Api().Registry(&TaskApiHandler{})
}
// 一个API, 提供给外部调用的接口来触发一个Task的执行
// 参数 TaskSpec, 返回 Task
type TaskApiHandler struct {
ioc.ObjectImpl
}
func (h *TaskApiHandler) Name() string {
return "tasks"
}
func (i *TaskApiHandler) Version() string {
return "v1"
}
func (h *TaskApiHandler) Init() error {
ws := gorestful.ObjectRouter(h)
// restfulspec "github.com/emicklei/go-restful-openapi/v2"
// 这个库是 go-restful 的一个扩展库,用于生成 OpenAPI 规范的文档
tags := []string{"Task管理"}
ws.Route(ws.POST("").To(h.RunTask).
Doc("运行任务").
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(task.TaskSpec{}).
Writes(task.Task{}).
Returns(200, "OK", task.Task{}))
return nil
}
// RunTask 处理运行任务的API请求
// Body 参数是 TaskSpec, 返回 Task
func (h *TaskApiHandler) RunTask(r *restful.Request, w *restful.Response) {
// 解析请求参数,构造 TaskSpec
req := &task.TaskSpec{}
if err := r.ReadEntity(req); err != nil {
// "github.com/infraboard/mcube/v2/http/restful/response"
// Failed 封装的 GoRestful框架的 异常返回
response.Failed(w, err)
return
}
// 补充TaskId
if req.Id == "" {
req.Id = uuid.NewString()
}
// 找到对应的 TaskRunner, 这里我们通过 TaskSpec 中的 Name 字段来找到对应的 TaskRunner
taskDebugRunner := tasks.GetTaskRunner(req.Name)
if taskDebugRunner == nil {
response.Failed(w, exception.NewInternalServerError("%s not found", req.Name))
return
}
// 处理请求
taskResp, err := taskDebugRunner.Run(r.Request.Context(), req)
if err != nil {
response.Failed(w, err)
return
}
response.Success(w, taskResp)
}
```