文档章节

LockSupport实现线程挂起和唤醒——深入浅出原码分析

须臾之余
 须臾之余
发布于 07/18 15:48
字数 3031
阅读 48
收藏 0

面试题

(1)LockSupport比Object的wait/notify有两大优势,分别是什么?

(2)LockSupport源码是如何实现的,具体说说你的看法?

(1)LockSupport比Object的wait/notify有两大优势,分别是什么?

LockSupport是Java6引入的一个工具类,它简单灵活,应用广泛。

1.简单

俗话说,没有比较就没有伤害。这里咱们还是通过对比来介绍LockSupport的简单。

在没有LockSupport之前,线程的挂起和唤醒咱们都是通过Object的wait和notify/notifyAll方法实现。

写一段例子代码,线程A执行一段业务逻辑后调用wait阻塞住自己。主线程调用notify方法唤醒线程A,线程A然后打印自己执行的结果。

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                try {
                    obj.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒钟,保证线程A已经计算完成,阻塞在wait方法
        Thread.sleep(1000);
        obj.notify();
    }
}

执行这段代码,不难发现这个错误

java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at com.aaa.TestObjWait$1.run(TestObjWait.java:15)
    at java.lang.Thread.run(Thread.java:748)
45
Exception in thread "main" java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)
    at com.aaa.TestObjWait.main(TestObjWait.java:25)

原因很简单,wait和notify/notifyAll方法只能在同步代码块里用。所以将代码修改如下就可以正常运行

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                try {
                    synchronized (obj){
                        obj.wait();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒钟,保证线程A已经计算完成,阻塞在wait方法
        Thread.sleep(1000);
        synchronized (obj){
            obj.notify();
        }
    }
}

那如果咱们换成LockSupport呢?简单得很,看代码:

public class TestLo {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) { //——步骤二
                sum += i;
            }
            //——这里会阻塞
            LockSupport.park(); //——步骤四
            System.out.println(sum); //——步骤五
        });
        thread.start(); //——步骤一
        Thread.sleep(1000);
        LockSupport.unpark(thread); //——步骤三
    }
}

直接调用即可,没有说非得在同步代码块里才能用,非常easy

2.灵活性

如果只是LockSupport在使用起来比Object的wait/notify简单,那还真没必要专门讲解下LockSupport。最主要的是灵活性。

上边的例子代码中,主线程调用了Thread.sleep(1000)方法来等待线程A计算完成进入wait状态。如果去掉Thread.sleep()调用,代码如下:

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                try {
                    synchronized (obj){
                        obj.wait();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒钟,保证线程A已经计算完成,阻塞在wait方法
        //Thread.sleep(1000);
        synchronized (obj){
            obj.notify();
        }
    }
}

多运行几次上面的代码,有时候能够正常打印结果并退出程序,但有时候线程无法打印结果阻塞了。

原因在于:主线程调用完notify后,线程A才进入wait方法,导致线程A一直阻塞了。由于线程A不是后台线程,所以整个程序无法退出。

那如果换成LockSupport呢?

LockSupport就支持主线程先调用unpark后,线程A再调用parl而不被阻塞吗?是的,没错,代码如下

public class TestObjWait {

    public static void main(String[] args)throws Exception {
        final Object obj = new Object();
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for(int i=0;i<10;i++){
                    sum+=i;
                }
                LockSupport.park();
                System.out.println(sum);
            }
        });
        A.start();
        //睡眠一秒钟,保证线程A已经计算完成,阻塞在wait方法
        //Thread.sleep(1000);
        LockSupport.unpark(A);
    }
}

不管你执行多少次,这段代码都能正常打印结果并退出。这就是LockSupport最大的灵活所在。

总结一下,LockSupport比Object的wait/notify有两大优势

①LockSupport不需要在同步代码块里。所以线程间也不需要维护一个共享的同步对象了,实现了线程间的解耦。

②unpark函数可以优先于park调用,所以不需要担心线程间的执行先后顺序。

应用广泛

LockSupport在Java的工具类用应用很广泛,咱们这里找几个例子感受感受。以Java里最常用的类ThreadPoolExecutor为例。先看如下代码:

public class Test001 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor
                (5, 5, 1000, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(1000));
        Future<String> future = poolExecutor.submit(new Callable<String>() {
            @Override
            public String call() throws InterruptedException {
                TimeUnit.SECONDS.sleep(5);
                return "hello";
            }
        });
        String result = future.get();
        System.out.println(result);
    }
}代码中,我们向线程池中

