譯者 | 李睿
審校 | 重樓
在當今競爭激烈的市場環境中,為了生存和發展,企業必須能夠實時收集、處理和響應數據。無論是檢測欺詐、個性化用戶體驗還是監控系統,現在都需要接近即時的數據。
然而,構建和運行任務關鍵型實時數據管道具有挑戰性。基礎設施必須具有容錯性、無限可擴展性,并與各種數據源和應用程序集成。這就是ApacheKafka、Python和云平臺的用武之地。
這個綜合指南中將介紹:
這里將包括大量的代碼片段、配置示例和文檔鏈接,以便獲得這些非常有用的技術的實踐經驗。
Apache Kafka是一個分布式、分區、復制的提交日志,用于可靠且大規模地存儲數據流。Apache Kafka的核心是提供以下功能:
Kafka架構由以下主要組件組成:
消息被發布到名為“主題”的類別中。每個主題都充當消息提要或消息隊列。常見的場景是每個消息類型或數據流的一個主題。Kafka主題中的每條消息都有一個唯一的標識符,稱為偏移量,它代表了在主題中的位置。一個主題可以分為多個分區,這些分區是可以存儲在不同代理上的主題片段。分區允許Kafka通過在多個消費者之間分配負載來擴展和并行化數據處理。
生產者是向Kafka主題發布消息的應用程序。它們連接到Kafka集群,序列化數據(例如JSON或Avro),分配一個密鑰,并將其發送到適當的主題。
例如,一個Web應用程序可以產生點擊流事件,或者一個移動應用程序可以產生使用統計。
消費者從Kafka主題中讀取消息并進行處理。處理可能涉及解析數據、驗證、聚合、過濾、存儲到數據庫等。
消費者連接到Kafka集群,并訂閱一個或多個主題來獲取消息提要,然后根據用例需求進行處理。
這是一個Kafka服務器,它接收來自生產者的消息,分配偏移量,將消息提交到存儲中,并將數據提供給消費者。Kafka集群由多個代理組成,以實現可擴展性和容錯性。
ZooKeeper處理代理之間的協調和共識,例如控制器選舉和主題配置。它維護Kafka操作所需的集群狀態和配置信息。
這涵蓋了Kafka的基礎知識。要深入了解,可以參考一些Kafka文檔。
以下了解如何通過在云中運行Kafka來簡化管理。
雖然Kafka具有高度可擴展性和可靠性,但它的運行涉及部署、基礎設施管理、監控、安全、故障處理、升級等方面的大量工作。
值得慶幸的是,Kafka現在是所有主要云計算提供商提供的完全托管服務:
服務 | 描述 | 定價 |
AWS MSK | 在AWS上完全托管、高可用的Apache Kafka集群。處理基礎設施,擴展,安全,故障處理等。 | 基于代理的數量 |
Google Cloud Pub/Sub | 基于Kafka的無服務器實時消息服務。自動擴展,至少一次交付保證。 | 基于使用指標 |
Confluent Cloud | 完全管理的事件流平臺,由Apache Kafka提供支持。提供免費層。 | 基于功能的分層定價 |
Azure Event Hubs | Apache Kafka的高吞吐量事件攝取服務。與Azure數據服務的集成。 | 基于吞吐量單位 |
托管服務抽象了Kafka操作的復雜性,可以讓用戶專注數據管道。
接下來,將使用Python、Kafka和云平臺構建一個實時管道。也可以參考以下的指南作為另一個示例。
Kafka的基本實時管道有兩個主要組件:向Kafka發布消息的生產者和訂閱主題并處理消息的消費者。
其架構遵循以下流程:
為了進行簡化,將使用Confluent Kafka Python客戶端庫。
生產者應用程序從數據源收集數據并將其發布到Kafka主題。作為一個例子,假設有一個Python服務從一個Web應用程序收集用戶點擊流事件。
在Web應用程序中,當用戶的行為像是頁面瀏覽或產品評級時,可以捕獲這些事件并將它們發送給Kafka。
可以抽象出Web應用程序如何收集數據的實現細節。
Python from confluent_kafka import Producer import json # User event data event = { "timestamp": "2022-01-01T12:22:25", "userid": "user123", "page": "/product123", "action": "view" } # Convert to JSON event_json = json.dumps(event) # Kafka producer configuration conf = { 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'client.id': 'clickstream-producer' } # Create producer instance producer = Producer(conf) # Publish event producer.produce(topic='clickstream', value=event_json) # Flush and close producer producer.flush() producer.close()
這將事件發布到云托管Kafka集群上的clickstream主題。
Confluent_Kafka Python客戶端在將消息發送到Kafka之前使用內部緩沖區來批處理消息。與單獨發送每條消息相比,這提高了效率。
在默認情況下,消息會在緩沖區中累積,直到:
(1)已達到緩沖區大小限制(默認為32MB)。
(2)調用flush()方法。
當調用flush()時,緩沖區中的任何消息都會立即發送到Kafka代理。
如果不調用flush(),而是依賴于緩沖區大小限制,那么在下一次自動刷新之前,如果發生故障,就有丟失事件的風險。調用flush()能夠更好地控制最小化潛在的消息丟失。
但是,在每次生產后調用flush()會帶來額外的開銷。找到合適的緩沖配置取決于特定的可靠性需求和吞吐量需求。
可以在事件發生時不斷添加事件來構建實時流。這為下游數據消費者提供了連續的事件提要。
接下來,有一個消費者應用程序來從Kafka攝取事件并處理它們。
例如,可能想要解析事件,篩選特定的子類型,并驗證模式。
Python from confluent_kafka import Consumer import json # Kafka consumer configuration conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'group.id': 'clickstream-processor', 'auto.offset.reset': 'earliest'} # Create consumer instance consumer = Consumer(conf) # Subscribe to 'clickstream' topic consumer.subscribe(['clickstream']) # Poll Kafka for messages infinitely while True: msg = consumer.poll(1.0) if msg is None: continue # Parse JSON from message value event = json.loads(msg.value()) # Process event based on business logic if event['action'] == 'view': print('User viewed product page') elif event['action'] == 'rating': # Validate rating, insert to DB etc pass print(event) # Print event # Close consumer consumer.close()
這個輪詢clickstream主題以獲取新消息,使用它們,并根據事件類型采取行動——打印、更新數據庫等。
對于一個簡單的管道來說,這很有效。但如果每秒事件數增加100倍呢?消費者將無法跟上其增長。這就是像PySpark這樣的工具可以幫助擴展處理的地方。
PySpark為Apache Spark提供了一個Python API,Apache Spark是一個為大規模數據處理優化的分布式計算框架。
使用PySpark,可以利用Spark的內存計算和并行執行來更快地使用Kafka流。
首先,將Kafka數據加載到DataFrame中,DataFrame可以使用Spark SQL或Python進行操作。
Python from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder / .appName('clickstream-consumer') / .getOrCreate() # Read stream from Kafka 'clickstream' df = spark.readStream / .format("kafka") / .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") / .option("subscribe", "clickstream") / .load() # Parse JSON from value df = df.selectExpr("CAST(value AS STRING)") df = df.select(from_json(col("value"), schema).alias("data")) Next, we can express whatever processing logic we need using DataFrame transformations: from pyspark.sql.functions import * # Filter for 'page view' events views = df.filter(col("data.action") == "view") # Count views per page URL counts = views.groupBy(col("data.page")) .count() .orderBy("count") # Print the stream query = counts.writeStream / .outputMode("complete") / .format("console") / .start() query.awaitTermination()
它利用Spark的分布式運行時,在數據流上實時應用過濾、聚合和排序等操作。
還可以使用多個消費者組并行化消費,并將輸出接收器寫入數據庫、云存儲等。
這允許在Kafka的數據上構建可擴展的流處理。
現在已經介紹了端到端管道,以下了解應用它的一些實際用例。
以下探索一些實際用例,在這些用例中,這些技術可以幫助大規模地處理大量實時數據。
許多現代網絡和移動應用程序跟蹤用戶的行為,例如頁面瀏覽量、按鈕點擊、交易等,以收集使用情況分析。
(1)問題
(2)解決方案
物聯網傳感器產生大量的實時遙測數據,例如溫度、壓力、位置等。
(1)問題
(2)解決方案
3.客戶支持聊天分析
像Zendesk這樣的聊天平臺捕獲了大量的客戶支持對話。
(1)問題
(2)解決方案
這個用例演示了如何將這些技術應用于涉及大量快速移動數據的實際業務問題。
綜上所述, Python、Kafka和云平臺為構建健壯的、可擴展的實時數據管道提供了一個很好的組合。
原文標題:Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud,作者:Dmitrii Mitiaev
本文鏈接:http://www.www897cc.com/showinfo-26-68326-0.html如何使用Python、Apache Kafka和云平臺構建健壯的實時數據管道
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com