本文將深入解析 CompletableFuture,希望對(duì)各位讀者能有所幫助。
CompletableFuture 適用于以下場(chǎng)景
下面是一個(gè)演示 CompletableFuture 如何使用的代碼示例:
public class CompletableFutureExample { public static void main(String[] args) { // 創(chuàng)建CompletableFuture對(duì)象,并定義異步任務(wù) CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)的邏輯代碼 // 在這里執(zhí)行耗時(shí)操作或其他需要異步執(zhí)行的任務(wù) try { TimeUnit.SECONDS.sleep(2); // 模擬耗時(shí)操作 } catch (InterruptedException e) { e.printStackTrace(); } return "Hello, "; }); // 添加任務(wù)完成后的回調(diào)方法 CompletableFuture<String> resultFuture = future.thenApplyAsync(result -> { // 任務(wù)完成后的處理邏輯 // result為上一步任務(wù)的結(jié)果 return result + "World!"; }); // 組合多個(gè)CompletableFuture對(duì)象 CompletableFuture<String> combinedFuture = future.thenCombine(resultFuture, (result1, result2) -> { // 對(duì)多個(gè)CompletableFuture的結(jié)果進(jìn)行組合處理 return result1 + result2 + " Welcome to the CompletableFuture world!"; }); // 異常處理 CompletableFuture<String> exceptionHandledFuture = combinedFuture.exceptionally(ex -> { // 異常處理邏輯 System.out.println("任務(wù)執(zhí)行出現(xiàn)異常:" + ex.getMessage()); return "Fallback Result"; }); // 等待并獲取任務(wù)的結(jié)果 try { String result = exceptionHandledFuture.get(); System.out.println("任務(wù)的最終結(jié)果為:" + result); } catch (InterruptedException | ExecutionException e) { // 處理異常情況 e.printStackTrace(); } }}
結(jié)果輸出:
任務(wù)的最終結(jié)果為:Hello, Hello, World! Welcome to the CompletableFuture world!
首先,我們創(chuàng)建了一個(gè)CompletableFuture對(duì)象future。在future中,我們使用supplyAsync方法定義了一個(gè)異步任務(wù),其中 lambda表達(dá)式 中的代碼會(huì)在另一個(gè)線程中執(zhí)行。在這個(gè)例子中,我們模擬了一個(gè)耗時(shí)操作,通過(guò)TimeUnit.SECONDS.sleep(2)暫停了2秒鐘。
然后,我們添加了一個(gè)回調(diào)方法resultFuture。在這個(gè)回調(diào)方法中,將前一個(gè)異步任務(wù)的結(jié)果作為參數(shù)進(jìn)行處理,并返回處理后的新結(jié)果。在這個(gè)例子中,我們將前一個(gè)任務(wù)的結(jié)果與字符串 "World!" 連接起來(lái),形成新的結(jié)果。
接下來(lái),我們使用thenCombine方法組合了兩個(gè)CompletableFuture對(duì)象:future和resultFuture。在這個(gè)組合任務(wù)中,我們將兩個(gè)任務(wù)的結(jié)果進(jìn)行組合處理,返回最終的結(jié)果。在這個(gè)例子中,我們將前兩個(gè)任務(wù)的結(jié)果與字符串 " Welcome to the CompletableFuture world!" 連接起來(lái)。
此外,我們還處理了異常情況。通過(guò)exceptionally方法,我們定義了一個(gè)異常處理回調(diào)方法。如果在任務(wù)執(zhí)行過(guò)程中發(fā)生了異常,我們可以在這里對(duì)異常進(jìn)行處理,并返回一個(gè)默認(rèn)值作為結(jié)果。
最后,我們使用get方法等待并獲取最終的任務(wù)結(jié)果。需要注意的是,get方法可能會(huì)阻塞當(dāng)前線程,直到任務(wù)完成并返回結(jié)果。在這個(gè)例子中,我們使用try-catch塊捕獲可能的異常情況,并打印出最終的任務(wù)結(jié)果。
這個(gè)例子只是部分展示了CompletableFuture的功能,實(shí)際上它比你想象的還要強(qiáng)大!
CompletableFuture 的源碼非常龐大和復(fù)雜,涉及到并發(fā)、線程池、同步機(jī)制等多方面的知識(shí)。在這里,我們只重點(diǎn)介紹 CompletableFuture 的核心實(shí)現(xiàn)原理。
圖片
CompletableFuture 的作者是大名鼎鼎的 Doug Lea。CompletableFuture 類是實(shí)現(xiàn)了 Future 和 CompletionStage 接口的一個(gè)關(guān)鍵類。它可以表示異步計(jì)算的結(jié)果,并提供了一系列方法來(lái)操作和處理這些結(jié)果。
CompletableFuture 內(nèi)部使用了一個(gè)屬性result來(lái)保存計(jì)算結(jié)果,以及若干個(gè)屬性waiters來(lái)保存等待結(jié)果的任務(wù)。當(dāng)計(jì)算完成后,CompletableFuture將會(huì)通知所有等待結(jié)果的任務(wù),并將結(jié)果傳遞給它們。
為了實(shí)現(xiàn)鏈?zhǔn)讲僮鳎珻ompletableFuture還定義了內(nèi)部類:Completion, UniCompletion, 和 BiCompletion。
Completion, UniCompletion, 和 BiCompletion 是 CompletableFuture 內(nèi)部用于處理異步任務(wù)完成的輔助類。
這些輔助類在 CompletableFuture 的內(nèi)部被使用,以實(shí)現(xiàn)異步任務(wù)的執(zhí)行、結(jié)果的處理和組合等操作。它們提供了一種靈活的方式來(lái)處理異步任務(wù)的完成情況,并通過(guò)回調(diào)方法或其他一些方法來(lái)處理任務(wù)的結(jié)果和異常。
圖片
CompletableFuture中包含兩個(gè)字段:result 和 stack。result 用于存儲(chǔ)當(dāng)前CF的結(jié)果,stack (Completion)表示當(dāng)前CF完成后需要觸發(fā)的依賴動(dòng)作(Dependency Actions),去觸發(fā)依賴它的CF的計(jì)算,依賴動(dòng)作可以有多個(gè)(表示有多個(gè)依賴它的CF),以棧(Treiber stack)的形式存儲(chǔ),stack表示棧頂元素。
CompletableFuture 在設(shè)計(jì)思想上類似 “觀察者模式,每個(gè) CompletableFuture 都可以被看作一個(gè)被觀察者,其內(nèi)部有一個(gè)Completion類型的鏈表成員變量stack,用來(lái)存儲(chǔ)注冊(cè)到其中的所有觀察者。當(dāng)被觀察者執(zhí)行完成后會(huì)彈棧stack屬性,依次通知注冊(cè)到其中的觀察者。
CompletableFuture 的執(zhí)行流程如下:
當(dāng)異步任務(wù)完成時(shí),它會(huì)設(shè)置自己的結(jié)果值,將狀態(tài)標(biāo)記為已完成。
如果有其他線程在此之前調(diào)用了complete()、completeExceptionally()、cancel()等方法,可能會(huì)影響任務(wù)的最終狀態(tài)。
請(qǐng)注意,以上步驟的順序和具體實(shí)現(xiàn)可能略有不同,但大致上反映了CompletableFuture的執(zhí)行流程。在實(shí)際應(yīng)用中,我們可以根據(jù)需求選擇適合的方法來(lái)處理異步任務(wù)的完成情況、結(jié)果、異常以及任務(wù)之間的關(guān)系。
CompletableFuture類提供了一系列用于處理和組合異步任務(wù)的方法。以下是這些方法的介紹:
創(chuàng)建一個(gè) CompletableFuture 對(duì)象有以下幾種方法:
CompletableFuture<String> future = new CompletableFuture<>();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return "Result";});CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 異步任務(wù)邏輯});
CompletableFuture<Integer> transformedFuture = originalFuture.thenApply(result -> { // 轉(zhuǎn)換邏輯 return result.length();});originalFuture.thenAccept(result -> { // 處理結(jié)果邏輯 System.out.println("Result: " + result);});CompletableFuture<Void> runnableFuture = originalFuture.thenRun(() -> { // 在結(jié)果完成后執(zhí)行的操作});
//CompletableFuture.completedFuture()直接創(chuàng)建一個(gè)已完成狀態(tài)的CompletableFutureCompletableFuture<String> cf2 = CompletableFuture.completedFuture("result");//先初始化一個(gè)未完成的CompletableFuture,然后通過(guò)complete()、completeExceptionally(),也完成該CompletableFutureCompletableFuture<String> cf = new CompletableFuture<>();cf.complete("success");
CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<Integer> future = stage.toCompletableFuture();
用于將當(dāng)前的 CompletionStage 對(duì)象轉(zhuǎn)換為一個(gè) CompletableFuture 對(duì)象。
以下是在 CompletableFuture 對(duì)象上異步執(zhí)行任務(wù)的一些方法示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return "Result";});
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 異步任務(wù)邏輯});
CompletableFuture提供了不同的方式來(lái)對(duì)異步任務(wù)進(jìn)行鏈?zhǔn)讲僮鳌?span style="display:none">igd28資訊網(wǎng)——每日最新資訊28at.com
CompletableFuture<Void> executedFuture = future.thenRun(() -> executeTask());
thenRun方法用于在CompletableFuture完成后執(zhí)行一個(gè)Runnable任務(wù)。它返回一個(gè)新的CompletableFuture對(duì)象,該對(duì)象沒(méi)有返回值。
CompletableFuture<Void> acceptedFuture = future.thenAccept(result -> processResult(result));
thenAccept方法用于在CompletableFuture完成后對(duì)結(jié)果進(jìn)行處理。它接收一個(gè)Consumer函數(shù)作為參數(shù),并返回一個(gè)新的CompletableFuture對(duì)象。
CompletableFuture<U> appliedFuture = future.thenApply(result -> transformResult(result));
thenApply方法用于在CompletableFuture完成后對(duì)結(jié)果進(jìn)行轉(zhuǎn)換。它接收一個(gè)Function函數(shù)作為參數(shù),并返回一個(gè)新的CompletableFuture對(duì)象。
CompletableFuture<U> composedFuture = future.thenCompose(result -> executeAnotherTask(result));
用于對(duì)異步任務(wù)的結(jié)果進(jìn)行處理,并返回一個(gè)新的異步任務(wù)。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<Void> whenCompleteFuture = future.whenComplete((result, exception) -> { if (exception != null) { System.out.println("Exception occurred: " + exception.getMessage()); } else { System.out.println("Result: " + result); }});whenCompleteFuture.join();
用于在異步任務(wù)完成后執(zhí)行指定的動(dòng)作。它允許你在任務(wù)完成時(shí)處理結(jié)果或處理異常。
CompletableFuture還提供了一系列方法來(lái)組合和處理多個(gè)異步任務(wù)的結(jié)果。
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);
allOf方法接收一組CompletableFuture對(duì)象作為參數(shù),并返回一個(gè)新的CompletableFuture對(duì)象,該對(duì)象在所有給定的CompletableFuture都完成時(shí)完成。這樣我們可以等待所有任務(wù)都完成后再進(jìn)行下一步操作。
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);
anyOf方法與allOf類似,不同之處在于它返回的CompletableFuture對(duì)象在任何一個(gè)給定的CompletableFuture完成時(shí)就完成。這樣我們可以獲取最先完成的任務(wù)的結(jié)果。
CompletableFuture<U> combinedFuture = future1.thenCombine(future2, (result1, result2) -> combineResults(result1, result2));
thenCombine方法接收兩個(gè)CompletableFuture對(duì)象和一個(gè)函數(shù)作為參數(shù),用于指定當(dāng)這兩個(gè)CompletableFuture都完成時(shí)如何處理它們的結(jié)果。返回的新的CompletableFuture對(duì)象將接收到計(jì)算后的結(jié)果。
CompletableFuture<U> resultFuture = future1.applyToEither(future2, result -> processResult(result));
applyToEither方法用于獲取兩個(gè)CompletableFuture中任意一個(gè)完成的結(jié)果,并對(duì)該結(jié)果進(jìn)行處理。它接收一個(gè)Function函數(shù)作為參數(shù),并返回一個(gè)新的CompletableFuture對(duì)象。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);future1.acceptEither(future2, result -> { System.out.println("Result: " + result);});
用于在兩個(gè) CompletableFuture 對(duì)象中任意一個(gè)完成時(shí)執(zhí)行指定的操作。該方法接收兩個(gè)參數(shù):另一個(gè) CompletableFuture 對(duì)象和一個(gè)消費(fèi)者函數(shù)(Consumer)。當(dāng)其中任何一個(gè) CompletableFuture 完成時(shí),將其結(jié)果作為參數(shù)傳遞給消費(fèi)者函數(shù)進(jìn)行處理。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<Void> combinedFuture = future1.runAfterBoth(future2, () -> { System.out.println("Both futures completed");});combinedFuture.join();
用于在兩個(gè)異步任務(wù)都完成后執(zhí)行指定的動(dòng)作,需要注意的是,runAfterBoth() 方法是一個(gè)非阻塞方法,動(dòng)作將在兩個(gè)異步任務(wù)都完成后立即執(zhí)行。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return 42;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello";});CompletableFuture<Void> eitherFuture = future1.runAfterEither(future2, () -> { System.out.println("One of the futures completed");});eitherFuture.join();
用于在兩個(gè)異步任務(wù)中任意一個(gè)完成后執(zhí)行指定的動(dòng)作。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<Void> thenAcceptBothFuture = future1.thenAcceptBoth(future2, (result1, result2) -> { System.out.println("Action executed with thenAcceptBoth(): " + result1 + ", " + result2);});thenAcceptBothFuture.join();
用于在兩個(gè)異步任務(wù)都完成后執(zhí)行指定的動(dòng)作。它的作用是接收兩個(gè)異步任務(wù)的結(jié)果,并將結(jié)果作為參數(shù)傳遞給指定的消費(fèi)者函數(shù)。
CompletableFuture提供了多種方式來(lái)處理異步任務(wù)的異常情況。
CompletableFuture<U> exceptionHandledFuture = future.exceptionally(ex -> handleException(ex));
通過(guò)exceptionally方法,我們可以對(duì)CompletableFuture的異常情況進(jìn)行處理。它接收一個(gè)Function函數(shù)作為參數(shù),用于處理異常并返回一個(gè)新的CompletableFuture對(duì)象。
CompletableFuture<U> handledFuture = future.handle((result, ex) -> handleResult(result, ex));
handle方法可以同時(shí)處理正常結(jié)果和異常情況。它接收一個(gè)BiFunction函數(shù)作為參數(shù),用于處理結(jié)果和異常,并返回一個(gè)新的CompletableFuture對(duì)象。
future.completeExceptionally();
異常地完成 CompletableFuture,將結(jié)果設(shè)置為一個(gè)異常。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Something went wrong");});boolean completedExceptionally = future.isCompletedExceptionally();System.out.println("Is completed exceptionally: " + completedExceptionally);
該方法返回一個(gè)布爾值,表示當(dāng)前異步任務(wù)是否已經(jīng)異常完成。
CompletableFuture<Integer> future = new CompletableFuture<>();future.obtrudeException(new RuntimeException("Something went wrong"));boolean completedExceptionally = future.isCompletedExceptionally();System.out.println("Is completed exceptionally: " + completedExceptionally);
用于強(qiáng)制將指定的異常作為異步任務(wù)的結(jié)果,調(diào)用 obtrudeException(Throwable ex) 方法后,異步任務(wù)將立即完成,并將指定的異常作為結(jié)果返回。
future.join()
join() 方法不會(huì)拋出已檢查異常,因?yàn)樗腔?nbsp;CompletableFuture 類設(shè)計(jì)的,如果異步任務(wù)拋出異常,join() 方法會(huì)將該異常包裝在 CompletionException 中并拋出。
future.get()
get() 方法會(huì)拋出一個(gè) InterruptedException 異常和一個(gè) ExecutionException 異常,前者表示獲取結(jié)果時(shí)被中斷,后者表示獲取結(jié)果時(shí)任務(wù)本身拋出了異常。
future.get(1,TimeUnit.Hours)
有異常則拋出異常,最長(zhǎng)等待一個(gè)小時(shí),一個(gè)小時(shí)之后,如果還沒(méi)有數(shù)據(jù),則異常。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return 42;});int result = future.getNow(0); // 獲取異步操作的結(jié)果,如果尚未完成,則返回默認(rèn)值0System.out.println("Result: " + result);
getNow(T value) 是 CompletableFuture 類的一個(gè)方法,用于獲取異步操作的結(jié)果,如果異步操作尚未完成,則返回給定的默認(rèn)值,該方法會(huì)立即返回結(jié)果,不會(huì)阻塞當(dāng)前線程。
CompletableFuture也支持超時(shí)控制和取消操作,以便更好地管理異步任務(wù)的執(zhí)行。
CompletableFuture<U> timeoutFuture = future.completeOnTimeout(defaultResult, timeout, timeUnit);
completeOnTimeout方法在指定的超時(shí)時(shí)間內(nèi)等待CompletableFuture的完成,如果超時(shí)則將其設(shè)置為默認(rèn)結(jié)果。它返回一個(gè)新的CompletableFuture對(duì)象。
boolean isCancelled = future.cancel(true);
cancel方法可用于取消CompletableFuture的執(zhí)行。它接收一個(gè)boolean參數(shù),指示是否中斷正在執(zhí)行的任務(wù)。返回值表示是否成功取消了任務(wù)。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return 42;});future.cancel(true); // 取消異步任務(wù)boolean isCancelled = future.isCancelled();System.out.println("Is cancelled: " + isCancelled);
isCancelled() 是 CompletableFuture 類的一個(gè)方法,用于判斷當(dāng)前異步任務(wù)是否已被取消。如果異步任務(wù)已被取消,則返回 true;否則返回 false。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);int numberOfDependents = combinedFuture.getNumberOfDependents();System.out.println("Number of dependents: " + numberOfDependents);
getNumberOfDependents() 用于獲取當(dāng)前 CompletableFuture 對(duì)象所依賴的其他異步任務(wù)的數(shù)量。如果沒(méi)有任何依賴任務(wù),或者所有依賴任務(wù)已經(jīng)完成,則返回的數(shù)量為0。
future.complete("米飯");
complete(T value):該方法返回布爾值,表示是否成功地將結(jié)果設(shè)置到 CompletableFuture 中。如果 CompletableFuture 未完成,則將結(jié)果設(shè)置,并返回 true;如果 CompletableFuture 已經(jīng)完成,則不進(jìn)行任何操作并返回 false。
CompletableFuture<Integer> future = new CompletableFuture<>();future.obtrudeValue(42);boolean completedNormally = future.isDone() && !future.isCompletedExceptionally();System.out.println("Is completed normally: " + completedNormally);
用于強(qiáng)制將指定的值作為異步任務(wù)的結(jié)果,調(diào)用 obtrudeValue(T value) 方法后,異步任務(wù)將立即完成,并將指定的值作為結(jié)果返回。
與 complete() 不同,obtrudeValue() 必須在任務(wù)已經(jīng)完成的情況下調(diào)用,否則會(huì)引發(fā) IllegalStateException 異常。并且complete() 方法對(duì)于已經(jīng)完成的任務(wù)會(huì)忽略額外的完成操作,并返回 false。而obtrudeValue() 方法即使任務(wù)已經(jīng)完成,仍然會(huì)強(qiáng)制使用新的結(jié)果值,并返回 true。
CompletableFuture<Integer> future = CompletableFuture.completedFuture(42);boolean done = future.isDone();System.out.println("Is done: " + done);
用于判斷當(dāng)前異步任務(wù)是否已經(jīng)完成(無(wú)論是正常完成還是異常完成)。
CompletableFuture也支持并發(fā)限制,以控制同時(shí)執(zhí)行的異步任務(wù)數(shù)量。
Executor executor = Executors.newFixedThreadPool(10);CompletableFuture<U> future = CompletableFuture.supplyAsync(() -> doSomething(), executor);
我們可以通過(guò)使用線程池來(lái)限制CompletableFuture的并發(fā)執(zhí)行數(shù)量。通過(guò)創(chuàng)建一個(gè)固定大小的線程池,并將其作為參數(shù)傳遞給CompletableFuture,就可以控制并發(fā)執(zhí)行任務(wù)的數(shù)量。
CompletableFuture類提供了許多方法,但實(shí)際上常用的方法只有幾個(gè)。為了方便記憶,以下是一些總結(jié)的規(guī)律:
掌握以上規(guī)律后,就可以基本記住大部分方法,剩下的其他方法可以單獨(dú)記憶。
本文詳細(xì)探討了 CompletableFuture 的原理和方法,學(xué)習(xí)了如何在任務(wù)完成后執(zhí)行操作、處理結(jié)果和轉(zhuǎn)換結(jié)果。
CompletableFuture是Java中強(qiáng)大的異步編程工具之一,合理利用它的方法和策略可以更好地處理異步任務(wù)和操作。
本文鏈接:http://www.www897cc.com/showinfo-26-60983-0.htmlCompletableFuture深度解析
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com
上一篇: 匯聚軟件測(cè)試領(lǐng)域最新實(shí)操經(jīng)驗(yàn),2024 Gtest峰會(huì)不能錯(cuò)過(guò)!