文档章节

Java并发新构件之PriorityBlockingQueue

摆渡者
 摆渡者
发布于 2015/10/17 17:28
字数 1153
阅读 664
收藏 9

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 

    PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
    private static int counter = 1;
    private final int priority;
    private Random random = new Random(47);
    private final int id = counter++;//这个id不是static的,因此
    protected static List<PrioritizedTask> sequence = new ArrayList<>();
    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    @Override
    public int compareTo(PrioritizedTask o) {
        int val = this.priority - o.priority;
        //higher value, higher priority
        return val < 0 ? 1 : (val > 0 ? -1 : 0);
    }
    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
        }
        System.out.println(this);
    }
    @Override
    public String toString() {
        return String.format("P=[%1$-3d]", priority) + ", ID=" + id;
    }
    public static class EndFlagTask extends PrioritizedTask {
        private ExecutorService exec;
        public EndFlagTask(ExecutorService executorService) {
            super(-1);//最低的优先级
            exec = executorService;
        }
        @Override
        public void run() {
            System.out.println(this + " calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }
    @Override
    public void run() {
        try {
            //慢慢的添加高优先级的任务
            for (int i = 0; i < 6; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(9)); //6个优先级10
            }
            //先创建2个P=0的任务
            queue.add(new PrioritizedTask(0));
            queue.add(new PrioritizedTask(0));
            //添加低优先级的任务
            for (int i = 0; i < 6; i++) {// 优先级0-5
                queue.add(new PrioritizedTask(i));
            }
            //添加一个结束标志的任务
            queue.add(new PrioritizedTask.EndFlagTask(exec));
            
        } catch (InterruptedException e) {
            // TODO: handle exception
        }
        System.out.println("Finished PrioritizedTaskProducer.");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> queue;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            //不停的从queue里面取任务,直到exec停止。
            while(!Thread.interrupted()) {
                //使用当前线程来跑这些任务
                queue.take().run();
            }
        } catch (InterruptedException e) {
            
        }
        System.out.println("Finished PrioritizedTaskConsumer.");
    }
}

public final class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

执行结果:

P=[9  ], ID=1
P=[9  ], ID=2
P=[9  ], ID=3
P=[9  ], ID=4
P=[9  ], ID=5
Finished PrioritizedTaskProducer.
P=[9  ], ID=6
P=[5  ], ID=14
P=[4  ], ID=13
P=[3  ], ID=12
P=[2  ], ID=11
P=[1  ], ID=10
P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8
P=[-1 ], ID=15 calling shutdownNow()
Finished PrioritizedTaskConsumer.

    PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

Finished PrioritizedTaskProducer.

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

    任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

P=[5  ], ID=14

    可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

    还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8

    另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException). 

© 著作权归作者所有

共有 人打赏支持
摆渡者
粉丝 333
博文 171
码字总数 205876
作品 0
浦东
程序员
私信 提问
Java并发编程利用 Condition 实现阻塞队列

什么是阻塞队列 BlockingQueue 队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,...

行走在旅途中
2017/11/07
0
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
07/22
0
0
[Java 并发编程] 集合框架之 同步容器类 & 并发容器类

吾生也有涯,而知也无涯。———《庄子》 通过上一篇文章,我们已经知道设计一个线程安全类的原则和步骤,以及在设计过程中我们应当注意的细节。实际上,Java 的集合库包含了线程安全集合和非...

seaicelin
05/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

降压变换器 Buck

特点 输入输出极性相同。 工作过程 在 MOS 导通时,输入电源通过 L 和 C 滤波后向负载端提供电流;当 MOS 断开后,L 通过二极管续流,保持负载电流连续。输出电压因为占空比的作用,不会超过...

colinux
今天
2
0
Apache日志不记录访问静态文件,访问日志切割,静态元素过期时间设置

Apache配置不记录访问静态文件的日志 网站大多元素为静态文件,如图片、css、js等,这些元素可以不用记录 vhost原始配置 <VirtualHost *:80> ServerAdmin test@163.com DocumentRoo...

野雪球
今天
3
0
聊聊storm的ICommitterTridentSpout

序 本文主要研究一下storm的ICommitterTridentSpout ICommitterTridentSpout storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSpout.java public interface......

go4it
今天
4
0
Ubuntu常用操作

查看端口号 netstat -anp |grep 端口号 查看已使用端口情况 netstat -nultp(此处不用加端口号) netstat -anp |grep 82查看82端口的使用情况 查找被占用的端口: netstat -tln netstat -tl...

hc321
昨天
4
0
网站cdn的静态资源突然访问变的缓慢,问题排查流程

1.首先我查看了一下是否自己的网络问题,通过对比其他资源的访问速度和下载速度,确认不是 2.通过ping 和 tracert 判断cdn域名能否正常访问,(最后回想感觉这一步可以省略,因为每次最终能访...

小海bug
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部