??先來看一個日常生活快遞寄件場景,從寄件人(寄件)到收件人(收件),全流程如下:
圖片
當你準備寄送一個包裹時,通常你可以有兩種寄件方式:
??方案一、你親自前往快遞服務點,填寫寄件單、交付包裹、等待工作人員處理,最后得到一張寄送單據。你必須在服務點等待直到所有步驟都完成。這個過程是同步的。
??方案二、你可以選擇在線預約快遞上門取件服務,填寫相關信息后,你的請求就被提交給系統。此時,你可以繼續進行其他事情,而不需要等待快遞員到達。系統會在后臺異步處理你的請求,安排合適的快遞員前來取件。這樣,你就可以在等待的過程中做其他事情,無需阻塞在快遞服務點。
??這種寄件方式提高了效率,讓用戶可以更加靈活地安排自己的時間。在后臺系統中,快遞公司可以通過合理的任務調度,處理多個異步請求,提高寄件服務的整體吞吐量。這種方式類似于在后端異步處理任務,而用戶無需等待任務完成,可以繼續進行其他操作,提高了整個寄件過程的并發性和響應性。這個過程就是異步。
我們通過這個例子抽象出同步模型和異步模型:
圖片
同步模型:一個任務做完做下一個任務,阻塞
異步模型:做當前任務,只需要開啟而不需要關心另一個任務如何執行,非阻塞
??有了上邊的模型,對于同步和異步的概念就有了初步的認識。事實上,在架構設計中,異步思想是指通過異步處理來提高系統的性能、可伸縮性和響應速度。
以下是SpringColud微服務架構的基本套件:
圖片
在架構設計中,異步思想可以應用在多個方面。常見的異步實踐包括:
......
??接下來,我們針對實際項目中的異步設計逐個探究。可能做不到面面俱到,但是可以為真實的場景中的方案設計打開思路。
??Spring Cloud Gateway基于Project Reactor反應式編程和WebFlux框架,通過路由、過濾器、事件等機制實現了靈活的網關服務。它適用于構建微服務架構中的業務網關,具有高性能、可擴展性和豐富的功能。
官網地址:https://spring.io/projects/spring-cloud-gateway/
圖片
性能比較
??對 Zuul/Spring Cloud Gateway 的一些性能分析可以參考 Spring Cloud Gateway 作者 Spencer Gibb 提供的項目:https://github.com/spencergibb/spring-cloud-gateway-bench。
圖片
摘自SpringCloud GateWay作者spencergibb 提供的一個壓測報告
??總的來說,Gateway在處理IO密集型請求場景下有著更大的優勢。原因是: 隨著Spring 5 推出的WebFlux,它是完全異步且非阻塞的,底層也是基于Netty實現的。我們分別對Reactor模型和Netty做一個簡單介紹。
圖片
??其中:mainReactor主要負責連接處理(不參與數據處理),而subReactor負責數據的讀取(不參與連接). 不再是單線程模型那樣,接收請求和處理數據都是在一個Reactor下進行。
核心主要是基于NIO的Netty框架,原理說明如下:
組件關系:
圖片
概念說明:
每個服務器中都會有一個 Boss(老板),會有一群做事情的WorkerBoss(員工) 會不停地接收新的連接,將連接分配給一個個 Worker 處理連接
執行過程:
圖片
Netty 執行過程:
關于SpringCloud GateWay的使用,請自行查閱官網。這里只介紹如何體現NIO異步非阻塞原理的。
場景分析:
??比如:商城首頁菜單樹。一般這種場景我們允許在一定時間數據不一致性。那么就可以使用定時任務+消息隊列。如每隔5分鐘同步一次,達到數據最終一致。
圖片
注意事項:
??這種數據同步方案主要適用于數據實時性要求不高的場景,因為:定時任務處理存在一定時間間隔,會有同步延時。同時在時間窗口期數據可能發生變更。還有就是數據最終一致性的保證,主要取決于MQ的可靠性。
場景分析:
??三方平臺交互,上游系統(A)的數據和下游系統(B)的數據進行接口規范轉化。此處可能涉及到很多業務轉到同一個平臺或者不同平臺。而我們接口轉化的功能是一致的。當然你可以使用Feign直接調用。但是流量增加、網絡阻塞時可能會出現調用失敗,導致未能成功送達下游。因此我們可以這樣設計:
注意事項:
??這種異步設計一方面為了系統內部服務之間解耦,另一方面起到了削峰填谷的作用。但是引入消息隊列和轉化服務,增加了系統的復雜性。因為鏈路較長,出現問題時排查起來比較困難。因此要在數據庫中盡可能存留記錄明細,方便審查。另外,也可能出現消息積壓等問題。當然這是消息隊列存在的共性問題。
場景分析:
??日常我們會遇到很多這種發短信的情況。比如,
......
那么對于短信場景,我們如何設計呢?
圖片
注意事項:這種異步設計一方面為了將發送短信的功能獨立出來。
場景分析:
??在業務系統中,一般我們會進行日志采集和可視化展示。ELK 是由 Elasticsearch、Logstash 和 Kibana 組成的一套日志管理和分析解決方案。結合 Kafka 使用時,通常用于搭建一個高效的日志處理系統。
圖片
ELK 工作流程并結合 Kafka 的工作流程描述:
Logstash 作為 Kafka 消費者,通過 Kafka Input 插件訂閱一個或多個 Kafka 主題。
Logstash 接收到 Kafka 中的日志消息后,可以進行多種操作,如解析日志、添加字段、過濾、轉換格式等。
Logstash 處理日志并發送到 Elasticsearch:
Logstash 通過 Elasticsearch Output 插件將處理后的日志數據發送到 Elasticsearch 集群。
Logstash 可以將日志數據根據配置的索引模式(Index Pattern)劃分到不同的索引中,以便更好地管理和查詢。
Elasticsearch 存儲和索引日志數據:
Elasticsearch 接收 Logstash 發送過來的日志數據,并將其存儲在分布式索引中。
Elasticsearch 提供了強大的全文搜索和分析功能,支持對大量的日志數據進行高效的查詢和分析。
Kibana 可視化和查詢:
Kibana 作為 Elasticsearch 的前端界面,提供了豐富的可視化工具和查詢界面。
用戶可以使用 Kibana 創建儀表板、圖表,執行復雜的查詢,實時監控日志數據等。
整個工作流程如下:
+----------------------+ +----------------------+ +----------------------+| Producer | ----> | Kafka | ----> | Logstash | | (Log Generator) | | (Message Broker) | | | +----------------------+ +----------------------+ +----------+-----------+ | | v +----------------------+ | Elasticsearch | | (Log Storage) | +----------------------+ | | v +----------------------+ | Kibana | | (Visualization Tool)| +----------------------+
注意事項:
??整個 ELK + Kafka 的架構可以幫助實現高效的日志收集、處理和可視化,適用于大規模分布式系統中的日志管理。
??當使用多線程和 CompletableFuture 來執行批處理任務時,可以通過將任務分成多個子任務,并使用 CompletableFuture 來異步執行這些子任務。主要思想如下:
圖片
假設我們有一個批處理任務,需要對一組數據進行處理:
import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;public class BatchProcessingExample { public static void main(String[] args) { // 模擬一組數據 List<Integer> data = generateData(10); // 定義線程池 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); // 將數據分成多個子任務進行處理 List<CompletableFuture<Void>> futures = new ArrayList<>(); int batchSize = 3; for (int i = 0; i < data.size(); i += batchSize) { List<Integer> batch = data.subList(i, Math.min(i + batchSize, data.size())); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 在這里執行批處理的具體邏輯 processBatch(batch); }, executor); futures.add(future); } // 等待所有子任務完成 CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); // 在所有子任務完成后關閉線程池 allOf.thenRun(executor::shutdown); try { // 等待所有任務完成 allOf.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } private static List<Integer> generateData(int size) { List<Integer> data = new ArrayList<>(); for (int i = 1; i <= size; i++) { data.add(i); } return data; } private static void processBatch(List<Integer> batch) { // 模擬批處理邏輯 for (Integer value : batch) { System.out.println(Thread.currentThread().getName() + " - Processing: " + value); } }}
??以上我們模擬了一組數據,然后將數據分成多個批次,每個批次使用 CompletableFuture 異步執行。CompletableFuture.allOf 用于等待所有子任務完成。
在這個示例中,主要體現了以下異步的思想和操作:
CompletableFuture.runAsync(() -> { // 執行異步任務的邏輯}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.thenRun(executor::shutdown);
??這些異步操作幫助提高程序的并發性和響應性,特別在處理批量任務時,可以更有效地利用系統資源。異步編程模型能夠允許程序在等待某些操作完成的同時繼續執行其他操作,從而提高系統的效率。
??異步設計在處理并發和提高系統性能方面具有優勢,但也帶來了一些可能的問題。以上提供的場景和方案僅供參考。使用過程中應當根據業務特征合理選擇具體方案。
本文鏈接:http://www.www897cc.com/showinfo-26-70427-0.html聊聊項目實戰中的異步設計
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
下一篇: Python 實現定時任務的九種方案