文档章节

消费队列

 北极之北
发布于 2017/03/17 16:31
字数 346
阅读 8
收藏 0

package com.pingan.wifi.dahua.base;


import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.pingan.wifi.dahua.biz.UpgradeQueueService;


/**
 * 队列消费器
 * 
 * @author renpeng162
 * 
 */
@Component("upgradeQueue")
public class UpgradeQueue implements InitializingBean, DisposableBean {

    private static final Log logger = LogFactory.getLog(UpgradeQueue.class);

    // 要消费的队列名
    private String queueName = RedisKey.FUND_QUEUE_FIRST_BUG;

    // 消费线程池大小
    private int threadPoolSize = 1;

    // 消费线程池
    private List<ConsumerThread> consumerThreads;
    
    @Autowired
    private UpgradeQueueService queueService;

    // 消费线程启动延时
    private int startDelay = 10000;


    /**
     * 在队列消费器初始化时要先启动线程池中的线程
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        consumerThreads = new ArrayList<ConsumerThread>();
        
        for (int i = 0; i < threadPoolSize; i++) {
            ConsumerThread t = new ConsumerThread();
            t.setName("pools-consumer-" + this.queueName + "-" + i);
            t.setDaemon(true); // 把这些消费线程设置成守护线程,方便jvm正常退出
            t.start();
            consumerThreads.add(t);
        }
    }

    /**
     * 消费器销毁时,把所有的消费线程也关闭掉
     */
    @Override
    public void destroy() throws Exception {
        try {
            for (ConsumerThread ct : consumerThreads) {
                ct.shutdown();
            }
        } catch (Exception t) {
            logger.warn(t);
        }
    }

    private class UpgradeConsumerImpl implements UpgradeQueueConsumer {
        private volatile boolean status = true;

        @Override
        public boolean getStatus() {
            return this.status;
        }

        @Override
        public void setStatus(boolean status) {
            this.status = status;
        }

    }

    /**
     * 消费线程类定义
     * 
     * @author EX-ZHAOXIANGTAO001
     * 
     */
    private class ConsumerThread extends Thread {

        private UpgradeQueueConsumer msgConsumer = new UpgradeConsumerImpl();

        public ConsumerThread() {

        }

        @Override
        public void run() {
            logger.info("首先保证线程能够启动===========================状态:"+msgConsumer.getStatus());
            if (startDelay > 0) {
                try {
                    Thread.sleep(startDelay); // 先使线程挂起给定时间,已等待系统其它部分先正常启动
                } catch (InterruptedException e) {
                    logger.warn(e);
                }
            }
            queueService.pullMsg(queueName,msgConsumer);
        }

        public void shutdown() {
            this.msgConsumer.setStatus(false);
        }
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public int getThreadPoolSize() {
        return threadPoolSize;
    }

    public void setThreadPoolSize(int threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    public int getStartDelay() {
        return startDelay;
    }

    public void setStartDelay(int startDelay) {
        this.startDelay = startDelay;
    }

}
 

© 著作权归作者所有

上一篇: dubbo
下一篇: 线程安全
粉丝 8
博文 23
码字总数 17514
作品 0
深圳
私信 提问
基于信号量的生产消费模型

一、前言 上篇 https://www.jianshu.com/p/6402676abc86 文章讲解了一个定时生产消费时候消费队列里面最多有几个元素的问题。本文来探讨另外一个问题,由于生产和消费线程执行的不确定性,会...

今天你不奋斗明天你就落后
2018/01/30
0
0
RocketMQ 源码分析Message 拉取与消费(下)

1、概述 本文接:《RocketMQ 源码分析Message 拉取与消费(上)》。 主要解析 在 消费 逻辑涉及到的源码。 2、Consumer MQ 提供了两类消费者: PushConsumer: 在大多数场景下使用。 名字虽然...

wangchen1999
2018/04/13
21
0
RocketMQ一个新的消费组初次启动时从何处开始消费呢?

1、抛出问题 一个新的消费组订阅一个已存在的Topic主题时,消费组是从该Topic的哪条消息开始消费呢? 首先翻阅DefaultMQPushConsumer的API时,setConsumeFromWhere(ConsumeFromWhere consum...

丁威
09/11
0
0
Rocketmq集群消费测试

一 机器部署 1、机器组成 7台机器,均为16G内存 每台服务器均有4个CPU,2核 2、运行环境配置 3、刷盘方式 每台机器master机器均采用异步刷盘方式 二 性能评测 1、评测目的 测试consumer端的集...

tantexian
2016/06/30
702
0
RocketMQ 主题扩分片后遇到的坑

消息组接到某项目组反馈,topic 在扩容后出现部分队列无法被消费者,导致消息积压,影响线上业务? 考虑到该问题是发送在真实的线上环境,为了避免泄密,本文先在笔者的虚拟机中来重现问题。...

丁威
09/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nginx学习笔记

中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。 是连接两个独立应用程序或独立系统的软件。 web请求通过中间件可以直接调用操作系统,也可以经过中间件把请求分发到多...

码农实战
今天
5
0
Spring Security 实战干货:玩转自定义登录

1. 前言 前面的关于 Spring Security 相关的文章只是一个预热。为了接下来更好的实战,如果你错过了请从 Spring Security 实战系列 开始。安全访问的第一步就是认证(Authentication),认证...

码农小胖哥
今天
12
0
JAVA 实现雪花算法生成唯一订单号工具类

import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.util.Calendar;/** * Default distributed primary key generator. * * <p> * Use snowflake......

huangkejie
昨天
12
0
PhotoShop 色调:RGB/CMYK 颜色模式

一·、 RGB : 三原色:红绿蓝 1.通道:通道中的红绿蓝通道分别对应的是红绿蓝三种原色(RGB)的显示范围 1.差值模式能模拟三种原色叠加之后的效果 2.添加-颜色曲线:调整图像RGB颜色----R色增强...

东方墨天
昨天
11
1
将博客搬至CSDN

将博客搬至CSDN

算法与编程之美
昨天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部