文档章节

ElasticSearch中bulkProcesser使用

candyleer
 candyleer
发布于 2017/02/11 22:38
字数 482
阅读 53
收藏 4

初次接触es,可能对增删改查很熟悉,以为能为得心应手,本次应用场景为 数据库变更一条记录,会触发更新es中的数据,每秒并发大概30条左右,测试环境一切工作正常(数据量较少),上线后发现日志中很多类似于下面的错误:

队列满了

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 200) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
  ...

更新版本错误:

Caused by: org.elasticsearch.index.engine.VersionConflictEngineException: [kpi][4] [opportunity][1442415600000]: version conflict, current [5933], provided [5932]
        at org.elasticsearch.index.engine.internal.InternalEngine.innerIndex(InternalEngine.java:582) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.engine.internal.InternalEngine.index(InternalEngine.java:522) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.shard.service.InternalIndexShard.index(InternalIndexShard.java:425) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:193) [elasticsearch-1.4.4.jar:]

经过高手指点,从单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

private BulkProcessor bulkProcessor;

    @PostConstruct
    public void init() {
        this.bulkProcessor = BulkProcessor.builder(
                esTransportMainClient.getClient(),
                new BulkProcessor.Listener() {

                    @Override
                    public void beforeBulk(long executionId, BulkRequest request) {
                        logger.info("---尝试插入{}条数据---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, BulkResponse response) {
                        logger.info("---尝试插入{}条数据成功---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, Throwable failure) {
                        logger.error("[es错误]---尝试插入数据失败---", failure);
                    }

                })
                .setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(2)
                .build();
    }

我这里全局保持一个bulkProcesser就可以维持正常业务了。

每次使用的方法:

bulkProcessor.add(updateRequestBuilder.request());

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

© 著作权归作者所有

共有 人打赏支持
candyleer
粉丝 1
博文 5
码字总数 1859
作品 0
成都
私信 提问
Elastic 在年度用户大会 Elastic{ON} 2018 上发布众多新功能和技术预览

下载超过 2.25 亿次,Elastic 公开 X-Pack 源代码 旧金山 (Elastic{ON} 2018) – 2018 年 2 月 27 日 – Elastic,Elasticsearch 和 Elastic Stack背后的公司,今天宣布其产品累计下载次数达...

Medcl
2018/03/01
0
0
Elasticsearch介绍和安装

版权声明:https://blog.csdn.net/weixin43814195?t=1 https://blog.csdn.net/weixin43814195/article/details/85275156 Elasticsearch 1.简介 1.1基本概念 Elasticsearch是基于Lucene的全文......

MIss.Fan
2018/12/27
0
0
CentOS7.3下ELK日志分析系统集群搭建

Elasticsearch是个基于Lucene实现的开源、分布式、restful的全文本搜索引擎,此外他还是一个分布式实时文档存储,其中每个文档的每个filed均是可被索引的数据,且可被搜索,也是一个带实时分...

wujunqi1996
2018/07/14
0
0
CentOS下使用ELK套件搭建日志分析和监控平台

1 概述 ELK套件(ELK stack)是指ElasticSearch、Logstash和Kibana三件套。这三个软件可以组成一套日志分析和监控工具。 由于三个软件各自的版本号太多,建议采用ElasticSearch官网推荐的搭配...

周宇1991
2018/06/29
0
0
centos 7( linux )下安装elasticsearch教程

目录 概述 环境准备 elaticsearch简介 安装elasticsearch 彩蛋 概述 很久没有写博客了,最近在做全文检索的项目,发现elasticsearch踩了不少坑,百度点进去又是坑,在此记录一下自己的踩坑历程。...

java_龙
2018/10/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

lucene 评分要素解析

基本规则:

Java搬砖工程师
28分钟前
0
0
ubutnu 14.04 安装JIRA

系统版本:Ubuntu 14.04 下载jira安装文件 sudo wget https://downloads.atlassian.com/software/jira/downloads/atlassian-jira-software-7.1.9-x64.bin 修改文件权限 sudo chmod 777 atl......

Kampfer
38分钟前
0
0
软件开发模型优缺点及其适用范围

瀑布模型、快速原型模型、增量模型、螺旋模型 瀑布模型也称软件生存周期模型。 优点: (1)它在软件工程中占有重要地位,它提供了软件开发的基本框架,这比依靠“个人技艺”开发软件好得多。...

无极之岚
39分钟前
0
0
孩子们各显神通对付 iOS 12「屏幕使用时间」的限制

简评:2018 年秋季,苹果公司推出了 iOS 12,其中备受好评的一项改变是:增加了屏幕使用时间限制,以减轻沉迷手机的状况。三个月过去后,这项功能似乎并没有对孩子造成太多困扰,道高一尺魔高...

极光推送
44分钟前
2
1
springCloud Spring Boot mybatis分布式微服务云架构-docker-feign-hystrix-ribbon(七)

简介 在上一节中,我们讨论了feign+hystrix在项目开发中,除了考虑正常的调用之外,负载均衡和故障转移也是关注的重点,这也是feign + ribbon+hystrix的优势所在,本节我们就讨论一下在feign...

sccspuercode
44分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部