手写AQS-非公平锁

原创
04/06 11:18
阅读数 74

1. Unsafe工具类

package com.shi.flink.unsafeTest;

import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
 * @author shiye
 * @create 2021-03-30 17:03
 */
public class UnsafeUtil {
    public static Unsafe getInstance() {
        Field field = null;
        try {
            field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return null;
    }
}

2. 手写AQS抽象类

package com.shi.flink.shilock;

import com.shi.flink.unsafeTest.UnsafeUtil;
import sun.misc.Unsafe;

import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;

/**
 * 自己写抽象AQS实现
 *
 * @author shiye
 * @create 2021-03-30 14:10
 */
public abstract class ShiAQS extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 7373984972572414691L;

    /**
     * 头指针
     */
    private transient volatile Node head;

    /**
     * 尾指针
     */
    private transient volatile Node tail;

    /**
     * 状态值:
     * 0:空闲,
     * 1:正在有人使用
     */
    private volatile int state;

    /**
     * 获取当前状态
     *
     * @return
     */
    protected final int getState() {
        return state;
    }

    /**
     * 设置当前锁的状态
     *
     * @param state
     */
    public void setState(int state) {
        this.state = state;
    }

    /**
     * 使用unsafe类来初始化一些参数值
     */
    private static final Unsafe unsafe = UnsafeUtil.getInstance();
    private static long stateOffset;
    private static long headOffset;
    private static long tailOffset;
    private static long waitStatusOffset;
    private static long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }

    /**
     * 设置状态
     *
     * @param expect
     * @param update
     * @return
     */
    protected boolean compareAndSetState(int expect, int update) {
        //读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * 设置头指针
     *
     * @param update
     * @return
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * 如果pre节点得waitStatus值为ws,
     * 则把signal赋值给waitStatus
     *
     * @param pre
     * @param ws
     * @param signal
     * @return
     */
    private static boolean compareAndSetWaitStatus(Node pre, int ws, int signal) {
        return unsafe.compareAndSwapInt(pre, waitStatusOffset, ws, signal);
    }

    /**
     * 设置尾指针
     *
     * @param expect
     * @param update
     * @return
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * 设置下一个节点
     *
     * @param node
     * @param expect
     * @param update
     * @return
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    /**
     * 解锁方法
     *
     * @param arg
     */
    protected void release(int arg) throws Exception {
        //尝试去释放占用锁得线程
        boolean tryRelease = tryRelease(arg);

        if (tryRelease) {
            Node h = head;
            if (h != null && h.waitStatus != 0) {
                unparkSuccessor(h);
            }
        }
    }

    /**
     * 尝试去释放占用锁得线程
     *
     * @param arg
     * @return
     * @throws Exception
     */
    protected boolean tryRelease(int arg) throws Exception {
        if (Thread.currentThread() != getExclusiveOwnerThread()) {
            //如果当前线程不是占用锁得线程就抛出异常
            throw new Exception("解锁失败,当前线程不是占用锁得线程无法解锁");
        } else {
            setExclusiveOwnerThread(null);
            this.setState(0);
            return true;
        }
    }

    /**
     * 打断某个线程
     *
     * @return
     */
    protected boolean interruptThread(Thread thread) throws Exception {
        Thread ownerThread = getExclusiveOwnerThread();
        if (ownerThread == thread) {
            //如果是正在运行得线程
            compareAndSetState(1, 0);
            setExclusiveOwnerThread(null);
        } else if (head != null) {
            //再对类中查找当前线程,并且取消排队
            for (Node next1 = head.next; next1 != null; next1 = next1.next) {
                if (next1.thread == thread) {
                    compareAndSetWaitStatus(next1, next1.waitStatus, 1);
                }
            }
        }
        //解锁
        thread.interrupt();
        System.out.println(thread.getName() + " 已经中断了 ====> ");
        unparkSuccessor(head);
        System.out.println(thread.getName() + " 已经结束了 ====> ");
        return false;
    }

    /**
     * 自定义一个内部类Node节点
     */
    static final class Node {

        //共享模式标记
        static final Node shared = new Node();

        //独占锁标记
        static final Node excusive = null;

        //waitStatus值,指示线程已取消
        static final int cancelled = 1;

        //waitStatus值,用于指示后续线程需要解除等待状态
        static final int signal = -1;

        //waitStatus值,指示线程正在等待条件
        static final int condition = -2;

        //waitStatus值,指示下一个acquireShared应该 无条件传播
        static final int propagate = -3;

        //锁的等待状态
        volatile int waitStatus;

        //前指针
        volatile Node prev;

        //后指针
        volatile Node next;

        //线程
        volatile Thread thread;

        Node nextWaiter;

        //是否是共享锁
        final boolean isShared() {
            return nextWaiter == shared;
        }

        //无参构造
        public Node() {
        }

        public Node(Node nextWaiter, Thread thread) {
            this.nextWaiter = nextWaiter;
            this.thread = thread;
        }

        /**
         * 获取前节点
         *
         * @return
         */
        public Node getPrev() {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException("前节点不能为空");
            }
            return p;
        }
    }


    /**
     * 获得
     *
     * @param arg
     */
    public void acquire(int arg) {
        //1.尝试去排队
        boolean tryAcquire = tryAcquire(arg);
        if (!tryAcquire) {
            //2.如果抢占锁失败,就去排队
            Node node = addWaiter(Node.excusive);

            //3.对已经再队列中的节点,进行休眠等侯
            acquireQueued(node, arg);
        }
    }

    /**
     * 先尝试去排队
     * 1.先获取锁得状态,如果状态为0,就尝试去占用一次锁
     * 否则返回占用失败
     *
     * @param arg
     * @return true:表示抢占锁成功
     * false:表示抢占所失败
     */
    public final boolean tryAcquire(int arg) {
        Thread current = Thread.currentThread();
        int state = getState();
        if (state == 0) {
            //如果空闲了,就尝试去占用一次锁
            if (compareAndSetState(0, arg)) {
                //抢占成功就返回true,并设置线程
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) {
            //如果当前当前线程多次抢占锁,就将状态+arg
            int nextState = state + arg;
            if (nextState < 0) {
                throw new Error("超过最大锁计数");
            }
            setState(nextState);
            return true;
        }
        return false;
    }

    /**
     * 添加等待队列
     *
     * @param mode
     */
    public Node addWaiter(Node mode) {
        Node node = new Node(mode, Thread.currentThread());
        Node temp = tail;
        if (temp == null) {
            //入队
            enQueue(node);
            return node;
        } else {
            //如果队列中不为空,就把当前节点添加到尾节点中
            node.prev = temp;
            if (compareAndSetTail(temp, node)) {
                temp.next = node;
                return node;
            }
        }
        return node;
    }

    /**
     * 入队
     * 把node节点添加到队列中,
     * 如果队列为null就初始化一个队列并且把node节点添加到尾节点中
     *
     * @param node 返回当前节点
     */
    public Node enQueue(Node node) {
        while (true) {
            Node temp = tail;
            if (temp == null) {
                //创建一个头指针
                compareAndSetHead(new Node());
                //让尾指针也指向头指针(空节点)
                tail = head;
            } else {
                node.prev = temp;
                if (compareAndSetTail(temp, node)) {
                    temp.next = node;
                    return node;
                }
            }
        }
    }

    /**
     * @param node 当前正在侯队中得节点
     * @param arg
     * @return
     */
    protected boolean acquireQueued(Node node, int arg) {
        boolean failed = true;

        try {
            //是否被打断,默认false
            boolean interrupted = false;
            while (true) {
                final Node p = node.getPrev();
                if (p == head && tryAcquire(arg)) {
                    //如果是他的头节点是head,并且尝试抢占锁成功就出队,让当前线程运行
                    setHead(node);
                    p.next = null;//利于gc回收
                    failed = false;
                    return interrupted;
                }

                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    //pre节点得waitStatus 设置成-1,并且让当前线程阻塞,打断当前线程
                    interrupted = true;
                }
            }

        } finally {
            if (failed) cancelAcquire(node);
        }
    }

    protected final void cancelAcquire(Node node) {
        if (node == null) return;

        node.thread = null;
        Node pre = node.prev;

        while (pre.waitStatus > 0) {
            node.prev = pre = pre.prev;
        }
        Node predNext = pre.next;
        node.waitStatus = Node.cancelled;

        if (node == tail && compareAndSetTail(node, pre)) {
            compareAndSetNext(pre, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pre != head &&
                    ((ws = pre.waitStatus) == Node.signal ||
                            (ws <= 0 && compareAndSetWaitStatus(pre, ws, Node.signal))) &&
                    pre.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pre, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }

    }

    /**
     * 解锁必须成功
     *
     * @param node
     */
    protected final void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }

        /**
         * AQS源码是这样实现得
         * 如果当前节点不为空,并且用户取消了,就从尾节点往前遍历一个,直到找到最前面得一个节点,解锁当前线程
         */
        Node next1 = node.next;
