关于Java中锁使用的一些问题

原创
01/21 19:07
阅读数 238

1. 开始

首先,我们来看一个比较长,容易劝退的例子,看看你能发现哪些问题,以及可以优化的地方。

如果没什么耐心,也可以跳过,代码实现的基本逻辑就是:统一向远程服务器发送日志数据。

import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class TaskHelper {

    private static SendDataTask sendDataTask;
    private static BlockingQueue<String> dataQueue = new LinkedBlockingQueue<>(100000);

    public static void main(String[] args) {
        sendDataTask = new SendDataTask();
        Thread thread = new Thread(sendDataTask, "SendDataTask");
        thread.setDaemon(true);
        thread.start();
    }

    public void submit(String data) {
        if (StringUtils.isNotBlank(data)) {
            dataQueue.add(data);
            sendDataTask.signalQueue();
        }
    }

    private static class SendDataTask implements Runnable {

        private ReentrantLock lock = new ReentrantLock();
        private Condition notEmpty = lock.newCondition();

        private int checkPeriod = 10 * 1000;
        private volatile boolean stop = false;

        @Override
        public void run() {
            while (!stop) {
                if (isEmpty()) {
                    awaitQueue();
                }
                try {
                    Thread.sleep(checkPeriod);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (stop) {
                    break;
                }
                check();
            }
        }

        public void check() {
            boolean timedOut = false;
            List<String> dataList = new ArrayList<>();
            for (; ; ) {
                if (timedOut) {
                    return;
                }
                try {
                    String data = dataQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (StringUtils.isBlank(data)) {
                        timedOut = true;
                        if (dataList.size() > 0) {
                            sendData(dataList);
                        }
                    } else {
                        dataList.add(data);
                        if (dataList.size() > 100) {
                            sendData(dataList);
                        }
                    }
                } catch (InterruptedException ignore) {
                }
            }
        }

        public void stop() {
            this.stop = true;
            //执行关闭逻辑
        }

        private boolean isEmpty() {
            return dataQueue.size() == 0;
        }

        private void awaitQueue() {
            boolean flag = lock.tryLock();
            if (flag) {
                try {
                    notEmpty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }

        public void signalQueue() {
            boolean flag = false;
            try {
                flag = lock.tryLock(100, TimeUnit.MILLISECONDS);
                if (flag)
                    notEmpty.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (flag)
                    lock.unlock();
            }
        }

        public void sendData(List<String> data) {
            System.out.println("数据处理业务逻辑");
        }
    }

}

刚刚开始,我也想为什么不用Executors创建ThreadPoolExecutor来处理?

我想大概是:

  1. 不能丢数据,如果并发高,不好处理线程池的拒绝逻辑。
  2. 程序关闭的时候,需要保存未发送数据,避免丢数据
  3. 需要重试机制,处理网络异常情况

重试部分的逻辑我省略了,有兴趣可以了解一下下面这个库:

<dependency>
    <groupId>com.github.lowzj</groupId>
    <artifactId>java-retrying</artifactId>
    <version>1.2</version>
</dependency>

但是真的有必要即使用LinkedBlockingQueue,有使用Lock吗?

这个问题,我们放在后面解决,我们先来看点相关知识。

2. synchronized与Lock

synchronized与Lock最主要的区别是:

  1. synchronized锁是非公平的,Lock锁是否公平可以设置
  2. synchronized锁是不可中断的,Lock锁是否响应中断可以设置
  3. synchronized没有获取锁超时机制,Lock锁可以设置锁超时时间

其实,JDK对synchronized通过偏向锁(CAS线程ID)、轻量级锁(自旋)等方式优化,其实性能并不比Lock差,具体可以看看后面synchronized。

个人觉得,除非你需要设置公平锁、需要锁响应中断,需要获取锁超时、需要读写分离锁等,其他时候尽量使用synchronized。

因为性能问题,考虑使用Lock大多数时候是不值得的,除非的确对性能有极端要求,并且做了大量对比测试。

因为Lock使用的方式相对复杂,要考虑做更多测试,否则,一个Bug带来的问题绝对比可能的一点点性能降低带来的危害要大。

3. Lock与Condition

synchronized通过Object的wait、notify、notifyAll实现等待通知机制,Lock则需要Condition配合。

看一个简单的例子:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockCondition {

    private final Lock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    private final Task[] tasks = new Task[100];
    private int addIndex;
    private int getIndex;
    private int count;

    public void addTask(Task task) throws InterruptedException {
        lock.lock();
        try {
            // 注意使用的是while,不是if
            while (count == tasks.length) {//如果队列满了
                notFull.await();//notFull条件不满足了,需要等待
            }
            tasks[addIndex++] = task;
            if (addIndex == tasks.length){
                addIndex = 0;
            }
            count++;
            notEmpty.signal();//添加数据之后,非空条件满足了,就可以唤醒等在这个条件上的线程了
        } finally {
            lock.unlock();
        }
    }

    public Task getTask() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {//如果队列为空
                notEmpty.await();//队列为空,非空条件不满足了,需要等待
            }
            Task task = tasks[getIndex++];
            if (getIndex == tasks.length) {
                getIndex = 0;
            }
            count--;
            notFull.signal();//数据被取出之后,notFull条件满足了,需要唤醒等待notFull条件的线程
            return task;
        } finally {
            lock.unlock();
        }
    }

    private static class Task{

    }
}

这个例子虽然非常粗糙,但是基本包含使用Lock和Condition的要素了:

  1. Condition是通过Lock获取
  2. await放在while循环中,而不是if
  3. await、signal都必须先获取到锁lock之后执行
  4. 在finally中释放锁:lock.unlock()
  5. 条件不满足的时候执行await
  6. 条件满足之后执行signal

想要更好的知道Lock的使用方式,JDK的BlockingQueue的相关实现类绝对是最好的教程实例,读读这些类的源码吧,例如LinkedBlockingQueue。

4. LinkedBlockingQueue主要方法

添加元素方法 说明
add 向队列添加元素,如果队列满,则抛出异常
put 向队列添加元素,阻塞到添加成功
offer 向队列添加元素,没有添加成功返回false,可以设置一个添加超时时间
获取元素方法 说明
remove 获取并删除队列头部元素,如果队列为空,则抛出异常
take 从队列添获取元素,一直等待到获取成功
poll 从队列添获取元素,没有返回null,可以设置一个获取超时时间
peek 从队列添获取元素,没有返回null
drainTo 从队列添获取元素,一次可以获取多个,可以设置最大获取个数

JDK中的其他阻塞队列方法逻辑大多类似,基本就是分:

  1. 是否抛出异常(add remove)
  2. 阻塞(put take)
  3. 超时还是直接返回(offer poll)

5. 关于synchronized

5.1 示例说明

public class Synchronize {

    private Object lock = new Object();

    public static synchronized void doSomethingA(){

    }

    public static void doSomethingB(){
        synchronized (Synchronize.class){

        }
    }

    public synchronized void doSomethingC(){

    }

    public void doSomethingD(){
        synchronized (this){

        }
    }

    public void doSomethingE(){
        synchronized (lock){
            
        }
    }
}

如果是静态方法,锁对象就是类的class,也就是下面2个方法加锁方式是等价的:

public static synchronized void doSomethingA(){

}

public static void doSomethingB(){
    synchronized (Synchronize.class){

    }
}

如果是实例方法,所对象是实例本身,也就是下面2个方法加锁方式是基本等价的:

public synchronized void doSomethingC(){

}

public void doSomethingD(){
    synchronized (this){

    }
}

静态方法和实例方法使用的是不同的锁,所以他们之间不会相互影响。

如果某些方法需要减小锁的粒度,可以new一个新对象做为锁对象,来减少锁的争用。

5.2 锁升级

之所以有锁升级过程是因为JDK对synchronized关键字做了优化。

无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

每个Java对象有一个对象头,以32位为例。

无锁时候:

对象的HashCode 分代年龄 是否偏向锁 锁标志位
25位 4位 1位 2位(01)

偏向锁:

线程ID Epoch(偏向锁时间) 分代年龄 是否偏向锁 锁标志位
23位 2位 4位 1位 2位(01)

因为锁不仅不存在多线程竞争,而且总是由同一线程多次获得,偏向锁通过对线程ID的CAS操作来避免同步。

可以通过XX:-UseBiasedLocking=false设置偏向锁开启与关闭,可以通过XX:BiasedLockingStartupDelay=0参数设置偏向锁是否延迟开启,0表示关闭延迟开启。

轻量级锁:

轻量级锁考虑的是因为,大多数时候一个线程持有锁的时间非常短,在没有获取到锁的时候通过短暂的自旋来避免上下文切换开销。

自旋的逻辑大概就是通过while(true)空等,会消耗CPU。

举个生活中的例子,就像烧开水,开始等的时候发现水没有开,就直接回去了,后来你知道水马上要开了,再去检查的时候,你决定等几秒钟,以免刚回去可能还没坐下水就开了。

可以通过-XX:+UseSpinning参数来设置是否启用锁自旋,使用-XX:PreBlockSpin参数设置自旋次数

重量级锁:

重量级的锁就是synchronize最开始的实现方式,对象内部的监视器锁monitor,依赖于底层的操作系统的Mutex Lock实现。

6. 关于Java对象头

这里不细介绍了,可以使用open jdk的jol查看:

<dependency>
    <groupId>org.openjdk.jol</groupId>
    <artifactId>jol-core</artifactId>
    <version>0.14</version>
</dependency>
import org.junit.Test;
import org.openjdk.jol.info.ClassLayout;
import org.openjdk.jol.info.GraphLayout;

public class JOLTest {

    private static class User{
        private Integer id;
        private String name;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public synchronized void doSomething(){
            System.out.println(ClassLayout.parseInstance(this).toPrintable());
            System.out.println("#################");
        }
    }

    @Test
    public void test(){
        User user = new User();
        user.setId(1);
        user.setName("tim");
        //查看对象内部信息
        System.out.println(ClassLayout.parseInstance(user).toPrintable());
        System.out.println("-------------");
        //查看对象外部信息
        System.out.println(GraphLayout.parseInstance(user).toPrintable());
        System.out.println("-------------");
        //查看对象占用空间总大小
        System.out.println(GraphLayout.parseInstance(user).totalSize());
        System.out.println("-------------");
        user.doSomething();
    }
}

可以使用-XX:+UseCompressedOops参数对比一下指针是否压缩的差别。

7. 回到开始

回到我们最开始的程序,看看有哪些问题?

其实,最重要的就2个问题:

  1. LinkedBlockingQueue队列已经实现了锁相关的逻辑,我们自己使用的时候没必要在加一次锁
  2. 拿着锁sleep,Thread.sleep()

其实,JDK已经为我们提供了足够多的工具,很多时候我们没有必要写太多相对底层的代码。

下面,提供一个简化版的示例,仅供参考:

import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class MyTaskHelper {

    private static final BlockingQueue<String> DATA_QUEUE = new LinkedBlockingQueue<>(1000);

    private static volatile boolean stop = false;

    private static Thread thread;

    static {
        start();
    }

    public static void addTask(String task) throws InterruptedException {
        DATA_QUEUE.put(task);
    }

    public static synchronized void start(){
        if(stop){
            if(thread == null || !thread.isAlive()) {
                thread = new Thread(new SendDataTask(), "MyTaskHelper");
                thread.setDaemon(true);
                thread.start();
            }
            stop = false;
        }
    }

    public static void stop(){
        stop = true;
        System.out.println("处理未发送数据逻辑");
    }

    private static class SendDataTask implements Runnable {

        @Override
        public void run() {
            LinkedList<String> datas = new LinkedList<>();
            while (!stop){
                datas.clear();
                DATA_QUEUE.drainTo(datas,100);
                System.out.println("发送日志数据逻辑 datas");
            }
        }
    }
}

8. 资料参考

OpenJDK对象头

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部