Fork/Join 框架(一):创建Fork/Join

原创
2015/11/02 00:04
阅读数 264

说到Fork/Join 框架 ,得不提起执行器框架(Executor Framework),它将任务的创建和执行进行了分离,通过Executor Framework只需要实现Runnable接口的对象和使用Executor对象,然后将Runnable 对象发送个执行器。执行器在负责运行这些任务所需要的线程,包括线程的创建,线程的管理以及线程的结束。

java 7 又更近了一步,它包括了ExecutorService 接口的另一种实现,用来解决特殊类型的问题。就是Fork/Join 框架,有时也称为”分解/合并框架“。

Fork/Join 框架,是用来解决能够通过分治技术,将问题拆分为小任务的问题。它和执行器框架的只要区别在于工作窃取算法。

下面实现简单的例子,我们实现一项更新产品价格的任务。最初的任务将负责更新列表中的所有的元素的价格。如果一个任务需要更新大于10个元素,将分为两个部分去执行。然后再去更新各自部分的产品价格。

1、创建一个产品类,用来存储产品的名称和价格
package five2;
/**
 * 
 * @author qpx
 *
 */
public class Product {
	
	private String name;
	private double price;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	
	

}

  2、创建生成一个随机产品列表的类

package five2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 随机产品列表
 * @author qpx
 *
 */
public class productListGenetator {
	
	public List<Product> genetate(int size){
		
		List<Product> products = new ArrayList<>();
		for(int i = 0 ;i<size;i++){
			Product product = new Product();
			product.setName("Product"+i);
			product.setPrice(10);
			products.add(product);
			
			
		}
		return products;
		
	}
	
	public static void main(String[] args) {
		List<Integer> aaa = new ArrayList<Integer>();
		aaa.add(1);
		aaa.add(0, 2);
		
		System.out.println(Arrays.toString(aaa.toArray()));
	}

}

 3、创建一个Task 的类,继承RecursiveAction 类,这个是主任务类

package five2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveAction {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private List<Product> products;

	private int first;
	private int last;

	private double increment;

	public Task(List<Product> products, int first, int last, double increment) {
		super();
		this.products = products;
		this.first = first;
		this.last = last;
		this.increment = increment;
	}

	@Override
	protected void compute() {
		// TODO Auto-generated method stub‘
		if (this.last - this.first < 10) {

			updateprices();
		} else {
			int middle = (last + first) / 2+1;
			//System.out.println("middle:"+middle);
			System.out.printf(Thread.currentThread().getName()+"  Task:Pending tasks:%s\n", getQueuedTaskCount());
			Task t1 = new Task(products, first, (middle), increment);
			//System.out.println("t1:first:"+first+",last:"+middle);
			Task t2 = new Task(products, middle, last, increment);
			//System.out.println("t2:first:"+middle+",last:"+last);
			invokeAll(t1, t2);

		}

	}

	private void updateprices() {
		// TODO Auto-generated method stub
		for (int i = first; i < last; i++) {
			Product product = products.get(i);
			product.setPrice(product.getPrice() * (1 + increment));
			System.out.println(Thread.currentThread().getName() + "的i值:"    
                    + i);    

		}
	}

	public static void main(String[] args) throws InterruptedException {
			productListGenetator a = new productListGenetator();
			List<Product> products = a.genetate(1000);
			
			Task task = new Task(products,0,products.size(),0.20);
			
			//ForkJoinPool pool = new ForkJoinPool(10);
		        ForkJoinPool pool = new ForkJoinPool();

			
			pool.submit(task);
			
			
			do{
				System.out.printf("Main: 线程数量:%d\n",pool.getActiveThreadCount());
				
				System.out.printf("Main: Thread 窃取数量:%d\n",pool.getStealCount());
				
				System.out.printf("Main: Thread 平行数量:%d\n",pool.getParallelism());
				
				
				TimeUnit.MILLISECONDS.sleep(5);

				
				
			}while(!task.isDone());
			pool.shutdown();
			
			
			if(task.isCompletedNormally()){
				System.out.printf("Main: The process has completed normally.\n");
				
				
			}
			
			for(int i = 0;i<products.size();i++){
				
				Product p = products.get(i);
				if(p.getPrice()!=12){
					
					System.out.printf("Product %s:%f\n",p.getName(),p.getName());
				}
				
				
			}
			
			System.out.printf("Main:End of the Program.\n");
	}
}

 注意:我们采用了无参的构造方式创建了

ForkJoinPool pool = new ForkJoinPool(); 他将执行默认的配置,创建一个线程书等于计算机CPU数目的线程池。

   另外ForkJionPool 类还提供了以下方法用于执行任务。

   

execute(Runnabletask) 注意的是使用Runnable对象时,ForkJionPool 不会采用工作窃取算法。仅仅实用ForkJoinTask类的时候采用工作窃取算法
invoke(ForkJoinTask<T> list)  execute方法是异步调用的,此方法是同步调用的



展开阅读全文
加载中

作者的其它热门文章

打赏
0
8 收藏
分享
打赏
0 评论
8 收藏
0
分享
返回顶部
顶部