日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

依賴Kafka的Go單元測試例解

來源: 責編: 時間:2024-01-09 08:52:28 235觀看
導讀Kafka[1]是Apache基金會開源的一個分布式事件流處理平臺,是Java陣營(最初為Scala)中的一款殺手級應用,其提供的高可靠性、高吞吐量和低延遲的數據傳輸能力,讓其到目前為止依舊是現代企業級應用系統以及云原生應用系統中

Kafka[1]是Apache基金會開源的一個分布式事件流處理平臺,是Java陣營(最初為Scala)中的一款殺手級應用,其提供的高可靠性、高吞吐量和低延遲的數據傳輸能力,讓其到目前為止依舊是現代企業級應用系統以及云原生應用系統中使用的重要中間件。3X628資訊網——每日最新資訊28at.com

在日常開發Go程序時,我們經常會遇到一些依賴Kafka的代碼[2],如何對這些代碼進行測試,尤其是單測是擺在Go開發者前面的一個現實問題!3X628資訊網——每日最新資訊28at.com

有人說用mock,是個路子。但看過我的《單測時盡量用fake object[3]》一文的童鞋估計已經走在了尋找kafka fake object的路上了!Kafka雖好,但身形碩大,不那么靈巧。找到一個合適的fake object不容易。在這篇文章中,我們就來聊聊如何測試那些依賴kafka的代碼,再往本質一點說,就是和大家以找找那些合適的kafka fake object。3X628資訊網——每日最新資訊28at.com

1. 尋找fake object的策略

在《單測時盡量用fake object[4]》一文中,我們提到過,如果測試的依賴提供了tiny版本或某些簡化版,我們可以直接使用這些版本作為fake object的候選,就像etcd提供了用于測試的自身簡化版的實現(embed)[5]那樣。3X628資訊網——每日最新資訊28at.com

但Kafka并沒有提供tiny版本,我們也只能選擇《單測時盡量用fake object[6]》一文提到的另外一個策略,那就是利用容器來充當fake object,這是目前能搞到任意依賴的fake object的最簡單路徑了。也許以后WASI(WebAssembly System Interface)[7]成熟了,讓wasm脫離瀏覽器并可以在本地系統上飛起,到時候換用wasm也不遲。3X628資訊網——每日最新資訊28at.com

下面我們就按照使用容器的策略來找一找適合的kafka container。3X628資訊網——每日最新資訊28at.com

2. testcontainers-go

我們第一站就來到了testcontainers-go[8]。testcontainers-go是一個Go語言開源項目,專門用于簡化創建和清理基于容器的依賴項,常用于Go項目的單元測試、自動化集成或冒煙測試中。通過testcontainers-go提供的易于使用的API,開發人員能夠以編程方式定義作為測試的一部分而運行的容器,并在測試完成時清理這些資源。3X628資訊網——每日最新資訊28at.com

注:testcontainers[9]不僅提供Go API,它還覆蓋了主流的編程語言,包括:Java、.NET、Python、Node.js、Rust[10]等。3X628資訊網——每日最新資訊28at.com

在幾個月之前,testcontainers-go[11]項目還沒有提供對Kafka的直接支持,我們需要自己使用testcontainers.GenericContainer來自定義并啟動kafka容器。2023年9月,以KRaft模式運行的Kafka容器才被首次引入testcontainers-go項目[12]。3X628資訊網——每日最新資訊28at.com

目前testcontainers-go使用的kafka鏡像版本是confluentinc/confluent-local:7.5.0[13]。Confluent[14]是在kafka背后的那家公司,基于kafka提供商業化支持。今年初,Confluent還收購了Immerok,將apache的另外一個明星項目Flink招致麾下。3X628資訊網——每日最新資訊28at.com

confluent-local[15]并不是一個流行的kafka鏡像,它只是一個使用KRaft模式的零配置的、包含Confluent Community RestProxy的Apache Kafka,并且鏡像是實驗性的,僅應用于本地開發工作流,不應該用在支持生產工作負載。3X628資訊網——每日最新資訊28at.com

生產中最常用的開源kafka鏡像是confluentinc/cp-kafka鏡像[16],它是基于開源Kafka項目構建的,但在此基礎上添加了一些額外的功能和工具,以提供更豐富的功能和更易于部署和管理的體驗。cp-kafka鏡像的版本號并非kafka的版本號,其對應關系需要cp-kafka鏡像官網查詢。3X628資訊網——每日最新資訊28at.com

另外一個開發領域常用的kafka鏡像是bitnami的kafka鏡像。Bitnami是一個提供各種開源軟件的預打包鏡像和應用程序棧的公司。Bitnami Kafka鏡像是基于開源Kafka項目構建的,是一個可用于快速部署和運行Kafka的Docker鏡像。Bitnami Kafka鏡像與其內部的Kakfa的版本號保持一致。3X628資訊網——每日最新資訊28at.com

