Semaphore 源码分析

原创
2018/06/20 11:24
阅读数 195

需要提前了解的知识点: AbstractQueuedSynchronizer 实现原理

类介绍

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。比如控制用户的访问量,同一时刻只允许1000个用户同时使用系统,如果超过1000个并发,则需要等待。

使用场景

比如模拟一个停车场停车信号,假设停车场只有两个车位,一开始两个车位都是空的。这时如果同时来了两辆车,看门人允许它们进入停车场,然后放下车拦。以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。

public class SemaphoreDemo {
    private static Semaphore s = new Semaphore(2);
    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        pool.submit(new ParkTask("1"));
        pool.submit(new ParkTask("2"));
        pool.submit(new ParkTask("3"));
        pool.submit(new ParkTask("4"));
        pool.submit(new ParkTask("5"));
        pool.submit(new ParkTask("6"));
        pool.shutdown();
    }

    static class ParkTask implements Runnable {
        private String name;
        public ParkTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                s.acquire();
                System.out.println("Thread "+this.name+" start...");
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }
    }
}

Semaphore 源码分析

Semaphore 通过使用内部类Sync继承AQS来实现。
支持公平锁和非公平锁。内部使用的AQS的共享锁。
具体实现可参考 AbstractQueuedSynchronizer 源码分析

Semaphore 的结构如下:

Semaphore构造

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

构造方法指定信号量的许可数量,默认采用的是非公平锁,也只可以指定为公平锁。
permits赋值给AQS中的state变量。

acquire:可响应中断的获得信号量

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

获得信号量方法,这两个方法支持 Interrupt中断机制,可使用acquire() 方法每次获取一个信号量,也可以使用acquire(int permits) 方法获取指定数量的信号量 。

acquire:不可响应中断的获取信号量

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

这两个方法不响应Interrupt中断机制,其它功能同acquire方法机制。

tryAcquire 方法,尝试获得信号量

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

尝试获得信号量有三个方法。

  1. 尝试获取信号量,如果获取成功则返回true,否则马上返回false,不会阻塞当前线程。
  2. 尝试获取信号量,如果在指定的时间内获得信号量,则返回true,否则返回false
  3. 尝试获取指定数量的信号量,如果在指定的时间内获得信号量,则返回true,否则返回false。

release 释放信号量

public void release() {
    sync.releaseShared(1);
}

调用AQS中的releaseShared方法,使得state每次减一来控制信号量。

availablePermits方法,获取当前剩余的信号量数量

public int availablePermits() {
    return sync.getPermits();
}

//=========Sync类========
final int getPermits() {
    return getState();
 }

该方法返回AQS中state变量的值,当前剩余的信号量个数

drainPermits方法

public int drainPermits() {
    return sync.drainPermits();
}

//=========Sync类========
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

获取并返回立即可用的所有许可。Sync类的drainPermits方法,获取1个信号量后将可用的信号量个数置为0。例如总共有10个信号量,已经使用了5个,再调用drainPermits方法后,可以获得一个信号量,剩余4个信号量就消失了,总共可用的信号量就变成6个了。

reducePermits 方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

//=========Sync类========
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

该方法是protected 方法,减少信号量个数

判断AQS等待队列中是否还有Node

public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

//=========AbstractQueuedSynchronizer类========
public final boolean hasQueuedThreads() {
   //头结点不等于尾节点就说明链表中还有元素
   return head != tail;
}

getQueuedThreads方法

protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

//=========AbstractQueuedSynchronizer类========
public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
        Thread t = p.thread;
        if (t != null)
            list.add(t);
    }
    return list;
}

该方法获取AQS中等待队列中所有未获取信号量的线程相关的信息(等待获取信号量的线程相关信息)。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部