文档章节

ElasticSearch中bulkProcesser使用

candyleer
 candyleer
发布于 2017/02/11 22:38
字数 482
阅读 49
收藏 4

初次接触es,可能对增删改查很熟悉,以为能为得心应手,本次应用场景为 数据库变更一条记录,会触发更新es中的数据,每秒并发大概30条左右,测试环境一切工作正常(数据量较少),上线后发现日志中很多类似于下面的错误:

队列满了

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 200) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
  ...

更新版本错误:

Caused by: org.elasticsearch.index.engine.VersionConflictEngineException: [kpi][4] [opportunity][1442415600000]: version conflict, current [5933], provided [5932]
        at org.elasticsearch.index.engine.internal.InternalEngine.innerIndex(InternalEngine.java:582) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.engine.internal.InternalEngine.index(InternalEngine.java:522) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.index.shard.service.InternalIndexShard.index(InternalIndexShard.java:425) [elasticsearch-1.4.4.jar:]
        at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:193) [elasticsearch-1.4.4.jar:]

经过高手指点,从单次的实时操作改为批量操作,这样做的好处有,减少网路开销,从消息大小,时间,消息数量三个维度来衡量 批量操作的维度,如果数据不是要求非常实时的操作(非常实时的存储应该也不会选择es),改为批量操作后,错误均修复,大概配置如下。

private BulkProcessor bulkProcessor;

    @PostConstruct
    public void init() {
        this.bulkProcessor = BulkProcessor.builder(
                esTransportMainClient.getClient(),
                new BulkProcessor.Listener() {

                    @Override
                    public void beforeBulk(long executionId, BulkRequest request) {
                        logger.info("---尝试插入{}条数据---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, BulkResponse response) {
                        logger.info("---尝试插入{}条数据成功---", request.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request, Throwable failure) {
                        logger.error("[es错误]---尝试插入数据失败---", failure);
                    }

                })
                .setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(2)
                .build();
    }

我这里全局保持一个bulkProcesser就可以维持正常业务了。

每次使用的方法:

bulkProcessor.add(updateRequestBuilder.request());

此processer的含义为如果消息数量到达1000 或者消息大小到大5M 或者时间达到5s 任意条件满足,客户端就会把当前的数据提交到服务端处理。效率很高。

© 著作权归作者所有

共有 人打赏支持
candyleer
粉丝 1
博文 5
码字总数 1859
作品 0
成都
Elastic Search学习笔记2——安装head插件

下载head插件地址 http://mobz.github.io/elasticsearch-head/ 安装 在V2中,进入elasticsearch/bin目录 使用plugin可以直接安装插件 plugin install mobz/elasticsearch-head 在浏览器中输入...

晨猫
03/09
0
0
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
09/11
0
0
初探 ELK - 每天5分钟玩转 Docker 容器技术(89)

在开源的日志管理方案中,最出名的莫过于 ELK 了。ELK 是三个软件的合称:Elasticsearch、Logstash、Kibana。 Elasticsearch 一个近乎实时查询的全文搜索引擎。Elasticsearch 的设计目标就是...

CloudMAN
2017/11/03
0
0
Elasticsearch中文分词研究

一、ES分析器简介 ES是一个实时搜索与数据分析引擎,为了完成搜索功能,必须对原始数据进行分析、拆解,以建立索引,从而实现搜索功能; ES对数据分析、拆解过程如下: 首先,将一块文本分成...

zhaipengfei1231
04/18
0
0
分布式搜索elasticsearch 安装

单机安装 安装我就以自己的项目为主我自己本身就是一下小项目从部署向下 安装步骤: 1. 安装jdk或者jre,然后设置好环境变更JAVA_HOME; 2. 下载elasticsearch,将之解压,到你项目文件下 我...

空_明
2014/01/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

自定义Ubuntu/Windows双系统引导菜单主题

学习Linux自然少不了要装双系统,其中Ubuntu便是我们用的最多的Linux系统。装完双系统后,Ubuntu会自动生成grub开机引导及菜单,及其丑陋,而且很多我们用不到的选项。今天我们就介绍burg:修...

Linux就该这么学
6分钟前
0
0
Go 并发(二)

Go Mutex 通过Mutex和信道处理竞态条件。 临界区 当程序并发运行时,多个协程不应该同时访问那些修改共享资源的代码,这些修改共享资源的代码称为临界区。 Go中通过Mutex可以避免同时访问临界...

春哥大魔王的博客
8分钟前
0
0
CentOS 7安装和部署Docker

Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。通过 uname -r 命令查看你当前的内核版本 uname -r3.10.0-514.el7.x86_64 1、...

狼王黄师傅
12分钟前
0
0
php扩展可以通过pecl 或者phpize 安装

pecl 算是 php 扩展的一个官方聚合平台,一些比较有名,有特点的扩展会被 pecl 收录,收录后可以通过 pecl 的方式安装。但是更多的扩展是没有收录在 pecl 上的,这些扩展还是需要通过 phpize...

bengozhong
12分钟前
0
0
CentOS中如何安装7ZIP

执行以下命令下载安装: wget http://nchc.dl.sourceforge.net/project/p7zip/p7zip/9.20.1/p7zip_9.20.1_src_all.tar.bz2tar -jxvf p7zip_9.20.1_src_all.tar.bz2cd p7zip_9.20.1make......

凯文加内特
18分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部