下面我們就來看看如何使用testcontainers-go的kafka來作為依賴kafka的Go單元測試用例的fake object。3X628資訊網——每日最新資訊28at.com

這第一個測試示例改編自testcontainers-go/kafka module的example_test.go:3X628資訊網——每日最新資訊28at.com

// testcontainers/kafka_setup/kafka_test.gopackage mainimport (    "context"    "fmt"    "testing" "github.com/testcontainers/testcontainers-go/modules/kafka")func TestKafkaSetup(t *testing.T) {    ctx := context.Background()    kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))    if err != nil {        panic(err)    }    // Clean up the container    defer func() {        if err := kafkaContainer.Terminate(ctx); err != nil {            panic(err)        }    }()    state, err := kafkaContainer.State(ctx)    if err != nil {        panic(err)    }    if kafkaContainer.ClusterID != "test-cluster" {        t.Errorf("want test-cluster, actual %s", kafkaContainer.ClusterID)    }    if state.Running != true {        t.Errorf("want true, actual %t", state.Running)    }    brokers, _ := kafkaContainer.Brokers(ctx)    fmt.Printf("%q/n", brokers)}

在這個例子中,我們直接調用kafka.RunContainer創建了一個名為test-cluster的kafka實例,如果沒有通過WithImage向RunContainer傳入自定義鏡像,那么默認我們將啟動一個confluentinc/confluent-local:7.5.0的容器(注意:隨著時間變化,該默認容器鏡像的版本也會隨之改變)。3X628資訊網——每日最新資訊28at.com

通過RunContainer返回的kafka.KafkaContainer我們可以獲取到關于kafka容器的各種信息,比如上述代碼中的ClusterID、kafka Broker地址信息等。有了這些信息,我們后續便可以與以容器形式啟動的kafka建立連接并做數據的寫入和讀取操作了。3X628資訊網——每日最新資訊28at.com

我們先來看這個測試的運行結果,與預期一致:3X628資訊網——每日最新資訊28at.com

$ go test 2023/12/16 21:45:52 github.com/testcontainers/testcontainers-go - Connected to docker:   ... ...  Resolved Docker Host: unix:///var/run/docker.sock  Resolved Docker Socket Path: /var/run/docker.sock  Test SessionID: 19e47867b733f4da4f430d78961771ae3a1cc66c5deca083b4f6359c6d4b2468  Test ProcessID: 41b9ef62-2617-4189-b23a-1bfa4c06dfec2023/12/16 21:45:52 Creating container for image docker.io/testcontainers/ryuk:0.5.12023/12/16 21:45:53 Container created: 8f2240042c272023/12/16 21:45:53 Starting container: 8f2240042c272023/12/16 21:45:53 Container started: 8f2240042c272023/12/16 21:45:53 Waiting for container id 8f2240042c27 image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}2023/12/16 21:45:53 Creating container for image confluentinc/confluent-local:7.5.02023/12/16 21:45:53 Container created: a39a495aed0b2023/12/16 21:45:53 Starting container: a39a495aed0b2023/12/16 21:45:53 Container started: a39a495aed0b["localhost:1037"]2023/12/16 21:45:58 Terminating container: a39a495aed0b2023/12/16 21:45:58 Container terminated: a39a495aed0bPASSok   demo 6.236s

接下來,在上面用例的基礎上,我們再來做一個Kafka連接以及數據讀寫測試:3X628資訊網——每日最新資訊28at.com

