文档章节

04.JUC 集合 - LinkedBlockingQueue

夕阳晒裤衩
 夕阳晒裤衩
发布于 2017/02/23 16:53
字数 959
阅读 10
收藏 0
点赞 0
评论 0

##基本概念

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列

LinkedBlockingQueue 按照先进先出的原则对元素进行排序。

LinkedBlockingQueue 采用了双锁、双条件队列来提高读写效率。


##内部构造

LinkedBlockingQueue 内部维护着一个单向链表,且链表头节点的值永远为空。如下图所示:

输入图片说明

下面来看它的构成:

  • Node ,节点
static class Node<E> {
	E item;

	Node<E> next;

	Node(E x) {
		item = x;
	}
}
  • 构造函数
private transient Node<E> head;
private transient Node<E> last;
private final int capacity;

public LinkedBlockingQueue() {
	// 该队列是有界的,若不指定容量,默认为最大值
	this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
	if (capacity <= 0){
		// 抛出异常...
	}

	this.capacity = capacity;
	last = head = new Node<E>(null);
}

##双锁机制

由于采用了双锁机制,因此它的出队、入队操作可以同时进行,从而提高效率。

// 用于入队操作
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

// 用于出队操作
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

由于出入队可以同时进行,因此必须避免冲突问题。

  • 同一元素操作冲突:由于 LinkedBlockingQueue 采用了 FIFO(先进先出)的原则,实际上是双端操作,不会存在冲突。并且在其内部定义了两个变量:head、last。
// 出队修改头节点
private transient Node<E> head;

// 入队修改尾节点
private transient Node<E> last;
  • 元素数量操作冲突:因为入队数量要+1,出队数量要-1,因此需要保证它的可见性,这里采用了这里采用原子类来实现:
private final AtomicInteger count = new AtomicInteger(0);

##入队操作

  • offer,该操作成功返回 true,失败返回 false。
public boolean offer(E e) {
	if (e == null){
		// 抛出异常...
	}

	// 判断元素的个数是否超过容量4
	final AtomicInteger count = this.count;
	if (count.get() == capacity){
		return false;
	}
	
	int c = -1;
	
	Node<E> node = new Node(e);

	// 加锁
	final ReentrantLock putLock = this.putLock;
	putLock.lock();
	
	try {
		// 再次判断,存在等待获取锁期间,其他线程执行入队操作。
		if (count.get() < capacity) {
			// 入队操作
			enqueue(node);

			// 注意:数量+1,返回的旧值
			c = count.getAndIncrement();
			if (c + 1 < capacity){
				// 队列未满,唤醒因为队列满而阻塞的线程
				notFull.signal();
			}
		}
	} finally {
		putLock.unlock();
	}

	// 为 0 表示之前队列是空的,唤醒出队时因为空队列而进入 notEmpty 条件等待队列的线程
	if (c == 0){
		signalNotEmpty();
	}

	return c >= 0;
}
  • put,该操作成功返回 true,失败则进入阻塞。
 private final AtomicInteger count = new AtomicInteger(0);

public void put(E e) throws InterruptedException {

	if (e == null){
		// 抛出异常...
	}

	int c = -1;
	Node<E> node = new Node(e);
	final ReentrantLock putLock = this.putLock;
	final AtomicInteger count = this.count;
	putLock.lockInterruptibly();

	try {
		
		// 满队列,进入条件等待队列,线程阻塞
		while (count.get() == capacity) {
			notFull.await();
		}
		
		// 关键-> 入队操作
		enqueue(node);
		
		c = count.getAndIncrement();
		if (c + 1 < capacity){
			notFull.signal();
		}
		
	} finally {
		putLock.unlock();
	}
	
	if (c == 0){
		signalNotEmpty();
	}
}
  • 关键
private void enqueue(Node<E> node) {
	last = last.next = node;
}

private void signalNotEmpty() {
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lock();
	try {
		notEmpty.signal();
	} finally {
		takeLock.unlock();
	}
}
  • 入队操作的过程如下所示:

输入图片说明


##出队操作

  • poll,成功返回被移除的元素,失败返回 null。
public E poll() {
	final AtomicInteger count = this.count;
	if (count.get() == 0){
		return null;
	}

	E x = null;
	int c = -1;

	// 加锁
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lock();
	
	try {
		if (count.get() > 0) {
			// 关键 -> 出队
			x = dequeue();
			c = count.getAndDecrement();
			if (c > 1){
				notEmpty.signal();
			}
		}
	} finally {
		takeLock.unlock();
	}

	// 表示出队前满队列,唤醒因入队时满队列而进入 notFull 条件等待队列的线程。
	if (c == capacity){
		signalNotFull();
	}
	return x;
}
  • take,成功返回被移除的元素,失败则线程阻塞。
public E take() throws InterruptedException {
	E x;
	int c = -1;
	final AtomicInteger count = this.count;
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lockInterruptibly();
	try {
		// 空队列,进入条件等待队列,线程阻塞
		while (count.get() == 0) {
			notEmpty.await();
		}
		x = dequeue();
		c = count.getAndDecrement();
		if (c > 1){
			notEmpty.signal();
		}
			
	} finally {
		takeLock.unlock();
	}

	if (c == capacity){
		signalNotFull();
	}
	
	return x;
}
  • 关键
private E dequeue() {
	// 头节点、以及它的后继节点
	Node<E> h = head;
	Node<E> first = h.next;

	// 等于 h.next = null,即断开后指针
	h.next = h; 

	// 设置新的头节点
	head = first;

	E x = first.item;

	// 将节点的值置空
	first.item = null;
	return x;
}

