2025-03-23 16:28:38 +08:00

107 lines
2.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package event_test
import (
"context"
"fmt"
"net"
"strconv"
"testing"
"github.com/segmentio/kafka-go"
)
// 创建Topic
func TestCreateTopic(t *testing.T) {
// 1. 连上kafka
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// contoller 管理, 获取zk地址后管理集群状态
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
// contoller 集群的维护
err = controllerConn.CreateTopics(kafka.TopicConfig{Topic: "maudit_new", NumPartitions: 6, ReplicationFactor: 1})
if err != nil {
t.Fatal(err)
}
}
// 查询Topic列表
func TestListTopic(t *testing.T) {
// 1. 连上kafka
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
t.Fatal(err)
}
topics := map[string]int{}
for _, p := range partitions {
topics[p.Topic]++
}
t.Log(topics)
}
func TestKafkaWirteMessage(t *testing.T) {
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "maudit_new",
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(), kafka.Message{
// 支持 Writing to multiple topics
// NOTE: Each Message has Topic defined, otherwise an error is returned.
// Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
})
if err != nil {
t.Fatal(err)
}
}
func TestKafkaReadMessage(t *testing.T) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "maudit_new",
GroupID: "maudit",
})
// 自动化 1. 读取消息, 读出来 就代表已经被处理, FetchMessage, Commit(OK)
// r.ReadMessage(context.Background())
// 手动操作: 2. 获取消息, commit(OK), 对消息可靠度有要求,自己严格控制,避免消息丢失
for {
m, err := r.FetchMessage(context.Background())
if err != nil {
t.Fatal(err)
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 标记消息已处理
r.CommitMessages(context.Background(), m)
}
}