Lock
Lock是java.util.concurrent.locks
包下的接口,Lock 实现提供了比使用synchronized 方法和语句可获得的更广泛的锁定操作,它能以更优雅的方式处理线程同步问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class LockTest { public static void main(String[] args) { final Outputter1 output = new Outputter1(); new Thread() { public void run() { output.output("zhangsan"); }; }.start(); new Thread() { public void run() { output.output("lisi"); }; }.start(); } } class Outputter1 { private Lock lock = new ReentrantLock(); public void output(String name) { lock.lock(); try { for(int i = 0; i < name.length(); i++) { System.out.print(name.charAt(i)); } } finally { lock.unlock(); } } }
|
这样就实现了和sychronized一样的同步效果,需要注意的是,用sychronized修饰的方法或者语句块在代码执行完之后锁自动释放,而用Lock需要我们手动释放锁。
如果说这就是Lock,那么它不能成为同步问题更完美的处理方式,下面要介绍的是读写锁(ReadWriteLock),我们会有一种需求,在对数据进行读写的时候,为了保证数据的一致性和完整性,需要读和写是互斥的,写和写是互斥的,但是读和读是不需要互斥的,这样读和读不互斥性能更高些。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class Data { private int data; private ReadWriteLock rwl = new ReentrantReadWriteLock(); public void set(int data) { rwl.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "准备写入数据"); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + "写入" + this.data); } finally { rwl.writeLock().unlock(); } } public void get() { rwl.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "准备读取数据"); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "读取" + this.data); } finally { rwl.readLock().unlock(); } } }
|
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13
| Thread-4准备读取数据 Thread-3准备读取数据 Thread-5准备读取数据 Thread-5读取18 Thread-4读取18 Thread-3读取18 Thread-2准备写入数据 Thread-2写入6 Thread-2准备写入数据 Thread-2写入10 Thread-1准备写入数据 Thread-1写入22 Thread-5准备读取数据
|
Condition
Condition 将 Object 监视器方法(wait、notify 和notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了synchronized 方法和语句的使用,Condition 替代了 Object监视器方法的使用。下面将之前写过的一个线程通信的例子替换成用Condition实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class ThreadTest2 { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { threadExecute(business, "sub"); } }).start(); threadExecute(business, "main"); } public static void threadExecute(Business business, String threadType) { for(int i = 0; i < 100; i++) { try { if("main".equals(threadType)) { business.main(i); } else { business.sub(i); } } catch (InterruptedException e) { e.printStackTrace(); } } } } class Business { private boolean bool = true; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void main(int loop) throws InterruptedException { lock.lock(); try { while(bool) { condition.await(); } for(int i = 0; i < 100; i++) { System.out.println("main thread seq of " + i + ", loop of " + loop); } bool = true; condition.signal(); } finally { lock.unlock(); } } public void sub(int loop) throws InterruptedException { lock.lock(); try { while(!bool) { condition.await(); } for(int i = 0; i < 10; i++) { System.out.println("sub thread seq of " + i + ", loop of " + loop); } bool = false; condition.signal(); } finally { lock.unlock(); } } }
|
在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。
Condition的强大之处在于它可以为多个线程间建立不同的Condition,下面引入API中的一段代码,加以说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
|
这是一个处于多线程工作环境下的缓存区,缓存区提供了两个方法,put和take,put是存数据,take是取数据,内部有个缓存队列,具体变量和方法说明见代码,这个缓存区类实现的功能:有多个线程往里面存数据和从里面取数据,其缓存队列(先进先出后进后出)能缓存的最大数值是100,多个线程间是互斥的,当缓存队列中存储的值达到100时,将写线程阻塞,并唤醒读线程,当缓存队列中存储的值为0时,将读线程阻塞,并唤醒写线程,这也是ArrayBlockingQueue的内部实现。
这就是多个Condition的强大之处,假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间
Callable和Future
Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class CallableAndFuture { public static void main(String[] args) { Callable<Integer> callable = new Callable<Integer>() { public Integer call() throws Exception { return new Random().nextInt(100); } }; FutureTask<Integer> future = new FutureTask<Integer>(callable); new Thread(future).start(); try { Thread.sleep(5000); System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
|
FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到。
下面来看另一种方式使用Callable和Future,通过ExecutorService的submit方法执行Callable,并返回Future,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class CallableAndFuture { public static void main(String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); Future<Integer> future = threadPool.submit(new Callable<Integer>() { public Integer call() throws Exception { return new Random().nextInt(100); } }); try { Thread.sleep(5000); System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
|
执行多个带返回值的任务,并取得多个返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class CallableAndFuture { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool); for(int i = 1; i < 5; i++) { final int taskID = i; cs.submit(new Callable<Integer>() { public Integer call() throws Exception { return taskID; } }); } for(int i = 1; i < 5; i++) { try { System.out.println(cs.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }
|
如果不使用CompletionService,可以先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据,不同的是提交到CompletionService中的Future是按照完成的顺序排列的,而集合是按照添加顺序排序的。
CAS
在Java并发包中有这样一个包,java.util.concurrent.atomic,该包是对Java部分数据类型的原子封装,在原有数据类型的基础上,提供了原子性的操作方法,保证了线程安全。
阻塞队列BlockingQueue
BlockingQueue是个接口,有如下实现类:
- ArrayBlockQueue:一个由数组支持的有界阻塞队列。此队列按FIFO(先进先出)原则对元素进行排序。创建其对象必须明确大小,像数组一样。
- LinkedBlockQueue:一个可改变大小的阻塞队列。此队列按FIFO(先进先出)原则对元素进行排序。创建其对象如果没有明确大小,默认值是Integer.MAX_VALUE。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
- PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
- SynchronousQueue:同步队列。同步队列没有任何容量,每个插入必须等待另一个线程移除,反之亦然。
实现一个生产者消费者模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public class BigPlate { private BlockingQueue<Object> eggs = new ArrayBlockingQueue<Object>(5); public void putEgg(Object egg) { try { eggs.put(egg); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("放入鸡蛋"); } public Object getEgg() { Object egg = null; try { egg = eggs.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("拿到鸡蛋"); return egg; } static class AddThread extends Thread { private BigPlate plate; private Object egg = new Object(); public AddThread(BigPlate plate) { this.plate = plate; } public void run() { plate.putEgg(egg); } } static class GetThread extends Thread { private BigPlate plate; public GetThread(BigPlate plate) { this.plate = plate; } public void run() { plate.getEgg(); } } public static void main(String[] args) { BigPlate plate = new BigPlate(); for(int i = 0; i < 10; i++) { new Thread(new AddThread(plate)).start(); } for(int i = 0; i < 10; i++) { new Thread(new GetThread(plate)).start(); } } }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 放入鸡蛋 放入鸡蛋 放入鸡蛋 放入鸡蛋 放入鸡蛋 拿到鸡蛋 放入鸡蛋 拿到鸡蛋 拿到鸡蛋 拿到鸡蛋 放入鸡蛋 放入鸡蛋 放入鸡蛋 拿到鸡蛋 放入鸡蛋 拿到鸡蛋 拿到鸡蛋 拿到鸡蛋 拿到鸡蛋 拿到鸡蛋
|