java多线程Master-Work模式的实现

原创
2018/08/17 11:19
阅读数 856

    关于多线程一直都不是很懂,看了很多教程可还是一头雾水。但master-work这种模式在多线程中很常用,索性把看的教程代码写成文章。方便自己以后在工作中遇到了进行copy。

    一、创建类

    1,需要执行的任务类

package com.dgw.thread18;

public class Task {
	private String taskId;//任务id
	private String taskName;//任务名称
	
	public String getTaskId() {
		return taskId;
	}
	public void setTaskId(String taskId) {
		this.taskId = taskId;
	}
	public String getTaskName() {
		return taskName;
	}
	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}
	
}

    2,work类(执行子任务的类)

package com.dgw.thread18;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class WorkerThread implements Runnable{

	private ConcurrentLinkedQueue<Task> workQueue;//用来承装任务的集合
	private ConcurrentHashMap<String, Object> resultMap;//用一个容器承装每一个worker执行任务的结果集
	
    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
		this.workQueue=workQueue;
	}

	public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
		this.resultMap=resultMap;
	}
	
	@Override
	public void run() {
		while(true){
			Task task = this.workQueue.poll();
			if(task==null) break;
			//正真的去做业务处理
			Object output=handle(task);
			System.out.println(Thread.currentThread().getName()+"线程 ,执行"+output);
			this.resultMap.put(task.getTaskId(), output);
		}
	}

	//处理任务
	private Object handle(Task task) {
		Object output=null;
		try {
			//表示处理任务的耗时
			Thread.sleep(500);
			output=task.getTaskName();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return output;
	}

	

}

    3,master类(执行主任务的类)

package com.dgw.thread18;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class MasterThread {
	
	//1,用来承装任务的集合
	private ConcurrentLinkedQueue<Task>  workQueue=new ConcurrentLinkedQueue<>();
	
	//2,用HashMap承装所有的work对象
	private Map<String,Thread> workers=new HashMap<>();
	
	//3,用一个容器承装每一个worker执行任务的结果集
	private ConcurrentHashMap<String, Object> resultMap=new ConcurrentHashMap<>();
	
	//4,构造方法
	public MasterThread(WorkerThread worker,int workerCount){
		
		//每一个worker对象都需要有Master的引用 ,workQueue用于任务领取,resultMap用于结果集提交
		worker.setWorkerQueue(this.workQueue);
		worker.setResultMap(this.resultMap);
		
		for (int i = 0; i < workerCount; i++) {
			//key表示每一个worker的名字,value表示形成执行对象
			workers.put("子节点"+i, new Thread(worker));
		}
	}
	
	//5,提交方法
	public void submit(Task task){
		workQueue.add(task);
	}
	
	//6,需要一个执行的方法,启动应用程序让所有的work工作
	public void execute(){
		for(Map.Entry<String, Thread> me:workers.entrySet()){
			me.getValue().start();
		}
	}
	
	//7,判断线程是否执行完毕
	public boolean isComplete() {
		for(Map.Entry<String, Thread> me:workers.entrySet()){
			if(me.getValue().getState() !=Thread.State.TERMINATED){
				return false;
			}
		}
		return true;
	}

	//8,返回结果集
	public int getResult() {
		int ret=0;
		for(Map.Entry<String, Object> me:resultMap.entrySet()){
			//汇总
			ret+=1;
		}
		return ret;
	}

	public void love() {
		long start=System.currentTimeMillis();
		while(true){
			if(this.isComplete()){
				long end=System.currentTimeMillis()-start;
				int result = this.getResult();
				System.out.println("最终结果:"+result+",最终耗时:"+end);
				break;
			}
		}
	}
	
}

    4,测试类

package com.dgw.thread18;

public class TestMasterWorker {
	
	public static void main(String[] args) {
		
		MasterThread masterThread = new MasterThread(new WorkerThread(), Runtime.getRuntime().availableProcessors());
		//添加100个任务
		for(int i=0;i<20;i++){
			Task task= new Task();
			task.setTaskId(""+i);
			task.setTaskName("任务"+i);
			masterThread.submit(task);
		}
		
		//执行任务
		masterThread.execute();
		
		masterThread.love();
	}
}

    二、要点说明和执行流程分析

        1,执行流程

            在主程序中 MasterThread masterThread = new MasterThread(new WorkerThread(), Runtime.getRuntime().availableProcessors());会调用MasterThread 类的构造方法,将new WorkerThread()传入。Runtime.getRuntime().availableProcessors()为java虚拟机可争夺到的cpu数,即本程序中创建的线程数。在构造方法中往WorkerThread中添加MasterThread 类中的任务集合和处理结果集合。根据可用cpu数创建承装线程的集合,在测试类(主线程)中添加20个任务,并通过MasterThread类中的submit()方法添加到workQueue中。在测试类(主线程)中通过调WorkerThread中的execute()方法启动每一个线程。在每个线程中去执行任务方法,并返回执行结果。

     2,要点说明

        在本程序中虽然只有20个任务,但可以创建多个线程21个或30个(但任务数小于线程数时,有的线程执行不到任务就结束了)

        在本程序中只会创建一个WorkerThread对象,多个线程持有WorkerThread对象。

 

    三、执行结果输出

    

Thread-2线程 ,执行任务1
Thread-0线程 ,执行任务0
Thread-3线程 ,执行任务3
Thread-1线程 ,执行任务2
Thread-1线程 ,执行任务7
Thread-0线程 ,执行任务5
Thread-2线程 ,执行任务4
Thread-3线程 ,执行任务6
Thread-2线程 ,执行任务10
Thread-1线程 ,执行任务8
Thread-3线程 ,执行任务11
Thread-0线程 ,执行任务9
Thread-2线程 ,执行任务12
Thread-3线程 ,执行任务14
Thread-0线程 ,执行任务15
Thread-1线程 ,执行任务13
Thread-1线程 ,执行任务19
Thread-2线程 ,执行任务16
Thread-3线程 ,执行任务17
Thread-0线程 ,执行任务18
最终结果:20,最终耗时:2500

 

    

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部