文档章节

Elasitcsearch High Level Rest Client学习笔记(三)批量api

木子SMZ
 木子SMZ
发布于 2018/07/18 18:27
字数 873
阅读 2357
收藏 0

Bulk Request

BulkRequest可以在一起从请求执行批量添加、更新和删除,至少需要添加一个操作

BulkRequest request = new BulkRequest(); //创建BulkRequest
request.add(new IndexRequest("posts", "doc", "1")  //添加操作
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  //添加操作
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  //添加操作
        .source(XContentType.JSON,"field", "baz"));

注意:每次只支持一种encoded,否则会报错

可以在同一个BulkRequest中添加不同类型操作

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可选参数

超时时间设置

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");

 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

 

设置副本shard活跃验证,执行index、update、delete操作前必须有多少个副本shard活跃

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);

调用方式

同步

BulkResponse bulkResponse = client.bulk(request);

异步

client.bulkAsync(request, new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        //成功
    }

    @Override
    public void onFailure(Exception e) {
        //失败
    }
});

响应对象

响应对象包括操作信息,并且可以便利每一个结果

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {     //index操作
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {     //update操作
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {     //delete操作
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

BulkResponce提供方法快速查看操作是否失败

if (bulkResponse.hasFailures()) { 
    //todo
}

BulkProcessor

RestHighLevelClient:执行BulkRequest并且返回BulkResponse

BulkProcessor.Listener:在bulk请求前后执行,并且可以处理失败情况

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        //bulk请求前执行
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        //bulk请求后执行
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        //失败后执行
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build();  //BulkProcessor通过 BulkProcessor.Builder build()方法构建, RestHighLevelClient.bulkAsync() 用来执行bulk请求

BulkProcessor.Builder提供方法使BulkProcessor调整请求参数

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); //按照数量批量处理(默认1000,-1禁用) 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); //按照大小批量处理
builder.setConcurrentRequests(0); //并发处理线程
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //设置flush索引周期
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //回退策略,等待1秒并重试3次, BackoffPolicy.noBackoff()  BackoffPolicy.constantBackoff()  BackoffPolicy.exponentialBackoff()  查看更多选项

添加请求

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

BulkProcessor通过 BulkProcessor.Listener 监控请求, BulkProcessor.Listener 提供方法接受BulkRequest和BulkResponse

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); //在每个execution前执行,可以获知每次执行多少个操作
        logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        if (response.hasFailures()) {  //在每个execution后执行,可以获知是否包含错误
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); //发生错误时执行
    }
};

批量请求执行后需要关闭BulkProcessor。两种关闭方式选其一

awaitClose(),所有请求被处理后或者等待时间结束后关闭,返回ture表明所有请求已经完成,false说明等待时间结束后请求并未执行结束

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

close(),立即关闭BulkProcessor

bulkProcessor.close();

关闭processor之前,所有已经被添加的请求会被提交执行,并且不能再向其中添加请求

© 著作权归作者所有

木子SMZ
粉丝 2
博文 34
码字总数 22804
作品 0
昌平
程序员
私信 提问
Elasitcsearch High Level Rest Client学习笔记(一)

文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.1/java-rest-high.html java doc地址: https://artifacts.elastic.co/javadoc/org/elasticsearch/client/el......

木子SMZ
2018/07/12
0
0
Elasticsearch Java Rest Client 上手指南(下)

High Level Rest Clent到现在还不是完成版。我试了一下,5.6版本的就这么些API 包含了基本的增删改查和批量操作 我翻了一下官方文档,凉凉。确实像官方文档说的那样,需要完善。虽然是High ...

MaxZing
2018/07/18
0
0
Elasitcsearch High Level Rest Client学习笔记(二) 基础API

1、index API IndexRequest request = new IndexRequest( String jsonString = "{" + request.source(jsonString, XContentType.JSON); //source可以有多种形式下面介绍 source可以以map的形......

木子SMZ
2018/07/17
0
0
Elasticsearch入门实践

一. 系统环境 操作系统:CentOS release 6.8 (Final) ES版本:6.1.1 二. 安装 先确认安装了Java运行时环境: 解压ES压缩包: 三. 启动 1. 启动ES单节点 当然,对于在后台以守护进程模式运行的...

哲别0
2018/06/06
0
0
High Level REST Client 访问阿里云6.3 Elasticsearch 实例实现

开发环境:InteliJ IDEA 操作系统 :macOS Mojave Elasticsearch 版本:阿里云 6.3.2withX-Pack 客户端版本:REST Client 6.3.2 1. 预先创建好阿里云 ES 实例,开启公网地址访问白名单。 2....

garygao305
01/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

零基础学画画应该从哪开始?

零基础学画画应该从哪开始?一种是从小有兴趣,喜欢涂鸦,喜欢将自己的创意和想法表现出来;另一种是长大后审美提高,开始对绘画艺术感兴趣,从而开始从零基础学起。 推荐大家可以搜一下:轻微...

设绘嗨
18分钟前
2
0
你编写的程序高效、优雅吗?阿里架构师教你编写高效优雅Java程序

面向对象 构造器参数太多怎么办? 用 builder 模式,用在 1、5 个或者 5 个以上的成员变量 2、参数不多,但是在未来,参数会增加 Builder 模式: 属于对象的创建模式,一般有 1. 抽象建造者:...

kx33389
23分钟前
2
0
PDF 文档操作Java类库Spire.PDF for Java v2.7.6发布上线!| 附下载

Spire.PDF for Java是一款专门对 PDF 文档进行操作的 Java 类库。该类库的主要功能在于帮助开发人员在 Java 应用程序(J2SE和J2EE)中生成 PDF 文档和操作现有 PDF 文档,并且运行环境无需安...

mnrssj
31分钟前
1
0
初探云原生应用管理(二): 为什么你必须尽快转向 Helm v3

在研究了一番“开放云原生应用中心(AppHub)”之后,程序员小张似乎已经明白了“云原生应用”到底是怎么一回事情。 “不就是 Helm 嘛!” 这不,小张这就准备把自己开发多年的“图书馆管理系...

zhaowei121
35分钟前
0
0
「工具」三分钟了解一款思维导图工具:XMind Zen

一款非常实用的商业思维导图软件,融合艺术与创造力。致力于高效的可视化思维,强调软件的跨平台使用,帮助用户提高生产效率。 相关信息 · 操作系统:macOS / Windows / Linux · 官方网站:...

极光推送
36分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部