补充对接Task模块

This commit is contained in:
yumaojun03 2025-07-06 18:10:36 +08:00
parent 0abb789a44
commit ba04b02cf8
14 changed files with 346 additions and 13 deletions

View File

@ -1 +1,7 @@
package apps
import (
_ "122.51.31.227/go-course/go18/devcloud/cmdb/apps/resource/impl"
_ "122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret/impl"
_ "122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret/sync"
)

View File

@ -32,7 +32,3 @@ const (
)
type TYPE string
const ()
type STATUS string

View File

@ -0,0 +1,5 @@
package secret
const (
TASK_LABLE_SECRET_ID = "secret_id"
)

View File

@ -1 +1,31 @@
package impl
import (
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret"
"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/ioc/config/datasource"
)
func init() {
ioc.Controller().Registry(&SecretServiceImpl{})
}
var _ secret.Service = (*SecretServiceImpl)(nil)
type SecretServiceImpl struct {
ioc.ObjectImpl
}
func (s *SecretServiceImpl) Name() string {
return secret.APP_NAME
}
func (s *SecretServiceImpl) Init() error {
if datasource.Get().AutoMigrate {
err := datasource.DB().AutoMigrate(&secret.Secret{})
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,15 @@
package impl_test
import (
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret"
"122.51.31.227/go-course/go18/devcloud/cmdb/test"
)
var (
svc secret.Service
)
func init() {
test.DevelopmentSetUp()
svc = secret.GetService()
}

View File

@ -1 +1,98 @@
package impl
import (
"context"
"fmt"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret"
"github.com/infraboard/mcube/v2/exception"
"github.com/infraboard/mcube/v2/ioc/config/datasource"
"github.com/infraboard/mcube/v2/types"
"github.com/infraboard/modules/task/apps/task"
"gorm.io/gorm"
)
// CreateSecret implements secret.Service.
func (s *SecretServiceImpl) CreateSecret(ctx context.Context, in *secret.CreateSecretRequest) (*secret.Secret, error) {
ins := secret.NewSecret(in)
// 需要加密
if err := ins.EncryptedApiSecret(); err != nil {
return nil, err
}
// upsert, gorm save
if err := datasource.DBFromCtx(ctx).Save(ins).Error; err != nil {
return nil, err
}
return ins, nil
}
// DescribeSecret implements secret.Service.
func (s *SecretServiceImpl) DescribeSecret(ctx context.Context, in *secret.DescribeSecretRequeset) (*secret.Secret, error) {
// 取出后,需要解密
ins := &secret.Secret{}
query := in.GormResourceFilter(datasource.DBFromCtx(ctx).Model(&secret.Secret{}))
if err := query.Where("id = ?", in.Id).Take(ins).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, exception.NewNotFound("secret %s not found", in.Id)
}
return nil, err
}
ins.SetIsEncrypted(true)
if err := ins.DecryptedApiSecret(); err != nil {
return nil, err
}
// 解密过后的数据
return ins, nil
}
// QuerySecret implements secret.Service.
func (s *SecretServiceImpl) QuerySecret(ctx context.Context, in *secret.QuerySecretRequest) (*types.Set[*secret.Secret], error) {
set := types.New[*secret.Secret]()
query := in.GormResourceFilter(datasource.DBFromCtx(ctx).Model(&secret.Secret{}))
err := query.Count(&set.Total).Error
if err != nil {
return nil, err
}
err = query.
Order("create_at desc").
Offset(int(in.ComputeOffset())).
Limit(int(in.PageSize)).
Find(&set.Items).
Error
if err != nil {
return nil, err
}
return set, nil
}
// SyncResource implements secret.Service.
// 资源同步接口
func (s *SecretServiceImpl) SyncResource(ctx context.Context, in *secret.SyncResourceRequest) (*types.Set[*task.Task], error) {
// 直接初始化并返回,避免警告
taskSet := types.NewSet[*task.Task]()
// 查询Secret信息
ins, err := s.DescribeSecret(ctx, secret.NewDescribeSecretRequeset(in.Id))
if err != nil {
return taskSet, err
}
if !ins.GetEnabled() {
return taskSet, fmt.Errorf("secret %s not enabled", ins.Name)
}
// 获取syncer, 执行同步
for _, rs := range ins.ResourceType {
syncer := secret.GetSyncer(rs)
taskInfo := syncer.Sync(ctx, ins, rs)
taskSet.Add(taskInfo)
}
return taskSet, nil
}

View File

