构建自定义的同步工具:
状态依赖性的管理:
- 依赖状态的操作可以一直阻塞直到可以继续执行,这比使他们先失败再实现起来更为方便且更不易出错。
- 有界缓存的几种实现
/**
* 有界缓存实现的基类
*/
public abstract class BaseBoundedBuffer<V> {
private final V[] buf;
private int tail;
private int head;
private int count;
protected BaseBoundedBuffer(int capacity){
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v){
buf[tail] = v;
if (++tail == buf.length){
tail = 0;
}
++count;
}
protected synchronized final V doTake(){
V v = buf[head];
buf[head] = null; //let gc collect
if (++head == buf.length){
head = 0;
}
--count;
return v;
}
public synchronized final boolean isFull(){
return count == buf.length;
}
public synchronized final boolean isEmpty(){
return count == 0;
}
}
- 将前提条件的失败传给调用者
/**
* 当不满足前提条件时,有界缓存不会执行相应的操作
*/
public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
public GrumyBoundedBuffer(int size){
super(size);
}
public synchronized void put(V v){
if (isFull()){
throw new BufferFullException();
}
doPut(v);
}
public synchronized V take(){
if (isEmpty())
throw new BufferEmptyExeption();
return doTake();
}
}
- 对上面有界缓存类的调用
while (true){
try {
String item = buffer.take();
} catch (BufferEmptyExeption e) { //调用者来处理异常
e.printStackTrace();
}
}
这样将状态依赖性交给调用着来管理,可能导致一些功能不能实现,如维持FIFO顺序。
通过轮询与休眠来实现简单的阻塞:
/**
* 使用简单阻塞实现的有界缓存
*/
public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> {
private static long SLEEP_TIME;
public SleepyBounedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException{
while (true){
synchronized(this){
if (!isFull()){
doPut(v);
return;
}
}
Thread.sleep(SLEEP_TIME);
}
}
public V take() throws InterruptedException{
while (true){
synchronized(this){
if (!isEmpty()){
return doTake();
}
}
Thread.sleep(SLEEP_TIME);
}
}
}
上面的休眠时间的设置,需要在
响应性与
CPU使用率之间的权衡。
条件队列:
- 条件队列使得一组线程(等待线程集合)以某种特定的方式来等待待定条件为真。
/**
* 使用条件队列实现的有界缓存
* 每个对象都可看作一个条件队列
*/
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
public BoundedBuffer(int capacity) {
super(capacity);
}
public synchronized void put(V v) throws InterruptedException{
while (isFull()){
wait();
}
doPut(v);
notifyAll();
}
public synchronized V take() throws InterruptedException{
while (isEmpty()){
wait();
}
V v = doTake();
notifyAll();
return v;
}
}
使用条件队列:
- 条件队列使构建高效以及高可响应性的状态依赖变得更容易,但同时也很容易被不正确地使用。
条件谓词:
- 要想正确使用条件队列,关键是找出对象在哪个谓词条件上等待。
- 条件谓词是使某个操作成为状态依赖操作的前提条件。在有界缓存中,比如take()方法的条件谓词就是"缓存不为空"。
- 条件谓词是由类中各个状态变量构成的表达式。比如判断"缓存不为空时",是以状态count==0来进行判断的。
- 将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
- 在条件等待中存在一种重要的三元关系:加锁, wait, 条件谓词(正如上面的代码)。
- 每一次wai调用都隐式地与特定的条件谓词相关联,当调用某个条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。
过早唤醒:
当使用条件等待时(如Object.wait(), 或Condition.await()):
- 通常都有一个条件谓词--包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
- 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
- 在一个循环中调用wait。
- 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
- 当调用wait, notify或notifyAll等方法时,一定要持有与条件队列相关的锁。
- 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。
丢失的信号:
- 丢失的信号:线程必须等待一个已经为真的条件。(即错过被唤醒的机会)。
通知:
- 每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。
- 上面的代码我们使用notifyAll进行唤醒,而不是用notify,是为了防止某种意义上的"信号丢失",比如notify总是去唤醒一个等待条件为假的线程,而阻止了等待条件为真的线程被执行。
- 在以下情况时,则可以用notify而不是notifyAll:
1. 所有等待线程的类型都相同,只有一个条件谓词与条件队列相关,并且每个线程从wait返回后执行相同的操作。
2. 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。
阀门类:
/**
* 使用wait和notifyAll来实现可重新关闭的阀门
*/
public class ThreadGate {
private boolean isOpen;
private int generation;
public synchronized void close(){
isOpen = false;
}
public synchronized void open(){
++ generation;
- isOpen = true;
notifyAll();
}
public synchronized void await() throws InterruptedException{
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation){
wait();
}
}
}
子类的安全问题:
- 如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。
封装条件队列:
- 将条件队列对象封装起来,与线程安全类的最常见设计模式并不一致,在这种模式中建议使用对象自身既是锁,又是条件队列。
入口协议与出口协议:
- 入口出口协议用来描述wait和notify方法的正确使用。
- 入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是则通知相关的条件队列。
显示的Condition对象:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
}
- 基于Condition实现的有界缓冲队列
/**
* 使用显示条件变量的有界缓存
*/
public class ConditionBoundedBuffer<T> {
private static final int BUFFER_SIZE = 10;
protected final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final T[] items = (T[])new Object[BUFFER_SIZE];
private int tail, head, count;
public void put(T x) throws InterruptedException{
lock.lock();
try {
while (count == items.length){
notFull.await(); //队列已满等待。
}
items[tail] = x;
if (++ tail == items.length){
tail = 0;
}
++ count;
notEmpty.signal(); //队列非空,唤醒其他等待条件
} finally{
lock.unlock();
}
}
public T take() throws InterruptedException{
lock.lock();
try {
while (count == 0){
notEmpty.await();
}
T x = items[head];
items[head] = null; // let gc collect
--count;
notFull.signal();
return x;
} finally{
lock.unlock();
}
}
}
Synchronizer剖析:
/**
* 使用Lock来实现信号量
* 不是java.util.concurrent.Semaphore的真实实现
*/
public class SemaphoreOnLock {
private final Lock lock = new ReentrantLock();
private final Condition permitsAvailable = lock.newCondition(); //等待可用条件
private int permits; //允许请求的线程数
public SemaphoreOnLock(int initialPermits) {
lock.lock();
try {
permits = initialPermits;
} finally{
lock.unlock();
}
}
public void acruire() throws InterruptedException{
lock.lock();
try {
while (permits <= 0){
permitsAvailable.await();
}
-- permits;
} finally {
lock.unlock();
}
}
public void release(){
lock.lock();
try {
++permits;
permitsAvailable.signal(); //已可用信号通知
} finally{
lock.unlock();
}
}
}
- ReentrantLock与Semaphore都使用了同一个基类AbstractQueueSynchronizer(AQS)。
AbstractQueuedSynchronizer类:
一个简单的闭锁:
/**
* 使用AbstractQueuedSynchronizer实现的二元闭锁
*/
public class OneShotLatch {
private Sync sync = new Sync();
public void signal(){
sync.releaseShared(0);
}
public void await() throws InterruptedException{
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer{
private static final long serialVersionUID = -4086289179080702705L;
protected int tryAcquireShared(int ingored){
// 如果闭锁是开的(state == 1), 那么这个操作符成功,否则将失败
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored){
setState(1); // 打开闭锁
return true; // 其他的线程可以获取该闭锁
}
}
}
java.util.concurrent同步器类中的CAS:
protected boolean tryAcquire(int ignored){
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0){ //若为被锁住
if (compareAndSetState(0, 1)){ //设置锁住
owner = current;
return true;
}
} else if(current == owner){ //锁重入
setState(c + 1);
return true;
}
return false;
}
Semaphore与CountDownLatch:
//Semaphore的tryAcquireShared, truReleaseShared
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
FutureTask:
- 在FutureTask中,AQS同步状态被用来保存任务的状态。
ReentrantReadWriteLock:
不吝指正。