日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

獲取雙異步返回值時,如何保證主線程不阻塞?

來源: 責編: 時間:2024-01-25 10:41:34 245觀看
導讀一、前情提要在上一篇文章中,使用雙異步后,如何保證數據一致性?,通過Future獲取異步返回值,輪詢判斷Future狀態,如果執行完畢或已取消,則通過get()獲取返回值,get()是阻塞的方法,因此會阻塞當前線程,如果通過new Runnable()執行

AMF28資訊網——每日最新資訊28at.com

一、前情提要

在上一篇文章中,使用雙異步后,如何保證數據一致性?,通過Future獲取異步返回值,輪詢判斷Future狀態,如果執行完畢或已取消,則通過get()獲取返回值,get()是阻塞的方法,因此會阻塞當前線程,如果通過new Runnable()執行get()方法,那么還是需要返回AsyncResult,然后再通過主線程去get()獲取異步線程返回結果。AMF28資訊網——每日最新資訊28at.com

寫法很繁瑣,還會阻塞主線程。AMF28資訊網——每日最新資訊28at.com

下面是FutureTask異步執行流程圖:AMF28資訊網——每日最新資訊28at.com

AMF28資訊網——每日最新資訊28at.com

二、JDK8的CompletableFuture

1、ForkJoinPool

Java8中引入了CompletableFuture,它實現了對Future的全面升級,可以通過回調的方式,獲取異步線程返回值。AMF28資訊網——每日最新資訊28at.com

CompletableFuture的異步執行通過ForkJoinPool實現, 它使用守護線程去執行任務。AMF28資訊網——每日最新資訊28at.com

ForkJoinPool在于可以充分利用多核CPU的優勢,把一個任務拆分成多個小任務,把多個小任務放到多個CPU上并行執行,當多個小任務執行完畢后,再將其執行結果合并起來。AMF28資訊網——每日最新資訊28at.com

Future的異步執行是通過ThreadPoolExecutor實現的。AMF28資訊網——每日最新資訊28at.com

AMF28資訊網——每日最新資訊28at.com

2、從ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的區別

  • ForkJoinPool中的每個線程都會有一個隊列,而ThreadPoolExecutor只有一個隊列,并根據queue類型不同,細分出各種線程池;
  • ForkJoinPool在使用過程中,會創建大量的子任務,會進行大量的gc,但是ThreadPoolExecutor不需要,因為ThreadPoolExecutor是任務分配平均的;
  • ThreadPoolExecutor中每個異步線程之間是相互獨立的,當執行速度快的線程執行完畢后,它就會一直處于空閑的狀態,等待其它線程執行完畢;
  • ForkJoinPool中每個異步線程之間并不是絕對獨立的,在ForkJoinPool線程池中會維護一個隊列來存放需要執行的任務,當線程自身任務執行完畢后,它會從其它線程中獲取未執行的任務并幫助它執行,直至所有線程執行完畢。

因此,在多線程任務分配不均時,ForkJoinPool的執行效率更高。但是,如果任務分配均勻,ThreadPoolExecutor的執行效率更高,因為ForkJoinPool會創建大量子任務,并對其進行大量的GC,比較耗時。AMF28資訊網——每日最新資訊28at.com

三、通過CompletableFuture優化 “通過Future獲取異步返回值”

1、通過Future獲取異步返回值關鍵代碼

(1)將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中

