ListenableFuture

ListenableFuture是可以监听的Future,它是对java原生Future的扩展增强。Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果希望计算完成时马上就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做会使得代码复杂,且效率低下。如果使用ListenableFuture,Guava会帮助检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

MoreExecutors

该类是final类型的工具类,提供了很多静态方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此实例submit方法即可初始化ListenableFuture对象。

  • getExitingExecutorService(ThreadPoolExecutor executor)

Executors的静态方法创建的线程池必须显式调用shutdown()方法关闭线程池,通常情况下我们并不知道线程池该何时关掉,MoreExecutors工具类提供了一个getExitingExecutorService(ThreadPoolExecutor executor)及其一系列重载方法,该方法会给指定的ThreadPoolExecutor添加一个钩子,在程序完成后它会通过使用守护线程和添加一个关闭钩子来等待他们完成,然后自动调用ThreadPoolExecutor的shutdown()方法。默认会等待120s等待线程池完成工作,120s后会关闭线程池,即便线程池中仍有工作未完成。MoreExecutors提供了相应的重载方法可以指定超时时间。

1
2
3
ExecutorService threadPool = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1)
)

  • getExitingScheduledExecutorService(ScheduledThreadPoolExecutor executor)

基本同getExitingExecutorService方法。

  • addDelayedShutdownHook(ExecutorService service,long timeout,TimeUnit unit)

给指定的ExecutorService添加一个钩子,在程序完成后延迟指定时间再关闭线程池。

ListeningExecutorService

继承自ExecutorService,重写ExecutorService类中的submit方法,返回ListenableFuture对象

ListenableFuture

继承自Future类增加了addListener方法,该方法在给定的excutor上注册一个监听器,当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序,但可以在计算完成时确保马上被调用

FutureCallback

该接口提供了OnSuccess和OnFailuren方法。获取异步计算的结果并回调

一个完整的例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*static ListeningExecutorService service = MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1)
)
);*/
public static void main(String[] args) throws Exception{
ExecutorService pool = Executors.newFixedThreadPool(1);
ListeningExecutorService service = MoreExecutors.listeningDecorator(pool);
MoreExecutors.addDelayedShutdownHook(service,3,TimeUnit.SECONDS);
ListenableFuture<Integer> listenableFuture = service.submit(()->{
Random random = new Random();
int i=0;
while (i<10){
i++;
Console.log(random.nextInt(100));
Thread.sleep(100);
}
return random.nextInt(100);
});
Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
Console.log("执行结果:"+result);
}
@Override
public void onFailure(Throwable t) {
Console.log("线程执行发生错误");
t.printStackTrace();
}
});
Console.log("主程序结束了");
System.exit(0);
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
主程序结束了
47
62
48
57
50
80
52
52
99
62
执行结果:33

在上面的例子中,给ListeningExecutorService实例service添加一个钩子,设定在主程序结束了延迟3s关闭线程池。给service添加一个任务,执行10个打印,每个间隔约100ms,可以看出,在主程序结束后3s线程池才关闭,期间线程池完成了任务执行回调打印结果。如果指定的延迟时间到了任务还没完成,线程池也会强制关闭。如上将打印间隔时间改成1000ms,任务执行到约三分之一就结束了,结果也不会打印出来。

Futures