// testcontainers/kafka_consumer_and_producer/kafka_test.gopackage mainimport ( "bytes" "context" "errors" "net" "strconv" "testing" "time" "github.com/testcontainers/testcontainers-go/modules/kafka" kc "github.com/segmentio/kafka-go" // kafka client)func createTopics(brokers []string, topics ...string) error { // to create topics when auto.create.topics.enable='false' conn, err := kc.Dial("tcp", brokers[0]) if err != nil {  return err } defer conn.Close() controller, err := conn.Controller() if err != nil {  return err } var controllerConn *kc.Conn controllerConn, err = kc.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil {  return err } defer controllerConn.Close() var topicConfigs []kc.TopicConfig for _, topic := range topics {  topicConfig := kc.TopicConfig{   Topic:             topic,   NumPartitions:     1,   ReplicationFactor: 1,  }  topicConfigs = append(topicConfigs, topicConfig) } err = controllerConn.CreateTopics(topicConfigs...) if err != nil {  return err } return nil}func newWriter(brokers []string, topic string) *kc.Writer { return &kc.Writer{  Addr:                   kc.TCP(brokers...),  Topic:                  topic,  Balancer:               &kc.LeastBytes{},  AllowAutoTopicCreation: true,  RequiredAcks:           0, }}func newReader(brokers []string, topic string) *kc.Reader { return kc.NewReader(kc.ReaderConfig{  Brokers:  brokers,  Topic:    topic,  GroupID:  "test-group",  MaxBytes: 10e6, // 10MB })}func TestProducerAndConsumer(t *testing.T) { ctx := context.Background() kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster")) if err != nil {  t.Fatalf("want nil, actual %v/n", err) } // Clean up the container defer func() {  if err := kafkaContainer.Terminate(ctx); err != nil {   t.Fatalf("want nil, actual %v/n", err)  } }() state, err := kafkaContainer.State(ctx) if err != nil {  t.Fatalf("want nil, actual %v/n", err) } if state.Running != true {  t.Errorf("want true, actual %t", state.Running) } brokers, err := kafkaContainer.Brokers(ctx) if err != nil {  t.Fatalf("want nil, actual %v/n", err) } topic := "test-topic" w := newWriter(brokers, topic) defer w.Close() r := newReader(brokers, topic) defer r.Close() err = createTopics(brokers, topic) if err != nil {  t.Fatalf("want nil, actual %v/n", err) } time.Sleep(5 * time.Second) messages := []kc.Message{  {   Key:   []byte("Key-A"),   Value: []byte("Value-A"),  },  {   Key:   []byte("Key-B"),   Value: []byte("Value-B"),  },  {   Key:   []byte("Key-C"),   Value: []byte("Value-C"),  },  {   Key:   []byte("Key-D"),   Value: []byte("Value-D!"),  }, } const retries = 3 for i := 0; i < retries; i++ {  ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)  defer cancel()  // attempt to create topic prior to publishing the message  err = w.WriteMessages(ctx, messages...)  if errors.Is(err, kc.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {   time.Sleep(time.Millisecond * 250)   continue  }  if err != nil {   t.Fatalf("want nil, actual %v/n", err)  }  break } var getMessages []kc.Message for i := 0; i < len(messages); i++ {  m, err := r.ReadMessage(context.Background())  if err != nil {   t.Fatalf("want nil, actual %v/n", err)  }  getMessages = append(getMessages, m) } for i := 0; i < len(messages); i++ {  if !bytes.Equal(getMessages[i].Key, messages[i].Key) {   t.Errorf("want %s, actual %s/n", string(messages[i].Key), string(getMessages[i].Key))  }  if !bytes.Equal(getMessages[i].Value, messages[i].Value) {   t.Errorf("want %s, actual %s/n", string(messages[i].Value), string(getMessages[i].Value))  } }}

我們使用segmentio/kafka-go這個客戶端[17]來實現kafka的讀寫。關于如何使用segmentio/kafka-go這個客戶端,可以參考我之前寫的《Go社區主流Kafka客戶端簡要對比[18]》。3X628資訊網——每日最新資訊28at.com

這里我們在TestProducerAndConsumer這個用例中,先通過testcontainers-go的kafka.RunContainer啟動一個Kakfa實例,然后創建了一個topic: “test-topic”。我們在寫入消息前也可以不單獨創建這個“test-topic”,Kafka默認啟用topic自動創建,并且segmentio/kafka-go的高級API:Writer也支持AllowAutoTopicCreation的設置。不過topic的創建需要一些時間,如果要在首次寫入消息時創建topic,此次寫入可能會失敗,需要retry。3X628資訊網——每日最新資訊28at.com

向topic寫入一條消息(實際上是一個批量Message,包括四個key-value pair)后,我們調用ReadMessage從上述topic中讀取消息,并將讀取的消息與寫入的消息做比較。3X628資訊網——每日最新資訊28at.com

注:近期發現kafka-go的一個可能導致內存暴漲的問題[19],在kafka ack返回延遲變大的時候,可能觸發該問題。3X628資訊網——每日最新資訊28at.com

下面是執行該用例的輸出結果:3X628資訊網——每日最新資訊28at.com

