文档章节

并发编程专题三-线程的并发工具类

DrakKing
 DrakKing
发布于 04/14 13:12
字数 1746
阅读 259
收藏 11

一、Fork-Join框架

1、分而治之

规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

动态规范

2、工作密取 workStealing

就是在任务分割的时候,前面的任务执行可能会比后面的执行速度快,当前面的执行完,后面的还没执行的时候,执行完前面的任务的线程不会停止,而是从后面的任务的尾部取出子任务继续工作。Fork-Join就是实现了这样的机制。

Fork/Join使用的标准范式

代码如下:


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class SumArray {

    private static class SumTask extends RecursiveTask<Integer>{

        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
        private int[] src; //表示我们要实际统计的数组
        private int fromIndex;//开始统计的下标
        private int toIndex;//统计到哪里结束的下标

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

		@Override
		protected Integer compute() {
			if(toIndex-fromIndex < THRESHOLD) {
				int count = 0;
				for(int i=fromIndex;i<=toIndex;i++) {
			    	//SleepTools.ms(1);
			    	count = count + src[i];
				}
				return count;
			}else {
				//fromIndex....mid....toIndex
				//1...................70....100
				int mid = (fromIndex+toIndex)/2;
				SumTask left = new SumTask(src,fromIndex,mid);
				SumTask right = new SumTask(src,mid+1,toIndex);
				invokeAll(left,right);
				return left.join()+right.join();
			}
		}
    }


    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();

        SumTask innerFind = new SumTask(src,0,src.length-1);

        long start = System.currentTimeMillis();

        pool.invoke(innerFind);//同步调用
        System.out.println("Task is Running.....");

        System.out.println("The count is "+innerFind.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }
}

public class MakeArray {
    //数组长度
    public static final int ARRAY_LENGTH  = 100000000;

    public static int[] makeArray() {

        //new一个随机数发生器
        Random r = new Random();
        int[] result = new int[ARRAY_LENGTH];
        for(int i=0;i<ARRAY_LENGTH;i++){
            //用随机数填充数组
            result[i] =  r.nextInt(ARRAY_LENGTH*3);
        }
        return result;

    }
}

二、常用的并发工具类

1、CountDownLatch

作用:是一组线程等待其他的线程完成工作以后在执行,加强版join

await用来等待,countDown负责计数器的减一

代码示例:


import java.util.concurrent.CountDownLatch;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class UseCountDownLatch {
	
	static CountDownLatch latch = new CountDownLatch(6);

	//初始化线程(只有一步,有4个)
    private static class InitThread implements Runnable{

        @Override
        public void run() {
        	System.out.println("Thread_"+Thread.currentThread().getId()
        			+" ready init work......");
        	latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;
            for(int i =0;i<2;i++) {
            	System.out.println("Thread_"+Thread.currentThread().getId()
            			+" ........continue do its work");
            }
        }
    }
    
    //业务线程
    private static class BusiThread implements Runnable{

        @Override
        public void run() {
        	try {
				latch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
            for(int i =0;i<3;i++) {
            	System.out.println("BusiThread_"+Thread.currentThread().getId()
            			+" do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
    	//单独的初始化线程,初始化分为2步,需要扣减两次
        new Thread(new Runnable() {
            @Override
            public void run() {
            	SleepTools.ms(1); //休眠1s
                System.out.println("Thread_"+Thread.currentThread().getId()
            			+" ready init work step 1st......");
                latch.countDown();//每完成一步初始化工作,扣减一次
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
            			+" ready init work step 2nd......");
                latch.countDown();//每完成一步初始化工作,扣减一次
            }
        }).start();
        new Thread(new BusiThread()).start();
        for(int i=0;i<=3;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }

        latch.await();
        System.out.println("Main do ites work........");
    }
}

2、CyclicBarrier

让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)

CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行

代码示例:

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class UseCyclicBarrier {
	
	private static CyclicBarrier barrier 
		= new CyclicBarrier(5,new CollectThread());
	
    private static ConcurrentHashMap<String,Long> resultMap
            = new ConcurrentHashMap<>();//存放子线程工作结果的容器

    public static void main(String[] args) {
        for(int i=0;i<=4;i++){
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }

    //负责屏障开放以后的工作
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
            	result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result);
            System.out.println("do other business........");
        }
    }

    //工作线程
    private static class SubThread implements Runnable{

        @Override
        public void run() {
        	long id = Thread.currentThread().getId();//线程本身的处理结果
            resultMap.put(Thread.currentThread().getId()+"",id);
            Random r = new Random();//随机决定工作线程的是否睡眠
            try {
                if(r.nextBoolean()) {
                	Thread.sleep(2000+id);
                	System.out.println("Thread_"+id+" ....do something ");
                }
                System.out.println(id+"....is await");
                barrier.await();
            	Thread.sleep(1000+id);
                System.out.println("Thread_"+id+" ....do its business ");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

CountDownLatch和CyclicBarrier辨析

1、countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
2、countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数

3、Semaphore

控制同时访问某个特定资源的线程数量,用在流量控制


import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class DBPoolSemaphore {
	
	private final static int POOL_SIZE = 10;
	private final Semaphore useful,useless;//useful表示可用的数据库连接,useless表示已用的数据库连接
	
	public DBPoolSemaphore() {
		this. useful = new Semaphore(POOL_SIZE);
		this.useless = new Semaphore(0);
	}
	
	//存放数据库连接的容器
	private static LinkedList<Connection> pool = new LinkedList<Connection>();
	//初始化池
	static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
	}

	/*归还连接*/
	public void returnConnect(Connection connection) throws InterruptedException {
		if(connection!=null) {
			System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
					+"可用连接数:"+useful.availablePermits());
			useless.acquire();
			synchronized (pool) {
				pool.addLast(connection);
			}	
			useful.release();
		}
	}
	
	/*从池子拿连接*/
	public Connection takeConnect() throws InterruptedException {
		useful.acquire();
		Connection conn;
		synchronized (pool) {
			conn = pool.removeFirst();
		}
		useless.release();
		return conn;
	}
	
}

4、Exchange

两个线程间的数据交换, 用的比较少

代码示例:

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class UseExchange {
    private static final Exchanger<Set<String>> exchange 
    	= new Exchanger<Set<String>>();

    public static void main(String[] args) {

    	//第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
            	Set<String> setA = new HashSet<String>();//存放数据的容器
                try {
                	/*添加数据
                	 * set.add(.....)
                	 * */
                	setA = exchange.exchange(setA);//交换set
                	/*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

      //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
            	Set<String> setB = new HashSet<String>();//存放数据的容器
                try {
                	/*添加数据
                	 * set.add(.....)
                	 * set.add(.....)
                	 * */
                	setB = exchange.exchange(setB);//交换set
                	/*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

    }
}

5、Callable、Future和FutureTask

类之间的关系

isDone,结束,正常还是异常结束,或者自己取消,返回true;

isCancelled 任务完成前被取消,返回true;

cancel(boolean):

  1. 任务还没开始,返回false
  2. 任务已经启动,cancel(true),中断正在运行的任务,中断成功,返回true,cancel(false),不会去中断已经运行的任务
  3. 任务已经结束,返回false

代码示例:



import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;


/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class UseFuture {
	
	/*实现Callable接口,允许有返回值*/
	private static class UseCallable implements Callable<Integer>{

		private int sum;
		@Override
		public Integer call() throws Exception {
			System.out.println("Callable子线程开始计算");
			Thread.sleep(2000);
			for(int i=0;i<5000;i++) {
				sum = sum+i;
			}
			System.out.println("Callable子线程计算完成,结果="+sum);
			return sum;
		}

	}
	
	public static void main(String[] args) 
			throws InterruptedException, ExecutionException {
		
		UseCallable useCallable = new UseCallable();
		FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);
		new Thread(futureTask).start();
		Random r = new Random();
		SleepTools.second(1);
		if(r.nextBoolean()) {//随机决定是获得结果还是终止任务
			System.out.println("Get UseCallable result = "+futureTask.get());
		}else {
			System.out.println("中断计算");
			futureTask.cancel(true);
		}
		
	}

}

场景举例:包含图片和文字的文档的处理:图片(云上),可以用future去取图片,主线程继续解析文字。

© 著作权归作者所有

DrakKing
粉丝 5
博文 6
码字总数 12357
作品 0
杭州
技术主管
私信 提问
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
【Java并发专题】27篇文章详细总结Java并发基础知识

努力的意义,就是,在以后的日子里,放眼望去全是自己喜欢的人和事! github:https://github.com/CL0610/Java-concurrency,欢迎题issue和Pull request。所有的文档都是自己亲自码的,如果觉...

你听___
2018/05/06
0
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
JAVA并发编程JUC基础学习(简介)

之前写过一篇并发编程的简单实例应用,Future快速实现并发编程,可以很快的在自己的项目中应用,但并不系统,之前说过总结一篇(或者一系列)java.util.concurrent 这个并发编程工具类的学习...

小海bug
02/22
0
0
牛逼哄哄的Dubbo框架,底层到底是什么原理?

搞了N年Java,仍有不少朋友困惑:用了很多年Dubbo,觉得自己挺厉害,跳槽面试时一问RPC,一问底层通讯,一问NIO和AIO,就一脸懵逼,到底该怎么办? (大家有没有这样的感触?Dubbo用得很熟,...

Java猫
03/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周二乱弹 —— 吾不好梦中插人

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @鱼豆腐233 :#今日歌曲分享# 分享My Chemical Romance的单曲《I Don't Love You》: 《I Don't Love You》- My Chemical Romance 手机党少年们...

小小编辑
52分钟前
20
5
ss5 vpn 安装(linux版本)

1. 创建一个文件夹 /ss5 你也可以自定义,不过后续的地方需要注意自己的地址 2. 下载ss5文件(如果你的服务器没有安装wget请使用 yum -y install wget 命令安装 如果连yum都没安装自己查去)(下...

太黑_thj
今天
2
0
八、RabbitMQ的集群原理

集群架构 写在前面 RabbitMQ集群是按照低延迟环境设计的,千万不要跨越WAN或者互联网来搭建RabbitMQ集群。如果一定要在高延迟环境下使用RabbitMQ集群,可以参考使用Shovel和Federation工具。...

XuePeng77
今天
5
0
mac系统下,brew 安装mysql,用终端可以连接,navicat却连接不上?

问题: 1.报错? 2059 - Authentication plugin 'caching_sha2_password' cannot be loaded: dlopen(../Frameworks/caching_sha2_password.so, 2): image not found 2.自己通过设置,已经把密......

写bug的攻城狮
昨天
3
0
老生常谈,HashMap的死循环

问题 最近的几次面试中,我都问了是否了解HashMap在并发使用时可能发生死循环,导致cpu100%,结果让我很意外,都表示不知道有这样的问题,让我意外的是面试者的工作年限都不短。 由于HashMap...

群星纪元
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部