该工具类提供了许多静态工具常用的如下:

  • addCallback(ListenableFuture future,FutureCallback callback):给ListenableFuture添加一个回调,在任务完成后执行callback,无法保证回调的执行顺序,但能保证回调一定会被执行。
  • addCallback(ListenableFuture future,FutureCallback callback,Executor executor):重载方法,指定executor执行callback
  • allAsList(Iterable<? extends ListenableFuture<? extends V>> futures:创建一个新的ListenableFuture,其值是一个list,如果传入的所有future都成功了,该list包含了传入的所有future的值,值的顺序与传入的future顺序一致,如果传入的future有一个失败,则返回的future立即失败。如果取消返回的 future,则会尝试取消传入的所有future,同样,传入的future有一个失败或是取消,返回的future也会同样失败或取消。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    static ListeningExecutorService service = MoreExecutors.listeningDecorator(
    MoreExecutors.getExitingExecutorService(
    (ThreadPoolExecutor) Executors.newFixedThreadPool(1)
    )
    );
    private static abstract class Task<T> implements Callable<T>{
    public T t;
    public Task(T t){
    this.t = t;
    }
    }
    public static void testAllAsList(){
    List<ListenableFuture<Object>> list = new ArrayList<>();
    Random random = new Random();
    for(int i=0;i<3;i++){
    ListenableFuture<Object> future = service.submit(new Task<Object>(i) {
    @Override
    public Object call() throws Exception {
    Thread.sleep(1000);
    /*if(if(ObjectUtils.equals(this.t,1)){//注释段1
    throw new RuntimeException("手动抛出任务失败");
    }){
    throw new RuntimeException("手动抛出异常");
    }*/
    Console.log("任务 {} 执行完成",this.t);
    return this.t;
    }
    });
    list.add(future);
    }
    // list.get(1).cancel(true);//注释段2
    ListenableFuture<List<Object>> combinedFuture = Futures.allAsList(list);
    Futures.addCallback(combinedFuture, new FutureCallback<List<Object>>() {
    @Override
    public void onSuccess(@Nullable List<Object> result) {
    for(Object obj:result){
    Console.log("返回结果:{}",obj);
    }
    }
    @Override
    public void onFailure(Throwable t) {
    Console.log("任务失败了");
    t.printStackTrace();
    }
    });
    // combinedFuture.cancel(true); //注释段3
    }

返回结果

1
2
3
4
5
6
任务 0 执行完成
任务 1 执行完成
任务 2 执行完成
返回结果:0
返回结果:1
返回结果:2

如果解开注释段1在第二个任务手动抛出异常,打印如下:

1
2
3
4
任务 0 执行完成
任务失败了
java.lang.RuntimeException: 手动抛出异常
任务 2 执行完成

可以看出第二个任务出现异常执行失败,第一个和第三个任务正常执行,但是在回调中并没有执行onSuccess,而是执行onFailure。第二个任务发生异常时,combinedFuture立即失败,但是并不影响线程池中第三个任务的执行。

如果解开注释段2取消第二个任务的执行,打印如下

1
2
3
4
任务失败了
java.util.concurrent.CancellationException: Task was cancelled.
任务 0 执行完成
任务 2 执行完成

取消第二个任务,combinedFuture立即取消并执行回调的异常,但是线程池的第三个任务仍正常执行。

如果解开注释段3,通过combinedFuture取消任务,打印如下:

1
2
任务失败了
java.util.concurrent.CancellationException: Task was cancelled.

可见通过combinedFuture取消任务,会同时取消传入的所有正在执行中(cancel方法传入true)或未执行的任务。

注意:在任务中一定要关注任务的中断标记,cancel不一定能使当前正在执行的任务停止,IO阻塞、synchronized锁阻塞时无法被停止,Lock可以被停止。

  • successfulAsList:返回一个ListenableFuture,基本同allAsList,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Failed或者cancel,则用null替代。
  • ListenableFuture transform(ListenableFuture input,final Function<? super I, ? extends O> function)

将input返回值通过指定的function中,转换成一个新的ListenableFuture,新future的返回值是function的返回值。表达有点绕口,说个案例比较容易明白,提交一个任务向远端调用一个接口,将返回值做一些复杂耗时处理再入库,这涉及到两个耗时过程,如果按照传统的方式,submit一个任务返回ListenableFuture,然后监听任务结束,在回调中将结果处理入库过程包装成另一个任务提交,再监听结束。大致代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ListenableFuture future1 = service.submit(()->{
//执行远端接口调用
returrn 调用结果
});
//监听future1
Futures.addCallback(future1,new FutureCallback(){
public void onSuccess(@Nullable Object result) {
ListenableFuture future2 = ListenableFuture future1 = service.submit((result)->{
//处理调用结果并入库
returrn 入库结果
});
//监听future2
Futures.addCallback(future2,new FutureCallback(){
//此处省略代码
})
}
});

从上面可以看出,代码冗长,多次监听回调,如果采用transform,代码会精简很多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void testTransform() throws Exception{
ListenableFuture<Integer> future1 = service.submit(()->{
//执行远端接口调用,这里模拟返回1
return 1;
});
Function<Integer,String> func = new Function<Integer, String>() {
@Nullable
@Override
public String apply(@Nullable Integer input) {
//处理调用结果并入库,这里模拟如果为:input+"_123"
return input+"_123";
}
};
ListenableFuture<String> finalFuture = Futures.transform(future1,func);
Console.log("结果:{}",finalFuture.get());
/*AsyncFunction<Integer,String> func = new AsyncFunction<Integer, String>() {
@Override
public ListenableFuture<String> apply(@Nullable Integer input) throws Exception {
return service.submit(()->input+"_123");
}
};
ListenableFuture<String> finalFuture = Futures.transformAsync(future1,func);
Console.log("结果:{}",finalFuture.get());*/
}

输出结果如下:

1
结果:1_123

在上面的例子中,只看到显式地提交了第一个任务,没给第一个任务添加监听也并没有显式提交第二个任务。实际上guava已经做了这些工作了。如果取消finalFuture,则future1也会取消(如果能取消,同样如果取消future1,finalFuture会执行回调(如果添加了回调)并尝试取消自身。

注释部分是另一种实现,Function内部也是通过AsyncFunction实现的。

transform方法还包含一个重载方法,允许在第三个参数传入一个Executor,如果指定,则function在指定的Executor执行。