文档章节

生产者消费者运用

l
 lintalkliu
发布于 2017/07/18 16:52
字数 467
阅读 1
收藏 0

public class CallCeicApiByMutliThread {
    private static Log logger=LogFactory.getLog(CallCeicApiByMutliThread.class); 
    public static final  AtomicInteger reservations = new AtomicInteger(0);
    /**
     * 
     * 入口方法
     */
    public static void main(String[] args) {
        try{
            
            logger.info("*****************************************************:  CEIC Request Start");
            // Thead 数量
            int ceicThreadSize =Integer.valueOf(PropertyUtil.getValueByName("ceic.properties", "ceicThreadSize"));
            
            BlockingQueue<SeriesMasterDto> queue = new LinkedBlockingQueue<SeriesMasterDto>(ceicThreadSize);
            ConsumerCeic consumer = new ConsumerCeic(queue);  
            ProducerCeic producer = new ProducerCeic(queue);
            ExecutorService executorService =Executors.newFixedThreadPool(ceicThreadSize);
            executorService.execute(producer);
            for (int i = 0; i < ceicThreadSize; i++) {  
                executorService.execute(consumer);
            }
             while(true){
                 if(!ProducerCeic.isProductRunning && queue.isEmpty() && reservations.get()<=0){
                    ConsumerCeic.setCustomerRunning(false);
                    executorService.shutdown();
                    break;
                  }
             }
             
            logger.info("*****************************************************:  CEIC Request End");
             

        }catch(Exception e){
            // 统一的异常输出
            logger.error(e.getMessage(),e);
        }
    }
}

class ProducerCeic implements Runnable {  

    private static Log logger=LogFactory.getLog(ProducerCeic.class); 
    public static volatile boolean isProductRunning = true;
    BlockingQueue<SeriesMasterDto> queue;  
    public ProducerCeic(BlockingQueue<SeriesMasterDto> queue) {  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        try {  
            while(isProductRunning){
                // seriesMaster数据取得
                List<SeriesMasterDto> resultDtoList = DAOFactory.getSeriesMasterDaoInstance().querySeriesMaster();
                for(SeriesMasterDto dto:resultDtoList){
                    queue.put(dto);//如果队列是满的话,会阻塞当前线程  
                    CallCeicApiByMutliThread.reservations.incrementAndGet();
                }
                // 一次执行
                isProductRunning= false;
            }
        }  catch (Exception e) {
            logger.error(e.getMessage(),e);
        }
    }  
}

class ConsumerCeic implements Runnable{  
    private static Log logger=LogFactory.getLog(ConsumerCeic.class);
    
    private static volatile boolean isCustomerRunning = true;
    public static void setCustomerRunning(boolean isCustomerRunning) {
        ConsumerCeic.isCustomerRunning = isCustomerRunning;
    }

    BlockingQueue<SeriesMasterDto> queue;  
    
    public ConsumerCeic(BlockingQueue<SeriesMasterDto> queue){  
        this.queue = queue;  
    }  
      
    @Override  
    public void run() {  
        
        while(isCustomerRunning){
            try {  
                SeriesMasterDto dto = queue.take();//如果队列为空,会阻塞当前线程  
                ceicOperator(dto);
            }  catch (Exception e) {
                logger.error(e.getMessage(),e);
            } finally{
                 CallCeicApiByMutliThread.reservations.decrementAndGet();
            }
        }
     
    }