private void signalNotFull() {
	final ReentrantLock putLock = this.putLock;
	putLock.lock();
	try {
		notFull.signal();
	} finally {
		putLock.unlock();
	}
}
  • 出队过程如下图所示:

输入图片说明


© 著作权归作者所有

共有 人打赏支持
夕阳晒裤衩
粉丝 3
博文 15
码字总数 18485
作品 0
天津
程序员
Java集合--阻塞队列(LinkedBlockingQueue)

1. LinkedBlockingQueue 上篇中,说到了ArrayBlockingQueue阻塞队列。在ArrayBlockingQueue中,底层使用了数组结构来实现。 那么,提到数组了就不得不提及链表。作为两对成双成对的老冤家,链...

贾博岩
2017/12/10
0
0
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞...

凯文加内特
2014/07/31
0
0
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞...

F风向标F
2013/12/03
0
0
Java: Queue 各种方法的细小区别

Java提供了Quere,相当好用,在1.5版本中又有增强。 add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常 remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuc...

xiaomin0322
03/06
0
0
集合框架 Queue---BlockingQueue详解

本例介绍一个特殊的队列:BlockingQueue,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是...

长平狐
2012/11/28
587
0
java的concurrent包的存储类

java的concurrent包的存储类 简略的翻看一下concurrent包,一部分是通过继承AbstractQueuedSynchronizer实现(ReentrantLock、CountDownLatch、semaphore等),一部分通过lock实现(CycliBa...

GITTODO
2016/03/28
123
0
集合框架 Queue---LinkedBlockingQueue

java.util.concurrent 类 LinkedBlockingQueue java.lang.Objectjava.util.AbstractCollection 类型参数: - 此 collection 中所保存元素的类型。 所有已实现的接口: Serializable, Iterab......

长平狐
2012/11/28
242
0
java多线程中两个容器之间的同步

写了一个多线程的爬虫(对多线程不熟悉),但是每次队列和集合中都有重复的元素,我把代码逻辑贴上来,大家帮我看一下,谢谢了: 在进程中: queue = LinkedBlockingQueue set = Concurrent...

Mangoer
2015/07/16
369
3
阻塞队列(2)--LinkedBlockingDeque底层实现

2.1 LinkedBlockingQueue是什么? 1.1 LinkedBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。底层以链表的形式保存数据(双向链表,...

yokol
06/21
0
0
源码|并发一枝花之BlockingQueue

今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。 JDK版本:oracle java 1.8.0_102 继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列...

猴子007
2017/10/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

机器学习管理平台 MLFlow

最近工作很忙,博客一直都没有更新。抽时间给大家介绍一下Databrick开源的机器学习管理平台-MLFlow。 谈起Databrick,相信即使是不熟悉机器学习和大数据的工程湿们也都有所了解,它由Spark的...

naughty
今天
0
0
idea tomcat 远程调试

tomcat 配置 编辑文件${tomcat_home}/bin/catalina.sh,在文件开头添加如下代码。    CATALINA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=7829" Idea端配......

qwfys
今天
1
0
遍历目录下的文件每250M打包一个文件

#!/usr/bin/env python # -*- utf-8 -*- # @Time : 2018/7/20 0020 下午 10:16 # @Author : 陈元 # @Email : abcmeabc@163.com # @file : tarFile.py import os import tarfile import thr......

寻爱的小草
今天
1
0
expect同步文件&expect指定host和要同步的文件&构建文件分发系统&批量远程执行命令

20.31 expect脚本同步文件 expect通过与rsync结合,可以在一台机器上把文件自动同步到多台机器上 编写脚本 [root@linux-5 ~]# cd /usr/local/sbin[root@linux-5 sbin]# vim 4.expect#!/...

影夜Linux
今天
1
0
SpringBoot | 第九章:Mybatis-plus的集成和使用

前言 本章节开始介绍数据访问方面的相关知识点。对于后端开发者而言,和数据库打交道是每天都在进行的,所以一个好用的ORM框架是很有必要的。目前,绝大部分公司都选择MyBatis框架作为底层数...

oKong
今天
13
0
win10 上安装解压版mysql

1.效果 2. 下载MySQL 压缩版 下载地址: https://downloads.mysql.com/archives/community/ 3. 配置 3.1 将下载的文件解压到合适的位置 我最终将myql文件 放在:D:\develop\mysql 最终放的位...

Lucky_Me
今天
2
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

问题终结者
今天
2
0
expect脚本同步文件expect脚本指定host和要同步的文件 构建文件分发系统批量远程执行命令

expect脚本同步文件 在一台机器上把文件同步到多台机器上 自动同步文件 vim 4.expect [root@yong-01 sbin]# vim 4.expect#!/usr/bin/expectset passwd "20655739"spawn rsync -av ro...

lyy549745
今天
1
0
36.rsync下 日志 screen

10.32/10.33 rsync通过服务同步 10.34 linux系统日志 10.35 screen工具 10.32/10.33 rsync通过服务同步: rsync还可以通过服务的方式同步。那需要开启一个服务,他的架构是cs架构,客户端服务...

王鑫linux
今天
1
0
matplotlib 保存图片时的参数

简单绘图 import matplotlib.pyplot as pltplt.plot(range(10)) 保存为csv格式,放大后依然很清晰 plt.savefig('t1.svg') 普通保存放大后会有点模糊文件大小20多k plt.savefig('t5.p...

阿豪boy
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部