diff --git a/devcloud-mini/cmdb/apps/secret/api/api.go b/devcloud-mini/cmdb/apps/secret/api/api.go index 1d10044..3e4567d 100644 --- a/devcloud-mini/cmdb/apps/secret/api/api.go +++ b/devcloud-mini/cmdb/apps/secret/api/api.go @@ -11,6 +11,7 @@ import ( "github.com/infraboard/mcube/v2/ioc/config/gorestful" "github.com/infraboard/modules/iam/apps/endpoint" "gitlab.com/go-course-project/go17/devcloud-mini/cmdb/apps/secret" + audit "gitlab.com/go-course-project/go17/devcloud-mini/maudit/audit" permission "gitlab.com/go-course-project/go17/devcloud-mini/mcenter/permisson" ) @@ -36,6 +37,7 @@ func (r *SecretApiHandler) Init() error { ws.Route(ws.GET("").To(r.QuerySecret). Metadata(permission.Auth(true)). Metadata(permission.Permission(true)). + Metadata(audit.Audit(true)). Metadata(endpoint.META_RESOURCE_KEY, "secret"). Metadata(endpoint.META_ACTION_KEY, "list"). Doc("凭证列表"). diff --git a/devcloud-mini/cmdb/etc/application.toml b/devcloud-mini/cmdb/etc/application.toml index 95f8f9e..a43b020 100644 --- a/devcloud-mini/cmdb/etc/application.toml +++ b/devcloud-mini/cmdb/etc/application.toml @@ -2,7 +2,7 @@ name = "cmdb" description = "cmdb" internal_address = "http://127.0.0.1:8020" - internal_token = "bar3TjDvMxITfrGrVLvv3ujF" + internal_token = "hufyCWnmC1TapxC87b2W4tB5" [http] # 开启GRPC服务 @@ -22,4 +22,14 @@ [mongo] endpoints = ["127.0.0.1:27017"] - database = "go17" \ No newline at end of file + database = "go17" + +[kafka] + brokers = ["127.0.0.1:9092"] + scram_algorithm = "SHA512" + username = "" + password = "" + debug = false + +[auditor] + topic = "maudit_new" \ No newline at end of file diff --git a/devcloud-mini/docs/arch.drawio b/devcloud-mini/docs/arch.drawio index 47c8b7c..0b6adbd 100644 --- a/devcloud-mini/docs/arch.drawio +++ b/devcloud-mini/docs/arch.drawio @@ -8,13 +8,13 @@ - + - + - + @@ -24,8 +24,13 @@ + + + + + - + @@ -36,6 +41,17 @@ + + + + + + + + + + + diff --git a/devcloud-mini/maudit/apps/event/const.go b/devcloud-mini/maudit/apps/event/const.go new file mode 100644 index 0000000..5a399c1 --- /dev/null +++ b/devcloud-mini/maudit/apps/event/const.go @@ -0,0 +1,5 @@ +package event + +const ( + META_AUDIT_KEY = "audit" +) diff --git a/devcloud-mini/maudit/apps/event/consumer/consumer.go b/devcloud-mini/maudit/apps/event/consumer/consumer.go new file mode 100644 index 0000000..0935d86 --- /dev/null +++ b/devcloud-mini/maudit/apps/event/consumer/consumer.go @@ -0,0 +1,42 @@ +package consumer + +import ( + "context" + "io" + "time" + + "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps/event" +) + +// 读取消息,处理消息 +func (c *consumer) Run(ctx context.Context) error { + for { + m, err := c.reader.FetchMessage(ctx) + if err != nil { + if err == io.EOF { + c.log.Info().Msg("reader closed") + return nil + } + c.log.Error().Msgf("featch message error, %s", err) + time.Sleep(5 * time.Second) + continue + } + + // 处理消息 + e := event.NewEvent() + c.log.Debug().Msgf("message at topic/partition/offset %v/%v/%v", m.Topic, m.Partition, m.Offset) + + // 发送的数据时Json格式, 接收用的JSON, 发送也需要使用JSON + err = e.Load(m.Value) + if err == nil { + if err := event.GetService().SaveEvent(ctx, event.NewEventSet().Add(e)); err != nil { + c.log.Error().Msgf("save event error, %s", err) + } + } + + // 处理完消息后需要提交该消息已经消费完成, 消费者挂掉后保存消息消费的状态 + if err := c.reader.CommitMessages(ctx, m); err != nil { + c.log.Error().Msgf("failed to commit messages: %s", err) + } + } +} diff --git a/devcloud-mini/maudit/apps/event/consumer/impl.go b/devcloud-mini/maudit/apps/event/consumer/impl.go new file mode 100644 index 0000000..335951c --- /dev/null +++ b/devcloud-mini/maudit/apps/event/consumer/impl.go @@ -0,0 +1,61 @@ +package consumer + +import ( + "context" + + "github.com/infraboard/mcube/v2/ioc" + "github.com/infraboard/mcube/v2/ioc/config/log" + "github.com/rs/zerolog" + + ioc_kafka "github.com/infraboard/mcube/v2/ioc/config/kafka" + ioc_mongo "github.com/infraboard/mcube/v2/ioc/config/mongo" + kafka "github.com/segmentio/kafka-go" +) + +func init() { + ioc.Controller().Registry(&consumer{ + GroupId: "maudit", + Topics: []string{"maudit_new"}, + ctx: context.Background(), + }) +} + +// 业务具体实现 +type consumer struct { + // 继承模版 + ioc.ObjectImpl + + // 模块子Logger + log *zerolog.Logger + + // + reader *kafka.Reader + // 允许时上下文 + ctx context.Context + + // 消费组Id + GroupId string `toml:"group_id" json:"group_id" yaml:"group_id" env:"GROUP_ID"` + // 当前这个消费者 配置的topic + Topics []string `toml:"topic" json:"topic" yaml:"topic" env:"TOPIC"` +} + +// 对象名称 +func (i *consumer) Name() string { + return "maudit_consumer" +} + +// 初始化 +func (i *consumer) Init() error { + // 对象 + i.log = log.Sub(i.Name()) + i.log.Debug().Msgf("database: %s", ioc_mongo.Get().Database) + i.reader = ioc_kafka.ConsumerGroup(i.GroupId, i.Topics) + + go i.Run(i.ctx) + return nil +} + +func (i *consumer) Close(ctx context.Context) error { + i.ctx.Done() + return nil +} diff --git a/devcloud-mini/maudit/apps/event/impl/event.go b/devcloud-mini/maudit/apps/event/impl/event.go new file mode 100644 index 0000000..186e32e --- /dev/null +++ b/devcloud-mini/maudit/apps/event/impl/event.go @@ -0,0 +1,45 @@ +package impl + +import ( + "context" + + "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps/event" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// 存储 +// 选择MongoDB +func (i *impl) SaveEvent(ctx context.Context, in *event.EventSet) error { + i.log.Debug().Msgf("events: %s", in) + + _, err := i.col.InsertMany(ctx, in.ToDocs()) + if err != nil { + return err + } + return nil +} + +// 查询 +func (i *impl) QueryEvent(ctx context.Context, in *event.QueryEventRequest) (*event.EventSet, error) { + set := event.NewEventSet() + + filter := bson.M{} + + opt := options.Find() + opt.SetLimit(int64(in.PageSize)) + opt.SetSkip(in.ComputeOffset()) + cursor, err := i.col.Find(ctx, filter, opt) + if err != nil { + return nil, err + } + + for cursor.Next(ctx) { + e := event.NewEvent() + if err := cursor.Decode(e); err != nil { + return nil, err + } + set.Add(e) + } + return set, nil +} diff --git a/devcloud-mini/maudit/apps/event/impl/impl.go b/devcloud-mini/maudit/apps/event/impl/impl.go new file mode 100644 index 0000000..bee4edc --- /dev/null +++ b/devcloud-mini/maudit/apps/event/impl/impl.go @@ -0,0 +1,43 @@ +package impl + +import ( + "github.com/infraboard/mcube/v2/ioc" + "github.com/infraboard/mcube/v2/ioc/config/log" + "github.com/rs/zerolog" + "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps/event" + + ioc_mongo "github.com/infraboard/mcube/v2/ioc/config/mongo" + "go.mongodb.org/mongo-driver/mongo" +) + +func init() { + ioc.Controller().Registry(&impl{}) +} + +// 业务具体实现 +type impl struct { + // 继承模版 + ioc.ObjectImpl + + // 模块子Logger + log *zerolog.Logger + + // + col *mongo.Collection +} + +// 对象名称 +func (i *impl) Name() string { + return event.AppName +} + +// 初始化 +func (i *impl) Init() error { + // 对象 + i.log = log.Sub(i.Name()) + + i.log.Debug().Msgf("database: %s", ioc_mongo.Get().Database) + // 需要一个集合Collection + i.col = ioc_mongo.DB().Collection("events") + return nil +} diff --git a/devcloud-mini/maudit/apps/event/interface.go b/devcloud-mini/maudit/apps/event/interface.go index 0e4b82e..3c71c05 100644 --- a/devcloud-mini/maudit/apps/event/interface.go +++ b/devcloud-mini/maudit/apps/event/interface.go @@ -1 +1,34 @@ package event + +import ( + "context" + + "github.com/infraboard/mcube/v2/http/request" + "github.com/infraboard/mcube/v2/ioc" +) + +var ( + AppName = "event" +) + +func GetService() Service { + return ioc.Controller().Get(AppName).(Service) +} + +type Service interface { + // 存储 + SaveEvent(context.Context, *EventSet) error + // 查询 + QueryEvent(context.Context, *QueryEventRequest) (*EventSet, error) +} + +func NewQueryEventRequest() *QueryEventRequest { + return &QueryEventRequest{ + PageRequest: request.NewDefaultPageRequest(), + } +} + +type QueryEventRequest struct { + // 分页请求参数 + *request.PageRequest +} diff --git a/devcloud-mini/maudit/apps/event/kafka_test.go b/devcloud-mini/maudit/apps/event/kafka_test.go index 2620aca..8d7136d 100644 --- a/devcloud-mini/maudit/apps/event/kafka_test.go +++ b/devcloud-mini/maudit/apps/event/kafka_test.go @@ -1,6 +1,8 @@ package event_test import ( + "context" + "fmt" "net" "strconv" "testing" @@ -56,3 +58,49 @@ func TestListTopic(t *testing.T) { } t.Log(topics) } + +func TestKafkaWirteMessage(t *testing.T) { + w := &kafka.Writer{ + Addr: kafka.TCP("localhost:9092"), + Topic: "maudit_new", + Balancer: &kafka.LeastBytes{}, + } + err := w.WriteMessages(context.Background(), kafka.Message{ + // 支持 Writing to multiple topics + // NOTE: Each Message has Topic defined, otherwise an error is returned. + // Topic: "topic-A", + Key: []byte("Key-A"), + Value: []byte("Hello World!"), + }, + kafka.Message{ + Key: []byte("Key-B"), + Value: []byte("One!"), + }, + kafka.Message{ + Key: []byte("Key-C"), + Value: []byte("Two!"), + }) + if err != nil { + t.Fatal(err) + } +} + +func TestKafkaReadMessage(t *testing.T) { + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "maudit_new", + GroupID: "maudit", + }) + // 自动化 1. 读取消息, 读出来 就代表已经被处理, FetchMessage, Commit(OK) + // r.ReadMessage(context.Background()) + // 手动操作: 2. 获取消息, commit(OK), 对消息可靠度有要求,自己严格控制,避免消息丢失 + for { + m, err := r.FetchMessage(context.Background()) + if err != nil { + t.Fatal(err) + } + fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) + // 标记消息已处理 + r.CommitMessages(context.Background(), m) + } +} diff --git a/devcloud-mini/maudit/apps/event/model.go b/devcloud-mini/maudit/apps/event/model.go index 0e4b82e..691ebbe 100644 --- a/devcloud-mini/maudit/apps/event/model.go +++ b/devcloud-mini/maudit/apps/event/model.go @@ -1 +1,95 @@ package event + +import ( + "encoding/json" + "time" + + "github.com/infraboard/mcube/v2/tools/pretty" + "github.com/rs/xid" + "github.com/segmentio/kafka-go" +) + +func NewEventSet() *EventSet { + return &EventSet{ + Items: []*Event{}, + } +} + +type EventSet struct { + // t + Total int64 `json:"total"` + // 列表 + Items []*Event `json:"items"` +} + +func (s *EventSet) String() string { + return pretty.ToJSON(s) +} + +func (s *EventSet) Add(items ...*Event) *EventSet { + s.Items = append(s.Items, items...) + return s +} + +func (s *EventSet) ToDocs() (docs []any) { + for i := range s.Items { + docs = append(docs, s.Items[i]) + } + return +} + +func NewEvent() *Event { + return &Event{ + Id: xid.New().String(), + Label: map[string]string{}, + Extras: map[string]string{}, + Time: time.Now().Unix(), + } +} + +// 用户操作事件 +// 如何映射成 MongoDB BSON +type Event struct { + // 事件Id, + // _id 在mongodb 表示的是对象Id + Id string `json:"id" bson:"_id"` + // 谁 + Who string `json:"who" bson:"who"` + // 在什么时间 + Time int64 `json:"time" bson:"time"` + // 操作人的Ip + Ip string `json:"ip" bson:"ip"` + // User Agent + UserAgent string `json:"user_agent" bson:"user_agent"` + + // 做了什么操作, 服务:资源:动作 + // 服务 + Service string `json:"service" bson:"service"` + // 资源 + ResourceType string `json:"resource_type" bson:"resource_type"` + // 动作 + Action string `json:"action" bson:"action"` + + // 详情信息 + ResourceId string `json:"resource_id" bson:"resource_id"` + // 状态码 404 + StatusCode int `json:"status_code" bson:"status_code"` + // 具体信息 + ErrorMessage string `json:"error_message" bson:"error_message"` + + // 标签 + Label map[string]string `json:"label" bson:"label"` + // 扩展信息 + Extras map[string]string `json:"extras" bson:"extras"` +} + +func (e *Event) Load(data []byte) error { + return json.Unmarshal(data, e) +} + +func (e *Event) ToKafkaMessage() kafka.Message { + data, _ := json.Marshal(e) + return kafka.Message{ + Value: data, + } +} diff --git a/devcloud-mini/maudit/apps/registry.go b/devcloud-mini/maudit/apps/registry.go index cff2ab9..d873dbf 100644 --- a/devcloud-mini/maudit/apps/registry.go +++ b/devcloud-mini/maudit/apps/registry.go @@ -1 +1,6 @@ package apps + +import ( + _ "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps/event/consumer" + _ "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps/event/impl" +) diff --git a/devcloud-mini/maudit/audit/README.md b/devcloud-mini/maudit/audit/README.md new file mode 100644 index 0000000..5c0fc80 --- /dev/null +++ b/devcloud-mini/maudit/audit/README.md @@ -0,0 +1,2 @@ +# 审计接入中间件 + diff --git a/devcloud-mini/maudit/audit/audit.go b/devcloud-mini/maudit/audit/audit.go new file mode 100644 index 0000000..503a1c0 --- /dev/null +++ b/devcloud-mini/maudit/audit/audit.go @@ -0,0 +1,124 @@ +package aduit + +import ( + "context" + + "github.com/emicklei/go-restful/v3" + "github.com/infraboard/mcube/v2/ioc" + "github.com/infraboard/mcube/v2/ioc/config/application" + "github.com/infraboard/mcube/v2/ioc/config/gorestful" + ioc_kafka "github.com/infraboard/mcube/v2/ioc/config/kafka" + "github.com/infraboard/mcube/v2/ioc/config/log" + "github.com/infraboard/modules/iam/apps/endpoint" + "github.com/infraboard/modules/iam/apps/token" + "github.com/rs/zerolog" + "github.com/segmentio/kafka-go" + "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps/event" +) + +func init() { + ioc.Config().Registry(&auditor{ + Topic: "maudit_new", + }) +} + +func Audit(v bool) (string, bool) { + return event.META_AUDIT_KEY, v +} + +type auditor struct { + ioc.ObjectImpl + + log *zerolog.Logger + + // 当前这个消费者 配置的topic + Topic string `toml:"topic" json:"topic" yaml:"topic" env:"TOPIC"` + // + wirter *kafka.Writer +} + +func (a *auditor) Name() string { + return "auditor" +} + +func (a *auditor) Init() error { + a.log = log.Sub("mauditor") + a.log.Debug().Msgf("maduit topic name: %s", a.Topic) + a.wirter = ioc_kafka.Producer(a.Topic) + + // 添加到中间件, 加到Root Router里面 + ws := gorestful.RootRouter() + ws.Filter(a.Audit()) + return nil +} + +// 补充中间件函数逻辑 +func (a *auditor) Audit() restful.FilterFunction { + return func(r1 *restful.Request, r2 *restful.Response, fc *restful.FilterChain) { + sr := r1.SelectedRoute() + md := NewMetaData(sr.Metadata()) + + // 开关打开,则开启审计 + if md.GetBool(event.META_AUDIT_KEY) { + + // 获取当前是否需要审计 + e := event.NewEvent() + + // 用户信息 + tk := token.GetTokenFromCtx(r1.Request.Context()) + if tk != nil { + e.Who = tk.UserName + e.Extras["namespace"] = tk.NamespaceName + } + + // ioc 里面获取当前应用的名称 + e.Service = application.Get().AppName + e.ResourceType = md.GetString(endpoint.META_RESOURCE_KEY) + e.Action = md.GetString(endpoint.META_ACTION_KEY) + + // {id} /:id + e.ResourceId = r1.PathParameter("id") + e.UserAgent = r1.Request.UserAgent() + e.Extras["method"] = sr.Method() + e.Extras["path"] = sr.Path() + e.Extras["operation"] = sr.Operation() + + // 补充处理后的数据 + e.StatusCode = r2.StatusCode() + // 发送给topic, 使用这个中间件的使用者,需要配置kafka + err := a.wirter.WriteMessages(context.Background(), e.ToKafkaMessage()) + if err != nil { + a.log.Error().Msgf("send message error, %s", err) + } else { + a.log.Debug().Msgf("send audit event ok, who: %s, resource: %s, action: %s", e.Who, e.ResourceType, e.Action) + } + } + + // 路有给后续逻辑 + fc.ProcessFilter(r1, r2) + } +} + +func NewMetaData(data map[string]any) *MetaData { + return &MetaData{ + data: data, + } +} + +type MetaData struct { + data map[string]any +} + +func (m *MetaData) GetString(key string) string { + if v, ok := m.data[key]; ok { + return v.(string) + } + return "" +} + +func (m *MetaData) GetBool(key string) bool { + if v, ok := m.data[key]; ok { + return v.(bool) + } + return false +} diff --git a/devcloud-mini/maudit/etc/application.toml b/devcloud-mini/maudit/etc/application.toml new file mode 100644 index 0000000..d1a9363 --- /dev/null +++ b/devcloud-mini/maudit/etc/application.toml @@ -0,0 +1,27 @@ +[app] + name = "mauit" + description = "maudit" + internal_address = "http://127.0.0.1:8020" + internal_token = "hufyCWnmC1TapxC87b2W4tB5" + +[http] + # 开启GRPC服务 + enable = true + # HTTP服务Host + host = "127.0.0.1" + # HTTP服务端口 + port = 8030 + +[event] + topics = ["maudit_new"] + +[mongo] + endpoints = ["127.0.0.1:27017"] + database = "go17" + +[kafka] + brokers = ["127.0.0.1:9092"] + scram_algorithm = "SHA512" + username = "" + password = "" + debug = false \ No newline at end of file diff --git a/devcloud-mini/maudit/main.go b/devcloud-mini/maudit/main.go new file mode 100644 index 0000000..4ccffcc --- /dev/null +++ b/devcloud-mini/maudit/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "github.com/infraboard/mcube/v2/ioc/server/cmd" + + _ "gitlab.com/go-course-project/go17/devcloud-mini/maudit/apps" + // 开启API Doc + _ "github.com/infraboard/mcube/v2/ioc/apps/apidoc/restful" +) + +// mcube +func main() { + cmd.Start() +} diff --git a/devcloud-mini/mcenter/etc/application.toml b/devcloud-mini/mcenter/etc/application.toml index 0c99de3..5999ec4 100644 --- a/devcloud-mini/mcenter/etc/application.toml +++ b/devcloud-mini/mcenter/etc/application.toml @@ -2,7 +2,7 @@ name = "mcenter" description = "mcenter" internal_address = "http://127.0.0.1:8020" - internal_token = "bar3TjDvMxITfrGrVLvv3ujF" + internal_token = "hufyCWnmC1TapxC87b2W4tB5" [http] # 开启GRPC服务 diff --git a/go.mod b/go.mod index 0ee4240..4a6e2d1 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/infraboard/mcube/v2 v2.0.52 - github.com/infraboard/modules v0.0.8 + github.com/infraboard/modules v0.0.9 github.com/rs/zerolog v1.32.0 github.com/segmentio/kafka-go v0.4.47 github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.1116 @@ -88,7 +88,7 @@ require ( github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 // indirect github.com/redis/go-redis/v9 v9.5.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/rs/xid v1.5.0 // indirect + github.com/rs/xid v1.5.0 github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect diff --git a/go.sum b/go.sum index 5a7276c..56fd340 100644 --- a/go.sum +++ b/go.sum @@ -127,8 +127,8 @@ github.com/infraboard/mcenter v0.0.45 h1:zFVmurkjGXb582hEJS0YSLjyl4TcimwSCeEbjXa github.com/infraboard/mcenter v0.0.45/go.mod h1:YsGG69OTCgqsAb0VYr7pLNASElVtimX1WQ/ZzANQ9MI= github.com/infraboard/mcube/v2 v2.0.52 h1:cOzVjTz2LlIMvz1CCXrRwyX6uaF0JKxc7MBIRMLipFY= github.com/infraboard/mcube/v2 v2.0.52/go.mod h1:gnr0xPPDPHvCS6JAzvdjqJ62J2+vUZTkobomjTXKsx0= -github.com/infraboard/modules v0.0.8 h1:ytzkjMbRuFb6FoI3Md6xS5hITjFqIvhIMMBvMQUGuE4= -github.com/infraboard/modules v0.0.8/go.mod h1:2ENrxqKlWvkf9WgO9FvXw3bUHWtPut71WSwDvVga/PI= +github.com/infraboard/modules v0.0.9 h1:LKBegOLiiJdRyRTkuT7COxRP7hri9KUF+Ef9fAdVBxY= +github.com/infraboard/modules v0.0.9/go.mod h1:u6e8Lq8W6bNNU0qxgEkxCXzaeNb0BrrIMmPXaVo+3s8= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=