    /**
     * 
     * 对CEIC XML数据进行CRUD操作
     * @param dto
     * @throws Exception
     */
    private static void ceicOperator(SeriesMasterDto dto) throws Exception{

        String seriesId = dto.getSeriesId();
        logger.info("*****************************************************:"+seriesId);
        // 0:insert. 1:update. 2. delete
        String operator = dto.actionId;
        if(operator.equals(ConstantUtil.CEIC_OPERATION_DELETE)){    
            
            // 2. delete(删除的场合,无需http请求)
            deleteSeriesData(seriesId);
        }else if(operator.equals(ConstantUtil.CEIC_OPERATION_INSERT)||operator.equals(ConstantUtil.CEIC_OPERATION_UPDATE)){

            String xmlFileHome =PropertyUtil.getValueByName("ceic.properties", "xmlfilehome");
            // 0:insert. 1:update.(只有插入和更新的场合,才需要http请求创建本地XML文件)
            // 创建URL
            String httpUrl = PropertyUtil.getValueByName("ceic.properties", "HTTP_URL_CEIC_SERIES_ID")+"&series="+seriesId;
            
            // http的get请求发出
            String strResultXml = RestfulAPIUtil.sendGet(httpUrl);
            
            if(null == strResultXml){
                logger.info("http reponse content is null");
                return;
            }
            
            // 生成XML文件
            File xmlFile = CeicFileUtil.createWriteFile(xmlFileHome, strResultXml,seriesId);

            // 解析XML文件并创建对应java对象
            IsearchResponse searchResponseDto = CeicFileUtil.readXmlByUnmarshal(xmlFile);
            
            // 请求返回的XML状态不是Success
            if(!searchResponseDto.getStatus().getMessage().equals("Success")){
                logger.info("xml's status is not success, file's name is  " + xmlFile );
            }else if(searchResponseDto.getExtendedSeriesList() == null){
                logger.info("xml's extendedSeriesList is NULL, file's name is  " + xmlFile );
            }else{
                if(operator.equals(ConstantUtil.CEIC_OPERATION_INSERT)){
                    // insert 
                    insertSeriesData(searchResponseDto,seriesId);
                }else if(operator.equals(ConstantUtil.CEIC_OPERATION_UPDATE)){
                    // update 
                    updateSeriesData(searchResponseDto,seriesId);
                }
            }
        }else{
            logger.info(" do not any operation, seriesId is :"+seriesId);
        }
    }
}

© 著作权归作者所有

l
粉丝 0
博文 6
码字总数 729
作品 0
南京
私信 提问
Spring for Apache Kafka实战

背景介绍 Kafka是一个分布式的、可分区的、可复制的消息系统,在现在的互联网公司,应用广泛,在我们公司在主要运用在定时推送业务,批量数据处理,日志上传等方面,我发现网上大部分博客,在...

谢一鸣
2018/07/26
0
0
并发编程(四)——Java中的阻塞队列

什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可...

whc20011
2016/10/31
23
0
Spring Cloud 2.x系列之整合rocketMQ

RocketMQ出了4的版本,而且本身这个mq有事务消息,在分布式的场景中有很好的启发性和作用,而且本身它也是阿里开源到apache的一个项目,从出身还是实力来说都很不错的。 1、新建项目sc-rock...

技术小能手
2018/11/12
0
0
Java中的阻塞队列

什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可...

vshcxl
2016/12/13
11
0
阻塞队列_BlockingQueue

阻塞队列_BlockingQueue 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,...

秋风醉了
2016/06/08
149
0

没有更多内容

加载失败,请刷新页面

加载更多

java发送html模板的高逼格邮件

最近做了一个监测k8s服务pod水平伸缩发送邮件的功能(当pod的cpu/内存达到指定阈值后会水平扩展出多个pod、或者指定时间内pod数应扩展到指定数量),一开始写了个格式很low的邮件,像下面这样...

码农实战
5分钟前
2
0
php-fpm配置文件详解/MariaDB密码重置、慢查询日志

来源:https://blog.csdn.net/Powerful_Fy php-fpm主配置文件路径:/usr/local/php-fpm/etc/php-fpm.conf #位于安装php安装目录下的etc/目录中,该文件中最后一行将配置文件指向:include=/...

asnfuy
10分钟前
2
0
川普给埃尔多安和内堪尼亚胡的信

任性 https://twitter.com/netanyahu/status/1186647558401253377 https://edition.cnn.com/2019/10/16/politics/trump-erdogan-letter/index.htm...

Iridium
31分钟前
10
0
golang-mysql-原生

db.go package mainimport ("database/sql""time"_ "github.com/go-sql-driver/mysql")var (db *sql.DBdsn = "root:123456@tcp(127.0.0.1:3306)/test?charset=u......

李琼涛
59分钟前
5
0
编程作业20191021092341

1编写一个程序,把用分钟表示的时间转换成用小时和分钟表示的时 间。使用#define或const创建一个表示60的符号常量或const变量。通过while 循环让用户重复输入值,直到用户输入小于或等于0的值...

1李嘉焘1
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部