$ go test2023/12/17 17:43:54 github.com/testcontainers/testcontainers-go - Connected to docker:   Server Version: 24.0.7  API Version: 1.43  Operating System: CentOS Linux 7 (Core)  Total Memory: 30984 MB  Resolved Docker Host: unix:///var/run/docker.sock  Resolved Docker Socket Path: /var/run/docker.sock  Test SessionID: f76fe611c753aa4ef1456285503b0935a29795e7c0fab2ea2588029929215a08  Test ProcessID: 27f531ee-9b5f-4e4f-b5f0-4681438710042023/12/17 17:43:54 Creating container for image docker.io/testcontainers/ryuk:0.5.12023/12/17 17:43:54 Container created: 577309098f4c2023/12/17 17:43:54 Starting container: 577309098f4c2023/12/17 17:43:54 Container started: 577309098f4c2023/12/17 17:43:54 Waiting for container id 577309098f4c image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}2023/12/17 17:43:54 Creating container for image confluentinc/confluent-local:7.5.02023/12/17 17:43:55 Container created: 1ee11e11742b2023/12/17 17:43:55 Starting container: 1ee11e11742b2023/12/17 17:43:55 Container started: 1ee11e11742b2023/12/17 17:44:15 Terminating container: 1ee11e11742b2023/12/17 17:44:15 Container terminated: 1ee11e11742bPASSok   demo 21.505s

我們看到默認情況下,testcontainer能滿足與kafka交互的基本需求,并且testcontainer提供了一系列Option(WithXXX)可以對container進行定制,以滿足一些擴展性的要求,但是這需要你對testcontainer提供的API有更全面的了解。3X628資訊網——每日最新資訊28at.com

除了開箱即用的testcontainer之外,我們還可以使用另外一種方便的基于容器的技術:docker-compose來定制和啟停我們需要的kafka image[20]。接下來,我們就來看看如何使用docker-compose建立fake kafka object。3X628資訊網——每日最新資訊28at.com

3. 使用docker-compose建立fake kafka

3.1 一個基礎的基于docker-compose的fake kafka實例模板

這次我們使用bitnami提供的kafka鏡像,我們先建立一個“等價”于上面“testcontainers-go”提供的kafka module的kafka實例,下面是docker-compose.yml:3X628資訊網——每日最新資訊28at.com

// docker-compose/bitnami/plaintext/docker-compose.ymlversion: "2"services:  kafka:    image: docker.io/bitnami/kafka:3.6    network_mode: "host"    volumes:      - "kafka_data:/bitnami"    environment:      # KRaft settings      - KAFKA_CFG_NODE_ID=0      - KAFKA_CFG_PROCESS_ROLES=controller,broker      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093      # Listeners      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT      # borrow from testcontainer      - KAFKA_CFG_BROKER_ID=0      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1      - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1      - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0      - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807volumes:  kafka_data:    driver: local

我們看到其中一些配置“借鑒”了testcontainers-go的kafka module,我們啟動一下該容器:3X628資訊網——每日最新資訊28at.com

$ docker-compose up -d[+] Running 2/2 ? Volume "plaintext_kafka_data"  Created                                                                                    0.0s  ? Container plaintext-kafka-1    Started                                                                                    0.1s

依賴該容器的go測試代碼與前面的TestProducerAndConsumer差不多,只是在開始處去掉了container的創建過程:3X628資訊網——每日最新資訊28at.com

// docker-compose/bitnami/plaintext/kafka_test.gofunc TestProducerAndConsumer(t *testing.T) { brokers := []string{"localhost:9092"} topic := "test-topic" w := newWriter(brokers, topic) defer w.Close() r := newReader(brokers, topic) defer r.Close() err := createTopics(brokers, topic) if err != nil {  t.Fatalf("want nil, actual %v/n", err) } time.Sleep(5 * time.Second) ... ...}

運行該測試用例,我們看到預期的結果:3X628資訊網——每日最新資訊28at.com

go testwrite message ok  Value-Awrite message ok  Value-Bwrite message ok  Value-Cwrite message ok  Value-D!PASSok   demo 15.143s

不過對于單元測試來說,顯然我們不能手動來啟動和停止kafka container,我們需要為每個用例填上setup和teardown,這樣也能保證用例間的相互隔離,于是我們增加了一個docker_compose_helper.go文件,在這個文件中我們提供了一些幫助testcase啟停kafka的helper函數:3X628資訊網——每日最新資訊28at.com

// docker-compose/bitnami/plaintext/docker_compose_helper.gopackage mainimport ( "fmt" "os/exec" "strings" "time")// helpler function for operating docker container through docker-compose commandconst ( defaultCmd     = "docker-compose" defaultCfgFile = "docker-compose.yml")func execCliCommand(cmd string, opts ...string) ([]byte, error) { cmds := cmd + " " + strings.Join(opts, " ") fmt.Println("exec command:", cmds) return exec.Command(cmd, opts...).CombinedOutput()}func execDockerComposeCommand(cmd string, cfgFile string, opts ...string) ([]byte, error) { var allOpts = []string{"-f", cfgFile} allOpts = append(allOpts, opts...) return execCliCommand(cmd, allOpts...)}func UpKakfa(composeCfgFile string) ([]byte, error) { b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "up", "-d") if err != nil {  return nil, err } time.Sleep(10 * time.Second) return b, nil}func UpDefaultKakfa() ([]byte, error) { return UpKakfa(defaultCfgFile)}func DownKakfa(composeCfgFile string) ([]byte, error) {    b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "down", "-v")    if err != nil {         return nil, err    }    time.Sleep(10 * time.Second)    return b, nil}func DownDefaultKakfa() ([]byte, error) { return DownKakfa(defaultCfgFile)}

