JDK1.5以后java加入了一个并发包java.util.concurrent用于替换传统的多线程使用场景。其中包含几个重要的接口Executor,ExecutorService,Future,Callable等。

Executor接口只定义了一个方法void execute(Runnable command),它用来执行提交的任务,用户只需提交任务,不必关心任务何时执行以及任务的调度情况。用户不需要明确的像传统方式那样定义一个线程,如:new Thread(new(RunnableTask())).start()。可以通过如下方式提交一个任务

1
2
3
Executor executor = <em>anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

ExecutorService接口继承自Executor接口,我们经常使用的也是这个接口。它提供了一系列管理异步任务的方法。java.util.concurrent包中提供了一个实现ExecutorService接口的虚拟类AbstractExecutorService

通常情况下我们不必自己去实现ExecutorService接口,java提供了一个工具类Executors,它提供了一些创建不同线程池的静态方法,如newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool等。

FixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FixedThreadPool {
ExecutorService threadPool = Executors.newFixedThreadPool(1);
public void testFixedThreadPool(){
try {
Task task1 = new Task("task1");
Task task2 = new Task("task2");
Future task1Future = threadPool.submit(task1);
Future task2Future = threadPool.submit(task2);
//做其它任务
Thread.sleep(1000);
Console.log("task1计算结果:{}",task1Future.get());
Console.log("task2计算结果:{}",task2Future.get());
}catch (InterruptedException e){
e.printStackTrace();
}catch (ExecutionException e) {
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
public static void main(String[] args){
FixedThreadPool fixedThreadPool = new FixedThreadPool();
fixedThreadPool.testFixedThreadPool();
}
}
public class Task implements Callable<Integer>{
private Integer[] numbers = new Integer[100];
String taskName;
public Task(String taskName){
this.taskName = taskName;
fillNumbers();
}
public void fillNumbers(){
for(int i=0;i<numbers.length;i++){
numbers[i] = new Random().nextInt(100);
}
}
@Override
public Integer call() throws Exception {
Console.log("线程{},开始计算",Thread.currentThread().getName());
Integer sum = 0;
for(int i=0;i<numbers.length;i++){
sum+=numbers[i];
}
Console.log("线程{},计算完成,结果是:{}",Thread.currentThread().getName(),sum);
return sum;
}
}

输出结果

1
2
3
4
5
6
线程pool-1-thread-1,开始计算
线程pool-1-thread-1,计算完成,结果是:4356
线程pool-1-thread-1,开始计算
线程pool-1-thread-1,计算完成,结果是:5142
task1计算结果:4356
task2计算结果:5142

初始化一个只有一个线程的固定线程池,提交两个任务,因为池中线程数量只有一个,第一个任务未执行完之前,第二个任务需要等待第一个任务执行完成才能执行。如果因为任务执行失败造成线程被关闭,线程池会创建一个新的线程代替它的位置。

CachedThreadPool
使用方式同上,创建一个线程缓存区将初始化的线程缓存起来,如果有空闲的线程,则使用空闲线程,如果没有就创建新的线程。如果线程空闲超过60s,线程将被移除。

1
ExecutorService threadPool = Executors.newCachedThreadPool();

CachedThreadPool中的线程数量上限是Integer.MAX_VALUE,使用时要注意溢出的问题。

SingleThreadExecutor

创建一个单work线程的线程池,以无界队列的方式执行提交的任务,如果因为某个任务失败造成线程结束,线程池会创建一个新的线程继续完成接下来的任务。提交到线程中的任务是严格按照顺序执行的。任何时刻都只有一个任务在执行,等价于newFixedThreadPool(1)

ScheduledThreadPool

创建一个可以在指定延迟后执行或定期执行的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ScheduledThreadPool {
ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(1);
public static void main(String[] args){
new ScheduledThreadPool().testScheduledThreadPool();
}
public void testScheduledThreadPool(){
Console.log("延迟2s执行");
scheduleService.schedule(new Callable<Object>() {
@Override
public Object call() throws Exception {
Console.log("执行具体任务--开始");
Thread.sleep(1000);
Console.log("执行具体任务--结束");
return 5;
}
}, 2, TimeUnit.SECONDS);
Console.log("执行其他任务");
//立即执行,每2s执行一次
scheduleService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Console.log("执行周期性任务");
}
},0,2,TimeUnit.SECONDS);
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
延迟2s执行
执行其他任务
执行周期性任务
执行具体任务--开始
执行具体任务--结束
执行周期性任务
执行周期性任务
执行周期性任务
执行周期性任务
执行周期性任务

ForkJoinPool

java7中新引进的一种线程池,它使用一个无限队列保存需要执行的任务,可以通过构造函数指定线程数量,如果不指定,则默认使用CPU核心数作为线程数量。

不同于其它线程池,ForkJoinPool基于work-stealing模式,在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行,尽量避免线程处于空闲状态。在任务负载不均衡的场景中非常有用;

ForkJoinPool实现了ExecutorService,可以提交实现Callable和Runnable接口的任务,也可以直接继承ForkJoinTask或RecursiveTask对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class ForkJoinPoolTest {
ForkJoinPool forkJoinPool = new ForkJoinPool();
public static Double[] num = new Double[1000];
public static void main(String[] args){
ForkJoinPoolTest pool = new ForkJoinPoolTest();
for(int i=0;i<num.length;i++){
num[i] = Math.random();
}
int count = pool.forkJoinPool.invoke(new ForkJoinTaskTest(1,1000));
Console.log("count:{}",count);
//Test norm ForkJoinTask
for(int i=0;i<5;i++){
// int result = pool.forkJoinPool.submit(new ForkJoinTask2(i+1)).fork().join();
int result = pool.forkJoinPool.invoke(new ForkJoinTask2(i+1));
Console.log("result:{}",result);
}
}
private static class ForkJoinTaskTest extends RecursiveTask<Integer> {
private int start;
private int end;
public ForkJoinTaskTest(int start,int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
Integer subCount = 0;
if(this.end - start<=10){
for(int i=this.start;i<this.end;i++){
if(num[i]<0.5){subCount++;}
}
}else{
int mid = (start+end)/2;
ForkJoinTaskTest left = new ForkJoinTaskTest(start,mid);
left.fork();
ForkJoinTaskTest right = new ForkJoinTaskTest(mid+1,end);
right.fork();
subCount+=left.join();
subCount+=right.join();
}
return subCount;
}
}
private static class ForkJoinTask2 extends ForkJoinTask<Integer>{
private int result = 0;
private int num;
public ForkJoinTask2(int num){
this.num = num;
}
public Integer getRawResult() {
return result;
}
protected void setRawResult(Integer value) {
this.result = value;
}
protected boolean exec() {
result = this.num*this.num;
return true;
}
}
}

继承ForkJoinTask时,重写的getRawResult和setRawResult两个方法是为了扩展和调试了,不要直接调用这个两个方法设置和获取值,应该调用对象的join或get方法。

Executors工具提供了newWorkStealingPool方法直接创建一个ForkJoinPool对象。