package event_test import ( "net" "strconv" "testing" "github.com/segmentio/kafka-go" ) // 创建Topic func TestCreateTopic(t *testing.T) { // 1. 连上kafka conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() // contoller 管理, 获取zk地址后,管理集群状态 controller, err := conn.Controller() if err != nil { panic(err.Error()) } var controllerConn *kafka.Conn controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer controllerConn.Close() // contoller 集群的维护 err = controllerConn.CreateTopics(kafka.TopicConfig{Topic: "maudit_new", NumPartitions: 6, ReplicationFactor: 1}) if err != nil { t.Fatal(err) } } // 查询Topic列表 func TestListTopic(t *testing.T) { // 1. 连上kafka conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() partitions, err := conn.ReadPartitions() if err != nil { t.Fatal(err) } topics := map[string]int{} for _, p := range partitions { topics[p.Topic]++ } t.Log(topics) }