在上一篇文章中,使用雙異步后,如何保證數據一致性?,通過Future獲取異步返回值,輪詢判斷Future狀態,如果執行完畢或已取消,則通過get()獲取返回值,get()是阻塞的方法,因此會阻塞當前線程,如果通過new Runnable()執行get()方法,那么還是需要返回AsyncResult,然后再通過主線程去get()獲取異步線程返回結果。
寫法很繁瑣,還會阻塞主線程。
下面是FutureTask異步執行流程圖:
Java8中引入了CompletableFuture,它實現了對Future的全面升級,可以通過回調的方式,獲取異步線程返回值。
CompletableFuture的異步執行通過ForkJoinPool實現, 它使用守護線程去執行任務。
ForkJoinPool在于可以充分利用多核CPU的優勢,把一個任務拆分成多個小任務,把多個小任務放到多個CPU上并行執行,當多個小任務執行完畢后,再將其執行結果合并起來。
Future的異步執行是通過ThreadPoolExecutor實現的。
因此,在多線程任務分配不均時,ForkJoinPool的執行效率更高。但是,如果任務分配均勻,ThreadPoolExecutor的執行效率更高,因為ForkJoinPool會創建大量子任務,并對其進行大量的GC,比較耗時。
@Async("async-executor")public void readXls(String filePath, String filename) { try { // 此代碼為簡化關鍵性代碼 List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync(); futureList.add(sumFuture); } }catch (Exception e){ logger.error("readXlsCacheAsync---插入數據異常:",e); }}
@Async("async-executor")public Future<Integer> readXlsCacheAsync() { try { // 此代碼為簡化關鍵性代碼 return new AsyncResult<>(sum); }catch (Exception e){ return new AsyncResult<>(0); }}
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) { int[] futureSumArr = new int[futureList.size()]; for (int i = 0;i<futureList.size();i++) { try { Future<Integer> future = futureList.get(i); while (true) { if (future.isDone() && !future.isCancelled()) { Integer futureSum = future.get(); logger.info("獲取Future返回值成功"+"----Future:" + future + ",Result:" + futureSum); futureSumArr[i] += futureSum; break; } else { logger.info("Future正在執行---獲取Future返回值中---等待3秒"); Thread.sleep(3000); } } } catch (Exception e) { logger.error("獲取Future返回值異常: ", e); } } boolean insertFlag = getInsertSum(futureSumArr, excelRow); logger.info("獲取所有異步線程Future的返回值成功,Excel插入結果="+insertFlag); return insertFlag;}
@Async("async-executor")public void readXls(String filePath, String filename) { List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>(); for (int time = 0; time < times; time++) { // 此代碼為簡化關鍵性代碼 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return readExcelDbJdk8Service.readXlsCacheAsyncMybatis(); } }).thenApply((result) -> {// 回調方法 return thenApplyTest2(result);// supplyAsync返回值 * 1 }).thenApply((result) -> { return thenApplyTest5(result);// thenApply返回值 * 1 }).exceptionally((e) -> { // 如果執行異常: logger.error("CompletableFuture.supplyAsync----異常:", e); return null; }); completableFutureList.add(completableFuture); }}
@Async("async-executor")public int readXlsCacheAsync() { try { // 此代碼為簡化關鍵性代碼 return sum; }catch (Exception e){ return -1; }}
public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){ logger.info("通過completableFuture.get()獲取每個異步線程的插入結果----開始"); int sum = 0; for (int i = 0; i < list.size(); i++) { Integer result = list.get(i).get(); sum += result; } boolean insertFlag = excelRow == sum; logger.info("全部執行完畢,excelRow={},入庫={}, 數據是否一致={}",excelRow,sum,insertFlag); return insertFlag;}
備注:因為CompletableFuture不阻塞主線程,主線程執行時間只有2秒,表格中統計的是異步線程全部執行完成的時間。
將核心線程數CorePoolSize設置成CPU的處理器數量,是不是效率最高的?
// 獲取CPU的處理器數量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 測試電腦是24
因為在接口被調用后,開啟異步線程,執行入庫任務,因為測試機最多同時開啟24線程處理任務,故將10萬條數據拆分成等量的24份,也就是10萬/24 = 4166,那么我設置成4200,是不是效率最佳呢?
測試的過程中發現,好像真的是這樣的。
@Autowired@Qualifier("asyncTaskExecutor")private Executor asyncTaskExecutor;@Overridepublic void readXls(String filePath, String filename) { List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>(); for (int time = 0; time < times; time++) { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { try { return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder); } catch (Exception e) { logger.error("CompletableFuture----readXlsCacheAsync---異常:", e); return -1; } }; },asyncTaskExecutor); completableFutureList.add(completableFuture); } // 不會阻塞主線程 CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> { try { int insertSum = getCompletableFutureResult(completableFutureList, excelRow); } catch (Exception ex) { return; } });}
/** * 自定義異步線程池 */@Bean("asyncTaskExecutor")public AsyncTaskExecutor asyncTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //設置線程名稱 executor.setThreadNamePrefix("asyncTask-Executor"); //設置最大線程數 executor.setMaxPoolSize(200); //設置核心線程數 executor.setCorePoolSize(24); //設置線程空閑時間,默認60 executor.setKeepAliveSeconds(200); //設置隊列容量 executor.setQueueCapacity(50); /** * 當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復調用 execute() 方法,直到成功 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor;}
效率對比:
③通過CompletableFuture獲取異步返回值(12線程) < ②通過Future獲取異步返回值 < ④通過CompletableFuture獲取異步返回值(24線程) < ①不獲取異步返回值
不獲取異步返回值時性能最優,這不廢話嘛~
核心線程數相同的情況下,CompletableFuture的入庫效率要優于Future的入庫效率,10萬條數據大概要快4秒鐘,這還是相當驚人的,優化的價值就在于此。
CompletableFuture.allOf(CompletableFuture的可變數組).whenComplete((r,e) -> {})。
getCompletableFutureResult方法在 “3.2.2 通過completableFuture.get()獲取返回值”。
// 不會阻塞主線程CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> { logger.info("全部執行完畢,解決主線程阻塞問題~"); try { int insertSum = getCompletableFutureResult(completableFutureList, excelRow); } catch (Exception ex) { logger.error("全部執行完畢,解決主線程阻塞問題,異常:", ex); return; }});// 會阻塞主線程//getCompletableFutureResult(completableFutureList, excelRow);logger.info("CompletableFuture----會阻塞主線程嗎?");
runAsync 方法不支持返回值。
可以通過runAsync執行沒有返回值的異步方法。
不會阻塞主線程。
// 分批異步讀取Excel內容并入庫int finalEnd = end;CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
supplyAsync也可以異步處理任務,傳入的對象實現了Supplier接口。將Supplier作為參數并返回CompletableFuture結果值,這意味著它不接受任何輸入參數,而是將result作為輸出返回。
會阻塞主線程。
supplyAsync()方法關鍵代碼:
int finalEnd = end;CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return readExcelDbJdk8Service.readXlsCacheAsyncMybatis(); }});
@Overridepublic int readXlsCacheAsyncMybatis() { // 不為人知的操作 // 返回異步方法執行結果即可 return 100;}
thenRun()不接受參數,也沒有返回值,與runAsync()配套使用,恰到好處。
// JDK8的CompletableFutureCompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis()).thenRun(() -> logger.info("CompletableFuture----.thenRun()方法測試"));
thenAccept()接受參數,沒有返回值。
supplyAsync + thenAccept
CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return readExcelDbJdk8Service.readXlsCacheAsyncMybatis(); }}).thenAccept(x -> logger.info(".thenAccept()方法測試:" + x));
但是,此時無法通過completableFuture.get()獲取supplyAsync的返回值了。
thenApply在thenAccept的基礎上,可以再次通過completableFuture.get()獲取返回值。
supplyAsync + thenApply,典型的鏈式編程。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { return readExcelDbJdk8Service.readXlsCacheAsyncMybatis(); }}).thenApply((result) -> { return thenApplyTest2(result);// supplyAsync返回值 * 2}).thenApply((result) -> { return thenApplyTest5(result);// thenApply返回值 * 5});logger.info("readXlsCacheAsyncMybatis插入數據 * 2 * 5 = " + completableFuture.get());
CompletableFuture合并任務的代碼實例,這里就不多贅述了,一些語法糖而已,大家切記陷入低水平勤奮的怪圈。
本文中以下幾個方面對比了CompletableFuture和Future的差異:
Future提供了異步執行的能力,但Future.get()會通過輪詢的方式獲取異步返回值,get()方法還會阻塞主線程。
輪詢的方式非常消耗CPU資源,阻塞的方式顯然與我們的異步初衷背道而馳。
JDK8提供的CompletableFuture實現了Future接口,添加了很多Future不具備的功能,比如鏈式編程、異常處理回調函數、獲取異步結果不阻塞不輪詢、合并異步任務等。
獲取異步線程結果后,我們可以通過添加事務的方式,實現Excel入庫操作的數據一致性。
異步多線程情況下如何實現事務?
有的小伙伴可能會說:
這還不簡單?添加@Transactional注解,如果發生異常或入庫數據量不符,直接回滾就可以了~
那么,真的是這樣嗎?我們下期見~
本文鏈接:http://www.www897cc.com/showinfo-26-67849-0.html獲取雙異步返回值時,如何保證主線程不阻塞?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com