日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

如何使用Python、Apache Kafka和云平臺構建健壯的實時數據管道

來源: 責編: 時間:2024-01-26 09:02:43 235觀看
導讀譯者 | 李睿審校 | 重樓在當今競爭激烈的市場環境中,為了生存和發展,企業必須能夠實時收集、處理和響應數據。無論是檢測欺詐、個性化用戶體驗還是監控系統,現在都需要接近即時的數據。然而,構建和運行任務關鍵型實時數據

譯者 | 李睿zHC28資訊網——每日最新資訊28at.com

審校 | 重樓zHC28資訊網——每日最新資訊28at.com

在當今競爭激烈的市場環境中,為了生存和發展,企業必須能夠實時收集、處理和響應數據。無論是檢測欺詐、個性化用戶體驗還是監控系統,現在都需要接近即時的數據。zHC28資訊網——每日最新資訊28at.com

zHC28資訊網——每日最新資訊28at.com

然而,構建和運行任務關鍵型實時數據管道具有挑戰性。基礎設施必須具有容錯性、無限可擴展性,并與各種數據源和應用程序集成。這就是ApacheKafka、Python和云平臺的用武之地。zHC28資訊網——每日最新資訊28at.com

這個綜合指南中將介紹:zHC28資訊網——每日最新資訊28at.com

  • 概述Apache Kafka架構
  • 在云中運行Kafka集群
  • 使用Python構建實時數據管道
  • 使用PySpark進行擴展處理
  • 實際示例,例如用戶活動跟蹤、物聯網數據管道,并支持聊天分析

這里將包括大量的代碼片段、配置示例和文檔鏈接,以便獲得這些非常有用的技術的實踐經驗。zHC28資訊網——每日最新資訊28at.com

Apache Kafka架構介紹

Apache Kafka是一個分布式、分區、復制的提交日志,用于可靠且大規模地存儲數據流。Apache Kafka的核心是提供以下功能:zHC28資訊網——每日最新資訊28at.com

  • 發布-訂閱消息:Kafka允許廣播來自生產者的數據流,例如頁面瀏覽量、交易、用戶事件等,并支持消費者實時消費。
  • 消息存儲:Kafka在消息到達時將其持久保存在磁盤上,并在指定的時間內保留它們。消息通過指示日志中位置的偏移量來存儲和索引。
  • 容錯:數據在可配置數量的服務器上復制。如果一臺服務器宕機,另一臺服務器可以保證持續運行。
  • 橫向可擴展性:Kafka集群可以通過簡單地添加更多的服務器來彈性擴展。這允許無限的存儲和處理能力。

Kafka架構由以下主要組件組成:zHC28資訊網——每日最新資訊28at.com

(1)主題

消息被發布到名為“主題”的類別中。每個主題都充當消息提要或消息隊列。常見的場景是每個消息類型或數據流的一個主題。Kafka主題中的每條消息都有一個唯一的標識符,稱為偏移量,它代表了在主題中的位置。一個主題可以分為多個分區,這些分區是可以存儲在不同代理上的主題片段。分區允許Kafka通過在多個消費者之間分配負載來擴展和并行化數據處理。zHC28資訊網——每日最新資訊28at.com

(2)生產者

生產者是向Kafka主題發布消息的應用程序。它們連接到Kafka集群,序列化數據(例如JSON或Avro),分配一個密鑰,并將其發送到適當的主題。zHC28資訊網——每日最新資訊28at.com

例如,一個Web應用程序可以產生點擊流事件,或者一個移動應用程序可以產生使用統計。zHC28資訊網——每日最新資訊28at.com

(3)消費者

消費者從Kafka主題中讀取消息并進行處理。處理可能涉及解析數據、驗證、聚合、過濾、存儲到數據庫等。zHC28資訊網——每日最新資訊28at.com

消費者連接到Kafka集群,并訂閱一個或多個主題來獲取消息提要,然后根據用例需求進行處理。zHC28資訊網——每日最新資訊28at.com

(4)代理

這是一個Kafka服務器,它接收來自生產者的消息,分配偏移量,將消息提交到存儲中,并將數據提供給消費者。Kafka集群由多個代理組成,以實現可擴展性和容錯性。zHC28資訊網——每日最新資訊28at.com

