文档章节

Java并发新构件之PriorityBlockingQueue

摆渡者
 摆渡者
发布于 2015/10/17 17:28
字数 1153
阅读 636
收藏 9

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 

    PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurOfMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
    private static int counter = 1;
    private final int priority;
    private Random random = new Random(47);
    private final int id = counter++;//这个id不是static的,因此
    protected static List<PrioritizedTask> sequence = new ArrayList<>();
    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    @Override
    public int compareTo(PrioritizedTask o) {
        int val = this.priority - o.priority;
        //higher value, higher priority
        return val < 0 ? 1 : (val > 0 ? -1 : 0);
    }
    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
        }
        System.out.println(this);
    }
    @Override
    public String toString() {
        return String.format("P=[%1$-3d]", priority) + ", ID=" + id;
    }
    public static class EndFlagTask extends PrioritizedTask {
        private ExecutorService exec;
        public EndFlagTask(ExecutorService executorService) {
            super(-1);//最低的优先级
            exec = executorService;
        }
        @Override
        public void run() {
            System.out.println(this + " calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }
    @Override
    public void run() {
        try {
            //慢慢的添加高优先级的任务
            for (int i = 0; i < 6; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(9)); //6个优先级10
            }
            //先创建2个P=0的任务
            queue.add(new PrioritizedTask(0));
            queue.add(new PrioritizedTask(0));
            //添加低优先级的任务
            for (int i = 0; i < 6; i++) {// 优先级0-5
                queue.add(new PrioritizedTask(i));
            }
            //添加一个结束标志的任务
            queue.add(new PrioritizedTask.EndFlagTask(exec));
            
        } catch (InterruptedException e) {
            // TODO: handle exception
        }
        System.out.println("Finished PrioritizedTaskProducer.");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> queue;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            //不停的从queue里面取任务,直到exec停止。
            while(!Thread.interrupted()) {
                //使用当前线程来跑这些任务
                queue.take().run();
            }
        } catch (InterruptedException e) {
            
        }
        System.out.println("Finished PrioritizedTaskConsumer.");
    }
}

public final class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

执行结果:

P=[9  ], ID=1
P=[9  ], ID=2
P=[9  ], ID=3
P=[9  ], ID=4
P=[9  ], ID=5
Finished PrioritizedTaskProducer.
P=[9  ], ID=6
P=[5  ], ID=14
P=[4  ], ID=13
P=[3  ], ID=12
P=[2  ], ID=11
P=[1  ], ID=10
P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8
P=[-1 ], ID=15 calling shutdownNow()
Finished PrioritizedTaskConsumer.

    PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

    PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

    从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

Finished PrioritizedTaskProducer.

    这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

    任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

P=[5  ], ID=14

    可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

    还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8

    另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException). 

© 著作权归作者所有

共有 人打赏支持
摆渡者
粉丝 326
博文 171
码字总数 205876
作品 0
浦东
程序员
Java并发编程利用 Condition 实现阻塞队列

什么是阻塞队列 BlockingQueue 队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,...

行走在旅途中
2017/11/07
0
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
07/22
0
0
[Java 并发编程] 集合框架之 同步容器类 & 并发容器类

吾生也有涯,而知也无涯。———《庄子》 通过上一篇文章,我们已经知道设计一个线程安全类的原则和步骤,以及在设计过程中我们应当注意的细节。实际上,Java 的集合库包含了线程安全集合和非...

seaicelin
05/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MySQL autocommit探究

-- sessionA:tx_isolation=REPEATABLE-READmysql> select connection_id();+-----------------+| connection_id() |+-----------------+| 28 |+-----------------+......

安小乐
8分钟前
4
0
c++多线程锁 Mutex  自动判断死锁

c++多线程锁可以使用absl::Mutex std::mutex这两种,下面是demo代码。 使用absl:Mutex的时候打印: [mutex.cc : 1338] RAW: Cycle: [mutex.cc : 1352] RAW: mutex@0x683b68 stack: @ 0x43856......

青黑
27分钟前
1
0
Blockathon2018(成都站)比赛落幕,留给我们这些区块链应用思考

9月14日,HiBlock区块链社区主办的第二届Blockathon在成都菁融国际广场成功举行,30名参赛者分为5支队伍在48小时内完成区块链项目的创意、开发及路演,经过紧张的开发及现场评选,最终币托(...

HiBlock
32分钟前
0
0
71.告警系统主脚本 配置文件 监控项目

20.20 告警系统主脚本(main.sh) 20.21 告警系统配置文件 20.22 告警系统监控项目 20.20 告警系统主脚本(main.sh): ~1.约定:把以后所有的shell脚本放在/usr/local/sbin下,也方便我们查...

王鑫linux
39分钟前
0
0
装饰者模式

装饰者模式 Q:何为装饰模式? ()地给一个对象添加一些额外的(),并且()时,并不影响原对象。扩展功能来说,装饰器模式相比生成子类更为灵活。 Q:使用场景? 1.想要在不影响其他对象的情况下...

阿元
59分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部