文档章节

Java并发新构件之PriorityBlockingQueue

摆渡者
 摆渡者
发布于 2015/10/17 17:28
字数 1153
阅读 592
收藏 9
点赞 0
评论 0

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 

    PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
    private static int counter = 1;
    private final int priority;
    private Random random = new Random(47);
    private final int id = counter++;//这个id不是static的,因此
    protected static List<PrioritizedTask> sequence = new ArrayList<>();
    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    @Override
    public int compareTo(PrioritizedTask o) {
        int val = this.priority - o.priority;
        //higher value, higher priority
        return val < 0 ? 1 : (val > 0 ? -1 : 0);
    }
    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
        }
        System.out.println(this);
    }
    @Override
    public String toString() {
        return String.format("P=[%1$-3d]", priority) + ", ID=" + id;
    }
    public static class EndFlagTask extends PrioritizedTask {
        private ExecutorService exec;
        public EndFlagTask(ExecutorService executorService) {
            super(-1);//最低的优先级
            exec = executorService;
        }
        @Override
        public void run() {
            System.out.println(this + " calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }
    @Override
    public void run() {
        try {
            //慢慢的添加高优先级的任务
            for (int i = 0; i < 6; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(9)); //6个优先级10
            }
            //先创建2个P=0的任务
            queue.add(new PrioritizedTask(0));
            queue.add(new PrioritizedTask(0));
            //添加低优先级的任务
            for (int i = 0; i < 6; i++) {// 优先级0-5
                queue.add(new PrioritizedTask(i));
            }
            //添加一个结束标志的任务
            queue.add(new PrioritizedTask.EndFlagTask(exec));
            
        } catch (InterruptedException e) {
            // TODO: handle exception
        }
        System.out.println("Finished PrioritizedTaskProducer.");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> queue;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            //不停的从queue里面取任务,直到exec停止。
            while(!Thread.interrupted()) {
                //使用当前线程来跑这些任务
                queue.take().run();
            }
        } catch (InterruptedException e) {
            
        }
        System.out.println("Finished PrioritizedTaskConsumer.");
    }
}

public final class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

执行结果:

P=[9  ], ID=1
P=[9  ], ID=2
P=[9  ], ID=3
P=[9  ], ID=4
P=[9  ], ID=5
Finished PrioritizedTaskProducer.
P=[9  ], ID=6
P=[5  ], ID=14
P=[4  ], ID=13
P=[3  ], ID=12
P=[2  ], ID=11
P=[1  ], ID=10
P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8
P=[-1 ], ID=15 calling shutdownNow()
Finished PrioritizedTaskConsumer.

    PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

Finished PrioritizedTaskProducer.

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

    任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

P=[5  ], ID=14

    可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

    还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8

    另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException). 

© 著作权归作者所有

共有 人打赏支持
摆渡者
粉丝 318
博文 171
码字总数 205794
作品 0
浦东
程序员
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
Java并发编程利用 Condition 实现阻塞队列

什么是阻塞队列 BlockingQueue 队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,...

行走在旅途中
2017/11/07
0
0
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
[Java 并发编程] 集合框架之 同步容器类 & 并发容器类

吾生也有涯,而知也无涯。———《庄子》 通过上一篇文章,我们已经知道设计一个线程安全类的原则和步骤,以及在设计过程中我们应当注意的细节。实际上,Java 的集合库包含了线程安全集合和非...

seaicelin
05/25
0
0
maven私服nexus之校验和(checksums)

maven是开源的项目构建工具。 nexus可以用来搭建maven私服。 checksum(校验和):用于校验数据的完整性和准确性。 maven nexus中可以查看构件的Checksums:SHA1 checksum和MD5 checksum。 ...

donhui
2014/10/10
0
0
Java 并发工具包 java.util.concurrent 用户指南

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。 本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友...

pior
2015/10/26
0
0
Java PriorityQueue && PriorityBlockingQueue

Java PriorityQueue && PriorityBlockingQueue 我们知道队列是遵循先进先出(First-In-First-Out)模式的,但有些时候需要在队列中基于优先级处理对象。举个例子,比方说我们有一个每日交易时...