眼尖的童鞋可能看到:在UpKakfa和DownKafka函數中我們使用了硬編碼的“time.Sleep”來等待10s,通常在鏡像已經pull到本地后這是有效的,但卻不是最精確地等待方式,testcontainers-go/wait[21]中提供了等待容器內程序啟動完畢的多種策略,如果你想用更精確的等待方式,可以了解一下wait包。3X628資訊網——每日最新資訊28at.com

基于helper函數,我們改造一下TestProducerAndConsumer用例:3X628資訊網——每日最新資訊28at.com

// docker-compose/bitnami/plaintext/kafka_test.gofunc TestProducerAndConsumer(t *testing.T) {    _, err := UpDefaultKakfa()    if err != nil {        t.Fatalf("want nil, actual %v/n", err)    }    t.Cleanup(func() {        DownDefaultKakfa()    }) ... ...}

我們在用例開始處通過UpDefaultKakfa使用docker-compose將kafka實例啟動起來,然后注冊了Cleanup函數[22],用于在test case執行結束后銷毀kafka實例。3X628資訊網——每日最新資訊28at.com

下面是新版用例的執行結果:3X628資訊網——每日最新資訊28at.com

$ go testexec command: docker-compose -f docker-compose.yml up -dwrite message ok  Value-Awrite message ok  Value-Bwrite message ok  Value-Cwrite message ok  Value-D!exec command: docker-compose -f docker-compose.yml down -vPASSok   demo 36.402s

使用docker-compose的最大好處就是可以通過docker-compose.yml文件對要fake的object進行靈活的定制,這種定制與testcontainers-go的差別就是你無需去研究testcontiners-go的API。3X628資訊網——每日最新資訊28at.com

下面是使用tls連接與kafka建立連接并實現讀寫的示例。3X628資訊網——每日最新資訊28at.com

3.2 建立一個基于TLS連接的fake kafka實例

Kafka的配置復雜是有目共睹的,為了建立一個基于TLS連接,我也是花了不少時間做“試驗”,尤其是listeners以及證書的配置,不下點苦功夫讀文檔還真是配不出來。3X628資訊網——每日最新資訊28at.com

下面是一個基于bitnami/kafka鏡像配置出來的基于TLS安全通道上的kafka實例:3X628資訊網——每日最新資訊28at.com

// docker-compose/bitnami/tls/docker-compose.yml# config doc:  https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.mdversion: "2"services:  kafka:    image: docker.io/bitnami/kafka:3.6    network_mode: "host"    #ports:      #- "9092:9092"    environment:      # KRaft settings      - KAFKA_CFG_NODE_ID=0      - KAFKA_CFG_PROCESS_ROLES=controller,broker      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094      # Listeners      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,SECURED://:9093,CONTROLLER://:9094      - KAFKA_CFG_ADVERTISED_LISTENERS=SECURED://:9093      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SECURED:SSL,PLAINTEXT:PLAINTEXT      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SECURED      # SSL settings      - KAFKA_TLS_TYPE=PEM      - KAFKA_TLS_CLIENT_AUTH=none      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=      # borrow from testcontainer      - KAFKA_CFG_BROKER_ID=0      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1      - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1      - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0      - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807    volumes:      # server.cert, server.key and ca.crt      - "kafka_data:/bitnami"      - "./kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro"      - "./kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro"      - "./kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro"volumes:  kafka_data:    driver: local

這里我們使用pem格式的證書和key,在上面配置中,volumes下面掛載的kafka.keystore.pem、kafka.keystore.key和kafka.truststore.pem分別對應了以前在Go中常用的名字:server-cert.pem(服務端證書), server-key.pem(服務端私鑰)和ca-cert.pem(CA證書)。3X628資訊網——每日最新資訊28at.com

這里整理了一個一鍵生成的腳本docker-compose/bitnami/tls/kafka-generate-cert.sh,我們執行該腳本生成所有需要的證書并放到指定位置(遇到命令行提示,只需要一路回車即可):3X628資訊網——每日最新資訊28at.com

