文档章节

ElasticSearch中bulkProcesser使用

candyleer
 candyleer
发布于 2017/02/11 22:38
字数 482
阅读 48
收藏 4

初次接触es,可能对增删改查很熟悉,以为能为得心应手,本次应用场景为 数据库变更一条记录,会触发更新es中的数据,每秒并发大概30条左右,测试环境一切工作正常(数据量较少),上线后发现日志中很多类似于下面的错误:

队列满了

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 200) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
  ...

更新版本错误:

Caused by: org.elasticsearch.index.engine.VersionConflictEngineException: [kpi][4] [opportunity][1442415600000]: version conflict, current [5933], provided [5932]
        at org.elasticsearch.index.engine.internal.InternalEngine.innerIndex(InternalEngine.java:582) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.engine.internal.InternalEngine.index(InternalEngine.java:522) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.shard.service.InternalIndexShard.index(InternalIndexShard.java:425) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:193) [elasticsearch-1.4.4.jar:]

经过高手指点,从单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

private BulkProcessor bulkProcessor;

    @PostConstruct
    public void init() {
        this.bulkProcessor = BulkProcessor.builder(
                esTransportMainClient.getClient(),
                new BulkProcessor.Listener() {

                    @Override
                    public void beforeBulk(long executionId, BulkRequest request) {
                        logger.info("---尝试插入{}条数据---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, BulkResponse response) {
                        logger.info("---尝试插入{}条数据成功---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, Throwable failure) {
                        logger.error("[es错误]---尝试插入数据失败---", failure);
                    }

                })
                .setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(2)
                .build();
    }

我这里全局保持一个bulkProcesser就可以维持正常业务了。

每次使用的方法:

bulkProcessor.add(updateRequestBuilder.request());

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

© 著作权归作者所有

共有 人打赏支持
candyleer
粉丝 1
博文 5
码字总数 1859
作品 0
成都
Elasticsearch中文分词研究

一、ES分析器简介 ES是一个实时搜索与数据分析引擎,为了完成搜索功能,必须对原始数据进行分析、拆解,以建立索引,从而实现搜索功能; ES对数据分析、拆解过程如下: 首先,将一块文本分成...

zhaipengfei1231
04/18
0
0
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0
CentOS7.3下ELK日志分析系统集群搭建

Elasticsearch是个基于Lucene实现的开源、分布式、restful的全文本搜索引擎,此外他还是一个分布式实时文档存储,其中每个文档的每个filed均是可被索引的数据,且可被搜索,也是一个带实时分...

wujunqi1996
07/14
0
0
初探 ELK - 每天5分钟玩转 Docker 容器技术(89)

在开源的日志管理方案中,最出名的莫过于 ELK 了。ELK 是三个软件的合称:Elasticsearch、Logstash、Kibana。 Elasticsearch 一个近乎实时查询的全文搜索引擎。Elasticsearch 的设计目标就是...

CloudMAN
2017/11/03
0
0
CentOS下使用ELK套件搭建日志分析和监控平台

1 概述 ELK套件(ELK stack)是指ElasticSearch、Logstash和Kibana三件套。这三个软件可以组成一套日志分析和监控工具。 由于三个软件各自的版本号太多,建议采用ElasticSearch官网推荐的搭配...

周宇1991
06/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

咕泡-Factory设计模式笔记

个人感悟: 设计模式都是处理复杂问题的,如果问题本身很简单,使用设计模式反而累赘,增加了开发的复杂性 遇到最简单的情况,直接 new 如果创建对象的过程简单,但是需要匹配不同情况,返回...

职业搬砖20年
22分钟前
0
0
Java中的锁分类

在读很多并发文章中,会提及各种各样锁如公平锁,乐观锁等等,这篇文章介绍各种锁的分类。介绍的内容如下: 公平锁/非公平锁 可重入锁 独享锁/共享锁 互斥锁/读写锁 乐观锁/悲观锁 分段锁 偏...

Funcy1122
30分钟前
0
0
Ansible随机数

想为你的Ansible剧本取一个随机数?还想在接下来的运行中保持系统的等幂性?这里有一个答案。 假如,你要为一大批服务器设置cron任务,却不想让它们同时启动,你可以这样设置分钟数: minute...

大别阿郎
40分钟前
0
0
SpringCloud之服务注册中心Eureka

本系列介绍的配置均基于 Spring Boot 2.0.1.RELEASE 版本和 Spring Cloud Finchley.SR1 服务注册中心 Spring Cloud 已经帮我们实现了服务注册中心,我们只需要很简单的几个步骤就可以完成。 ...

熊小飞呀
今天
15
1
“Comparison method violates ...”异常的再现方法

前提条件:JDK8 代码: import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.List;public class Test { public stat......

hunterli
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部