//        if (next1 == null || next1.waitStatus > 0) {
//            next1 = null;
//            for (Node t = tail; t != null && t != node; t = t.prev) {
//                if (t.waitStatus <= 0) {
//                    next1 = t;
//                }
//            }
//        }

        /**
         * 我自己实现,从前往后找
         */
        if (next1 != null && next1.waitStatus > 0) {
            for (next1 = next1.next; next1 != null; next1 = next1.next) {
                if (next1.waitStatus <= 0) {
                    break;
                }
            }
        }

        if (next1 != null) {
            //唤醒下一个线程
            System.out.println(next1.thread.getName() + " 开始唤醒了 ====> ");
            LockSupport.unpark(next1.thread);
            System.out.println(next1.thread.getName() + " 已经唤醒了 ====> ");

        }
    }

    /**
     * 将pre节点得waitStatus 设置成-1
     *
     * @param pre
     * @param node
     * @return
     */
    protected static boolean shouldParkAfterFailedAcquire(Node pre, Node node) {
        //获取node节点得前一个节点得状态
        int ws = pre.waitStatus;

        //如果是-1 就返回true
        if (ws == Node.signal) {
            return true;
        }

        if (ws > 0) {
            do {
                pre = pre.prev;
                node.prev = pre;
            } while (pre.waitStatus > 0);
            pre.next = node;
        } else {
            //设置成-1
            boolean flag = compareAndSetWaitStatus(pre, ws, Node.signal);
//            System.out.println("设置成-1是否成功:" + flag);
        }
        return false;
    }

    /**
     * 阻塞当前线程,并且返回当前线程得打断状态
     *
     * @return true: 打断线程成功
     */
    protected final boolean parkAndCheckInterrupt() {
        //打断线程,让线程阻塞
        LockSupport.park(this);
        return Thread.interrupted();
    }

    public Node getHead() {
        return head;
    }

    public void setHead(Node head) {
        this.head = head;
    }
}