$bash kafka-generate-cert.sh .........++++++.............................++++++You are about to be asked to enter information that will be incorporatedinto your certificate request.What you are about to enter is what is called a Distinguished Name or a DN.There are quite a few fields but you can leave some blankFor some fields there will be a default value,If you enter '.', the field will be left blank.-----Country Name (2 letter code) [XX]:State or Province Name (full name) []:Locality Name (eg, city) [Default City]:Organization Name (eg, company) [Default Company Ltd]:Organizational Unit Name (eg, section) []:Common Name (eg, your name or your server's hostname) []:Email Address []:Please enter the following 'extra' attributesto be sent with your certificate requestA challenge password []:An optional company name []:Signature oksubject=/C=XX/L=Default City/O=Default Company LtdGetting Private key.....................++++++.........++++++You are about to be asked to enter information that will be incorporatedinto your certificate request.What you are about to enter is what is called a Distinguished Name or a DN.There are quite a few fields but you can leave some blankFor some fields there will be a default value,If you enter '.', the field will be left blank.-----Country Name (2 letter code) [XX]:State or Province Name (full name) []:Locality Name (eg, city) [Default City]:Organization Name (eg, company) [Default Company Ltd]:Organizational Unit Name (eg, section) []:Common Name (eg, your name or your server's hostname) []:Email Address []:Please enter the following 'extra' attributesto be sent with your certificate requestA challenge password []:An optional company name []:Signature oksubject=/C=XX/L=Default City/O=Default Company LtdGetting CA Private Key

接下來,我們來改造用例,使之支持以tls方式建立到kakfa的連接:3X628資訊網——每日最新資訊28at.com

//docker-compose/bitnami/tls/kafka_test.gofunc createTopics(brokers []string, tlsConfig *tls.Config, topics ...string) error {    dialer := &kc.Dialer{        Timeout:   10 * time.Second,        DualStack: true,        TLS:       tlsConfig,    }    conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])    if err != nil {        fmt.Println("creating topic: dialer dial error:", err)        return err    }    defer conn.Close()    fmt.Println("creating topic: dialer dial ok") ... ...}func newWriter(brokers []string, tlsConfig *tls.Config, topic string) *kc.Writer {    w := &kc.Writer{        Addr:                   kc.TCP(brokers...),        Topic:                  topic,        Balancer:               &kc.LeastBytes{},        AllowAutoTopicCreation: true,        Async:                  true,        //RequiredAcks:           0,        Completion: func(messages []kc.Message, err error) {            for _, message := range messages {                if err != nil {                    fmt.Println("write message fail", err)                } else {                    fmt.Println("write message ok", string(message.Topic), string(message.Value))                }            }        },    }    if tlsConfig != nil {        w.Transport = &kc.Transport{            TLS: tlsConfig,        }    }    return w}func newReader(brokers []string, tlsConfig *tls.Config, topic string) *kc.Reader {    dialer := &kc.Dialer{        Timeout:   10 * time.Second,        DualStack: true,        TLS:       tlsConfig,    }    return kc.NewReader(kc.ReaderConfig{        Dialer:   dialer,        Brokers:  brokers,        Topic:    topic,        GroupID:  "test-group",        MaxBytes: 10e6, // 10MB    })}func TestProducerAndConsumer(t *testing.T) {    var err error    _, err = UpDefaultKakfa()    if err != nil {        t.Fatalf("want nil, actual %v/n", err)    }    t.Cleanup(func() {        DownDefaultKakfa()    })    brokers := []string{"localhost:9093"}    topic := "test-topic"    tlsConfig, _ := newTLSConfig()    w := newWriter(brokers, tlsConfig, topic)    defer w.Close()    r := newReader(brokers, tlsConfig, topic)    defer r.Close()    err = createTopics(brokers, tlsConfig, topic)    if err != nil {        fmt.Printf("create topic error: %v, but it may not affect the later action, just ignore it/n", err)    }    time.Sleep(5 * time.Second) ... ...}func newTLSConfig() (*tls.Config, error) {    /*       // 加載 CA 證書       caCert, err := ioutil.ReadFile("/path/to/ca.crt")       if err != nil {               return nil, err       }       // 加載客戶端證書和私鑰       cert, err := tls.LoadX509KeyPair("/path/to/client.crt", "/path/to/client.key")       if err != nil {               return nil, err       }       // 創建 CertPool 并添加 CA 證書       caCertPool := x509.NewCertPool()       caCertPool.AppendCertsFromPEM(caCert)    */    // 創建并返回 TLS 配置    return &tls.Config{        //RootCAs:      caCertPool,        //Certificates: []tls.Certificate{cert},        InsecureSkipVerify: true,    }, nil}

在上述代碼中,我們按照segmentio/kafka-go為createTopics、newWriter和newReader都加上了tls.Config參數,此外在測試用例中,我們用newTLSConfig創建一個tls.Config的實例,在這里我們一切簡化處理,采用InsecureSkipVerify=true的方式與kafka broker服務端進行握手,既不驗證服務端證書,也不做雙向認證(mutual TLS)。3X628資訊網——每日最新資訊28at.com

