文档章节

ElasticSearch Java api 详解_V1.0

逝去的回忆
 逝去的回忆
发布于 2017/05/12 16:50
字数 3005
阅读 716
收藏 6

Java 操作ES实例解析

集群的连接

作为Elasticsearch节点

实例化一个节点的客户端是获得客户端的最简单的方式。这个Client可以执行elasticsearch相关的操作。

import static org.elasticsearch.node.NodeBuilder.*;

// on startup
Node node = nodeBuilder().node();
Client client = node.client();

// on shutdown
node.close();

当你启动一个node,它就加入了elasticsearch集群。你可以通过简单的设置cluster.name或者明确地使用clusterName方法拥有不同的集群。

你能够在你项目的/src/main/resources/elasticsearch.yml文件中定义cluster.name。只要elasticsearch.yml在classpath目录下面,你就能够用到它来启动你的节点。

cluster.name: yourclustername

或者通过java:

Node node = nodeBuilder().clusterName("yourclustername").node();
Client client = node.client();

利用Client的好处是,操作可以自动地路由到这些操作被执行的节点,而不需要执行双跳(double hop)。例如,索引操作将会在该操作最终存在的分片上执行。

当你启动了一个节点,最重要的决定是是否它将保有数据。大多数情况下,我们仅仅需要用到clients,而不需要分片分配给它们。这可以通过设置node.data为false或者设置 node.client为true来简单实现。

import static org.elasticsearch.node.NodeBuilder.*;

// on startup
Node node = nodeBuilder().client(true).node();
Client client = node.client();

// on shutdown
node.close();

传输(transport)客户端

TransportClient利用transport模块远程连接一个elasticsearch集群。它并不加入到集群中,只是简单的获得一个或者多个初始化的transport地址,并以轮询的方式与这些地址进行通信。

// on startup
Client client = new TransportClient()
        .addTransportAddress(new InetSocketTransportAddress("host1", 9300))
        .addTransportAddress(new InetSocketTransportAddress("host2", 9300));

// on shutdown
client.close();

注意,如果你有一个与elasticsearch集群不同的集群,你可以设置机器的名字。

Settings settings = ImmutableSettings.settingsBuilder()
        .put("cluster.name", "myClusterName").build();
Client client =    new TransportClient(settings);
//Add transport addresses and do something with the client...

你也可以用elasticsearch.yml文件来设置。

这个客户端可以嗅到集群的其它部分,并将它们加入到机器列表。为了开启该功能,设置client.transport.sniff为true。

Settings settings = ImmutableSettings.settingsBuilder()
        .put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);

其它的transport客户端设置有如下几个:

Parameter Description
client.transport.ignore_cluster_name true:忽略连接节点的集群名验证
client.transport.ping_timeout ping一个节点的响应时间,默认是5s
client.transport.nodes_sampler_interval sample/ping 节点的时间间隔,默认是5s

 

Java 索引API

索引API允许开发者索引类型化的JSON文档到一个特定的索引,使其可以被搜索。

生成JSON文档

有几种不同的方式生成JSON文档

  • 利用byte[]或者作为一个String手动生成
  • 利用一个Map将其自动转换为相应的JSON
  • 利用第三方库如Jackson去序列化你的bean
  • 利用内置的帮助函数XContentFactory.jsonBuilder()

手动生成

需要注意的是,要通过Date Format编码日期。

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

使用map

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

序列化bean

elasticsearch早就用到了Jackson,把它放在了org.elasticsearch.common.jackson下面。你可以在你的pom.xml文件里面添加你自己的Jackson版本。

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.1.3</version>
</dependency>

这样,你就可以序列化你的bean为JSON。

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
String json = mapper.writeValueAsString(yourbeaninstance);

利用elasticsearch帮助类

elasticsearch提供了内置的帮助类来将数据转换为JSON

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,你也可以使用startArray(String)endArray()方法添加数组。另外,field可以接收任何类型的对象,你可以直接传递数字、时间甚至XContentBuilder对象。

可以用下面的方法查看json。

