JDK1.5以后java加入了一个并发包java.util.concurrent
用于替换传统的多线程使用场景。其中包含几个重要的接口Executor
,ExecutorService
,Future
,Callable
等。
Executor
接口只定义了一个方法void execute(Runnable command)
,它用来执行提交的任务,用户只需提交任务,不必关心任务何时执行以及任务的调度情况。用户不需要明确的像传统方式那样定义一个线程,如:new Thread(new(RunnableTask())).start()
。可以通过如下方式提交一个任务
1 2 3
| Executor executor = <em>anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2());
|
ExecutorService
接口继承自Executor
接口,我们经常使用的也是这个接口。它提供了一系列管理异步任务的方法。java.util.concurrent
包中提供了一个实现ExecutorService
接口的虚拟类AbstractExecutorService
。
通常情况下我们不必自己去实现ExecutorService
接口,java提供了一个工具类Executors
,它提供了一些创建不同线程池的静态方法,如newFixedThreadPool
,newSingleThreadExecutor
,newCachedThreadPool
,newScheduledThreadPool
等。
FixedThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class FixedThreadPool { ExecutorService threadPool = Executors.newFixedThreadPool(1); public void testFixedThreadPool(){ try { Task task1 = new Task("task1"); Task task2 = new Task("task2"); Future task1Future = threadPool.submit(task1); Future task2Future = threadPool.submit(task2); Thread.sleep(1000); Console.log("task1计算结果:{}",task1Future.get()); Console.log("task2计算结果:{}",task2Future.get()); }catch (InterruptedException e){ e.printStackTrace(); }catch (ExecutionException e) { e.printStackTrace(); }finally { threadPool.shutdown(); } } public static void main(String[] args){ FixedThreadPool fixedThreadPool = new FixedThreadPool(); fixedThreadPool.testFixedThreadPool(); } } public class Task implements Callable<Integer>{ private Integer[] numbers = new Integer[100]; String taskName; public Task(String taskName){ this.taskName = taskName; fillNumbers(); } public void fillNumbers(){ for(int i=0;i<numbers.length;i++){ numbers[i] = new Random().nextInt(100); } } @Override public Integer call() throws Exception { Console.log("线程{},开始计算",Thread.currentThread().getName()); Integer sum = 0; for(int i=0;i<numbers.length;i++){ sum+=numbers[i]; } Console.log("线程{},计算完成,结果是:{}",Thread.currentThread().getName(),sum); return sum; } }
|
输出结果
1 2 3 4 5 6
| 线程pool-1-thread-1,开始计算 线程pool-1-thread-1,计算完成,结果是:4356 线程pool-1-thread-1,开始计算 线程pool-1-thread-1,计算完成,结果是:5142 task1计算结果:4356 task2计算结果:5142
|
初始化一个只有一个线程的固定线程池,提交两个任务,因为池中线程数量只有一个,第一个任务未执行完之前,第二个任务需要等待第一个任务执行完成才能执行。如果因为任务执行失败造成线程被关闭,线程池会创建一个新的线程代替它的位置。
CachedThreadPool
使用方式同上,创建一个线程缓存区将初始化的线程缓存起来,如果有空闲的线程,则使用空闲线程,如果没有就创建新的线程。如果线程空闲超过60s,线程将被移除。
1
| ExecutorService threadPool = Executors.newCachedThreadPool();
|
CachedThreadPool中的线程数量上限是Integer.MAX_VALUE
,使用时要注意溢出的问题。
SingleThreadExecutor
创建一个单work线程的线程池,以无界队列的方式执行提交的任务,如果因为某个任务失败造成线程结束,线程池会创建一个新的线程继续完成接下来的任务。提交到线程中的任务是严格按照顺序执行的。任何时刻都只有一个任务在执行,等价于newFixedThreadPool(1)
。
ScheduledThreadPool
创建一个可以在指定延迟后执行或定期执行的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ScheduledThreadPool { ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(1); public static void main(String[] args){ new ScheduledThreadPool().testScheduledThreadPool(); } public void testScheduledThreadPool(){ Console.log("延迟2s执行"); scheduleService.schedule(new Callable<Object>() { @Override public Object call() throws Exception { Console.log("执行具体任务--开始"); Thread.sleep(1000); Console.log("执行具体任务--结束"); return 5; } }, 2, TimeUnit.SECONDS); Console.log("执行其他任务"); scheduleService.scheduleAtFixedRate(new Runnable() { @Override public void run() { Console.log("执行周期性任务"); } },0,2,TimeUnit.SECONDS); } }
|
执行结果
1 2 3 4 5 6 7 8 9 10
| 延迟2s执行 执行其他任务 执行周期性任务 执行具体任务--开始 执行具体任务--结束 执行周期性任务 执行周期性任务 执行周期性任务 执行周期性任务 执行周期性任务
|
ForkJoinPool
java7中新引进的一种线程池,它使用一个无限队列保存需要执行的任务,可以通过构造函数指定线程数量,如果不指定,则默认使用CPU核心数作为线程数量。
不同于其它线程池,ForkJoinPool基于work-stealing模式,在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行,尽量避免线程处于空闲状态。在任务负载不均衡的场景中非常有用;
ForkJoinPool实现了ExecutorService,可以提交实现Callable和Runnable接口的任务,也可以直接继承ForkJoinTask或RecursiveTask对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class ForkJoinPoolTest { ForkJoinPool forkJoinPool = new ForkJoinPool(); public static Double[] num = new Double[1000]; public static void main(String[] args){ ForkJoinPoolTest pool = new ForkJoinPoolTest(); for(int i=0;i<num.length;i++){ num[i] = Math.random(); } int count = pool.forkJoinPool.invoke(new ForkJoinTaskTest(1,1000)); Console.log("count:{}",count); for(int i=0;i<5;i++){ int result = pool.forkJoinPool.invoke(new ForkJoinTask2(i+1)); Console.log("result:{}",result); } } private static class ForkJoinTaskTest extends RecursiveTask<Integer> { private int start; private int end; public ForkJoinTaskTest(int start,int end){ this.start = start; this.end = end; } @Override protected Integer compute() { Integer subCount = 0; if(this.end - start<=10){ for(int i=this.start;i<this.end;i++){ if(num[i]<0.5){subCount++;} } }else{ int mid = (start+end)/2; ForkJoinTaskTest left = new ForkJoinTaskTest(start,mid); left.fork(); ForkJoinTaskTest right = new ForkJoinTaskTest(mid+1,end); right.fork(); subCount+=left.join(); subCount+=right.join(); } return subCount; } } private static class ForkJoinTask2 extends ForkJoinTask<Integer>{ private int result = 0; private int num; public ForkJoinTask2(int num){ this.num = num; } public Integer getRawResult() { return result; } protected void setRawResult(Integer value) { this.result = value; } protected boolean exec() { result = this.num*this.num; return true; } } }
|
继承ForkJoinTask时,重写的getRawResult和setRawResult两个方法是为了扩展和调试了,不要直接调用这个两个方法设置和获取值,应该调用对象的join或get方法。
Executors
工具提供了newWorkStealingPool
方法直接创建一个ForkJoinPool对象。