关于ReadWriteLock的一些使用
博客专区 > snecker 的博客 > 博客详情
关于ReadWriteLock的一些使用
snecker 发表于3年前
关于ReadWriteLock的一些使用
  • 发表于 3年前
  • 阅读 106
  • 收藏 11
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

最近遇到一个业务场景

  1. 有三个耗时线程。
  2. 线程A、线程B相互独立,并且会周期性执行sql,通过时间 增量获取新数据
  3. 线程C也是增量获取数据,并且操作依赖于线程A和线程B产生的数据; 可以简单理解为:独立同步A表、B表,同步C表的时候需要依赖A、B表的数据,也可以把C表看成A、B表的关联关系表。

如何让这三条线程相互协作呢?

最开始的简单想法是:

1.直接把三条线程的方法体抽出来,合并成一个单线程执行,即先执行A,再执行B(或者2个线程执行A,B)之后执行C线程方法体。

问题:如果是耗时操作,那么有可能出现等到我执行C线程的时候,A或者B数据源有更新,当然C执行的时候是获取不到A或B的最新数据的,那样就有可能导致丢失有效关联关系。而且单线程也无法提升整体性能。

2.利用A、B数据源的最后更新时间,对比C表最后更新时间,取最小值,然后与上次更新时间形成时间差,再同步这个时间差内产生的数据。 缺点:需要准确维护最后更新日期,若丢失时间则需要重新计算

3.利用读写锁来控制同步。A、B线程写数据的时候用writeLock锁住,C中对A、B引用块加读锁,同步用writeLock.condition的 await()、signalAll()方法来控制

重点说下第三种 先让code来说话吧

package com.xx;

/**
 * Created on 2015/8/13.
 */

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestCondition2 {

    /**
     * 场景1:
     * 线程C开始同步,发现依赖的线程A或B数据不全,然后循环等待
     * 场景2:
     * A或B在同步的时候,禁止线程C同步。
     *
     * @param args
     */

    public static void main(String[] args) {
        ReadWriteLock orgReadWriteLock = new ReentrantReadWriteLock();//默认非公平锁
        Lock orgReadLock = orgReadWriteLock.readLock();
        Lock orgWriteLock = orgReadWriteLock.writeLock();
        Condition orgHasDataCond = orgReadWriteLock.writeLock().newCondition();//写锁支持condition,readLock().newCondition() throws UnsupportedOperationException.

        ReadWriteLock invRWLock = new ReentrantReadWriteLock();
        Lock invReadLock = invRWLock.readLock();
        Lock invWriteLock = invRWLock.writeLock();
        Condition invCondition = invWriteLock.newCondition();

        ExecutorService executor = Executors.newFixedThreadPool(4);
        Map mapA = new HashMap<>();
        Map mapB = new HashMap<>();
        Random r = new Random();
        //线程1:专门写A数据
        //线程2:专门写B数据,和A数据是独立的
        //线程3:依赖线程1和线程2
        //1
        Runnable r1 = () -> {
            while (true) {
                orgWriteLock.lock();
                try {
                    mapA.put("A", r.nextInt(10));
                    //signal
                    orgHasDataCond.signalAll();
                } finally {
                    orgWriteLock.unlock();
                }
                System.out.println("线程A等待2s。。。" + mapA.get("A"));
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //2
        Runnable r2 = () -> {
            while (true) {
                invWriteLock.lock();
                try {
                    mapB.put("B", r.nextInt(20));
                    invCondition.signalAll();
                } finally {
                    invWriteLock.unlock();
                }
                System.out.println("线程B等待1s。。。" + mapB.get("B"));
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable r3 = () -> {
            while (true) {
                try {
                    //拿到线程A的值
                    Object a = null;
                    //依赖A
                    try {
                        orgReadLock.lock();
                        a = mapA.get("A");
                        System.out.println("线程3取到A的值:" + a);
                    } finally {
                        //unlock
                        orgReadLock.unlock();
                    }

                    try {
                        orgWriteLock.lock();
                        //线程3要求mapA小于5,否则一直等下去
                        while ((int) (a = mapA.get("A")) < 5) {
                            System.out.println("======线程3取到A的值小于5,等待。。。当前a=" + a);
                            orgHasDataCond.await();
                        }
                        System.out.println("【SUCCESSFUL】==========线程3终于取到了A:" + a);
                    } finally {
                        orgWriteLock.unlock();
                    }
                    //依赖B
                    Object b = null;
                    try {
                        invReadLock.lock();
                        b = mapB.get("B");
                        System.out.println("线程3取到B的值:" + b);
                    } finally {
                        //unlock
                        invReadLock.unlock();
                    }

                    try {
                        invWriteLock.lock();
                        //线程3要求mapA小于5,否则一直等下去
                        while ((int) (b = mapB.get("B")) < 15) {
                            System.out.println("======线程3取到B的值小于15,等待大于15。。。当前B=" + b);
                            invCondition.await();
                        }
                        System.out.println("【SUCCESSFUL】==========线程3终于取到了B:" + b);
                    } finally {
                        invWriteLock.unlock();
                    }
//                    orgReadLock.lock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //
        executor.execute(r1);
        executor.execute(r2);
        executor.execute(r3);
        executor.shutdown();
    }
}

运行结果

线程B等待1s。。。13
线程A等待2s。。。7
【SUCCESSFUL】==========线程3终于取到了A:7
线程3取到B的值:13
======线程3取到B的值小于15,等待大于15。。。当前B=13
======线程3取到B的值小于15,等待大于15。。。当前B=8
线程B等待1s。。。8
线程B等待1s。。。5
======线程3取到B的值小于15,等待大于15。。。当前B=5
线程A等待2s。。。1
线程B等待1s。。。3
======线程3取到B的值小于15,等待大于15。。。当前B=3
======线程3取到B的值小于15,等待大于15。。。当前B=1
线程B等待1s。。。1
线程A等待2s。。。4
线程B等待1s。。。7
======线程3取到B的值小于15,等待大于15。。。当前B=7
【SUCCESSFUL】==========线程3终于取到了B:15
线程B等待1s。。。15
线程A等待2s。。。5
线程B等待1s。。。1
线程B等待1s。。。7
线程A等待2s。。。1
线程B等待1s。。。17
线程B等待1s。。。16
线程A等待2s。。。4
线程3取到A的值:4
======线程3取到A的值小于5,等待。。。当前a=4

需要注意的是:condition如果和writelock绑定的话需要在writelock持有的情况下调用,否则抛出IllegalMonitorException。 引用官方一句

[IllegalMonitorException]Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

标签: java readwritelock
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 1
博文 38
码字总数 9666
×
snecker
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: