生产者和消费问题

原创
2014/09/09 17:54
阅读数 240

       本文使用java语言借助java并发库去实现生产者和消费者问题。主要设计思路:1.物料池是共享容器;2.生产者只负责生产物料,添加到物料池中;3.消费者从池中获取物料。在这里使用ReenTranLock控制共享容器的同步,使用Conditona做线程间的通知,当物料池满的时候挂起生产者,并且唤醒消费者去消费池中物料,当池中无物料的时候,挂起消费者,唤醒生产者生产物料。

      在编码之前我需要先对生产者、消费者、物料池做一个简单的分析:

       1.消费者和生产者他们的任务都是单一的,消费者消费物料,生产者生产物料,消费者和生产者对外只需要记住是哪个物料池就行了。

       2.共享数据控制同步应该同一个类中完成,这样控制方便,而且简单。

       3.发出通知的应该是物料池。因为只有它自己知道自己的状态,消费者和生产者才不会关心它。冷暖自知!!

     在做了简单的分析之后,清楚了各个对象的功能。接下来就是设计了。涉及到具体的地方无会在代码中注释,就不在这干巴巴的说了。

     核心-物料池

package com.autonavi.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/***
 * 物料池
 * @author 零下三度
 *
 */
public class Pool {
	
	private ReentrantLock lock = new ReentrantLock();
	private Condition putCondition = lock.newCondition();
	private Condition getCondition = lock.newCondition();
	private Product[] productPool = new Product[10];
	private int size;//池中物料的数量
	private int currPutIndex;//当前添加物料的索引
	private int currGetIndex;//当前获取物料的索引
	private Pool(){
		size = 0;
		currPutIndex = 0;
		currGetIndex = 0;
	}
	
	public static Pool getPool(){
		return new Pool();
	}

	/**
	 * 生产者向物料池添加一个商品
	 * @param product
	 * @throws InterruptedException 
	 */
	public void put(Product product) throws InterruptedException{
		try{
			lock.lock();
			//如果物料池满了,则不再允许向物料池中添加物料。
			while(size == productPool.length){
				System.out.println("物料池已经满了,暂时不能添加产品了,请耐心等待.....当前池中物料为:"+size);
				putCondition.await();
			}
			productPool[currPutIndex] = product;
	        if(++currPutIndex == productPool.length){
	        	currPutIndex = 0;
	        }
			++size;
			//注意:由于终端是共享资源,放在此处才能看到真正的测试结果过
			System.out.println(product.toString()+"已经添加到物料池中,当前池中产品个数:"+size+",currPutIndex="+currPutIndex);
			//添加了物料,池中有可用的物料,通知消费者可以从池中获取物料
			getCondition.signal();
		}finally{
			lock.unlock();
		}
	}
	/***
	 * 消费者从物料池中获取一个商品。
	 * @return
	 * @throws InterruptedException 
	 */
	public Product get() throws InterruptedException{
		try{
			lock.lock();
			//如果池中没有物料,则禁止消费者从池中获取物料
			while(size == 0){
				System.out.println("目前没有物料,暂时无法获取产品,请耐心等待.....当前池中物料数量:"+size);
				getCondition.await();
			}
			Product p = productPool[currGetIndex];
			productPool[currGetIndex] = null;
			if(++currGetIndex == productPool.length){
				currGetIndex = 0;
			}
			--size;
			System.out.println("出库的是:"+p.toString()+"当前池中还有产品个数:"+size+",currGetIndex="+currGetIndex);
			putCondition.signal();
			return p;
		}finally{
			lock.unlock();
		}
	}
}

生产者:

package com.autonavi.pc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/***
 * 生产者
 * @author 零下三度
 *
 */
public class Producers implements Runnable{
	
	private AtomicLong id = new AtomicLong(0);
	private Pool pool;
	private String productName;
	
	public Producers(){
	}
	
	public Producers(Pool pool,String productName,AtomicLong id){
		this.id = id;
		this.pool = pool;
		this.productName = productName;
	}

	public Pool getPool() {
		return pool;
	}


	public void setPool(Pool pool) {
		this.pool = pool;
	}

	public String getProductName() {
		return productName;
	}


	public void setProductName(String productName) {
		this.productName = productName;
	}

	
	public AtomicLong getId() {
		return id;
	}

	
	public void setId(AtomicLong id) {
		this.id = id;
	}


	/***
	 * 生产者的工作就是生产商品,添加到物料吃中
	 * 至于什么时候停止,什么时候开始,是需要被人去给他消息的。
	 */
	public void run() {
		Product p;
		try {
			while(true){
				TimeUnit.MILLISECONDS.sleep(200);
				p = new Product(""+id.incrementAndGet(),productName);
				pool.put(p);
				
			} 
		}catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void start(){
		Thread t = new Thread(this);
		t.start();
	}
	

}

消费者:

package com.autonavi.pc;

import java.util.concurrent.TimeUnit;

/***
 * 消费者
 * @author 零下三度
 *
 */
public class Consumers implements Runnable{
	
	private Pool pool;
	
	

	public Pool getPool() {
		return pool;
	}



	public void setPool(Pool pool) {
		this.pool = pool;
	}



	/***
	 * 消费者的工作就是消费物料池中的商品
	 */
	public void run() {
		try {
			Product p;
			while(true){
				TimeUnit.MILLISECONDS.sleep(200);
				p = pool.get();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public void start(){
		Thread t = new Thread(this);
		t.start();
	}

}

 物料-产品:

package com.autonavi.pc;
/***
 * 商品
 * @author 零下三度
 *
 */
public class Product {

	private String id;
	private String name;
	
	public Product(){
		
	}
	
	public Product(String id, String name) {
		this.id = id;
		this.name = name;
	}

	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	
	
	@Override
	public String toString() {
		String val = "id:"+id+"\n"+"name:"+name+"\n";
		return val;
	}

	@Override
	public boolean equals(Object obj) {
		if(obj != null && obj instanceof Product){
			Product p = (Product)obj;
			if(this.id != null && this.id.equals(p.getId())){
				return true;
			}
		}
		return false;
	}

	@Override
	public int hashCode() {
		return id.hashCode();
	}
	
	
	
	
}

测试用例:

package com.autonavi.pc;

import java.util.concurrent.atomic.AtomicLong;

public class ProductsAndConsumersTest {

	public static void main(String[] args) throws InterruptedException {
		Pool pool = Pool.getPool();
		Producers p = new Producers(pool,"产品A",new AtomicLong(0));
		System.out.println("启动生产者");
		p.start();
		Consumers c = new Consumers();
		c.setPool(pool);
		System.out.println("启动消费者");
		c.start();

	}

}


展开阅读全文
打赏
0
9 收藏
分享
加载中
更多评论
打赏
0 评论
9 收藏
0
分享
返回顶部
顶部