String json = builder.string();

索引文档

下面的例子将JSON文档索引为一个名字为“twitter”,类型为“tweet”,id值为1的索引。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .execute()
        .actionGet();

你也可以不提供id:

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json)
        .execute()
        .actionGet();

IndexResponse将会提供给你索引信息

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();

如果你在索引时提供了过滤,那么IndexResponse将会提供一个过滤器(percolator )

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(json)
        .execute()
        .actionGet();

List<String> matches = response.matches();

 

Java 获取API

获取API允许你通过id从索引中获取类型化的JSON文档,如下例:

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .execute()
        .actionGet();

操作线程

The get API allows to set the threading model the operation will be performed when the actual execution of the API is performed on the same node (the API is executed on a shard that is allocated on the same server).

默认情况下,operationThreaded设置为true表示操作执行在不同的线程上面。下面是一个设置为false的例子。

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .execute()
        .actionGet();

 

删除API

删除api允许你通过id,从特定的索引中删除类型化的JSON文档。如下例:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .execute()
        .actionGet();

操作线程

The get API allows to set the threading model the operation will be performed when the actual execution of the API is performed on the same node (the API is executed on a shard that is allocated on the same server).

默认情况下,operationThreaded设置为true表示操作执行在不同的线程上面。下面是一个设置为false的例子。

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .execute()
        .actionGet();

 

更新API

你能够创建一个UpdateRequest,然后将其发送给client。

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

或者你也可以利用prepareUpdate方法

1 client.prepareUpdate("ttl", "doc", "1")
2        .setScript("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE)
3        .get();

5 client.prepareUpdate("ttl", "doc", "1")
6        .setDoc(jsonBuilder()
7            .startObject()
8                .field("gender", "male")
9            .endObject())
10        .get();

1-3行用脚本来更新索引,5-10行用doc来更新索引。

当然,java API也支持使用upsert。如果文档还不存在,会根据upsert内容创建一个新的索引。

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);
client.update(updateRequest).get();

如果文档index/type/1已经存在,那么在更新操作完成之后,文档为:

{
    "name"  : "Joe Dalton",
    "gender": "male"
}

否则,文档为:

{
    "name" : "Joe Smith",
    "gender": "male"
}

 

bulk API

bulk API允许开发者在一个请求中索引和删除多个文档。下面是使用实例。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

 

搜索API

搜索API允许开发者执行一个搜索查询,返回满足查询条件的搜索信息。它能够跨索引以及跨类型执行。查询既可以用Java查询API也可以用Java过滤API。 查询的请求体由SearchSourceBuilder构建。

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilders.*;
import org.elasticsearch.index.query.QueryBuilders.*;

SearchResponse response = client.prepareSearch("index1", "index2")
        .setTypes("type1", "type2")
        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
        .setQuery(QueryBuilders.termQuery("multi", "test"))             // Query
        .setPostFilter(FilterBuilders.rangeFilter("age").from(12).to(18))   // Filter
        .setFrom(0).setSize(60).setExplain(true)
        .execute()
        .actionGet();

注意,所有的参数都是可选的。下面是最简洁的形式。

// MatchAll on the whole cluster with all default options
SearchResponse response = client.prepareSearch().execute().actionGet();

 

搜索模式(Java Class)

SearchRequestBuilder reqBuilder = client.prepareSearch(App.ESProp.INDEX_NAME)
                .setTypes("task_info").setSearchType(SearchType.DEFAULT)
                .setExplain(true);
        QueryStringQueryBuilder queryString = QueryBuilders
                .queryString("中华");
        queryString.field("taskContent");
        queryString.minimumShouldMatch("1");
        reqBuilder.setQuery(QueryBuilders.boolQuery().should(queryString))
                .setExplain(true);
        SearchResponse resp = reqBuilder.execute().actionGet();
        SearchHit[] hits = resp.getHits().getHits();

        List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
        for (SearchHit hit : hits) {
            results.add(hit.getSource());
        }
        System.out.println("result ---->>>>");
        for (int i = 0; i < results.size(); i++) {
            System.out.println(results.get(i));
        }