秋风醉了
2015/01/12
0
0
烛窗夜记(一),java 并发包中的各种队列

java 并发包 ArrayBlockingQueue 阻塞队列(以array实现,没有取到数据阻塞,插入数据时也可能阻塞) LinkedBlockingQueue 阻塞队列(以链表实现,一般情况下不如ArrayBlockingQueue 性能好)...

jingshishengxu
2011/01/17
0
0
JAVA线程15 - 新特性:阻塞队列&阻塞栈

一、阻塞队列 1. 概述 阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue。 阻塞队列是一个指定长度的队列,如果队列满了,添加新元素的操作...

小米米儿小
2014/03/05
0
0
002_JVM内存结构及调优学习

常遇到关于内存溢出的错误 java.lang包下 StackOverflowError 很少 OutOfMemoryError: heap space(堆空间) 比较常见 OutOfMemoryError: PermGen space 经常出现 2. Java虚拟机结构和属性...

英年求肥
2014/11/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JPA @MappedSuperclass 注解说明

基于代码复用和模型分离的思想,在项目开发中使用JPA的@MappedSuperclass注解将实体类的多个属性分别封装到不同的非实体类中。 1.@MappedSuperclass注解只能标准在类上:@Target({java.lang....

海博1600
7分钟前
0
0
Scala Configuration 相关API

Play使用了 Typesafe config library,但是也提供了一个有着更多Scala高级特性的的 Configuration 封装。不熟悉Typesafe配置的开发者可以移步 configuration文件的语法和特性文档。 读取配置...

Landas
今天
1
0
使用cookie技术 记住账号

1. 效果 2. 实现过程 2.1 前端 将用户的选中传递给后台 这个参数的获取是 参考:https://my.oschina.net/springMVCAndspring/blog/1860498 // var rememberLogin = $("#rememberLoginId").i...

Lucky_Me
今天
1
0
《趣谈网络协议》02之网络分层的真实含义

一、提出问题 1.提出问题 当你听到什么二层设备、三层设备、四层 LB 和七层 LB 中层的时候,是否有点一头雾水,不知道这些所谓的层,对应的各种协议具体要做什么“工作”? 2.这四个问题你弄...

aibinxiao
今天
2
0
Python3学习日志二 Python中的集合set和字典dict

1.集合set 定义一个集合set 我们可以看到定义集合set有两种不同的形式,如果要定义一个空的集合set不能用{}而是要用set();另外,集合是无序的,而且set中的元素是不可重复的,如果你定义了一...

Mr_bullshit
今天
0
0
adb 操作指令详解

ADB,即 Android Debug Bridge,它是 Android 开发/测试人员不可替代的强大工具,也是 Android 设备玩家的好玩具。 注:有部分命令的支持情况可能与 Android 系统版本及定制 ROM 的实现有关。...

孟飞阳
今天
0
0
nodejs安装以及环境配置(很好的node安装和配置文章,少走很多弯路)

一、安装环境 1、本机系统:Windows 10 Pro(64位) 2、Node.js:v6.9.2LTS(64位) 二、安装Node.js步骤 1、下载对应你系统的Node.js版本:https://nodejs.org/en/download/ 2、选安装目录进...

sprouting
今天
1
0
Redisson

了解了Redisson,发现使用挺简单的,接下来准备深入学习一下。 Redisson介绍 Redisson是架设于Redis基础之上的一个Java驻内存数据网格(In-Memory Data Grid) Redisson在基于NIO的Netty框架上...

to_ln
今天
0
0
python有哪些好玩的应用实现,用python爬虫做一个二维码生成器

python爬虫不止可以批量下载数据,还可以有很多有趣的应用,之前也发过很多,比如天气预报实时查询、cmd版的实时翻译、快速浏览论坛热门帖等等,这些都可以算是爬虫的另一个应用方向! 今天给...

python玩家
今天
0
0
python爬虫日志(3)-爬去异步加载网页

在浏览器检查元素页面中,选取Network中的XHR选项即可观察每次加载页面,网页发出的请求,观察url的规律即可利用封装的函数对每一页进行爬取。

茫羽行
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部