线程补充
线程补充
CompletableFuture
Future的问题
- 阻塞
- 不能合并结果
- 不能异常处理
- 不能回调
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法
实例化
supply 开头:这种方法,可以返回异步线程执行之后的结果
run 开头:这种不会返回结果,就只是执行线程任务
没有指定Executor的方法会使用
ForkJoinPool.commonPool()作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行
// 带返回值,泛型就是返回值类型
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
// 不带返回值
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);j
获取结果
get() 方法同样会阻塞直到任务完成
- getNow的参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值
join() 与get() 区别在于join() 返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
处理结果1 -- complete、exceptionally
//可以处理异常,无返回结果
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//可以处理异常,有返回结果(必须return),作为出现异常时的默认返回结果
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
public class ThreadTest {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 0;
return i;
}, executor).whenComplete((result,exception)->{
//虽然能得到异常信息,但是没法修改返回数据
System.out.println("异步任务完成了...结果是"+result+";异常是"+exception);
//可以感知异常,同时返回默认值
}).exceptionally(throwable -> {
return 10;
});
Integer integer = future.get();
System.out.println("main.............end......."+integer);
}
异步回调 -- thenApply、thenAccept、thenRun
Function参数是一个函数时接口, 入参是上次返回结果, 且有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Consumer参数是一个函数时接口, 只有入参, 无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
没有入参, 感知任务的完成, 通知或者做相应处理
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
提示
一般而言,commonPool为了提高性能,并不会立马收回线程,thenApply中回调任务和supplyAsync中的异步任务使用的是同一个线程
这里存在一个特殊情况,即如果supplyAsync中的任务是立即返回结果(不是耗时的任务),那么thenApply 回调任务也会在主线程执行
要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()回调,那么它将在从ForkJoinPool.commonPool()获取另一个线程执行(概率获取),一般情况是接着使用上一次的线程
如果将Executor传递给thenApplyAsync()回调,则该回调的异步任务将在从Excutor的线程池中获取的线程中执行
异步编排 -- thenCompose、thenCombine
thenCompose --- 编排2个依赖关系的异步任务
public CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn)
public CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn)
public CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
提示
thenApply是为了应用异步任务的结果, 是对结果的链式转化
thenCompose是为了异步任务的链式调用, 将结果传递给另一个异步任务时不出现层层的嵌套
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10)
.thenApply(result -> "Result: " + result);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10)
.thenCompose(result -> CompletableFuture.supplyAsync(() -> "Result: " + result));
thenCombine --- 编排2个非依赖关系的异步任务
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
处理结果3 -- handle
handle 和 thenApply 方法处理方式基本一样
唯一的不同是参数BiFunction(上次返回结果,上次异常),handle方法会给出异常,可以在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理
如果不想每个链式调用都处理异常,那就使用apply
handle 与 complete 区别:handle有返回结果,complete无返回结果
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Execut
/**
* 方法执行完成后的处理
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 5;
return i;
}, executor).handle((result,exception)->{
if (result != null){
return result*2;
}
if (exception != null){
return 0;
}
return 0;
});
Integer integer = future.get();
System.out.println("main.............end......."+integer);
两任务组合
thenRun:不能获取上一步的执行结果
thenAcceptAsync:能接受上一步结果,但是无返回结果(一般是终端操作)
thenApplyAsync:能接受上一步结果,有返回结果(流式调用)
thenCompose:对上任务结果组合
任务1.thenCompose(任务1结果 -> { });thenCombine:处理两个任务的结果
任务1.thenCombine( 任务2,( 任务1结果,任务2结果)->{ });xxxBothXxx:执行完所有,才能处理
//两任务都完成,才执行,有返回结果 public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);xxxEitherXxx:任意执行完,就处理
//两任务任意执行完,就处理 public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? supe ;
CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
// 第一个任务获取美术课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("画笔");
stuff.add("颜料");
return stuff;
}).thenCompose(list -> {
// 向第二个任务传递参数list(上一个任务美术课所需的东西list)
CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
List<String> stuff = new ArrayList<>();
// 第二个任务获取劳技课所需的工具
stuff.add("剪刀");
stuff.add("折纸");
// 合并两个list,获取课程所需所有工具
List<String> allStuff = Stream.of(list, stuff).flatMap(Collection::stream).collect(Collectors.toList());
return allStuff;
});
return insideFuture;
});
System.out.println(total.join().size());
// 美术课准备
CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {
// 第一个任务获取美术课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("画笔");
stuff.add("颜料");
return stuff;
});
// 手工课准备
CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {
// 第二个任务获取劳技课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("剪刀");
stuff.add("折纸");
return stuff;
});
// 都准备好后,开始处理
CompletableFuture<List<String>> total = painting
// 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
.thenCombine(handWork, (stuff1, stuff2) -> {
// 合并成新的list
List<String> totalStuff = Stream.of(stuff1, stuff1)
.flatMap(Collection::stream)
.collect(Collectors.toList());
return totalStuff;
});
System.out.println(JSONObject.toJSONString(total.join()));
多任务组合
allOf
anyOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Void> anyOf(CompletableFuture<?>... cfs);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
//使用sleep()模拟耗时操作
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 2;
});
CompletableFuture.allOf(future1, future1);
// 输出3
System.out.println(future1.join()+future2.join());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new NullPointerException();
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
// 睡眠3s模拟延时
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Object> anyOf = CompletableFuture
.anyOf(future, future2)
.exceptionally(error -> {
error.printStackTrace();
return 2;
});
System.out.println(anyOf.join());