上面的实例中,包含了一个简单的查询,在此有几点个人的理解,请看下面;

  •  基本查询器

             SearchResponse response = client.prepareSearch().execute().actionGet();// 获取全部

             SearchRequestBuilder searchRequestBuilder = client.prepareSearch("index1", "index2"); 在索引为index1 和index2中进行文档查询 
             searchRequestBuilder.setTypes("type1", "type2"); // es 的搜索 Search 不但联合多个库(index1、index2),而是可以是跨类型的(即跨表的 type1、type2)
             searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);  //设置查询类型  
             searchRequestBuilder.setFrom(0).setSize(10);  //设置分页信息  
             searchRequestBuilder.addSort("crawlDate", SortOrder.DESC);  // 按照时间降序  
             searchRequestBuilder.setExplain(true);  // 设置是否按查询匹配度排序  
    
             searchRequestBuilder.setSearchType,设置搜索类型,主要的搜索类型有:
             QUERY_THEN_FETCH:查询是针对所有的块执行的,但返回的是足够的信息,而不是文档内容(Document)。结果会被排序和分级,基于此,只有相关的块的文档对象会被返回。由于被取到的仅仅是这些,故而返回的 hit 的大小正好等于指定的 size。这对于有许多块的 index 来说是很便利的(返回结果不会有重复的,因为块被分组了)
             QUERY_AND_FETCH:最原始(也可能是最快的)实现就是简单的在所有相关的 shard上执行检索并返回结果。每个 shard 返回一定尺寸的结果。由于每个shard已经返回了一定尺寸的hit,这种类型实际上是返回多个 shard的一定尺寸的结果给调用者。
             DFS_QUERY_THEN_FETCH:与 QUERY_THEN_FETCH 相同,预期一个初始的散射相伴用来为更准确的 score 计算分配了的term频率。
             DFS_QUERY_AND_FETCH:与 QUERY_AND_FETCH 相同,预期一个初始的散射相伴用来为更准确的 score 计算分配了的term频率。
             SCAN:在执行了没有进行任何排序的检索时执行浏览。此时将会自动的开始滚动结果集。
             COUNT:只计算结果的数量,也会执行 facet。 

  •  Match Query (链接内有详细解释)
    QueryBuilder qb = QueryBuilders.matchQuery("name", "kimchy elasticsearch");
    //name是field,kimchy elasticsearch是要查询的字符串

     

  •  MultiMatch Query (链接内有详细解释)
    QueryBuilder qb = QueryBuilders.multiMatchQuery(
        "kimchy elasticsearch",     // Text you are looking for
         //kimchy elasticsearch是要查询的字符串
        "user", "message"           // Fields you query on
         //user 和 message都是field
        );

     

 

  •  构建文本查询器

            QueryStringQueryBuilder queryString = QueryBuilders.queryString("\"" + content + "\"");  构建文本查询器
            queryString.field(k);   设置匹配字段值

  •  termQuery

           强制匹配原则,禁止进行分词搜索 

  •  Should

           should查询中会默认将查询分成多个termQuery查询,他的精准值采用minimumShouldMatch参数进行设置。

 

Spring ES 操作简介

连接ES客户端

    @Bean
    public ElasticsearchTemplate elasticsearchTemplate() {
        return new ElasticsearchTemplate(client());
    }
    
    @Bean
    public Client client(){
        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", "elasticsearch")
                .put("client.transport.ping_timeout", "3s").build();
        
        TransportClient client= new TransportClient(settings);
        TransportAddress address = new InetSocketTransportAddress("120.24.165.15", 9300); 
        client.addTransportAddress(address);
        return client;
    }
    
    @Bean
    public ElasticsearchActionService elasticsearchService() {
        ElasticsearchActionService elasticsearchService = new ElasticsearchActionService();
        elasticsearchService.init(elasticsearchTemplate());
        return elasticsearchService;
    }