@ -0,0 +1,58 @@
package impl_test
import (
"testing"
"time"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/resource"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret"
)
func TestCreateSecret(t *testing.T) {
req := secret.NewCreateSecretRequest()
req.Name = "腾讯云只读账号"
req.Vendor = resource.VENDOR_TENCENT
req.ApiKey = "xxx"
req.ApiSecret = "xx"
req.SetEnabled(true)
req.ResourceType = append(req.ResourceType, resource.TYPE_VM)
req.Regions = []string{"ap-shanghai", "ap-guangzhou"}
ins, err := svc.CreateSecret(t.Context(), req)
if err != nil {
t.Fatal(err)
}
t.Log(ins)
}
func TestQuerySecret(t *testing.T) {
req := secret.NewQuerySecretRequest()
set, err := svc.QuerySecret(t.Context(), req)
if err != nil {
t.Fatal(err)
}
t.Log(set)
}
const (
SECRET_ID = "ed02f7cd-ee5b-33f0-bdf5-e305ecb3efb4"
)
func TestDescribeSecret(t *testing.T) {
req := secret.NewDescribeSecretRequeset(SECRET_ID)
ins, err := svc.DescribeSecret(t.Context(), req)
if err != nil {
t.Fatal(err)
}
t.Log(ins)
}
func TestSyncResource(t *testing.T) {
req := secret.NewSyncResourceRequest(SECRET_ID)
ins, err := svc.SyncResource(t.Context(), req)
if err != nil {
t.Fatal(err)
}
t.Log(ins)
time.Sleep(3 * time.Second)
}

View File