@Async("async-executor")public void readXls(String filePath, String filename) {    try {     // 此代碼為簡化關鍵性代碼        List<Future<Integer>> futureList = new ArrayList<>();        for (int time = 0; time < times; time++) {            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();            futureList.add(sumFuture);        }    }catch (Exception e){        logger.error("readXlsCacheAsync---插入數據異常:",e);    }}
@Async("async-executor")public Future<Integer> readXlsCacheAsync() {    try {        // 此代碼為簡化關鍵性代碼        return new AsyncResult<>(sum);    }catch (Exception e){        return new AsyncResult<>(0);    }}

(2)通過Future<Integer>.get()獲取返回值

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {    int[] futureSumArr = new int[futureList.size()];    for (int i = 0;i<futureList.size();i++) {        try {            Future<Integer> future = futureList.get(i);            while (true) {                if (future.isDone() && !future.isCancelled()) {                    Integer futureSum = future.get();                    logger.info("獲取Future返回值成功"+"----Future:" + future                            + ",Result:" + futureSum);                    futureSumArr[i] += futureSum;                    break;                } else {                    logger.info("Future正在執行---獲取Future返回值中---等待3秒");                    Thread.sleep(3000);                }            }        } catch (Exception e) {            logger.error("獲取Future返回值異常: ", e);        }    }        boolean insertFlag = getInsertSum(futureSumArr, excelRow);    logger.info("獲取所有異步線程Future的返回值成功,Excel插入結果="+insertFlag);    return insertFlag;}

2、通過CompletableFuture獲取異步返回值關鍵代碼

(1)將異步方法的返回值改為 int

@Async("async-executor")public void readXls(String filePath, String filename) { List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();    for (int time = 0; time < times; time++) {     // 此代碼為簡化關鍵性代碼        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {         @Override         public Integer get() {             return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();         }     }).thenApply((result) -> {// 回調方法         return thenApplyTest2(result);// supplyAsync返回值 * 1     }).thenApply((result) -> {         return thenApplyTest5(result);// thenApply返回值 * 1     }).exceptionally((e) -> { // 如果執行異常:         logger.error("CompletableFuture.supplyAsync----異常:", e);         return null;     });      completableFutureList.add(completableFuture);    }}
@Async("async-executor")public int readXlsCacheAsync() {    try {        // 此代碼為簡化關鍵性代碼        return sum;    }catch (Exception e){        return -1;    }}

(2)通過completableFuture.get()獲取返回值

public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){    logger.info("通過completableFuture.get()獲取每個異步線程的插入結果----開始");    int sum = 0;    for (int i = 0; i < list.size(); i++) {        Integer result = list.get(i).get();        sum += result;    }    boolean insertFlag = excelRow == sum;    logger.info("全部執行完畢,excelRow={},入庫={}, 數據是否一致={}",excelRow,sum,insertFlag);    return insertFlag;}

3、效率對比

(1)測試環境

  • 12個邏輯處理器的電腦;
  • Excel中包含10萬條數據;
  • Future的自定義線程池,核心線程數為24;
  • ForkJoinPool的核心線程數為24;

(2)統計四種情況下10萬數據入庫時間

  • 不獲取異步返回值
  • 通過Future獲取異步返回值
  • 通過CompletableFuture獲取異步返回值,默認ForkJoinPool線程池的核心線程數為本機邏輯處理器數量,測試電腦為12;
  • 通過CompletableFuture獲取異步返回值,修改ForkJoinPool線程池的核心線程數為24。

備注:因為CompletableFuture不阻塞主線程,主線程執行時間只有2秒,表格中統計的是異步線程全部執行完成的時間。AMF28資訊網——每日最新資訊28at.com

(3)設置核心線程數

將核心線程數CorePoolSize設置成CPU的處理器數量,是不是效率最高的?AMF28資訊網——每日最新資訊28at.com

// 獲取CPU的處理器數量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 測試電腦是24

因為在接口被調用后,開啟異步線程,執行入庫任務,因為測試機最多同時開啟24線程處理任務,故將10萬條數據拆分成等量的24份,也就是10萬/24 = 4166,那么我設置成4200,是不是效率最佳呢?AMF28資訊網——每日最新資訊28at.com

測試的過程中發現,好像真的是這樣的。AMF28資訊網——每日最新資訊28at.com

自定義ForkJoinPool線程池
@Autowired@Qualifier("asyncTaskExecutor")private Executor asyncTaskExecutor;@Overridepublic void readXls(String filePath, String filename) {  List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();    for (int time = 0; time < times; time++) {  CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {         @Override         public Integer get() {             try {                 return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);             } catch (Exception e) {                 logger.error("CompletableFuture----readXlsCacheAsync---異常:", e);                 return -1;             }         };     },asyncTaskExecutor);      completableFutureList.add(completableFuture); } // 不會阻塞主線程    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {        try {            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);        } catch (Exception ex) {            return;        }    });}
自定義線程池
/** * 自定義異步線程池 */@Bean("asyncTaskExecutor")public AsyncTaskExecutor asyncTaskExecutor() {    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();    //設置線程名稱    executor.setThreadNamePrefix("asyncTask-Executor");    //設置最大線程數    executor.setMaxPoolSize(200);    //設置核心線程數    executor.setCorePoolSize(24);    //設置線程空閑時間,默認60    executor.setKeepAliveSeconds(200);    //設置隊列容量    executor.setQueueCapacity(50);    /**     * 當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略     * 通常有以下四種策略:     * ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。     * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。     * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)     * ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復調用 execute() 方法,直到成功     */    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    executor.initialize();    return executor;}

AMF28資訊網——每日最新資訊28at.com

(4)統計分析

效率對比:AMF28資訊網——每日最新資訊28at.com

③通過CompletableFuture獲取異步返回值(12線程) <  ②通過Future獲取異步返回值 <  ④通過CompletableFuture獲取異步返回值(24線程) <  ①不獲取異步返回值AMF28資訊網——每日最新資訊28at.com

不獲取異步返回值時性能最優,這不廢話嘛~AMF28資訊網——每日最新資訊28at.com

核心線程數相同的情況下,CompletableFuture的入庫效率要優于Future的入庫效率,10萬條數據大概要快4秒鐘,這還是相當驚人的,優化的價值就在于此。AMF28資訊網——每日最新資訊28at.com

AMF28資訊網——每日最新資訊28at.com

四、通過CompletableFuture.allOf解決阻塞主線程問題

1、語法

CompletableFuture.allOf(CompletableFuture的可變數組).whenComplete((r,e) -> {})。AMF28資訊網——每日最新資訊28at.com

2、代碼實例

getCompletableFutureResult方法在 “3.2.2 通過completableFuture.get()獲取返回值”。AMF28資訊網——每日最新資訊28at.com

// 不會阻塞主線程CompletableFuture.allOf(completableFutureList.toArray(new   CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {    logger.info("全部執行完畢,解決主線程阻塞問題~");    try {        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);    } catch (Exception ex) {        logger.error("全部執行完畢,解決主線程阻塞問題,異常:", ex);        return;    }});// 會阻塞主線程//getCompletableFutureResult(completableFutureList, excelRow);logger.info("CompletableFuture----會阻塞主線程嗎?");

AMF28資訊網——每日最新資訊28at.com

五、CompletableFuture中花俏的語法糖

1、runAsync

runAsync 方法不支持返回值。AMF28資訊網——每日最新資訊28at.com

可以通過runAsync執行沒有返回值的異步方法。AMF28資訊網——每日最新資訊28at.com

不會阻塞主線程。AMF28資訊網——每日最新資訊28at.com

// 分批異步讀取Excel內容并入庫int finalEnd = end;CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();

2、supplyAsync

supplyAsync也可以異步處理任務,傳入的對象實現了Supplier接口。將Supplier作為參數并返回CompletableFuture結果值,這意味著它不接受任何輸入參數,而是將result作為輸出返回。AMF28資訊網——每日最新資訊28at.com

會阻塞主線程。AMF28資訊網——每日最新資訊28at.com

supplyAsync()方法關鍵代碼:AMF28資訊網——每日最新資訊28at.com

int finalEnd = end;CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {    @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }});
@Overridepublic int readXlsCacheAsyncMybatis() {    // 不為人知的操作    // 返回異步方法執行結果即可 return 100;}

六、順序執行異步任務

1、thenRun

thenRun()不接受參數,也沒有返回值,與runAsync()配套使用,恰到好處。AMF28資訊網——每日最新資訊28at.com

// JDK8的CompletableFutureCompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis()).thenRun(() -> logger.info("CompletableFuture----.thenRun()方法測試"));

AMF28資訊網——每日最新資訊28at.com

2、thenAccept

thenAccept()接受參數,沒有返回值。AMF28資訊網——每日最新資訊28at.com

supplyAsync + thenAcceptAMF28資訊網——每日最新資訊28at.com

  • 異步線程順序執行
  • supplyAsync的異步返回值,可以作為thenAccept的參數使用
  • 不會阻塞主線程
CompletableFuture.supplyAsync(new Supplier<Integer>() {    @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }}).thenAccept(x -> logger.info(".thenAccept()方法測試:" + x));

AMF28資訊網——每日最新資訊28at.com

但是,此時無法通過completableFuture.get()獲取supplyAsync的返回值了。AMF28資訊網——每日最新資訊28at.com

3、thenApply

thenApply在thenAccept的基礎上,可以再次通過completableFuture.get()獲取返回值。AMF28資訊網——每日最新資訊28at.com

supplyAsync + thenApply,典型的鏈式編程。AMF28資訊網——每日最新資訊28at.com

  • 異步線程內方法順序執行。
  • supplyAsync 的返回值,作為第 1 個thenApply的參數,進行業務處理。
  • 第 1 個thenApply的返回值,作為第 2 個thenApply的參數,進行業務處理。
  • 最后,通過future.get()方法獲取最終的返回值。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }}).thenApply((result) -> {    return thenApplyTest2(result);// supplyAsync返回值 * 2}).thenApply((result) -> {    return thenApplyTest5(result);// thenApply返回值 * 5});logger.info("readXlsCacheAsyncMybatis插入數據 * 2 * 5 = " + completableFuture.get());

AMF28資訊網——每日最新資訊28at.com

七、CompletableFuture合并任務

  • thenCombine,多個異步任務并行處理,有返回值,最后合并結果返回新的CompletableFuture對象。
  • thenAcceptBoth,多個異步任務并行處理,無返回值。
  • acceptEither,多個異步任務并行處理,無返回值。
  • applyToEither,,多個異步任務并行處理,有返回值。

CompletableFuture合并任務的代碼實例,這里就不多贅述了,一些語法糖而已,大家切記陷入低水平勤奮的怪圈。AMF28資訊網——每日最新資訊28at.com

八、CompletableFuture VS Future總結

本文中以下幾個方面對比了CompletableFuture和Future的差異:AMF28資訊網——每日最新資訊28at.com

  • ForkJoinPool和ThreadPoolExecutor的實現原理,探索了CompletableFuture和Future的差異。
  • 通過代碼實例的形式簡單介紹了CompletableFuture中花俏的語法糖。
  • 通過CompletableFuture優化了 “通過Future獲取異步返回值”。
  • 通過CompletableFuture.allOf解決阻塞主線程問題。

Future提供了異步執行的能力,但Future.get()會通過輪詢的方式獲取異步返回值,get()方法還會阻塞主線程。AMF28資訊網——每日最新資訊28at.com

輪詢的方式非常消耗CPU資源,阻塞的方式顯然與我們的異步初衷背道而馳。AMF28資訊網——每日最新資訊28at.com

JDK8提供的CompletableFuture實現了Future接口,添加了很多Future不具備的功能,比如鏈式編程、異常處理回調函數、獲取異步結果不阻塞不輪詢、合并異步任務等。AMF28資訊網——每日最新資訊28at.com

獲取異步線程結果后,我們可以通過添加事務的方式,實現Excel入庫操作的數據一致性。AMF28資訊網——每日最新資訊28at.com

異步多線程情況下如何實現事務?AMF28資訊網——每日最新資訊28at.com

有的小伙伴可能會說:AMF28資訊網——每日最新資訊28at.com

這還不簡單?添加@Transactional注解,如果發生異常或入庫數據量不符,直接回滾就可以了~AMF28資訊網——每日最新資訊28at.com

那么,真的是這樣嗎?我們下期見~AMF28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-67849-0.html獲取雙異步返回值時,如何保證主線程不阻塞?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Node問題:如何正確安裝nvm?Mac和Win雙教程!

下一篇: lowcode-cms開源社區源碼設計分享

標簽:
  • 熱門焦點
  • 8月總票房已突破10億!《封神》第一:口碑已經成了

    8月5日消息,據燈塔專業版數據,截至8月5日9時35分,8月總票房(含預售)已突破10億。其中,《封神》以大比分的優勢領先。根據官方消息,目前該片總票房已經超過14.
  • 三言兩語說透設計模式的藝術-單例模式

    寫在前面單例模式是一種常用的軟件設計模式,它所創建的對象只有一個實例,且該實例易于被外界訪問。單例對象由于只有一個實例,所以它可以方便地被系統中的其他對象共享,從而減少
  • 十個簡單但很有用的Python裝飾器

    裝飾器(Decorators)是Python中一種強大而靈活的功能,用于修改或增強函數或類的行為。裝飾器本質上是一個函數,它接受另一個函數或類作為參數,并返回一個新的函數或類。它們通常用
  • 虛擬鍵盤 API 的妙用

    你是否在遇到過這樣的問題:移動設備上有一個固定元素,當激活虛擬鍵盤時,該元素被隱藏在了鍵盤下方?多年來,這一直是 Web 上的默認行為,在本文中,我們將探討這個問題、為什么會發生
  • JVM優化:實戰OutOfMemoryError異常

    一、Java堆溢出堆內存中主要存放對象、數組等,只要不斷地創建這些對象,并且保證 GC Roots 到對象之間有可達路徑來避免垃 圾收集回收機制清除這些對象,當這些對象所占空間超過
  • 電視息屏休眠仍有網絡上傳 愛奇藝被質疑“薅消費者羊毛”

    記者丨寧曉敏 見習生丨汗青出品丨鰲頭財經(theSankei) 前不久,愛奇藝發布了一份亮眼的一季報,不僅營收和會員營收創造歷史最佳表現,其運營利潤也連續6個月實現增長。自去年年初
  • 自律,給不了Keep自由!

    來源 | 互聯網品牌官作者 | 李大為編排 | 又耳 審核 | 谷曉輝自律能不能給用戶自由暫時不好說,但大概率不能給Keep自由。近日,全球最大的在線健身平臺Keep正式登陸港交所,努力
  • 造車兩年股價跌六成,小米的估值邏輯變了嗎?

    如果從小米官宣造車后的首個交易日起持有小米集團的股票,那么截至2023年上半年最后一個交易日,投資者將浮虧59.16%,同區間的恒生科技指數跌幅為52.78%
  • 三星Galaxy Z Fold/Flip 5國行售價曝光 :最低7499元/12999元起

    據官方此前宣布,三星將于7月26日也就是明天在韓國首爾舉辦Unpacked活動,屆時將帶來帶來包括Galaxy Buds 3、Galaxy Watch 6、Galaxy Tab S9、Galaxy
Top 主站蜘蛛池模板: 威远县| 宁陵县| 米易县| 雅安市| 永宁县| 安远县| 台山市| 马边| 永安市| 九江市| 宜春市| 府谷县| 宣恩县| 汾西县| 永平县| 东辽县| 阿克苏市| 应城市| 尼木县| 高雄县| 包头市| 济南市| 宁远县| 平武县| 武安市| 朝阳市| 沛县| 乐山市| 牡丹江市| 金乡县| 洪湖市| 桐庐县| 台南市| 汽车| 乌什县| 通榆县| 疏附县| 北流市| 临武县| 县级市| 金溪县|