实现ArrayBlockingQueue-阻塞队列

原创
2019/04/15 16:11
阅读数 88

一:基本特性

二:实现原理

三:实现代码

一:基本特性

    先进先出、写入队列空间不足时会阻塞、获取队列数据是队列为空会阻塞

二:实现原理

    初始化队列、写入队列、消费队列

初始化队列

private String[] items = new String[5];
private Object full = new Object();
private Object empty = new Object();
private static int count;
private static int putIndex = 0;
private static int getIndex = 0;

写入队列

/**
	 * 写入锁put
	 */
	public void put(String t) {
		synchronized (full) {
			while (count == items.length) {
				try {
					full.wait();
				} catch (InterruptedException e) {
					break;
				}
			}
		}
		synchronized (empty) {
			items[putIndex] = t;
			putIndex++;

			count++;
			if (putIndex == items.length) {
				putIndex = 0;
			}
			empty.notify();
		}

	}

消费队列

/**
	 * 消费锁
	 * @return
	 */
	public String get() {
		synchronized (empty) {
			while (count == 0) {
				try {
					empty.wait();
				} catch (InterruptedException e) {
					return null;
				}
			}
		}

		synchronized (full) {
			Object result = items[getIndex];
			items[getIndex] = null;

			getIndex++;
			count--;
			if (getIndex == items.length) {
				getIndex = 0;
			}

			full.notify();
			return (String) result;
		}
	}

这样就可以实现一个队列先进先出的功能。

原理很简单:写入队列,在写入队列是先判断队列中元素是否已满,如果满则进入等待状态;消费队列,在消费前先看下队列中是否有元素,如果有就消费,没有进入等待。和消费者生产者模式类似。

三:实现代码:

我们了解了基本的原理,现在看下jdk是如何实现的

package com.block;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 重写ArrayBlockingQueue<E>
 * @author l
 *
 * @param <E>
 */
public class ArrayQueueryTest3<E> {

	private final E[] items;//数组
	private final ReentrantLock lock;//锁
	private final Condition notEmpty;//
	private final Condition notFull;//
	private int count;//总数
	private int takeIndex;//
	private int putIndex;//
	
	
	public ArrayQueueryTest3(int capacity){
		this(capacity,false);
	}
	
	public ArrayQueueryTest3(int capacity,boolean fair){
		if(capacity <= 0)
			throw new IllegalArgumentException();
		this.items = (E[]) new Object[capacity];
		lock = new ReentrantLock(fair);
		notEmpty = lock.newCondition();
		notFull = lock.newCondition();
	}
	
	public void put(E e) throws InterruptedException{
		if(e == null) throw new NullPointerException();
		final E[] items = this.items;
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		
		try {
			try {
				while(count == items.length){
					notFull.await();//等待
				}
			} catch (Exception e2) {
				notFull.signal();
				throw e2;
			}
			insert(e);//插入队列
		} finally{
			lock.unlock();
		}
	}
	private void insert(E x){
		items[putIndex] = x;
		putIndex = inc(putIndex);
		++count;
		notEmpty.signal();
	}
	
	public E take() throws InterruptedException {
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try{
			try{
				while(count == 0)
					notEmpty.await();//等待
			}catch(InterruptedException ie){
				notEmpty.signal();
				throw ie;
			}
			E x = extract();
			return x;
		}finally{
			lock.unlock();
		}
	}
	
	public E poll(){
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			if(count == 0)
				return null;
			E x = extract();
			return x;
		} finally {
			lock.unlock();
		}
	}
	
	private E extract(){
		final E[] items = this.items;
		E x = items[takeIndex];
		items[takeIndex] = null;
		takeIndex = inc(takeIndex);
		--count;
		notFull.signal();
		return x;
	}
	
	final int inc(int i){
		return (++i == items.length)? 0: i;
	}
	
	private boolean isEmpty(){
		final ReentrantLock lock = this.lock;
		lock.lock();
		try{
			return items.length == 0;
		}finally{
			lock.unlock();
		}
	}
	
	public int size(){
		final ReentrantLock lock = this.lock;
		lock.lock();
		try{
			return count;
		}finally{
			lock.unlock();
		}
	}
	
	public E poll(long timeout, TimeUnit unit) throws Exception{
		long nanos = unit.toNanos(timeout);
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			for (;;) {
				if(count != 0){
					E x = extract();
					return x;
				}
				if(nanos <= 0)
					return null;
				try {
					nanos = notEmpty.awaitNanos(nanos);//等待时长
				} catch (Exception e) {
					notEmpty.signal();
					throw e;
				}
			}
		} finally {
			lock.unlock();
		}
	}
	
	public static void main(String[] args) throws Exception {
		ArrayQueueryTest3<String> at = new ArrayQueueryTest3<>(4, false);
		at.put("123");
		at.put("123");
		at.put("123");
		System.out.println(at.size());
		while(!at.isEmpty())
		//如果生产者没有数据,则会一直进入等待状态,即使生产出数据,依旧会处在等待状态
			System.out.println(at.take());
		//每隔3s会进行一次get即使没有也会去get,当生产数据后可以依旧可以正常执行
			System.out.println(at.poll(3000,TimeUnit.MILLISECONDS));
		
	}
}

 

展开阅读全文
JDK
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部