生产者-消费者模式的三种实现方式

原创
2016/08/28 11:31
阅读数 1.7K

1、查看程序中线程的名称与状态

	/**
	 * 4:Signal Dispatcher:RUNNABLE
	 * 3:Finalizer:WAITING
	 * 2:Reference Handler:WAITING
	 * 1:main:RUNNABLE
	 */
	@Test
	public void testName(){
		ThreadMXBean tb = ManagementFactory.getThreadMXBean();
		ThreadInfo[] infos = tb.dumpAllThreads(false, false);
		for(ThreadInfo info : infos){
			System.out.println(info.getThreadId()+":"
+info.getThreadName()+":"
+info.getThreadState());
		}
	}

2、等待通知机制

一个线程修改一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,这个过程始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者。

生产者-消费者模式隔离了“做什么”和“怎么做”,在功能层面上实现了解耦,因此具备很好的扩展伸缩能力。

 

3、等待通知基本步骤

A:等待方和通知方遵循如下原则

1)获取对象的锁(生产者和消费者必须是同一把锁)

2)如果条件不满足,则执行锁的wait方法,等待被notify,notify后仍要继续检查条件,所以是while

3)如果条件满足,则执行对应的逻辑以及执行锁的notify。

B:伪代码

synchronized(lock){  // 获取对象的锁
			while(list.size() - num < 0){ // 使用while做条件判断
				try {
					lock.wait(); // 条件不满足则等待
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			for(int i = 0; i < num ; i++){
				list.remove(0); // 改变条件
			}
			lock.notifyAll(); // 执行逻辑之后调用notify
		}

 

4、生产者-消费者模式的具体实现

生产者-消费者模式的核心在于容器的容量,如果容器已经满了则通知生产者不要生产;如果容器已经空了则通知消费者不要继续消费。因此容器的加减应该做同步,容器的容量作为通知的条件。这其实就是阻塞队列的实现。

1)使用Object的 wait 和 notify 实现

2)使用Lock的condition中 await 和 signal 实现

3)使用阻塞队列实现

使用Object的 wait 和 notify 实现 如下:

package more_service.base_info;

import java.util.ArrayList;
import java.util.List;

public class ThreadWaitNotifyTest {
	
	private static List<String> list = new ArrayList<String>();
	
	private static final int MAX = 15;
	
	private static Object lock = new Object();
	
	public static void main(String[] args) {
		Thread p1 = new Thread(new ProductThread(5));
		Thread p2 = new Thread(new ProductThread(5));
		Thread p3 = new Thread(new ProductThread(5));
		Thread p4 = new Thread(new ProductThread(5));
		
		Thread c1 = new Thread(new ConsumerThread(10));
		Thread c2 = new Thread(new ConsumerThread(10));
		
		c1.start();
		c2.start();
		p1.start();
		p2.start();
		p3.start();
		p4.start();
		
		
	}
	
	public static void product(int num){
		synchronized(lock){
			while(list.size() + num > MAX){
				try {
					lock.wait();
					System.out.println(Thread.currentThread().getName()+"---- 已经满了,不能继续生产");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			for(int i = 0; i < num ; i++){
				list.add(Thread.currentThread().getName()+":"+i);
				System.out.println(Thread.currentThread().getName()+":"+i);
			}
			lock.notifyAll();
		}
	}
	
	public static void consume(int num){
		synchronized(lock){
			while(list.size() - num < 0){
				try {
					lock.wait();
					System.out.println(Thread.currentThread().getName()+"---- 已经空了,不能继续消费了");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			for(int i = 0; i < num ; i++){
				System.out.println(Thread.currentThread().getName()+":"+list.remove(0));
			}
			lock.notifyAll();
		}
	}
	
	
	static class ProductThread implements Runnable{
		
		private int num;
		
		public ProductThread(int num){
			this.num = num;
		}
		
		public void run() {
			product(this.num);
		}
	}
	
	
	static class ConsumerThread implements Runnable{

		private int num;
		
		public ConsumerThread(int num){
			this.num = num;
		}
		
		public void run() {
			consume(this.num);
		}
		
	}
	

}

使用Lock的condition中 await 和 signal 实现如下:

/**
 * <p>项目名称:mvn
 * <p>Package名称:com.hnust.test
 * 文件名称:LockTest.java 
 * 版本:1.00 
 * 创建日期:2015年9月13日
 * Copyright©2014 HNUST .All Rights Reserved.
 */
package com.hnust.test;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
 
/**
 *@author:Heweipo
 *@version 1.00
 *
 */
public class LockTest {
     
    public static void main(String[] args) {
         
        Data data = new Data();
        MyTask1 t1 = new MyTask1(data);
        MyTask2 t2 = new MyTask2(data);
         
        t1.start();
        t2.start();
         
    }
}
 
class Data {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
     
    public Data(){
        System.out.println( c1 == c2);
        lock = new ReentrantReadWriteLock().readLock();
    }
     
    public int increase(){
        lock.lock();
        try{
            if(number != 0){
                try {
                    c1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            number++;
            c2.signal();
        }finally{
            lock.unlock();
        }
        return number;
    }
     
    public int decrease(){
        lock.lock();
        try{
            if(number != 1){
                try {
                    c2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            number--;
            c1.signal();
        }finally{
            lock.unlock();
        }
        return number;
    }
     
}
 
class MyTask1 extends Thread{
     
    private Data data;
     
    public MyTask1(Data data){
        this.data = data;
    }
     
    public void run() {
        for(int i = 0 ; i < 10 ; i ++){
            System.out.println("decrease:"+data.decrease());
        }
    }
     
}
 
class MyTask2 extends Thread{
     
    private Data data;
     
    public MyTask2(Data data){
        this.data = data;
    }
     
    public void run() {
        for(int i = 0 ; i < 10 ; i ++){
            System.out.println("increase:"+data.increase());
        }
    }
     
}

使用阻塞队列实现如下:

package more_service.base_info;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import more_service.base_info.ThreadWaitNotifyTest.ConsumerThread;
import more_service.base_info.ThreadWaitNotifyTest.ProductThread;

public class BlockingQueueTest {
	
	public static BlockingQueue<String> queue = new LinkedBlockingQueue(3);
	
	public static void main(String[] args) {
		Thread p1 = new Thread(new ProductThread("1"));
		Thread p2 = new Thread(new ProductThread("2"));
		Thread p3 = new Thread(new ProductThread("3"));
		Thread p4 = new Thread(new ProductThread("4"));
		
		Thread c1 = new Thread(new ConsumerThread());
		Thread c2 = new Thread(new ConsumerThread());
		Thread c3 = new Thread(new ConsumerThread());
		Thread c4 = new Thread(new ConsumerThread());
		
		c1.start();
		c2.start();
		c3.start();
		c4.start();
		p1.start();
		p2.start();
		p3.start();
		p4.start();
	}
	
	static class ProductThread implements Runnable{
		
		private String name;
		
		public ProductThread(String name){
			this.name = name;
		}
		
		public void run() {
			try {
				System.out.println("put:"+name);
				queue.put(name);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	
	static class ConsumerThread implements Runnable{
		public void run() {
			try {
				System.out.println("take:"+queue.take());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	

}

 

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部