下面是修改代碼后的測試用例執行結果:3X628資訊網——每日最新資訊28at.com

$ go testexec command: docker-compose -f docker-compose.yml up -dcreating topic: dialer dial okcreating topic: get controller okcreating topic: dial control listener okcreate topic error: EOF, but it may not affect the later action, just ignore itwrite message error: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this brokerwrite message ok  Value-Awrite message ok  Value-Bwrite message ok  Value-Cwrite message ok  Value-D!exec command: docker-compose -f docker-compose.yml down -vPASSok   demo 38.473s

這里我們看到:createTopics雖然連接kafka的各個listener都ok,但調用topic創建時,返回EOF,但這的確不影響后續action的執行,不確定這是segmentio/kafka-go的問題,還是kafka實例的問題。另外首次寫入消息時,也因為topic或partition未建立而失敗,retry后消息正常寫入。3X628資訊網——每日最新資訊28at.com

通過這個例子我們看到,基于docker-compose建立fake object有著更廣泛的靈活性,如果做好容器啟動和停止的精準wait機制的話,我可能會更多選擇這種方式。3X628資訊網——每日最新資訊28at.com

4. 小結

本文介紹了如何在Go編程中進行依賴Kafka的單元測試,并探討了尋找適合的Kafka fake object的策略。3X628資訊網——每日最新資訊28at.com

對于Kafka這樣的復雜系統來說,找到合適的fake object并不容易。因此,本文推薦使用容器作為fake object的策略,并分別介紹了使用testcontainers-go項目和使用docker-compose作為簡化創建和清理基于容器的依賴項的工具。相對于剛剛加入testcontainers-go項目沒多久的kafka module而言,使用docker-compose自定義fake object更加靈活一些。但無論哪種方法,開發人員都需要對kafka的配置有一個較為整體和深入的理解。3X628資訊網——每日最新資訊28at.com

文中主要聚焦使用testcontainers-go和docker-compose建立fake kafka的過程,而用例并沒有建立明確的sut(被測目標),比如針對某個函數的白盒單元測試。3X628資訊網——每日最新資訊28at.com

文本涉及的源碼可以在這里[23]下載。3X628資訊網——每日最新資訊28at.com

參考資料

[1] Kafka: https://kafka.apache.org3X628資訊網——每日最新資訊28at.com

[2] 依賴Kafka的代碼: https://tonybai.com/2023/09/04/slog-in-action-file-logging-rotation-and-kafka-integration/3X628資訊網——每日最新資訊28at.com

[3] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/3X628資訊網——每日最新資訊28at.com

[4] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/3X628資訊網——每日最新資訊28at.com

[5] 用于測試的自身簡化版的實現(embed): https://github.com/etcd-io/etcd/blob/main/tests/integration/embed3X628資訊網——每日最新資訊28at.com

[6] 單測時盡量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/3X628資訊網——每日最新資訊28at.com

[7] WASI(WebAssembly System Interface): https://wasi.dev/3X628資訊網——每日最新資訊28at.com

[8] testcontainers-go: https://golang.testcontainers.org/3X628資訊網——每日最新資訊28at.com

[9] testcontainers: https://testcontainers.com3X628資訊網——每日最新資訊28at.com

[10] Rust: https://tonybai.com/2023/02/22/rust-vs-go-in-2023/3X628資訊網——每日最新資訊28at.com

[11] testcontainers-go: https://github.com/testcontainers/testcontainers-go/3X628資訊網——每日最新資訊28at.com

[12] 以KRaft模式運行的Kafka容器才被首次引入testcontainers-go項目: https://github.com/testcontainers/testcontainers-go/pull/16103X628資訊網——每日最新資訊28at.com

[13] confluentinc/confluent-local:7.5.0: https://hub.docker.com/r/confluentinc/confluent-local3X628資訊網——每日最新資訊28at.com

[14] Confluent: https://www.confluent.io3X628資訊網——每日最新資訊28at.com

[15] confluent-local: https://hub.docker.com/r/confluentinc/confluent-local3X628資訊網——每日最新資訊28at.com

[16] confluentinc/cp-kafka鏡像: https://hub.docker.com/r/confluentinc/cp-kafka3X628資訊網——每日最新資訊28at.com

[17] 使用segmentio/kafka-go這個客戶端: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients3X628資訊網——每日最新資訊28at.com

[18] Go社區主流Kafka客戶端簡要對比: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients3X628資訊網——每日最新資訊28at.com

[19] 發現kafka-go的一個可能導致內存暴漲的問題: https://github.com/segmentio/kafka-go/pull/11173X628資訊網——每日最新資訊28at.com

