ES Java client Document API整理

原创
2017/11/06 14:19
阅读数 2.2K

ES Java client Document API整理

对ES数据使用Java client进行处理设置。(这里有一个小细节就是:Java API版本一定要和ES版本对应,就是前面两位大版本一定要相等)

Maven仓库:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>5.5.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>transport</artifactId>
  <version>5.5.3</version>
</dependency>

api文档说明:

  • Client
    TransportClient client=null;
    Settings settings = Settings.builder()
        .put("cluster.name", "my-application")//指定集群名称
        .put("client.transport.sniff", true)//探测集群中机器状态
        .build();
    client = new PreBuiltTransportClient(settings);
    try {
      client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    } catch (UnknownHostException e) {
      e.printStackTrace();
    }
    return  client;

Document APIS:

对单个文档进行的操作 

  • Index API 

setSource第一种用法

 IndexResponse response = client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
            .startObject()
            .field("user", "kimchy")
            .field("postDate", "niaho")
            .field("message", "trying out Elasticsearch")
            .endObject()
        ).get();

setSource第二种用法

    XContentBuilder builder = jsonBuilder()
        .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
        .endObject();

    IndexResponse response = client.prepareIndex("twitter", "tweet", "2")
        .setSource(builder.toString(), XContentType.JSON
        )
        .get();

通过查看源码,还有几种用法(还有一些过期的就没有推荐)

setSource(byte[] source, int offset, int length, XContentType xContentType)

setSource(Object... source)

setSource(XContentType xContentType, Object... source)

   对IndexResponse返回值的属性进行了解

  Result getResult()//结果

  public String getIndex()//索引名

  public ShardId getShardId()//分片id

  public String getType()

  public String getId()

  public long getVersion()//第一次存入的数据version 是1

  public boolean forcedRefresh()

  public void setForcedRefresh()

  public RestStatus status()
  • GET API 

prepareGet用法(index,type,id)比较局限的是必须要知道id的情况进行查询

    GetResponse getResponse=client.prepareGet("twitter", "tweet", "2")
    .setOperationThreaded(false)//是否异步线程
    .get();
    System.out.println(getResponse.toString());
  • Delete API

prepareDelete用法局限同上

    DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
    .setOperationThreaded(false)
    .get();
  •  Delete By Query API

直接查询删除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) 
        .source("persons")                                  
        .get();                                             

long deleted = response.getDeleted();

对成功失败需要一些处理的情况下

//对成功失败需要特殊处理的情况下使用监听
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))                  
    .source("persons")                                                   
    .execute(new ActionListener<BulkByScrollResponse>() {           
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();                        
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });                       
  • Update API

对UpdateRequest进行操作1

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

对UpdateRequest进行操作2

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

使用script进行更新操作(也可以使用ScriptService.ScriptType.FILE,用的就是文件名)

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

upsert操作(存在就更新,不存在就插入)

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

对多个文档进行批量操作 

  • Multi Get API

可以跨索引查询

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}
  • Bulk API

批量新增文档,批量删除文档都可以

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

同时新增和删除

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
  • Using Bulk Processor

对批量处理结果进行处理

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
3 收藏
0
分享
返回顶部
顶部