初始化索引(库)

  • 初始化文档库,建立索引,实现批量新增数据。
        private ElasticsearchTemplate elasticsearchTemplate;
    
        @Autowired
        private Client esClient;
    
        public void init(ElasticsearchTemplate clzz) {
            elasticsearchTemplate = (ElasticsearchTemplate) clzz;
            if (!elasticsearchTemplate.indexExists(App.ESProp.INDEX_NAME)) {
                elasticsearchTemplate.createIndex(App.ESProp.INDEX_NAME);
            }
            elasticsearchTemplate.putMapping(TaskInfo.class);
            elasticsearchTemplate.putMapping(NewsInfo.class);
        }
    
        /**
         * 新增或者修改文档信息
        * @author 高国藩
        * @date 2017年5月12日 下午3:16:27
        * @param taskInfoList
        * @return
         */
        public boolean update(List<TaskInfo> taskInfoList) {
            List<IndexQuery> queries = new ArrayList<IndexQuery>();
            for (TaskInfo taskInfo : taskInfoList) {
                IndexQuery indexQuery = new IndexQueryBuilder().withId(taskInfo.getTaskId()).withObject(taskInfo).build();
                queries.add(indexQuery);
            }
            elasticsearchTemplate.bulkIndex(queries);
            return true;
        }
    

     

  • 采用注解方式,初始化Mapping文件(class)
    package com.sk.system.es;
    
    import org.springframework.data.annotation.Id;
    import org.springframework.data.elasticsearch.annotations.Document;
    import org.springframework.data.elasticsearch.annotations.Field;
    import org.springframework.data.elasticsearch.annotations.FieldIndex;
    import org.springframework.data.elasticsearch.annotations.FieldType;
    
    import com.sk.browser.config.App;
    /**
    * store 是否存储   FieldIndex.not_analyzed 不进行分词   indexAnalyzer="ik" 使用IK进行分词处理
    */
    //@Document(indexName = APP.ESProp.INDEX_NAME, type = APP.ESProp.TYPE_TASK_INFO, indexStoreType = APP.ESProp.INDEX_STORE_TYPE, shards = APP.ESProp.SHARDS, replicas = APP.ESProp.REPLICAS, refreshInterval = APP.ESProp.REFRESH_INTERVAL)
    @Document(indexName = App.ESProp.INDEX_NAME, type = App.ESProp.TYPE_TASK_INFO)
    public class TaskInfo {
        @Id //标注ID,将作为文档ID存在
        @Field(index = FieldIndex.not_analyzed, store = true)
        private String taskId;
        
        @Field(type = FieldType.Integer, index = FieldIndex.not_analyzed, store = true)
        private Integer userId;
    
        @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true)
        private String taskContent;
    
        @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true)
        private String taskArea;
        
        @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true)
        private String taskTags;
        
        @Field(type = FieldType.Integer, index = FieldIndex.not_analyzed, store = true)
        private Integer taskState;
    
        @Field(type = FieldType.String, index = FieldIndex.not_analyzed, store = true)
        private String updateTime;
    
        @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true)
        private String userNickName;
        
        public String getTaskId() {
            return taskId;
        }
    
        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
    
        public Integer getUserId() {
            return userId;
        }
    
        public void setUserId(Integer userId) {
            this.userId = userId;
        }
    
        public String getTaskContent() {
            return taskContent;
        }
    
        public void setTaskContent(String taskContent) {
            this.taskContent = taskContent;
        }
    
        public String getTaskArea() {
            return taskArea;
        }
    
        public void setTaskArea(String taskArea) {
            this.taskArea = taskArea;
        }
    
        public String getTaskTags() {
    		return taskTags;
    	}
    
    	public void setTaskTags(String taskTags) {
    		this.taskTags = taskTags;
    	}
    
    	public Integer getTaskState() {
            return taskState;
        }
    
        public void setTaskState(Integer taskState) {
            this.taskState = taskState;
        }
    
        public String getUpdateTime() {
            return updateTime;
        }
    
        public void setUpdateTime(String updateTime) {
            this.updateTime = updateTime;
        }
    
        public String getUserNickName() {
            return userNickName;
        }
    
        public void setUserNickName(String userNickName) {
            this.userNickName = userNickName;
        }
    
        @Override
        public String toString() {
            return "TaskInfo [taskId=" + taskId + ", userId=" + userId
                    + ", taskContent=" + taskContent + ", taskArea=" + taskArea
                    + ", taskState=" + taskState
                    + ", updateTime=" + updateTime + ", userNickName="
                    + userNickName + "]";
        }
    
    	public TaskInfo(String taskId, Integer userId, String taskContent,
    			String taskArea, String taskTags, Integer taskState,
    			String updateTime, String userNickName) {
    		this.taskId = taskId;
    		this.userId = userId;
    		this.taskContent = taskContent;
    		this.taskArea = taskArea;
    		this.taskTags = taskTags;
    		this.taskState = taskState;
    		this.updateTime = updateTime;
    		this.userNickName = userNickName;
    	}
        public TaskInfo() {
    		// TODO Auto-generated constructor stub
    	}
    }
    

     

 

