Thread.AQS
Thread.AQS
脑丨残 发表于5个月前
Thread.AQS
  • 发表于 5个月前
  • 阅读 5
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云实验室 1小时搭建人工智能应用,让技术更容易入门 免费体验 >>>   

AQS

  • AbstractQueuedSynchronizer队列同步器,Lock接口实现的核心,可自定义同步器。
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
  • CLH锁实现图,将每个线程构造成Node节点,加入链表,新加入线程在列表队列最后,每次头结点获取到所,后续节点继续获取锁。
  • AQS暴露操作方法,隐藏实现细节,集成AQS,重写获取锁方法,可自定义同步锁。
  • 内部通过voliate修饰的int变量,unsafe方法提供的compareAndSet方法提供同步,逐步包装成同步队列,同步锁。
  • 原理总结,用AQS的volatile的同步特性,设置int锁数量,unsafe对象的CAS同步设置方法,构造同步队列,最后unsafe的park,unpark直接操作线程挂起释放。
static final class Node {
       
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
 
        static final int SIGNAL    = -1;
     
        static final int CONDITION = -2;
  
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    
        Node nextWaiter;
 
    }
  • 以Lock的实现Reentranlock为例
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
    //实现AQS重写锁
    //Lock的所有操作,通过代理内部类调用AQS,先看非公平锁
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        abstract void lock();
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        //2. sync的非公平实现,快速调用父类AQS的CAS设置state是,如果失败,进入acquire构造Node节点,加入CLK
        //
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        
        //3. AQS的acruird,tryAcquire交给子类自定实现,addWaiter加入CLK节点,acquireQueued,所有节点自旋,获取同步状态
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    //1. 外部调用Lock
    public void lock() {
        sync.lock();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
}

    //4. AQS中的实现,tryAcquire交给子类
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
      //5. ReentranLock里Sync非公平锁实现
      final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取AQS中state
            int c = getState();
            if (c == 0) {
                //AQS的CAS同步设置state,并设置持有线程
                if (compareAndSetState(0, acquires)) {
                    //设置持有线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //可重入关键,同一线程可重复获取锁
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    //6. 加入CLK链表尾部,通过AQS,CAS设置tail节点,构造链表见自定容器Stack
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //快速设置tail,如果失败进入enq
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    //死循环设置尾节点,直到成功,以死循环的方式,同步设置tail节点
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    //7. 最后,每个构造的节点,都以自旋的方式,获取前一节点,如果前一节点是头节点,并且获取到锁,把当前自己设置成头结点,获的锁。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //parkAndCheckInterrupt 调用LockSupport挂起线程,进入阻塞状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //unlock,调用AQS释放
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            //LockSupport的unparking,释放线程挂起状态    
            unparkSuccessor(h);
            return true;
        }
        return false;
    }
     //1. 交给子类实现
     protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
标签: thread
共有 人打赏支持
粉丝 8
博文 55
码字总数 19033
×
脑丨残
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: