跳至主要內容

线程补充

程序员李某某大约 7 分钟

线程补充

CompletableFuture

Future的问题

  • 阻塞
  • 不能合并结果
  • 不能异常处理
  • 不能回调

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法

实例化

  • supply 开头:这种方法,可以返回异步线程执行之后的结果

  • run 开头:这种不会返回结果,就只是执行线程任务

  • 没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行

// 带返回值,泛型就是返回值类型
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
// 不带返回值
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);j

获取结果

get() 方法同样会阻塞直到任务完成

  • getNow的参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值

join() 与get() 区别在于join() 返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

处理结果1 -- complete、exceptionally

//可以处理异常,无返回结果
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//可以处理异常,有返回结果(必须return),作为出现异常时的默认返回结果
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
public class ThreadTest {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 0;
            return i;
        }, executor).whenComplete((result,exception)->{
            //虽然能得到异常信息,但是没法修改返回数据
            System.out.println("异步任务完成了...结果是"+result+";异常是"+exception);
        //可以感知异常,同时返回默认值
        }).exceptionally(throwable -> {
            return 10;
        });
        Integer integer = future.get();
        System.out.println("main.............end......."+integer);
    }

异步回调 -- thenApply、thenAccept、thenRun

Function参数是一个函数时接口, 入参是上次返回结果, 且有返回值

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

Consumer参数是一个函数时接口, 只有入参, 无返回值

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

没有入参, 感知任务的完成, 通知或者做相应处理

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

提示

一般而言,commonPool为了提高性能,并不会立马收回线程,thenApply中回调任务和supplyAsync中的异步任务使用的是同一个线程

这里存在一个特殊情况,即如果supplyAsync中的任务是立即返回结果(不是耗时的任务),那么thenApply 回调任务也会在主线程执行

要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()回调,那么它将在从ForkJoinPool.commonPool()获取另一个线程执行(概率获取),一般情况是接着使用上一次的线程

如果将Executor传递给thenApplyAsync()回调,则该回调的异步任务将在从Excutor的线程池中获取的线程中执行

异步编排 -- thenCompose、thenCombine

thenCompose --- 编排2个依赖关系的异步任务

public CompletableFuture<U> thenCompose(
     Function<? super T, ? extends CompletionStage<U>> fn)
public CompletableFuture<U> thenComposeAsync(
     Function<? super T, ? extends CompletionStage<U>> fn)
public CompletableFuture<U> thenComposeAsync(
     Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

提示

thenApply是为了应用异步任务的结果, 是对结果的链式转化

thenCompose是为了异步任务的链式调用, 将结果传递给另一个异步任务时不出现层层的嵌套

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10)
  .thenApply(result -> "Result: " + result);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10)
 .thenCompose(result -> CompletableFuture.supplyAsync(() -> "Result: " + result));

thenCombine --- 编排2个非依赖关系的异步任务

public <U,V> CompletableFuture<V> thenCombine(
     CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor)

处理结果3 -- handle

handle 和 thenApply 方法处理方式基本一样

唯一的不同是参数BiFunction(上次返回结果,上次异常),handle方法会给出异常,可以在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理

如果不想每个链式调用都处理异常,那就使用apply

handle 与 complete 区别:handle有返回结果,complete无返回结果

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Execut
/**
 * 方法执行完成后的处理
 */

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    int i = 10 / 5;
    return i;
}, executor).handle((result,exception)->{
    if (result != null){
        return result*2;
    }
    if (exception != null){
        return 0;
    }
    return 0;
});
Integer integer = future.get();
System.out.println("main.............end......."+integer);

两任务组合

  • thenRun:不能获取上一步的执行结果

  • thenAcceptAsync:能接受上一步结果,但是无返回结果(一般是终端操作)

  • thenApplyAsync:能接受上一步结果,有返回结果(流式调用)

  • thenCompose:对上任务结果组合

    任务1.thenCompose(任务1结果 -> {
      
    });
    
  • thenCombine:处理两个任务的结果

    任务1.thenCombine( 任务2,( 任务1结果,任务2结果)->{
      
    });
    
  • xxxBothXxx:执行完所有,才能处理

    //两任务都完成,才执行,有返回结果
    public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
    
  • xxxEitherXxx:任意执行完,就处理

    //两任务任意执行完,就处理
    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? supe  ;
    
CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
    // 第一个任务获取美术课需要带的东西,返回一个list
    List<String> stuff = new ArrayList<>();
    stuff.add("画笔");
    stuff.add("颜料");
    return stuff;
}).thenCompose(list -> {
    // 向第二个任务传递参数list(上一个任务美术课所需的东西list)
    CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
        List<String> stuff = new ArrayList<>();
        // 第二个任务获取劳技课所需的工具
        stuff.add("剪刀");
        stuff.add("折纸");
        // 合并两个list,获取课程所需所有工具
        List<String> allStuff = Stream.of(list, stuff).flatMap(Collection::stream).collect(Collectors.toList());
        return allStuff;
    });
    return insideFuture;
});
System.out.println(total.join().size());
// 美术课准备
CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {
     // 第一个任务获取美术课需要带的东西,返回一个list
     List<String> stuff = new ArrayList<>();
     stuff.add("画笔");
     stuff.add("颜料");
     return stuff;
});
// 手工课准备
CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {
     // 第二个任务获取劳技课需要带的东西,返回一个list
     List<String> stuff = new ArrayList<>();
     stuff.add("剪刀");
     stuff.add("折纸");
     return stuff;
});
// 都准备好后,开始处理
CompletableFuture<List<String>> total = painting
     // 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
     .thenCombine(handWork, (stuff1, stuff2) -> {
         // 合并成新的list
         List<String> totalStuff = Stream.of(stuff1, stuff1)
                 .flatMap(Collection::stream)
                 .collect(Collectors.toList());
         return totalStuff;
});
System.out.println(JSONObject.toJSONString(total.join()));

多任务组合

allOf

anyOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Void> anyOf(CompletableFuture<?>... cfs);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        //使用sleep()模拟耗时操作
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    return 2;
});
CompletableFuture.allOf(future1, future1);
// 输出3
System.out.println(future1.join()+future2.join());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    throw new NullPointerException();
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        // 睡眠3s模拟延时
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});
CompletableFuture<Object> anyOf = CompletableFuture
        .anyOf(future, future2)
        .exceptionally(error -> {
            error.printStackTrace();
            return 2;
        });
System.out.println(anyOf.join());

ThreadLocal

上次编辑于:
贡献者: liyuanhao,李元昊