diff --git a/devcloud/audit/apps/operator/README.md b/devcloud/audit/apps/event/README.md similarity index 100% rename from devcloud/audit/apps/operator/README.md rename to devcloud/audit/apps/event/README.md diff --git a/devcloud/audit/apps/operator/api/api.go b/devcloud/audit/apps/event/api/api.go similarity index 100% rename from devcloud/audit/apps/operator/api/api.go rename to devcloud/audit/apps/event/api/api.go diff --git a/devcloud/audit/apps/event/consumer/README.md b/devcloud/audit/apps/event/consumer/README.md new file mode 100644 index 0000000..50f758b --- /dev/null +++ b/devcloud/audit/apps/event/consumer/README.md @@ -0,0 +1,2 @@ +# 消息kafka的实践并存储 + diff --git a/devcloud/audit/apps/event/consumer/consumer.go b/devcloud/audit/apps/event/consumer/consumer.go new file mode 100644 index 0000000..edb9a0d --- /dev/null +++ b/devcloud/audit/apps/event/consumer/consumer.go @@ -0,0 +1,40 @@ +package consumer + +import ( + "context" + "io" + + "github.com/infraboard/modules/maudit/apps/event" +) + +// 读取消息,处理消息, 使用同步方法, 会阻塞 +func (c *consumer) Run(ctx context.Context) error { + for { + m, err := c.reader.FetchMessage(ctx) + if err != nil { + if err == io.EOF { + c.log.Info().Msg("reader closed") + return nil + } + c.log.Error().Msgf("featch message error, %s", err) + continue + } + + // 处理消息 + e := event.NewEvent() + c.log.Debug().Msgf("message at topic/partition/offset %v/%v/%v", m.Topic, m.Partition, m.Offset) + + // 发送的数据时Json格式, 接收用的JSON, 发送也需要使用JSON + err = e.Load(m.Value) + if err == nil { + if err := event.GetService().SaveEvent(ctx, event.NewEventSet().Add(e)); err != nil { + c.log.Error().Msgf("save event error, %s", err) + } + } + + // 处理完消息后需要提交该消息已经消费完成, 消费者挂掉后保存消息消费的状态 + if err := c.reader.CommitMessages(ctx, m); err != nil { + c.log.Error().Msgf("failed to commit messages: %s", err) + } + } +} diff --git a/devcloud/audit/apps/event/consumer/impl.go b/devcloud/audit/apps/event/consumer/impl.go new file mode 100644 index 0000000..af9892e --- /dev/null +++ b/devcloud/audit/apps/event/consumer/impl.go @@ -0,0 +1,59 @@ +package consumer + +import ( + "context" + + "github.com/infraboard/mcube/v2/ioc" + "github.com/infraboard/mcube/v2/ioc/config/log" + "github.com/rs/zerolog" + + ioc_kafka "github.com/infraboard/mcube/v2/ioc/config/kafka" + kafka "github.com/segmentio/kafka-go" +) + +func init() { + ioc.Controller().Registry(&consumer{ + GroupId: "audit", + Topics: []string{"audit_go18"}, + ctx: context.Background(), + }) +} + +// 业务具体实现 +type consumer struct { + // 继承模版 + ioc.ObjectImpl + + // 模块子Logger + log *zerolog.Logger + + // Kafka消费者 + reader *kafka.Reader + // 允许时上下文 + ctx context.Context + + // 消费组Id + GroupId string `toml:"group_id" json:"group_id" yaml:"group_id" env:"GROUP_ID"` + // 当前这个消费者 配置的topic + Topics []string `toml:"topic" json:"topic" yaml:"topic" env:"TOPIC"` +} + +// 对象名称 +func (i *consumer) Name() string { + return "maudit_consumer" +} + +// 初始化 +func (i *consumer) Init() error { + // 对象 + i.log = log.Sub(i.Name()) + i.reader = ioc_kafka.ConsumerGroup(i.GroupId, i.Topics) + + go i.Run(i.ctx) + return nil +} + +func (i *consumer) Close(ctx context.Context) error { + i.ctx.Done() + return nil +} diff --git a/devcloud/audit/apps/event/impl/event.go b/devcloud/audit/apps/event/impl/event.go new file mode 100644 index 0000000..f0be8be --- /dev/null +++ b/devcloud/audit/apps/event/impl/event.go @@ -0,0 +1,46 @@ +package impl + +import ( + "context" + + "122.51.31.227/go-course/go18/devcloud/audit/apps/event" + "github.com/infraboard/mcube/v2/types" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// 存储 +func (s *EventServiceImpl) SaveEvent(ctx context.Context, in *types.Set[*event.Event]) error { + s.log.Debug().Msgf("events: %s", in) + + _, err := s.col.InsertMany(ctx, in.ToAny()) + if err != nil { + return err + } + return nil +} + +func (s *EventServiceImpl) QueryEvent(ctx context.Context, in *event.QueryEventRequest) (*types.Set[*event.Event], error) { + set := types.NewSet[*event.Event]() + + // 查询条件 + filter := bson.M{} + + opt := options.Find() + opt.SetLimit(int64(in.PageSize)) + opt.SetSkip(in.ComputeOffset()) + + cursor, err := s.col.Find(ctx, filter, opt) + if err != nil { + return nil, err + } + + for cursor.Next(ctx) { + e := event.NewEvent() + if err := cursor.Decode(e); err != nil { + return nil, err + } + set.Add(e) + } + return set, nil +} diff --git a/devcloud/audit/apps/event/impl/impl.go b/devcloud/audit/apps/event/impl/impl.go new file mode 100644 index 0000000..e237463 --- /dev/null +++ b/devcloud/audit/apps/event/impl/impl.go @@ -0,0 +1,43 @@ +package impl + +import ( + "122.51.31.227/go-course/go18/devcloud/audit/apps/event" + "github.com/infraboard/mcube/v2/ioc" + "github.com/infraboard/mcube/v2/ioc/config/log" + "github.com/rs/zerolog" + + ioc_mongo "github.com/infraboard/mcube/v2/ioc/config/mongo" + "go.mongodb.org/mongo-driver/mongo" +) + +func init() { + ioc.Controller().Registry(&EventServiceImpl{}) +} + +// 业务具体实现 +type EventServiceImpl struct { + // 继承模版 + ioc.ObjectImpl + + // 模块子Logger + log *zerolog.Logger + + // MongoDB集合 + col *mongo.Collection +} + +// 对象名称 +func (i *EventServiceImpl) Name() string { + return event.AppName +} + +// 初始化 +func (i *EventServiceImpl) Init() error { + // 对象 + i.log = log.Sub(i.Name()) + + i.log.Debug().Msgf("database: %s", ioc_mongo.Get().Database) + // 需要一个集合Collection + i.col = ioc_mongo.DB().Collection("events") + return nil +} diff --git a/devcloud/audit/apps/event/interface.go b/devcloud/audit/apps/event/interface.go new file mode 100644 index 0000000..06aa983 --- /dev/null +++ b/devcloud/audit/apps/event/interface.go @@ -0,0 +1,35 @@ +package event + +import ( + "context" + + "github.com/infraboard/mcube/v2/http/request" + "github.com/infraboard/mcube/v2/ioc" + "github.com/infraboard/mcube/v2/types" +) + +var ( + AppName = "event" +) + +func GetService() Service { + return ioc.Controller().Get(AppName).(Service) +} + +type Service interface { + // 存储 + SaveEvent(context.Context, *types.Set[*Event]) error + // 查询 + QueryEvent(context.Context, *QueryEventRequest) (*types.Set[*Event], error) +} + +func NewQueryEventRequest() *QueryEventRequest { + return &QueryEventRequest{ + PageRequest: request.NewDefaultPageRequest(), + } +} + +type QueryEventRequest struct { + // 分页请求参数 + *request.PageRequest +} diff --git a/devcloud/audit/apps/event/model.go b/devcloud/audit/apps/event/model.go new file mode 100644 index 0000000..fc3efd8 --- /dev/null +++ b/devcloud/audit/apps/event/model.go @@ -0,0 +1,65 @@ +package event + +import ( + "encoding/json" + "time" + + "github.com/rs/xid" + "github.com/segmentio/kafka-go" +) + +func NewEvent() *Event { + return &Event{ + Id: xid.New().String(), + Label: map[string]string{}, + Extras: map[string]string{}, + Time: time.Now().Unix(), + } +} + +// 用户操作事件 +// 如何映射成 MongoDB BSON +type Event struct { + // 事件Id, + // _id 在mongodb 表示的是对象Id + Id string `json:"id" bson:"_id"` + // 谁 + Who string `json:"who" bson:"who"` + // 在什么时间 + Time int64 `json:"time" bson:"time"` + // 操作人的Ip + Ip string `json:"ip" bson:"ip"` + // User Agent + UserAgent string `json:"user_agent" bson:"user_agent"` + + // 做了什么操作, 服务:资源:动作 + // 服务 + Service string `json:"service" bson:"service"` + // 资源 + ResourceType string `json:"resource_type" bson:"resource_type"` + // 动作 + Action string `json:"action" bson:"action"` + + // 详情信息 + ResourceId string `json:"resource_id" bson:"resource_id"` + // 状态码 404 + StatusCode int `json:"status_code" bson:"status_code"` + // 具体信息 + ErrorMessage string `json:"error_message" bson:"error_message"` + + // 标签 + Label map[string]string `json:"label" bson:"label"` + // 扩展信息 + Extras map[string]string `json:"extras" bson:"extras"` +} + +func (e *Event) Load(data []byte) error { + return json.Unmarshal(data, e) +} + +func (e *Event) ToKafkaMessage() kafka.Message { + data, _ := json.Marshal(e) + return kafka.Message{ + Value: data, + } +} diff --git a/devcloud/audit/apps/operator/impl/impl.go b/devcloud/audit/apps/operator/impl/impl.go deleted file mode 100644 index 4f9d22e..0000000 --- a/devcloud/audit/apps/operator/impl/impl.go +++ /dev/null @@ -1 +0,0 @@ -package impl diff --git a/devcloud/audit/apps/registry.go b/devcloud/audit/apps/registry.go index d6baaa1..d61a6fd 100644 --- a/devcloud/audit/apps/registry.go +++ b/devcloud/audit/apps/registry.go @@ -1,6 +1,6 @@ package apps import ( - _ "122.51.31.227/go-course/go18/devcloud/audit/apps/operator/api" - _ "122.51.31.227/go-course/go18/devcloud/audit/apps/operator/impl" + _ "122.51.31.227/go-course/go18/devcloud/audit/apps/event/api" + _ "122.51.31.227/go-course/go18/devcloud/audit/apps/event/impl" ) diff --git a/devcloud/etc/application.toml b/devcloud/etc/application.toml index 3c7ce50..964bffe 100644 --- a/devcloud/etc/application.toml +++ b/devcloud/etc/application.toml @@ -14,6 +14,16 @@ auto_migrate = true debug = true +[mongo] + endpoints = ["127.0.0.1:27017"] + username = "" + password = "" + +[kafka] + brokers = ["127.0.0.1:9092"] + username = "" + password = "" + [http] host = "127.0.0.1" port = 8080 diff --git a/devcloud/main.go b/devcloud/main.go index 5f87654..9b47c04 100644 --- a/devcloud/main.go +++ b/devcloud/main.go @@ -5,6 +5,8 @@ import ( // mcenter 业务对象 _ "122.51.31.227/go-course/go18/devcloud/mcenter/apps" + // audit 业务对象 + _ "122.51.31.227/go-course/go18/devcloud/audit/apps" // 非功能性模块 _ "github.com/infraboard/mcube/v2/ioc/apps/apidoc/restful"