fork-join框架

fork-join提供了一种方法来将大任务拆分成小任务(fork),再将小任务合并(join)来完成计算。如果想完成这种递归计算需要扩展RecursiveTask<T>类或者RecursiveAction类(如果不需要返回值的话)。再覆盖compute()方法,在compute()方法中实现子任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MyTask extends RecursiveTask<Integer>{
private static final Integer ADJUST_VALUE = 10;

private int begin;
private int end;
private int result;

MyTask(int begin,int end){
this.begin = begin;
this.end = end;
}

@Override
protected Integer compute() {
if((end - begin) <= ADJUST_VALUE){
for (int i = begin; i <= end; i++){
result = result + i;
}
}else {
int middle = (end + begin) / 2;
MyTask t1 = new MyTask(begin,middle);
MyTask t2 = new MyTask(middle+1,end);
t1.fork();
t2.fork();
result = t1.join() + t2.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) {
MyTask myTask = new MyTask(0,100);
ForkJoinPool threadPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask);
System.out.println(forkJoinTask.invoke());
threadPool.shutdown();
}
}

异步计算CompletableFuture

CompletableFuture类实现了Future接口,一旦某个任务完成便会立刻获取任务的结果。CompletableFuture可以采用两种方式完成:得到一个结果,或者捕获异常。要处理这两种情况需要使用whenComplete方法。

1
2
CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) 
//Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.

传两个参数,一个是正常时的参数,一个是异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @Author:Gao
* @Date:2020-07-27 00:16
*/
public class CompletableFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* CompletableFuture.runAsync():
* public static CompletableFuture<Void> runAsync(Runnable runnable) {
* return asyncRunStage(asyncPool, runnable);
* }
* runAsync()传的是Runnable没有返回值
*/
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
});
completableFuture.get();

/**
* 异步回调
* CompletableFuture.supplyAsync():
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
* return asyncSupplyStage(asyncPool, supplier);
* }
* supplyAsync()传的是个供给型接口有返回值
*/
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
// int age = 10/0;
return 1024;
});
completableFuture1.whenComplete((t,u) -> {
System.out.println("t:"+t);
System.out.println("u:"+u);
}).exceptionally(f -> {
System.out.println("Exception"+f.getMessage());
return 0;
}).get();
}
}

也可以直接使用exceptionally()它接受一个函数式接口,它在出现异常时执行。

1
2
3
completableFuture1.exceptionally( fn -> {
return 0;
});

组合CompletableFuture

由于CompletableFuture是异步的,所以如果有多个CompletableFuture任务等待执行,那它们的执行顺序将是不可预知的,为了使这些异步任务按照一定顺序执行引入了如下方法:

thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)执行两个动作并用给定函数组合结果

thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)void版thenCombine()

runAfterBoth(CompletionStage<?> other, Runnable action)两个都完成后执行runnable

applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)得到其中一个结果时,传入给定的函数

示例:

1
2
3
4
5
CompletableFuture.completedFuture(url)
.thenComposeAsync(fun,executor)
.thenApply(fun)
.thenCompose(fun)
.thenAccept(fun);