© 著作权归作者所有

逝去的回忆
粉丝 16
博文 136
码字总数 219872
作品 0
深圳
高级程序员
私信 提问
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
2018/09/11
0
0
ElasticSearch Client详解

从本文开始,将与大家进入到Elasticsearch的精妙世界中来,基于当前最新的6.4.x版本。 本文将重点探讨ElasticSearch Client的相关知识,重点关注TransportClient与Rest Client。Elasticsear...

丁威
03/10
0
0
elasticsearch使用指南之ElasticSearch Client详解

从本文开始,将与大家进入到Elasticsearch的精妙世界中来,基于当前最新的6.4.x版本。 本文将重点探讨ElasticSearch Client的相关知识,重点关注TransportClient与Rest Client。Elasticsear...

唯有坚持不懈
03/31
0
0
elasticsearch和mysql的数据同步采用哪种方案合适。

最近公司在使用elasticsearch,使用的是6.5版本的。其中有一个需求就是需要把数据库已有的数据同步到elasticsearch中来,调研了几种方案。 logstash,elasticsearch-jdbc,自己实现。 暂时采...

lanceli
03/18
0
0
spring boot2集成ES详解

一:运行环境 JDK:1.8 ES:5.6.4 二:学习内容 如何构建spring-data-elasticsearch环境? 如何实现常用的增删改查? 如何实现对象嵌套也就是1对多这种关系? 三:JAVA依赖环境 根据spring-...

woter
2018/07/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

面试爱奇艺,竟然挂在第5轮……

今天给大家分享我曾经在爱奇艺的面试,过程还是比较有意思的,可以给大家一些参考 <br> 聊骚阶段 嗲妹妹:你好,我是爱奇艺的HR,我们正在招聘运维开发岗位,请问您最近有在看工作机会吗? ...

上海小胖
26分钟前
0
0
Jenkins系列_插件安装及报错处理

进入Jenkins之后我们可以进行插件的安装,插件管理位于以下模块: 发现上面报了一堆错误,是因为插件的依赖没有安装好,那么这一节,就先把这些错误解决掉吧。解决完成后,也就基本会使用插件...

shzwork
今天
2
0
mysql mysql的所有查询语句和聚合函数(整理一下,忘记了可以随时看看)

查询所有字段 select * from 表名; 查询自定字段 select 字段名 from 表名; 查询指定数据 select * from 表名 where 条件; 带关键字IN的查询 select * from 表名 where 条件 [not] in(元素...

edison_kwok
昨天
9
0
解决多线程并行加载缓存问题(利用guava实现)

依赖 com.google.guava:guava:20.0 import com.google.common.cache.Cache;import com.google.common.cache.CacheBuilder;import java.util.concurrent.ExecutionException;import j......

暗中观察
昨天
4
0
利用VisualVM 内存查看

准备工作,建几个测试类。等下就是要查看这几个类里面的属性 package visualvm;public class MultiObject { private String str; private int i; MultiObject(String str...

冷基
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部