3.非公平所实现

package com.shi.flink.shilock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自定义非公平锁
 * @author shiye
 * @create 2021-03-30 11:02
 */
public class ShiNonfairLock extends ShiAQS implements Lock, java.io.Serializable {

    private static final long serialVersionUID = 7373984872572414699L;

    @Override
    public void lock() {
        if(compareAndSetState(0, 1)){
            //如果抢到了锁,就把当前线程设置进去
            setExclusiveOwnerThread(Thread.currentThread());
        }else{
//            否则就去排队
            acquire(1);
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            super.release(1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }


    /**
     *  打断某个线程(自己瞎写的,有bug)
     * @param thread
     * @return
     */
    public boolean interruptThread(Thread thread) throws Exception {
        return super.interruptThread(thread);
    }
}

4.测试

package com.shi.flink.shilock;

import java.util.concurrent.TimeUnit;

/**
 * @author shiye
 * @create 2021-03-31 17:09
 */
public class MyLockTest {

    public static void main(String[] args) throws Exception {
        ShiNonfairLock lock = new ShiNonfairLock();

        new Thread(() -> {
            try {
                System.out.println("A 线程进入到...加锁过程");
                lock.lock();
                System.out.println("A 已经抢占到锁...休眠10s后运行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("A线程运行完成,开始解锁....");
                lock.unlock();
            }

        }, "A").start();


        TimeUnit.SECONDS.sleep(1);
        Thread B = new Thread(() -> {
            try {
                System.out.println("B 线程进入到...加锁过程");
                lock.lock();
                System.out.println("B 已经抢占到锁...休眠10s后运行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("B线程运行完成,开始解锁....");
                lock.unlock();
            }

        }, "B");
        B.start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            try {
                System.out.println("C 线程进入到...加锁过程");
                lock.lock();
                System.out.println("C 已经抢占到锁...休眠10s后运行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("C线程运行完成,开始解锁....");
                lock.unlock();
            }

        }, "C").start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            try {
                System.out.println("D 线程进入到...加锁过程");
                lock.lock();
                System.out.println("D 已经抢占到锁...休眠10s后运行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("D线程运行完成,开始解锁....");
                lock.unlock();
            }

        }, "D").start();

//        TimeUnit.SECONDS.sleep(1);
//        System.out.println("强制让 " + B.getName() + " 线程中断...");
//        lock.interruptThread(B);
    }
}

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部