@ -4,10 +4,12 @@ import (
"context"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/resource"
"122.51.31.227/go-course/go18/devcloud/mcenter/apps/policy"
"github.com/infraboard/mcube/v2/http/request"
"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/tools/pretty"
"github.com/infraboard/mcube/v2/types"
"github.com/infraboard/modules/task/apps/task"
)
const (
@ -34,10 +36,8 @@ type Service interface {
// 这个接口调用持续30分钟...
// 需要拆解为异步任务: 用户调用了同步后, 里面返回, 这个同步任务在后台执行(Gorouties), 需要查询同步日志(Ws)
// 执行同步
SyncResource(context.Context, *SyncResourceRequest) (*SyncResourceTask, error)
// 查询同步日志
QuerySyncLog(context.Context, *QuerySyncLogRequest) (*types.Set[*SyncRecord], error)
// 使用task模块来执行异步任务, 通过TaskId查询异步任务状态
SyncResource(context.Context, *SyncResourceRequest) (*types.Set[*task.Task], error)
}
// 同步记录(task) 有状态
@ -70,6 +70,7 @@ func NewQuerySecretRequest() *QuerySecretRequest {
}
type QuerySecretRequest struct {
policy.ResourceScope
// 分页请求
*request.PageRequest
}
@ -81,6 +82,7 @@ func NewDescribeSecretRequeset(id string) *DescribeSecretRequeset {
}
type DescribeSecretRequeset struct {
policy.ResourceScope
Id string `json:"id"`
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/resource"
"122.51.31.227/go-course/go18/devcloud/mcenter/apps/policy"
"github.com/google/uuid"
"github.com/infraboard/devops/pkg/model"
"github.com/infraboard/mcube/v2/crypto/cbc"
@ -28,7 +29,10 @@ func NewSecret(in *CreateSecretRequest) *Secret {
type Secret struct {
model.Meta
CreateSecretRequest `bson:"inline"`
// 资源范围, Namespace是继承的, Scope是API添加的
policy.ResourceLabel
// 资源定义
CreateSecretRequest
}
func (s *Secret) TableName() string {
@ -54,6 +58,8 @@ func NewCreateSecretRequest() *CreateSecretRequest {
}
type CreateSecretRequest struct {
// 是否启用
Enabled *bool `json:"enabled" gorm:"column:enabled"`
// 名称
Name string `json:"name" gorm:"column:name"`
// 尝试
@ -76,8 +82,21 @@ type CreateSecretRequest struct {
isEncrypted bool `gorm:"-"`
}
func (r *CreateSecretRequest) SetIsEncrypted(v bool) {
func (r *CreateSecretRequest) SetIsEncrypted(v bool) *CreateSecretRequest {
r.isEncrypted = v
return r
}
func (r *CreateSecretRequest) SetEnabled(v bool) *CreateSecretRequest {
r.Enabled = &v
return r
}
func (r *CreateSecretRequest) GetEnabled() bool {
if r.Enabled == nil {
return false
}
return *r.Enabled
}
func (r *CreateSecretRequest) GetSyncLimit() int64 {

View File

@ -0,0 +1,34 @@
package secret
import (
"context"
"fmt"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/resource"
"github.com/infraboard/modules/task/apps/task"
)
const (
SYNCER_PREFIX = "syncer"
)
var (
syncers = map[string]Syncer{}
)
func GetSyncerName(t resource.TYPE) string {
return fmt.Sprintf("%s_%s", SYNCER_PREFIX, t)
}
func GetSyncer(t resource.TYPE) Syncer {
return syncers[fmt.Sprintf("%s_%s", SYNCER_PREFIX, t)]
}
func RegistrySyncer(t resource.TYPE, s Syncer) {
syncers[fmt.Sprintf("%s_%s", SYNCER_PREFIX, t)] = s
}
type Syncer interface {
// 资源同步
Sync(context.Context, *Secret, resource.TYPE) *task.Task
}

View File

@ -0,0 +1,47 @@
package sync
import (
"context"
"time"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/resource"
"122.51.31.227/go-course/go18/devcloud/cmdb/apps/secret"
"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/ioc/config/log"
"github.com/infraboard/modules/task/apps/task"
"github.com/rs/zerolog"
)
func init() {
ioc.Controller().Registry(&VmSyncerServiceImpl{})
}
var _ secret.Syncer = (*VmSyncerServiceImpl)(nil)
type VmSyncerServiceImpl struct {
ioc.ObjectImpl
log *zerolog.Logger
}
func (s *VmSyncerServiceImpl) Name() string {
return secret.GetSyncerName(resource.TYPE_VM)
}
func (s *VmSyncerServiceImpl) Init() error {
s.log = log.Sub(s.Name())
secret.RegistrySyncer(resource.TYPE_VM, s)
return nil
}
// Sync implements secret.Syncer.
func (s *VmSyncerServiceImpl) Sync(ctx context.Context, secretIns *secret.Secret, rs resource.TYPE) *task.Task {
return task.GetService().Run(ctx, task.NewFnTask(func(ctx context.Context, req any) error {
// taskInfo := task.GetTaskFromCtx(ctx)
s.log.Debug().Msg("test for vm sync wait")
time.Sleep(1 * time.Second)
// secrt同步
return nil
}, secretIns).SetAsync(true).SetLabel(secret.TASK_LABLE_SECRET_ID, secretIns.Id))
}

View File

@ -0,0 +1,21 @@
package test
import (
"os"
"github.com/infraboard/mcube/v2/ioc"
// 要注册哪些对象, Book, Comment
// 加载的业务对象
_ "122.51.31.227/go-course/go18/devcloud/cmdb/apps"
// task模块
_ "github.com/infraboard/modules/task/apps"
// _ "122.51.31.227/go-course/go18/devcloud/mcenter/apps"
// _ "122.51.31.227/go-course/go18/devcloud/mpaas/apps"
)
func DevelopmentSetUp() {
// import 后自动执行的逻辑
// 工具对象的初始化, 需要的是绝对路径
ioc.DevelopmentSetupWithPath(os.Getenv("CONFIG_PATH"))
}

3
go.mod
View File

@ -10,7 +10,7 @@ require (
github.com/google/uuid v1.6.0
github.com/infraboard/devops v0.0.6
github.com/infraboard/mcube/v2 v2.0.61
github.com/infraboard/modules v0.0.12
github.com/infraboard/modules v0.0.13
github.com/rs/xid v1.6.0
github.com/rs/zerolog v1.34.0
github.com/segmentio/kafka-go v0.4.47
@ -117,5 +117,6 @@ require (
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.23.1 // indirect
resty.dev/v3 v3.0.0-beta.2 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

6
go.sum
View File

@ -99,8 +99,8 @@ github.com/infraboard/devops v0.0.6 h1:oo7RfRBxu9hbI/+bYzXuX40BfG1hRyVtxC0fBhoFG
github.com/infraboard/devops v0.0.6/go.mod h1:Ac+W3wFy5pG9EH7f1W8DxiAZRhnTDFHrp27WKkYnNoU=
github.com/infraboard/mcube/v2 v2.0.61 h1:al8Z+poXXOjfTIAY48ujFzV1uYzH/N7/xmve/ZXArbo=
github.com/infraboard/mcube/v2 v2.0.61/go.mod h1:TbYs8cnD8Cg19sTdU0D+vqWAN+LzoxhMYWmAC2pfJkQ=
github.com/infraboard/modules v0.0.12 h1:vQqm+JwzmhL+hcD9SV+WVlp9ecInc7NsbGahcTmJ0Wk=
github.com/infraboard/modules v0.0.12/go.mod h1:NdgdH/NoeqibJmFPn9th+tisMuR862/crbXeH4FPMaU=
github.com/infraboard/modules v0.0.13 h1:wykjQswgVnqv3mL2qtjqAui/AWuZWPaG/dkzITmDkrc=
github.com/infraboard/modules v0.0.13/go.mod h1:B+wkbV8oGOmSwribb3y/+F9PAp8H8UzhkrK7YySuYIk=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
@ -353,5 +353,7 @@ modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
resty.dev/v3 v3.0.0-beta.2 h1:xu4mGAdbCLuc3kbk7eddWfWm4JfhwDtdapwss5nCjnQ=
resty.dev/v3 v3.0.0-beta.2/go.mod h1:OgkqiPvTDtOuV4MGZuUDhwOpkY8enjOsjjMzeOHefy4=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=