(5)ZooKeeper

ZooKeeper處理代理之間的協調和共識,例如控制器選舉和主題配置。它維護Kafka操作所需的集群狀態和配置信息。zHC28資訊網——每日最新資訊28at.com

這涵蓋了Kafka的基礎知識。要深入了解,可以參考一些Kafka文檔。zHC28資訊網——每日最新資訊28at.com

以下了解如何通過在云中運行Kafka來簡化管理。zHC28資訊網——每日最新資訊28at.com

在云中運行Kafka

雖然Kafka具有高度可擴展性和可靠性,但它的運行涉及部署、基礎設施管理、監控、安全、故障處理、升級等方面的大量工作。zHC28資訊網——每日最新資訊28at.com

值得慶幸的是,Kafka現在是所有主要云計算提供商提供的完全托管服務:zHC28資訊網——每日最新資訊28at.com

服務zHC28資訊網——每日最新資訊28at.com

描述zHC28資訊網——每日最新資訊28at.com

定價zHC28資訊網——每日最新資訊28at.com

AWS MSKzHC28資訊網——每日最新資訊28at.com

在AWS上完全托管、高可用的Apache Kafka集群。處理基礎設施,擴展,安全,故障處理等。zHC28資訊網——每日最新資訊28at.com

基于代理的數量zHC28資訊網——每日最新資訊28at.com

Google Cloud Pub/SubzHC28資訊網——每日最新資訊28at.com

基于Kafka的無服務器實時消息服務。自動擴展,至少一次交付保證。zHC28資訊網——每日最新資訊28at.com

基于使用指標zHC28資訊網——每日最新資訊28at.com

Confluent CloudzHC28資訊網——每日最新資訊28at.com

完全管理的事件流平臺,由Apache Kafka提供支持。提供免費層。zHC28資訊網——每日最新資訊28at.com

基于功能的分層定價zHC28資訊網——每日最新資訊28at.com

Azure Event HubszHC28資訊網——每日最新資訊28at.com

Apache Kafka的高吞吐量事件攝取服務。與Azure數據服務的集成。zHC28資訊網——每日最新資訊28at.com

基于吞吐量單位zHC28資訊網——每日最新資訊28at.com

托管服務抽象了Kafka操作的復雜性,可以讓用戶專注數據管道。zHC28資訊網——每日最新資訊28at.com

接下來,將使用Python、Kafka和云平臺構建一個實時管道。也可以參考以下的指南作為另一個示例。zHC28資訊網——每日最新資訊28at.com

構建實時數據管道

Kafka的基本實時管道有兩個主要組件:向Kafka發布消息的生產者和訂閱主題并處理消息的消費者。zHC28資訊網——每日最新資訊28at.com

其架構遵循以下流程:zHC28資訊網——每日最新資訊28at.com

zHC28資訊網——每日最新資訊28at.com

為了進行簡化,將使用Confluent Kafka Python客戶端庫。zHC28資訊網——每日最新資訊28at.com

1. Python生產者

生產者應用程序從數據源收集數據并將其發布到Kafka主題。作為一個例子,假設有一個Python服務從一個Web應用程序收集用戶點擊流事件。zHC28資訊網——每日最新資訊28at.com

Web應用程序中,當用戶的行為像是頁面瀏覽或產品評級時,可以捕獲這些事件并將它們發送給Kafka。zHC28資訊網——每日最新資訊28at.com

可以抽象出Web應用程序如何收集數據的實現細節。zHC28資訊網——每日最新資訊28at.com

Python  from confluent_kafka import Producer import json # User event data event = { "timestamp": "2022-01-01T12:22:25",  "userid": "user123", "page": "/product123",  "action": "view" } # Convert to JSON event_json = json.dumps(event) # Kafka producer configuration  conf = { 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092', 'client.id': 'clickstream-producer'  } # Create producer instance producer = Producer(conf) # Publish event  producer.produce(topic='clickstream', value=event_json) # Flush and close producer producer.flush() producer.close()

這將事件發布到云托管Kafka集群上的clickstream主題。zHC28資訊網——每日最新資訊28at.com