[20] docker-compose來定制和啟停我們需要的kafka image: https://tonybai.com/2021/11/26/build-all-in-one-runtime-environment-with-docker-compose3X628資訊網——每日最新資訊28at.com

[21] testcontainers-go/wait: https://pkg.go.dev/github.com/testcontainers/testcontainers-go@v0.26.0/wait3X628資訊網——每日最新資訊28at.com

[22] 注冊了Cleanup函數: https://tonybai.com/2020/03/08/some-changes-in-go-1-14/3X628資訊網——每日最新資訊28at.com

[23] 這里: https://github.com/bigwhite/experiments/tree/master/unit-testing-deps-on-kafka3X628資訊網——每日最新資訊28at.com

[24] Gopher部落知識星球: https://public.zsxq.com/groups/512844588445443X628資訊網——每日最新資訊28at.com

[25] 鏈接地址: https://m.do.co/c/bff6eed926873X628資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-58960-0.html依賴Kafka的Go單元測試例解

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 請注意,你的 Pulsar 集群可能有刪除數據的風險

下一篇: 分享CodePen上六個酷炫demo特效

標簽:
  • 熱門焦點
  • 一加Ace2 Pro真機揭曉 鈦空灰配色質感拉滿

    終于,在經過了幾波預熱之后,一加Ace2 Pro的外觀真機圖在網上出現了。還是博主數碼閑聊站曝光的,這次的外觀設計還是延續了一加11的方案,只是細節上有了調整,例如新加入了鈦空灰
  • 2023年Q2用戶偏好榜:12+256G版本成新主流

    3月份的性能榜、性價比榜和好評榜之后,就要輪到2023年的第二季度偏好榜了,上半年的新機潮已經過去,最明顯的肯定就是大內存和存儲的機型了,另外部分中端機也取消了屏幕塑料支架
  • 十個可以手動編寫的 JavaScript 數組 API

    JavaScript 中有很多API,使用得當,會很方便,省力不少。 你知道它的原理嗎? 今天這篇文章,我們將對它們進行一次小總結。現在開始吧。1.forEach()forEach()用于遍歷數組接收一參
  • 使用LLM插件從命令行訪問Llama 2

    最近的一個大新聞是Meta AI推出了新的開源授權的大型語言模型Llama 2。這是一項非常重要的進展:Llama 2可免費用于研究和商業用途。(幾小時前,swyy發現它已從LLaMA 2更名為Lla
  • 新電商三兄弟,“抖快紅”成團!

    來源:價值研究所作 者:Hernanderz 隨著內容電商的概念興起,抖音、快手、小紅書組成的&ldquo;新電商三兄弟&rdquo;成為業內一股不可忽視的勢力,給阿里、京東、拼多多帶去了巨大壓
  • 網傳小米汽車開始篩選交付中心 建筑面積不低于3000平方米

    7月7日消息,近日有微博網友@長三角行健者爆料稱,據經銷商集團反饋,小米汽車目前已經開始了交付中心的篩選工作,要求候選場地至少有120個車位,建筑不能低
  • iQOO Neo8 Pro真機諜照曝光:天璣9200+和V1+旗艦雙芯加持

    去年10月,iQOO推出了iQOO Neo7系列機型,不僅搭載了天璣9000+,而且是同價位唯一一款天璣9000+直屏旗艦,一經上市便受到了用戶的廣泛關注。在時隔半年后,
  • 微軟發布Windows 11新版 引入全新任務欄狀態

    近日,微軟發布了Windows 11新版,而Build 22563更新主要引入了幾周前曝光的平板模式任務欄等,系統更流暢了。更新中,Windows 11加入了專門針對平板優化的任務欄
  • 最薄的14英寸游戲筆記本電腦 Alienware X14已可以購買

    2022年1月份在國際消費電子展(CES2022)上首次亮相的Alienware新品——Alienware X14現在已經可以購買了,這款筆記本電腦被譽為世界上最薄的 14 英寸游戲筆
Top 主站蜘蛛池模板: 吉林省| 瓮安县| 平邑县| 黔西| 阿巴嘎旗| 微山县| 留坝县| 疏勒县| 图木舒克市| 徐汇区| 旬阳县| 广安市| 嵊州市| 乐东| 许昌县| 社旗县| 惠来县| 洛川县| 华容县| 修文县| 甘南县| 鄂尔多斯市| 颍上县| 青河县| 永和县| 广河县| 昭通市| 佛坪县| 乌什县| 宣恩县| 潮州市| 姜堰市| 陇南市| 沾化县| 宣城市| 孝昌县| 南漳县| 和政县| 弋阳县| 武威市| 横山县|