fork-join框架 fork-join提供了一种方法来将大任务拆分成小任务(fork),再将小任务合并(join)来完成计算。如果想完成这种递归计算需要扩展RecursiveTask<T>
类或者RecursiveAction
类(如果不需要返回值的话)。再覆盖compute()
方法,在compute()
方法中实现子任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class MyTask extends RecursiveTask <Integer > { private static final Integer ADJUST_VALUE = 10 ; private int begin; private int end; private int result; MyTask(int begin,int end){ this .begin = begin; this .end = end; } @Override protected Integer compute () { if ((end - begin) <= ADJUST_VALUE){ for (int i = begin; i <= end; i++){ result = result + i; } }else { int middle = (end + begin) / 2 ; MyTask t1 = new MyTask(begin,middle); MyTask t2 = new MyTask(middle+1 ,end); t1.fork(); t2.fork(); result = t1.join() + t2.join(); } return result; } } public class ForkJoinDemo { public static void main (String[] args) { MyTask myTask = new MyTask(0 ,100 ); ForkJoinPool threadPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask); System.out.println(forkJoinTask.invoke()); threadPool.shutdown(); } }
异步计算CompletableFuture CompletableFuture
类实现了Future
接口,一旦某个任务完成便会立刻获取任务的结果。CompletableFuture
可以采用两种方式完成:得到一个结果,或者捕获异常。要处理这两种情况需要使用whenComplete
方法。
1 2 CompletableFuture<T> whenComplete (BiConsumer<? super T,? super Throwable> action)
传两个参数,一个是正常时的参数,一个是异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); }); completableFuture.get(); CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 1024 ; }); completableFuture1.whenComplete((t,u) -> { System.out.println("t:" +t); System.out.println("u:" +u); }).exceptionally(f -> { System.out.println("Exception" +f.getMessage()); return 0 ; }).get(); } }
也可以直接使用exceptionally()
它接受一个函数式接口,它在出现异常时执行。
1 2 3 completableFuture1.exceptionally( fn -> { return 0 ; });
组合CompletableFuture 由于CompletableFuture
是异步的,所以如果有多个CompletableFuture
任务等待执行,那它们的执行顺序将是不可预知的,为了使这些异步任务按照一定顺序执行引入了如下方法:
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
执行两个动作并用给定函数组合结果
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
void版thenCombine()
runAfterBoth(CompletionStage<?> other, Runnable action)
两个都完成后执行runnable
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
得到其中一个结果时,传入给定的函数
示例:
1 2 3 4 5 CompletableFuture.completedFuture(url) .thenComposeAsync(fun,executor) .thenApply(fun) .thenCompose(fun) .thenAccept(fun);