Confluent_Kafka Python客戶端在將消息發送到Kafka之前使用內部緩沖區來批處理消息。與單獨發送每條消息相比,這提高了效率。zHC28資訊網——每日最新資訊28at.com

在默認情況下,消息會在緩沖區中累積,直到:zHC28資訊網——每日最新資訊28at.com

(1)已達到緩沖區大小限制(默認為32MB)。zHC28資訊網——每日最新資訊28at.com

(2)調用flush()方法。zHC28資訊網——每日最新資訊28at.com

當調用flush()時,緩沖區中的任何消息都會立即發送到Kafka代理。zHC28資訊網——每日最新資訊28at.com

如果不調用flush(),而是依賴于緩沖區大小限制,那么在下一次自動刷新之前,如果發生故障,就有丟失事件的風險。調用flush()能夠更好地控制最小化潛在的消息丟失。zHC28資訊網——每日最新資訊28at.com

但是,在每次生產后調用flush()會帶來額外的開銷。找到合適的緩沖配置取決于特定的可靠性需求和吞吐量需求。zHC28資訊網——每日最新資訊28at.com

可以在事件發生時不斷添加事件來構建實時流。這為下游數據消費者提供了連續的事件提要。zHC28資訊網——每日最新資訊28at.com

2.Python消費者

接下來,有一個消費者應用程序來從Kafka攝取事件并處理它們。zHC28資訊網——每日最新資訊28at.com

例如,可能想要解析事件,篩選特定的子類型,并驗證模式。zHC28資訊網——每日最新資訊28at.com

Python  from confluent_kafka import Consumer import json # Kafka consumer configuration conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',  'group.id': 'clickstream-processor', 'auto.offset.reset': 'earliest'} # Create consumer instance  consumer = Consumer(conf) # Subscribe to 'clickstream' topic consumer.subscribe(['clickstream']) # Poll Kafka for messages infinitely  while True: msg = consumer.poll(1.0) if msg is None: continue  # Parse JSON from message value event = json.loads(msg.value())  # Process event based on business logic if event['action'] == 'view': print('User viewed product page')  elif event['action'] == 'rating': # Validate rating, insert to DB etc  pass  print(event) # Print event   # Close consumer consumer.close()

這個輪詢clickstream主題以獲取新消息,使用它們,并根據事件類型采取行動——打印、更新數據庫等。zHC28資訊網——每日最新資訊28at.com

對于一個簡單的管道來說,這很有效。但如果每秒事件數增加100倍呢?消費者將無法跟上其增長。這就是像PySpark這樣的工具可以幫助擴展處理的地方。zHC28資訊網——每日最新資訊28at.com

3.使用PySpark進行擴展

PySpark為Apache Spark提供了一個Python APIApache Spark是一個為大規模數據處理優化的分布式計算框架。zHC28資訊網——每日最新資訊28at.com

使用PySpark,可以利用Spark的內存計算和并行執行來更快地使用Kafka流。zHC28資訊網——每日最新資訊28at.com

首先,將Kafka數據加載到DataFrame中,DataFrame可以使用Spark SQL或Python進行操作。zHC28資訊網——每日最新資訊28at.com

Python  from pyspark.sql import SparkSession # Initialize Spark session spark = SparkSession.builder / .appName('clickstream-consumer') / .getOrCreate() # Read stream from Kafka 'clickstream'  df = spark.readStream / .format("kafka") / .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") / .option("subscribe", "clickstream") / .load() # Parse JSON from value df = df.selectExpr("CAST(value AS STRING)") df = df.select(from_json(col("value"), schema).alias("data")) Next, we can express whatever processing logic we need using DataFrame transformations: from pyspark.sql.functions import * # Filter for 'page view' events  views = df.filter(col("data.action") == "view") # Count views per page URL  counts = views.groupBy(col("data.page")) .count() .orderBy("count") # Print the stream  query = counts.writeStream / .outputMode("complete") /  .format("console") / .start()   query.awaitTermination()

它利用Spark的分布式運行時,在數據流上實時應用過濾、聚合和排序等操作。zHC28資訊網——每日最新資訊28at.com