代码中我们向线程池中扔了一个任务,然后调用Future的get方法,同步阻塞等待线程池的执行结果。

这里就要问了:get方法是如何组塞住当前线程?线程池执行完任务后又是如何唤醒线程的呢?

咱们跟着源码一步步分析,先看线程池的submit方法的实现:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

在submit方法里,线程池将我们提交的基于Callable实现的任务,封装为基于RunnableFuture实现的任务,然后将任务提交到线程池执行,并向当前线程返回RunnableFutrue。

进入newTaskFor方法,就一句话:return new FutureTask<T>(callable);

所以,咱们主线程调用future的get方法就是FutureTask的get方法,线程池执行的任务对象也是FutureTask的实例。

接下来看看FutureTask的get方法的实现:

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

比较简单,就是判断下当前任务是否执行完毕,如果执行完毕直接返回任务结果,否则进入awaitDone方法阻塞等待。

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); //——使用CAS非阻塞算法,将线程封装为WaitNode,保存下来,以供后续唤醒线程时使用
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);//——调用了LockSupport的park/parkNanos阻塞住当前线程。
        }
        else
            LockSupport.park(this);
    }
}

上边已经说了阻塞等待任务结果的逻辑,接下来再看看线程池执行完任务,唤醒等待线程的逻辑实现。

前边说了,咱们提交的基于Callable实现的任务,已经被封装为FutureTask任务提交给了线程池执行,任务的执行就是FutureTask的run方法执行。如下是FutureTask的run方法:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); //——执行我们提交的任务
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result); //任务执行完后调用set方法,唤醒工作线程的工作应该就是在这里了
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion(); //——进入这里看如何实现的
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //——先通过CAS非阻塞算法将所有等待的线程拿出来
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); //——然后再使用LockSupport的unpark唤醒每个线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

在使用线程池的过程中,不知道你有没有这么一个疑问:线程池里没有任务时,线程池里的线程在干嘛呢?

通过这篇文章《线程池的工作原理与源码解读》的读者一定知道,线程会调用队列的take方法阻塞等待新任务。那队列的take方法是不是也跟Future的get方法实现一样呢?咱们来看看源码实现。

以ArrayBlockingQueue为例,take方法实现如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); //——使用Lock的Condition的await方法实现线程阻塞,进入里面看看
        return dequeue();
    } finally {
        lock.unlock();
    }
}
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); //——发现await方法里还是使用LockSupport.park方法阻塞自己
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
public static void park(Object blocker) {
    Thread t = Thread.currentThread();//——获取调用线程
    setBlocker(t, blocker); //——设置该线程的blocker变量
    UNSAFE.park(false, 0L); //——挂起线程
    setBlocker(t, null); //——线程被激活后,清除blocker变量,因为一般都是在线程阻塞时才分析原因
}

Thread类里面有个变量volatile Object parkBlocker,用来存放park方法传递的block对象,也就是把block变量存放到了调用park方法的线程的成员变量里面。

(2)LockSupport源码是如何实现的,具体说说你的看法?

学习要知其然,还要知其所以然。接下来不妨看看LockSupport的实现。

进入LockSupport的park方法,可以发现它是调用了Unsafe的park方法,这是一个本地native方法,只能通过openjdk的源码看看其本地实现了。 

它调用了线程的Parker类型对象的park方法,如下是Parker类的定义:

类中定义了一个int类型的_counter变量,咱们上文中讲灵活性的那一节说,可以先执行unpark后执行park,

就是通过这个变量实现,看park方法的实现代码(由于方法比较长就不整体截图了):

park方法会调用Atomic::xchg方法,这个方法会原子性的将_counter赋值为0,并返回赋值前的值。如果调用park方法前,

_counter大于0,则说明之前调用过unpark方法,所以park方法直接返回。

接着往下看:

实际上Parker类用Posix的mutex,condition来实现的阻塞唤醒。如果对mutex和condition不熟,可以简单理解

为mutex就是Java里的synchronized,condition就是Object里的wait/notify操作。

park方法里调用pthread_mutex_trylock方法,就相当于Java线程进入Java的同步代码块,然后再次判断_counter

是否大于零,如果大于零则将_counter设置为零。最后调用pthread_mutex_unlock解锁,相当于Java执行完退出

同步代码块。如果_counter不大于零,则继续往下执行pthread_cond_wait方法,实现当前线程的阻塞。

