Kafka[1]是Apache基金會開源的一個分布式事件流處理平臺,是Java陣營(最初為Scala)中的一款殺手級應用,其提供的高可靠性、高吞吐量和低延遲的數據傳輸能力,讓其到目前為止依舊是現代企業級應用系統以及云原生應用系統中使用的重要中間件。
在日常開發Go程序時,我們經常會遇到一些依賴Kafka的代碼[2],如何對這些代碼進行測試,尤其是單測是擺在Go開發者前面的一個現實問題!
有人說用mock,是個路子。但看過我的《單測時盡量用fake object[3]》一文的童鞋估計已經走在了尋找kafka fake object的路上了!Kafka雖好,但身形碩大,不那么靈巧。找到一個合適的fake object不容易。在這篇文章中,我們就來聊聊如何測試那些依賴kafka的代碼,再往本質一點說,就是和大家以找找那些合適的kafka fake object。
在《單測時盡量用fake object[4]》一文中,我們提到過,如果測試的依賴提供了tiny版本或某些簡化版,我們可以直接使用這些版本作為fake object的候選,就像etcd提供了用于測試的自身簡化版的實現(embed)[5]那樣。
但Kafka并沒有提供tiny版本,我們也只能選擇《單測時盡量用fake object[6]》一文提到的另外一個策略,那就是利用容器來充當fake object,這是目前能搞到任意依賴的fake object的最簡單路徑了。也許以后WASI(WebAssembly System Interface)[7]成熟了,讓wasm脫離瀏覽器并可以在本地系統上飛起,到時候換用wasm也不遲。
下面我們就按照使用容器的策略來找一找適合的kafka container。
我們第一站就來到了testcontainers-go[8]。testcontainers-go是一個Go語言開源項目,專門用于簡化創建和清理基于容器的依賴項,常用于Go項目的單元測試、自動化集成或冒煙測試中。通過testcontainers-go提供的易于使用的API,開發人員能夠以編程方式定義作為測試的一部分而運行的容器,并在測試完成時清理這些資源。
注:testcontainers[9]不僅提供Go API,它還覆蓋了主流的編程語言,包括:Java、.NET、Python、Node.js、Rust[10]等。
在幾個月之前,testcontainers-go[11]項目還沒有提供對Kafka的直接支持,我們需要自己使用testcontainers.GenericContainer來自定義并啟動kafka容器。2023年9月,以KRaft模式運行的Kafka容器才被首次引入testcontainers-go項目[12]。
目前testcontainers-go使用的kafka鏡像版本是confluentinc/confluent-local:7.5.0[13]。Confluent[14]是在kafka背后的那家公司,基于kafka提供商業化支持。今年初,Confluent還收購了Immerok,將apache的另外一個明星項目Flink招致麾下。
confluent-local[15]并不是一個流行的kafka鏡像,它只是一個使用KRaft模式的零配置的、包含Confluent Community RestProxy的Apache Kafka,并且鏡像是實驗性的,僅應用于本地開發工作流,不應該用在支持生產工作負載。
生產中最常用的開源kafka鏡像是confluentinc/cp-kafka鏡像[16],它是基于開源Kafka項目構建的,但在此基礎上添加了一些額外的功能和工具,以提供更豐富的功能和更易于部署和管理的體驗。cp-kafka鏡像的版本號并非kafka的版本號,其對應關系需要cp-kafka鏡像官網查詢。
另外一個開發領域常用的kafka鏡像是bitnami的kafka鏡像。Bitnami是一個提供各種開源軟件的預打包鏡像和應用程序棧的公司。Bitnami Kafka鏡像是基于開源Kafka項目構建的,是一個可用于快速部署和運行Kafka的Docker鏡像。Bitnami Kafka鏡像與其內部的Kakfa的版本號保持一致。
下面我們就來看看如何使用testcontainers-go的kafka來作為依賴kafka的Go單元測試用例的fake object。
這第一個測試示例改編自testcontainers-go/kafka module的example_test.go:
// testcontainers/kafka_setup/kafka_test.gopackage mainimport ( "context" "fmt" "testing" "github.com/testcontainers/testcontainers-go/modules/kafka")func TestKafkaSetup(t *testing.T) { ctx := context.Background() kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster")) if err != nil { panic(err) } // Clean up the container defer func() { if err := kafkaContainer.Terminate(ctx); err != nil { panic(err) } }() state, err := kafkaContainer.State(ctx) if err != nil { panic(err) } if kafkaContainer.ClusterID != "test-cluster" { t.Errorf("want test-cluster, actual %s", kafkaContainer.ClusterID) } if state.Running != true { t.Errorf("want true, actual %t", state.Running) } brokers, _ := kafkaContainer.Brokers(ctx) fmt.Printf("%q/n", brokers)}
在這個例子中,我們直接調用kafka.RunContainer創建了一個名為test-cluster的kafka實例,如果沒有通過WithImage向RunContainer傳入自定義鏡像,那么默認我們將啟動一個confluentinc/confluent-local:7.5.0的容器(注意:隨著時間變化,該默認容器鏡像的版本也會隨之改變)。
通過RunContainer返回的kafka.KafkaContainer我們可以獲取到關于kafka容器的各種信息,比如上述代碼中的ClusterID、kafka Broker地址信息等。有了這些信息,我們后續便可以與以容器形式啟動的kafka建立連接并做數據的寫入和讀取操作了。
我們先來看這個測試的運行結果,與預期一致:
$ go test 2023/12/16 21:45:52 github.com/testcontainers/testcontainers-go - Connected to docker: ... ... Resolved Docker Host: unix:///var/run/docker.sock Resolved Docker Socket Path: /var/run/docker.sock Test SessionID: 19e47867b733f4da4f430d78961771ae3a1cc66c5deca083b4f6359c6d4b2468 Test ProcessID: 41b9ef62-2617-4189-b23a-1bfa4c06dfec2023/12/16 21:45:52 Creating container for image docker.io/testcontainers/ryuk:0.5.12023/12/16 21:45:53 Container created: 8f2240042c272023/12/16 21:45:53 Starting container: 8f2240042c272023/12/16 21:45:53 Container started: 8f2240042c272023/12/16 21:45:53 Waiting for container id 8f2240042c27 image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}2023/12/16 21:45:53 Creating container for image confluentinc/confluent-local:7.5.02023/12/16 21:45:53 Container created: a39a495aed0b2023/12/16 21:45:53 Starting container: a39a495aed0b2023/12/16 21:45:53 Container started: a39a495aed0b["localhost:1037"]2023/12/16 21:45:58 Terminating container: a39a495aed0b2023/12/16 21:45:58 Container terminated: a39a495aed0bPASSok demo 6.236s
接下來,在上面用例的基礎上,我們再來做一個Kafka連接以及數據讀寫測試:
// testcontainers/kafka_consumer_and_producer/kafka_test.gopackage mainimport ( "bytes" "context" "errors" "net" "strconv" "testing" "time" "github.com/testcontainers/testcontainers-go/modules/kafka" kc "github.com/segmentio/kafka-go" // kafka client)func createTopics(brokers []string, topics ...string) error { // to create topics when auto.create.topics.enable='false' conn, err := kc.Dial("tcp", brokers[0]) if err != nil { return err } defer conn.Close() controller, err := conn.Controller() if err != nil { return err } var controllerConn *kc.Conn controllerConn, err = kc.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { return err } defer controllerConn.Close() var topicConfigs []kc.TopicConfig for _, topic := range topics { topicConfig := kc.TopicConfig{ Topic: topic, NumPartitions: 1, ReplicationFactor: 1, } topicConfigs = append(topicConfigs, topicConfig) } err = controllerConn.CreateTopics(topicConfigs...) if err != nil { return err } return nil}func newWriter(brokers []string, topic string) *kc.Writer { return &kc.Writer{ Addr: kc.TCP(brokers...), Topic: topic, Balancer: &kc.LeastBytes{}, AllowAutoTopicCreation: true, RequiredAcks: 0, }}func newReader(brokers []string, topic string) *kc.Reader { return kc.NewReader(kc.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: "test-group", MaxBytes: 10e6, // 10MB })}func TestProducerAndConsumer(t *testing.T) { ctx := context.Background() kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster")) if err != nil { t.Fatalf("want nil, actual %v/n", err) } // Clean up the container defer func() { if err := kafkaContainer.Terminate(ctx); err != nil { t.Fatalf("want nil, actual %v/n", err) } }() state, err := kafkaContainer.State(ctx) if err != nil { t.Fatalf("want nil, actual %v/n", err) } if state.Running != true { t.Errorf("want true, actual %t", state.Running) } brokers, err := kafkaContainer.Brokers(ctx) if err != nil { t.Fatalf("want nil, actual %v/n", err) } topic := "test-topic" w := newWriter(brokers, topic) defer w.Close() r := newReader(brokers, topic) defer r.Close() err = createTopics(brokers, topic) if err != nil { t.Fatalf("want nil, actual %v/n", err) } time.Sleep(5 * time.Second) messages := []kc.Message{ { Key: []byte("Key-A"), Value: []byte("Value-A"), }, { Key: []byte("Key-B"), Value: []byte("Value-B"), }, { Key: []byte("Key-C"), Value: []byte("Value-C"), }, { Key: []byte("Key-D"), Value: []byte("Value-D!"), }, } const retries = 3 for i := 0; i < retries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // attempt to create topic prior to publishing the message err = w.WriteMessages(ctx, messages...) if errors.Is(err, kc.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } if err != nil { t.Fatalf("want nil, actual %v/n", err) } break } var getMessages []kc.Message for i := 0; i < len(messages); i++ { m, err := r.ReadMessage(context.Background()) if err != nil { t.Fatalf("want nil, actual %v/n", err) } getMessages = append(getMessages, m) } for i := 0; i < len(messages); i++ { if !bytes.Equal(getMessages[i].Key, messages[i].Key) { t.Errorf("want %s, actual %s/n", string(messages[i].Key), string(getMessages[i].Key)) } if !bytes.Equal(getMessages[i].Value, messages[i].Value) { t.Errorf("want %s, actual %s/n", string(messages[i].Value), string(getMessages[i].Value)) } }}
我們使用segmentio/kafka-go這個客戶端[17]來實現kafka的讀寫。關于如何使用segmentio/kafka-go這個客戶端,可以參考我之前寫的《Go社區主流Kafka客戶端簡要對比[18]》。
這里我們在TestProducerAndConsumer這個用例中,先通過testcontainers-go的kafka.RunContainer啟動一個Kakfa實例,然后創建了一個topic: “test-topic”。我們在寫入消息前也可以不單獨創建這個“test-topic”,Kafka默認啟用topic自動創建,并且segmentio/kafka-go的高級API:Writer也支持AllowAutoTopicCreation的設置。不過topic的創建需要一些時間,如果要在首次寫入消息時創建topic,此次寫入可能會失敗,需要retry。
向topic寫入一條消息(實際上是一個批量Message,包括四個key-value pair)后,我們調用ReadMessage從上述topic中讀取消息,并將讀取的消息與寫入的消息做比較。
注:近期發現kafka-go的一個可能導致內存暴漲的問題[19],在kafka ack返回延遲變大的時候,可能觸發該問題。
下面是執行該用例的輸出結果:
$ go test2023/12/17 17:43:54 github.com/testcontainers/testcontainers-go - Connected to docker: Server Version: 24.0.7 API Version: 1.43 Operating System: CentOS Linux 7 (Core) Total Memory: 30984 MB Resolved Docker Host: unix:///var/run/docker.sock Resolved Docker Socket Path: /var/run/docker.sock Test SessionID: f76fe611c753aa4ef1456285503b0935a29795e7c0fab2ea2588029929215a08 Test ProcessID: 27f531ee-9b5f-4e4f-b5f0-4681438710042023/12/17 17:43:54 Creating container for image docker.io/testcontainers/ryuk:0.5.12023/12/17 17:43:54 Container created: 577309098f4c2023/12/17 17:43:54 Starting container: 577309098f4c2023/12/17 17:43:54 Container started: 577309098f4c2023/12/17 17:43:54 Waiting for container id 577309098f4c image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}2023/12/17 17:43:54 Creating container for image confluentinc/confluent-local:7.5.02023/12/17 17:43:55 Container created: 1ee11e11742b2023/12/17 17:43:55 Starting container: 1ee11e11742b2023/12/17 17:43:55 Container started: 1ee11e11742b2023/12/17 17:44:15 Terminating container: 1ee11e11742b2023/12/17 17:44:15 Container terminated: 1ee11e11742bPASSok demo 21.505s
我們看到默認情況下,testcontainer能滿足與kafka交互的基本需求,并且testcontainer提供了一系列Option(WithXXX)可以對container進行定制,以滿足一些擴展性的要求,但是這需要你對testcontainer提供的API有更全面的了解。
除了開箱即用的testcontainer之外,我們還可以使用另外一種方便的基于容器的技術:docker-compose來定制和啟停我們需要的kafka image[20]。接下來,我們就來看看如何使用docker-compose建立fake kafka object。
這次我們使用bitnami提供的kafka鏡像,我們先建立一個“等價”于上面“testcontainers-go”提供的kafka module的kafka實例,下面是docker-compose.yml:
// docker-compose/bitnami/plaintext/docker-compose.ymlversion: "2"services: kafka: image: docker.io/bitnami/kafka:3.6 network_mode: "host" volumes: - "kafka_data:/bitnami" environment: # KRaft settings - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 # Listeners - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT # borrow from testcontainer - KAFKA_CFG_BROKER_ID=0 - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1 - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1 - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0 - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807volumes: kafka_data: driver: local
我們看到其中一些配置“借鑒”了testcontainers-go的kafka module,我們啟動一下該容器:
$ docker-compose up -d[+] Running 2/2 ? Volume "plaintext_kafka_data" Created 0.0s ? Container plaintext-kafka-1 Started 0.1s
依賴該容器的go測試代碼與前面的TestProducerAndConsumer差不多,只是在開始處去掉了container的創建過程:
// docker-compose/bitnami/plaintext/kafka_test.gofunc TestProducerAndConsumer(t *testing.T) { brokers := []string{"localhost:9092"} topic := "test-topic" w := newWriter(brokers, topic) defer w.Close() r := newReader(brokers, topic) defer r.Close() err := createTopics(brokers, topic) if err != nil { t.Fatalf("want nil, actual %v/n", err) } time.Sleep(5 * time.Second) ... ...}
運行該測試用例,我們看到預期的結果:
go testwrite message ok Value-Awrite message ok Value-Bwrite message ok Value-Cwrite message ok Value-D!PASSok demo 15.143s
不過對于單元測試來說,顯然我們不能手動來啟動和停止kafka container,我們需要為每個用例填上setup和teardown,這樣也能保證用例間的相互隔離,于是我們增加了一個docker_compose_helper.go文件,在這個文件中我們提供了一些幫助testcase啟停kafka的helper函數:
// docker-compose/bitnami/plaintext/docker_compose_helper.gopackage mainimport ( "fmt" "os/exec" "strings" "time")// helpler function for operating docker container through docker-compose commandconst ( defaultCmd = "docker-compose" defaultCfgFile = "docker-compose.yml")func execCliCommand(cmd string, opts ...string) ([]byte, error) { cmds := cmd + " " + strings.Join(opts, " ") fmt.Println("exec command:", cmds) return exec.Command(cmd, opts...).CombinedOutput()}func execDockerComposeCommand(cmd string, cfgFile string, opts ...string) ([]byte, error) { var allOpts = []string{"-f", cfgFile} allOpts = append(allOpts, opts...) return execCliCommand(cmd, allOpts...)}func UpKakfa(composeCfgFile string) ([]byte, error) { b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "up", "-d") if err != nil { return nil, err } time.Sleep(10 * time.Second) return b, nil}func UpDefaultKakfa() ([]byte, error) { return UpKakfa(defaultCfgFile)}func DownKakfa(composeCfgFile string) ([]byte, error) { b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "down", "-v") if err != nil { return nil, err } time.Sleep(10 * time.Second) return b, nil}func DownDefaultKakfa() ([]byte, error) { return DownKakfa(defaultCfgFile)}
眼尖的童鞋可能看到:在UpKakfa和DownKafka函數中我們使用了硬編碼的“time.Sleep”來等待10s,通常在鏡像已經pull到本地后這是有效的,但卻不是最精確地等待方式,testcontainers-go/wait[21]中提供了等待容器內程序啟動完畢的多種策略,如果你想用更精確的等待方式,可以了解一下wait包。
基于helper函數,我們改造一下TestProducerAndConsumer用例:
// docker-compose/bitnami/plaintext/kafka_test.gofunc TestProducerAndConsumer(t *testing.T) { _, err := UpDefaultKakfa() if err != nil { t.Fatalf("want nil, actual %v/n", err) } t.Cleanup(func() { DownDefaultKakfa() }) ... ...}
我們在用例開始處通過UpDefaultKakfa使用docker-compose將kafka實例啟動起來,然后注冊了Cleanup函數[22],用于在test case執行結束后銷毀kafka實例。
下面是新版用例的執行結果:
$ go testexec command: docker-compose -f docker-compose.yml up -dwrite message ok Value-Awrite message ok Value-Bwrite message ok Value-Cwrite message ok Value-D!exec command: docker-compose -f docker-compose.yml down -vPASSok demo 36.402s
使用docker-compose的最大好處就是可以通過docker-compose.yml文件對要fake的object進行靈活的定制,這種定制與testcontainers-go的差別就是你無需去研究testcontiners-go的API。
下面是使用tls連接與kafka建立連接并實現讀寫的示例。
Kafka的配置復雜是有目共睹的,為了建立一個基于TLS連接,我也是花了不少時間做“試驗”,尤其是listeners以及證書的配置,不下點苦功夫讀文檔還真是配不出來。
下面是一個基于bitnami/kafka鏡像配置出來的基于TLS安全通道上的kafka實例:
// docker-compose/bitnami/tls/docker-compose.yml# config doc: https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.mdversion: "2"services: kafka: image: docker.io/bitnami/kafka:3.6 network_mode: "host" #ports: #- "9092:9092" environment: # KRaft settings - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094 # Listeners - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,SECURED://:9093,CONTROLLER://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=SECURED://:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SECURED:SSL,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SECURED # SSL settings - KAFKA_TLS_TYPE=PEM - KAFKA_TLS_CLIENT_AUTH=none - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= # borrow from testcontainer - KAFKA_CFG_BROKER_ID=0 - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1 - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1 - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0 - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807 volumes: # server.cert, server.key and ca.crt - "kafka_data:/bitnami" - "./kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro" - "./kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro" - "./kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro"volumes: kafka_data: driver: local
這里我們使用pem格式的證書和key,在上面配置中,volumes下面掛載的kafka.keystore.pem、kafka.keystore.key和kafka.truststore.pem分別對應了以前在Go中常用的名字:server-cert.pem(服務端證書), server-key.pem(服務端私鑰)和ca-cert.pem(CA證書)。
這里整理了一個一鍵生成的腳本docker-compose/bitnami/tls/kafka-generate-cert.sh,我們執行該腳本生成所有需要的證書并放到指定位置(遇到命令行提示,只需要一路回車即可):
$bash kafka-generate-cert.sh .........++++++.............................++++++You are about to be asked to enter information that will be incorporatedinto your certificate request.What you are about to enter is what is called a Distinguished Name or a DN.There are quite a few fields but you can leave some blankFor some fields there will be a default value,If you enter '.', the field will be left blank.-----Country Name (2 letter code) [XX]:State or Province Name (full name) []:Locality Name (eg, city) [Default City]:Organization Name (eg, company) [Default Company Ltd]:Organizational Unit Name (eg, section) []:Common Name (eg, your name or your server's hostname) []:Email Address []:Please enter the following 'extra' attributesto be sent with your certificate requestA challenge password []:An optional company name []:Signature oksubject=/C=XX/L=Default City/O=Default Company LtdGetting Private key.....................++++++.........++++++You are about to be asked to enter information that will be incorporatedinto your certificate request.What you are about to enter is what is called a Distinguished Name or a DN.There are quite a few fields but you can leave some blankFor some fields there will be a default value,If you enter '.', the field will be left blank.-----Country Name (2 letter code) [XX]:State or Province Name (full name) []:Locality Name (eg, city) [Default City]:Organization Name (eg, company) [Default Company Ltd]:Organizational Unit Name (eg, section) []:Common Name (eg, your name or your server's hostname) []:Email Address []:Please enter the following 'extra' attributesto be sent with your certificate requestA challenge password []:An optional company name []:Signature oksubject=/C=XX/L=Default City/O=Default Company LtdGetting CA Private Key
接下來,我們來改造用例,使之支持以tls方式建立到kakfa的連接:
//docker-compose/bitnami/tls/kafka_test.gofunc createTopics(brokers []string, tlsConfig *tls.Config, topics ...string) error { dialer := &kc.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: tlsConfig, } conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0]) if err != nil { fmt.Println("creating topic: dialer dial error:", err) return err } defer conn.Close() fmt.Println("creating topic: dialer dial ok") ... ...}func newWriter(brokers []string, tlsConfig *tls.Config, topic string) *kc.Writer { w := &kc.Writer{ Addr: kc.TCP(brokers...), Topic: topic, Balancer: &kc.LeastBytes{}, AllowAutoTopicCreation: true, Async: true, //RequiredAcks: 0, Completion: func(messages []kc.Message, err error) { for _, message := range messages { if err != nil { fmt.Println("write message fail", err) } else { fmt.Println("write message ok", string(message.Topic), string(message.Value)) } } }, } if tlsConfig != nil { w.Transport = &kc.Transport{ TLS: tlsConfig, } } return w}func newReader(brokers []string, tlsConfig *tls.Config, topic string) *kc.Reader { dialer := &kc.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: tlsConfig, } return kc.NewReader(kc.ReaderConfig{ Dialer: dialer, Brokers: brokers, Topic: topic, GroupID: "test-group", MaxBytes: 10e6, // 10MB })}func TestProducerAndConsumer(t *testing.T) { var err error _, err = UpDefaultKakfa() if err != nil { t.Fatalf("want nil, actual %v/n", err) } t.Cleanup(func() { DownDefaultKakfa() }) brokers := []string{"localhost:9093"} topic := "test-topic" tlsConfig, _ := newTLSConfig() w := newWriter(brokers, tlsConfig, topic) defer w.Close() r := newReader(brokers, tlsConfig, topic) defer r.Close() err = createTopics(brokers, tlsConfig, topic) if err != nil { fmt.Printf("create topic error: %v, but it may not affect the later action, just ignore it/n", err) } time.Sleep(5 * time.Second) ... ...}func newTLSConfig() (*tls.Config, error) { /* // 加載 CA 證書 caCert, err := ioutil.ReadFile("/path/to/ca.crt") if err != nil { return nil, err } // 加載客戶端證書和私鑰 cert, err := tls.LoadX509KeyPair("/path/to/client.crt", "/path/to/client.key") if err != nil { return nil, err } // 創建 CertPool 并添加 CA 證書 caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) */ // 創建并返回 TLS 配置 return &tls.Config{ //RootCAs: caCertPool, //Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true, }, nil}
在上述代碼中,我們按照segmentio/kafka-go為createTopics、newWriter和newReader都加上了tls.Config參數,此外在測試用例中,我們用newTLSConfig創建一個tls.Config的實例,在這里我們一切簡化處理,采用InsecureSkipVerify=true的方式與kafka broker服務端進行握手,既不驗證服務端證書,也不做雙向認證(mutual TLS)。
下面是修改代碼后的測試用例執行結果:
$ go testexec command: docker-compose -f docker-compose.yml up -dcreating topic: dialer dial okcreating topic: get controller okcreating topic: dial control listener okcreate topic error: EOF, but it may not affect the later action, just ignore itwrite message error: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this brokerwrite message ok Value-Awrite message ok Value-Bwrite message ok Value-Cwrite message ok Value-D!exec command: docker-compose -f docker-compose.yml down -vPASSok demo 38.473s
這里我們看到:createTopics雖然連接kafka的各個listener都ok,但調用topic創建時,返回EOF,但這的確不影響后續action的執行,不確定這是segmentio/kafka-go的問題,還是kafka實例的問題。另外首次寫入消息時,也因為topic或partition未建立而失敗,retry后消息正常寫入。
通過這個例子我們看到,基于docker-compose建立fake object有著更廣泛的靈活性,如果做好容器啟動和停止的精準wait機制的話,我可能會更多選擇這種方式。
本文介紹了如何在Go編程中進行依賴Kafka的單元測試,并探討了尋找適合的Kafka fake object的策略。
對于Kafka這樣的復雜系統來說,找到合適的fake object并不容易。因此,本文推薦使用容器作為fake object的策略,并分別介紹了使用testcontainers-go項目和使用docker-compose作為簡化創建和清理基于容器的依賴項的工具。相對于剛剛加入testcontainers-go項目沒多久的kafka module而言,使用docker-compose自定義fake object更加靈活一些。但無論哪種方法,開發人員都需要對kafka的配置有一個較為整體和深入的理解。
文中主要聚焦使用testcontainers-go和docker-compose建立fake kafka的過程,而用例并沒有建立明確的sut(被測目標),比如針對某個函數的白盒單元測試。
文本涉及的源碼可以在這里[23]下載。
[1] Kafka: https://kafka.apache.org
[2] 依賴Kafka的代碼: https://tonybai.com/2023/09/04/slog-in-action-file-logging-rotation-and-kafka-integration/
[3] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/
[4] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/
[5] 用于測試的自身簡化版的實現(embed): https://github.com/etcd-io/etcd/blob/main/tests/integration/embed
[6] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/
[7] WASI(WebAssembly System Interface): https://wasi.dev/
[8] testcontainers-go: https://golang.testcontainers.org/
[9] testcontainers: https://testcontainers.com
[10] Rust: https://tonybai.com/2023/02/22/rust-vs-go-in-2023/
[11] testcontainers-go: https://github.com/testcontainers/testcontainers-go/
[12] 以KRaft模式運行的Kafka容器才被首次引入testcontainers-go項目: https://github.com/testcontainers/testcontainers-go/pull/1610
[13] confluentinc/confluent-local:7.5.0: https://hub.docker.com/r/confluentinc/confluent-local
[14] Confluent: https://www.confluent.io
[15] confluent-local: https://hub.docker.com/r/confluentinc/confluent-local
[16] confluentinc/cp-kafka鏡像: https://hub.docker.com/r/confluentinc/cp-kafka
[17] 使用segmentio/kafka-go這個客戶端: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients
[18] Go社區主流Kafka客戶端簡要對比: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients
[19] 發現kafka-go的一個可能導致內存暴漲的問題: https://github.com/segmentio/kafka-go/pull/1117
[20] docker-compose來定制和啟停我們需要的kafka image: https://tonybai.com/2021/11/26/build-all-in-one-runtime-environment-with-docker-compose
[21] testcontainers-go/wait: https://pkg.go.dev/github.com/testcontainers/testcontainers-go@v0.26.0/wait
[22] 注冊了Cleanup函數: https://tonybai.com/2020/03/08/some-changes-in-go-1-14/
[23] 這里: https://github.com/bigwhite/experiments/tree/master/unit-testing-deps-on-kafka
[24] Gopher部落知識星球: https://public.zsxq.com/groups/51284458844544
[25] 鏈接地址: https://m.do.co/c/bff6eed92687
本文鏈接:http://www.www897cc.com/showinfo-26-58960-0.html依賴Kafka的Go單元測試例解
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 請注意,你的 Pulsar 集群可能有刪除數據的風險
下一篇: 分享CodePen上六個酷炫demo特效