還可以使用多個消費者組并行化消費,并將輸出接收器寫入數據庫、云存儲等。zHC28資訊網——每日最新資訊28at.com

這允許在Kafka的數據上構建可擴展的流處理。zHC28資訊網——每日最新資訊28at.com

現在已經介紹了端到端管道,以下了解應用它的一些實際例。zHC28資訊網——每日最新資訊28at.com

實際用例

以下探索一些實際用例,在這些用例中,這些技術可以幫助大規模地處理大量實時數據。zHC28資訊網——每日最新資訊28at.com

1.用戶活動跟蹤

許多現代網絡和移動應用程序跟蹤用戶的行為,例如頁面瀏覽量、按鈕點擊、交易等,以收集使用情況分析。zHC28資訊網——每日最新資訊28at.com

(1)問題zHC28資訊網——每日最新資訊28at.com

  • 數據量可以隨著數百萬活躍用戶而大規模擴展。
  • 需要實時洞察以檢測問題并個性化內容。
  • 希望為歷史報表存儲匯總數據。

(2)解決方案zHC28資訊網——每日最新資訊28at.com

  • 使用Python或任何語言將點擊流事件攝取到Kafka主題中。
  • 使用PySpark進行清理、聚合和分析。
  • 將輸出保存到數據庫,例如Cassandra的儀表板。
  • 使用Spark ML實時警報檢測異常。

2.物聯網數據管道

物聯網傳感器產生大量的實時遙測數據,例如溫度、壓力、位置等。zHC28資訊網——每日最新資訊28at.com

(1)問題zHC28資訊網——每日最新資訊28at.com

  • 每秒產生數百萬個傳感器事件。
  • 需要清洗、改造、豐富。
  • 需要實時監控和歷史存儲。

(2)解決方案zHC28資訊網——每日最新資訊28at.com

  • 使用語言SDK收集Kafka主題中的傳感器數據。
  • 使用PySpark進行數據整理和連接外部數據。
  • 將數據流輸入機器學習模型進行實時預測。
  • 將聚合數據存儲在時間序列數據庫中以實現可視化。

3.客戶支持聊天分析zHC28資訊網——每日最新資訊28at.com

像Zendesk這樣的聊天平臺捕獲了大量的客戶支持對話。zHC28資訊網——每日最新資訊28at.com

(1)問題zHC28資訊網——每日最新資訊28at.com

  • 每月產生數百萬條聊天信息。
  • 需要了解客戶痛點和代理表現。
  • 必須發現負面情緒和緊急問題。

(2)解決方案zHC28資訊網——每日最新資訊28at.com

  • 使用連接器將聊天記錄導入Kafka主題。
  • 使用PySpark SQL和DataFrames進行聚合和處理。
  • 將數據輸入NLP模型,對情緒和意圖進行分類。
  • 存儲洞察到數據庫的歷史報告。
  • 為聯絡中心操作提供實時儀表板。

這個例演示了如何將這些技術應用于涉及大量快速移動數據的實際業務問題。zHC28資訊網——每日最新資訊28at.com

結論

綜上所述, Python、Kafka和云平臺為構建健壯的、可擴展的實時數據管道提供了一個很好的組合。zHC28資訊網——每日最新資訊28at.com

原文標題:Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud,作者:Dmitrii MitiaevzHC28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-68326-0.html如何使用Python、Apache Kafka和云平臺構建健壯的實時數據管道

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 接口響應以XML數據格式輸出,這些方法你都知道嗎?

下一篇: Spring實現Kafka重試Topic,真的太香了

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 寿光市| 吕梁市| 滦南县| 郯城县| 天峨县| 神农架林区| 敦煌市| 封开县| 桑日县| 册亨县| 河西区| 肥城市| 黄骅市| 宜州市| 厦门市| 平利县| 林甸县| 平陆县| 界首市| 博湖县| 怀宁县| 南平市| 突泉县| 宿迁市| 伊川县| 临清市| 巴南区| 双桥区| 卫辉市| 赤峰市| 修文县| 基隆市| 会泽县| 遵义县| 如东县| 唐山市| 禄丰县| 安新县| 梅州市| 甘肃省| 旬阳县|