最后再看看unpark方法的实现吧,这块就简单多了,直接上代码:

图中的1和4就相当于Java的进入synchronized和退出synchronized的加锁解锁操作,代码2将_counter设置为1,

同时判断先前_counter的值是否小于1,即这段代码:if(s<1)。如果不小于1,则就不会有线程被park,所以方法

直接执行完毕,否则就会执行代码3,来唤醒被阻塞的线程。

总结

通过阅读LockSupport的本地实现,我们不难发现这么个问题:多次调用unpark方法和调用一次unpark方法效果一样,

因为都是直接将_counter赋值为1,而不是加1。简单说就是:线程A连续调用两次LockSupport.unpark(B)方法唤醒线程B,

然后线程B调用两次LockSupport.park()方法, 线程B依旧会被阻塞。因为两次unpark调用效果跟一次调用一样,

只能让线程B的第一次调用park方法不被阻塞,第二次调用依旧会阻塞。

参考书籍

Java并发编程之美

参考链接

https://www.cnblogs.com/qingquanzi/p/8228422.html

 

© 著作权归作者所有

须臾之余
粉丝 125
博文 68
码字总数 178724
作品 0
吉安
程序员
私信 提问
JUC锁框架——LockSupport应用以及源码分析

LockSupport用法 在没有LockSupport之前,线程的挂起和唤醒咱们都是通过Object的wait和notify/notifyAll方法实现,而Object的wait和notify/notifyAll方法只能在同步代码块里用。而LockSuppo...

长头发-dawn
2018/09/11
61
0
concurrent包的同步控制工具

ReentrantLock 可重入锁,应用层面的锁,jdk6后性能和synchronized差不多,但是有更多的功能。 可以响应线程中断: 可以限时,这样可以避免死锁 ReentrantLock可以实现公平锁: 通过Conditi...

肥肥小浣熊
2017/11/06
0
0
阻塞和唤醒线程——LockSupport功能简介及原理浅析

1.LockSupport功能简介 在java并发包下各种同步组件的底层实现中,LockSupport的身影处处可见。JDK中的定义为用来创建锁和其他同步类的线程阻塞原语。 我们可以使用它来阻塞和唤醒线程,功能和...

takumiCX
2018/07/18
0
0
Java并发编程之Unsafe类和LockSupport类源码分析

一.Unsafe类的源码分析 JDK的rt.jar包中的Unsafe类提供了硬件级别的原子操作,Unsafe里面的方法都是native方法,通过使用JNI的方式来访问本地C++实现库。 rt.jar 中 Unsafe 类主要函数讲解,...

狂小白
2018/06/06
0
0
浅谈Java并发编程系列(九)—— AQS结构及原理分析

AQS介绍 AQS,即AbstractQueuedSynchronizer, 队列同步器,它是Java并发用来构建锁和其他同步组件的基础框架。来看下同步组件对AQS的使用: AQS是一个抽象类,主是是以继承的方式使用。AQS本...

codershamo
2017/02/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MySQL左连接问题,右表做筛选,左表列依然在

两张表,一张user表,一张user_log表 CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFA......

bengozhong
16分钟前
4
0
重新开始学Java——多线程基础

多线程 进程 主流计算机操作系统都支持同时运行多个任务 , 每个任务通常就是一个程序 , 每个运行中的程序就是一个进程或者多个进程 。 进程的特点 独立性 进程是系统中独立存在的实体 可以...

大家都是低调来的
17分钟前
2
0
注解在Java中是如何工作的?

> 来一点咖啡,准备好进入注解的世界。 注解一直是 Java 的一个非常重要的部分,它从 J2SE 5.0 开始就已经存在了。在我们的应用程序代码中,经常看到 @Override 和 @Deprecated 这样的注解。...

liululee
20分钟前
5
0
Docker 容器连接

Docker 容器连接 容器间的链接有两种方法,你选择其一即可 网络端口映射 docker run -d -P docker run -d -p-P :是容器内部端口随机映射到主机的高端口。-p : 是容器内部端口绑定到指定...

测者陈磊
23分钟前
4
0
车载导航应用中基于Sketch UI主题定制方案的实现

1.导读 关于应用的主题定制,相信大家或多或少都有接触,基本上,实现思路可以分为两类: 内置主题(应用内自定义style) 外部加载方式(资源apk形式、压缩资源、插件等) 其实,针对不同的主题...

阿里云官方博客
28分钟前
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部