文档章节

对ElasticSearch读写操作的封装

林中漫步
 林中漫步
发布于 2016/06/30 12:43
字数 1513
阅读 282
收藏 1

1 依赖的jar包:

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>1.7.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.4</version>
        </dependency>

2 Es环境配置:elasticsearch.yml

cluster.name: ${es.cluster.name}
node.name: "${es.node.name}"

client.transport.ip: ${es.client.transport.ip}
client.transport.port: ${es.client.transport.port}

3 封装的客户端工具类 ESClient

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class ESClient {

    private static final Logger log = LoggerFactory.getLogger(ESClient.class);

    private TransportClient client;

    public ESClient() {
        this.init();
    }


    private void close() {
        if (this.client != null) {
            this.client.close();
        }
    }

    @Override
    public void finalize() throws Throwable {
        this.close();
        super.finalize();
    }

    private void init() {
        try {
            Settings settings = ImmutableSettings.settingsBuilder().loadFromClasspath("elasticsearch.yml")
                    .put("client.transport.sniff", true).build();
            this.client = new TransportClient(settings);

            int port = settings.getAsInt("client.transport.port", 9900);
            String[] ips = settings.getAsArray("client.transport.ip");

            for (String ip : ips) {
                log.info("the ip is:" + ip);
                client.addTransportAddress(new InetSocketTransportAddress(ip, port));
            }
            log.info("es连接成功:{},{}", client, JSONObject.toJSONString(client.listedNodes()));

        } catch (Exception e) {
            if (client != null) {
                client.close();
            }
            log.error("连接es失败!", e);
        }
    }

    /**
     * 为一份文档建立索引
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param json  json格式的数据集,必须含有属性"id"
     * @return
     */
    public IndexResponse indexDoc(String index, String type, String json) throws Exception {
        JSONObject kvMap = JSONObject.parseObject(json);
        return this.indexDoc(index, type, kvMap);
    }


    /**
     * 为一份文档建立索引
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param kvMap 键值对形式的数据集,map中必须有属性key: "id"
     * @return
     */
    public IndexResponse indexDoc(String index, String type, Map<String, Object> kvMap)
            throws Exception {
        if (!kvMap.containsKey("id")) {
            throw new Exception("创建索引时,传入的map或json串中没有属性'id'! ");
        }
        String id = (String) kvMap.get("id");
        if (id == null) {
            throw new Exception("创建索引时,传入的map或json的属性'id'的值为null! ");
        }

        IndexRequestBuilder builder = client.prepareIndex(index, type, id);
        IndexResponse response = builder.setSource(kvMap)
                .execute()
                .actionGet();
        return response;
    }


    /**
     * 为多份文档建立索引
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param jsons json格式的数据集,其下json串必须有属性"id"
     * @return
     */
    public BulkResponse batchIndexDocsForJson(String index, String type, List<String> jsons)
            throws Exception {
        if (jsons.isEmpty()) {
            throw new Exception("批量创建索引时,传入的参数'jsons'为空!");
        }

        List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>(jsons.size());
        for (String json : jsons) {
            JSONObject kvMap = JSONObject.parseObject(json);
            kvList.add(kvMap);
        }

        BulkResponse response = this.batchIndexDocsForMap(index, type, kvList);
        kvList.clear();
        return response;
    }


    /**
     * 为多份文档建立索引
     *
     * @param index  索引名,相当于关系型数据库的库名
     * @param type   文档类型,相当于关系型数据库的表名
     * @param kvList 键值对形式的数据集,其下map中必须有属性key: "id"
     * @return
     */
    public BulkResponse batchIndexDocsForMap(String index, String type, List<Map<String, Object>> kvList)
            throws Exception {
        if (kvList.isEmpty()) {
            throw new Exception("批量创建索引时,传入的参数'kvList'为空!");
        }

        List<IndexRequest> requestList = new ArrayList<IndexRequest>(kvList.size());

        for (Map<String, Object> kvMap : kvList) {
            if (!kvMap.containsKey("id")) {
                throw new Exception("批量创建索引时,传入的map或json串中没有属性'id'! ");
            }
            String id = (String) kvMap.get("id");
            if (id == null) {
                throw new Exception("批量创建索引时,传入的map或json的属性'id'的值为null! ");
            }

            IndexRequest request = client
                    .prepareIndex(index, type, id).setSource(kvMap)
                    .request();
            requestList.add(request);
        }

        BulkRequestBuilder bulkRequest = client.prepareBulk();
        for (IndexRequest request : requestList) {
            bulkRequest.add(request);
        }

        BulkResponse response = bulkRequest
                .execute()
                .actionGet();

        return response;
    }


    /**
     * 删除一个文档
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param id    键值对形式的数据集
     * @return
     */
    public DeleteResponse deleteDoc(String index, String type, String id) throws InterruptedException {
        DeleteRequestBuilder builder = client.prepareDelete(index, type, id);
        DeleteResponse response = builder
                .execute()
                .actionGet();
        return response;
    }

    /**
     * 根据条件删除多个文档
     *
     * @param index        索引名,相当于关系型数据库的库名
     * @param type         文档类型,相当于关系型数据库的表名
     * @param queryBuilder 查询器
     * @return
     */
    public void deleteDocsByQuery(String index, String type, QueryBuilder queryBuilder) {
        client.prepareDeleteByQuery(index).setTypes(type).setQuery(queryBuilder)
                .execute()
                .actionGet();
    }


    /**
     * 指定id获取文档
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param id    文档id
     * @return
     */
    public Map<String, Object> getDoc(String index, String type, String id) {
        GetResponse response = client.prepareGet(index, type, id)
                .execute()
                .actionGet();

        Map<String, Object> retMap = response.getSourceAsMap();
        return retMap;
    }


    public List<Map<String, Object>> search(String index, String type, QueryBuilder queryBuilder, FilterBuilder filterBuilder) {
        SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type);
        if (queryBuilder != null) {
            builder = builder.setQuery(queryBuilder);
        }
        if (filterBuilder != null) {
            builder = builder.setPostFilter(filterBuilder);
        }

        SearchResponse searchResponse = builder.execute().actionGet();

        SearchHits hits = searchResponse.getHits();
        log.info("Es Hits count: " + hits.getTotalHits());

        List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>();

        SearchHit[] hitArray = hits.getHits();
        if (hitArray.length > 0) {
            for (SearchHit hit : hitArray) {
                Map<String, Object> kvMap = hit.getSource();
                kvMap.put("id", hit.getId());
                kvList.add(kvMap);
            }
        }
        return kvList;
    }
}

4 测试类 ESClientTest

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class ESClientTest {

    private static final Logger log = LoggerFactory.getLogger(ESClientTest.class);

    @Test
    public void testIndexDoc() {
        ESClient client = new ESClient();
        try {
            // 创建索引
            Map<String, Object> kvMap = new HashMap<String, Object>();
            kvMap.put("name", "hu");
            kvMap.put("age", "30");
            kvMap.put("gender", "f");
            kvMap.put("id", "33sdfa");
            client.indexDoc("test", "test_user", kvMap);

            // 创建索引
            String json = "{\"id\":\"55\",\"bb\":\"bbs\"}";
            client.indexDoc("test", "test_user", json);


            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testBatchIndexDocsForJson() {
        String json1 = "{\"id\":\"1\",\"ak\":\"av\",\"bk\":\"bv\"}";
        String json2 = "{\"id\":\"2\",\"ak\":\"av2\",\"bk\":\"bv2\"}";
        String json3 = "{\"id\":\"3\",\"ak\":\"av3\",\"bk\":\"bv3\"}";

        List<String> jsonList = new ArrayList<String>();
        jsonList.add(json1);
        jsonList.add(json2);
        jsonList.add(json3);

        ESClient client = new ESClient();
        try {
            // 创建索引
            client.batchIndexDocsForJson("test", "test_hw", jsonList);

            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testBatchIndexDocsForMap() {
        Map<String, Object> kvMap = new HashMap<String, Object>();
        kvMap.put("id", "3");
        kvMap.put("name", "hu");
        kvMap.put("age", "30");
        kvMap.put("gender", "f");
        kvMap.put("asdf", "33");

        Map<String, Object> kvMap2 = new HashMap<String, Object>();
        kvMap2.put("id", "2");
        kvMap2.put("name", "wang");
        kvMap2.put("age", "35");
        kvMap2.put("gender", "f");

        List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>();
        kvList.add(kvMap);
        kvList.add(kvMap2);


        ESClient client = new ESClient();
        try {
            // 创建索引
            client.batchIndexDocsForMap("test", "test_user", kvList);

            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            kvList = client.search("test", "test_user", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testSearch() {
        ESClient client = new ESClient();
        try {

            QueryBuilder queryBuilder = QueryBuilders.termQuery("gender", "f");
            FilterBuilder filterBuilder = FilterBuilders.rangeFilter("age").from(32);

            List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, filterBuilder);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testDeleteDoc() {
        ESClient client = new ESClient();
        try {

            client.deleteDoc("test", "test_user", "1");

            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testDeleteDocs() {
        ESClient client = new ESClient();
        try {
            // 删除文档
            QueryBuilder queryBuilder = QueryBuilders.termQuery("ak", "av2");
            client.deleteDocsByQuery("test", "test_user", queryBuilder);

            // 查询结果
            queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

            queryBuilder = QueryBuilders.matchAllQuery();
            client.deleteDocsByQuery("test", "test_user", queryBuilder);

            queryBuilder = QueryBuilders.matchAllQuery();
            client.deleteDocsByQuery("test", "test_hw", queryBuilder);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

© 著作权归作者所有

共有 人打赏支持
林中漫步
粉丝 94
博文 55
码字总数 33247
作品 0
深圳
架构师
Lucene、solr以及elasticsearch之间的区别和联系

全球数据库排行:DB-Engines Ranking 首先分别说明三者的概念: Lucene是一套信息检索工具包,并不包含搜索引擎系统,它包含了索引结构、读写索引工具、相关性工具、排序等功能,因此在使用L...

吴伟祥
08/13
0
0
ElasticSearch的ik分词插件开发

ik插件,说白了,就是通过封装ik分词器,与ElasticSearch对接,让ElasticSearch能够驱动该分词器。那么,具体怎么与ElasticSearch对接呢?从下往上走,总共3步: 一、封装IK分析器 与Elastic...

萧十一郎君
2014/05/26
0
1
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0
elk5.6.0 centos7 及问题

elk5.6.0 centos7 及问题 将elasticsearch,kibana,logstash 三个压缩包放入/data/docker_images/elk 目录中 服务器ip:192.168.1.250 cd /data/docker_images/elk/ 1.安装elasticsearch 本......

zbill
06/26
0
0
快速上手 Elasticsearch 的几个建议

相信不少同学都听说过 Elasticsearch,作为目前最流行的搜索引擎实现方案,越来越多的公司在自己的架构中引入,而其应用场景也从搜索引擎扩展到了日志存储分析、大数据分析领域,本文尝试给初...

rockybean
05/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

文件的压缩与解压(linux)

Linux下*.tar.gz文件解压缩命令 1.压缩命令:   命令格式:tar -zcvf 压缩后文件名.tar.gz 被压缩文件名 可先切换到当前目录下。压缩文件名和被压缩文件名都可加入路径。 2.解压缩命令: ...

qimh
34分钟前
3
0
invalid character found in the request target 异常

这个异常时因为Tomcat 9不支持请求格式出现“{”等非法字符的问题 因为tomcat版本问题遇到的坑,记录一下。 问题 今天由于要测试一下订单详情页的异步查询,在本地起了一个服务,发送的请求是...

edwardGe
38分钟前
4
0
发现抓包软件fiddler的bug

1个请求他跳转之后,直接400,被拦在了Apache,使用fiddler 的,replay requests 是同样的结果,但是replay composer确是正常的。 也就是说这replay requests 是发原来的包,replay composer...

NLGBZJ
48分钟前
1
0
linux screen 命令详解

shell关闭后, 主机仍然运行 screen命令 启动jenkins以后, screen, 然后按ctrl+a 再按d 这样暂停了子界面, 这时候回到了父界面 用screen –ls查看目前子界面的状态 [root@free /]# screen -l...

SuShine
49分钟前
4
0
mac机器切换无线网络导致网页不能打开的问题

问题: 公司和家里使用不同的WI-FI,每次从家到公司时自动切换网络后,公司的许多地址不能访问, ping域名是可以ping同的,但是网页却打不开... 问题分析: 初步猜想是DNS缓存的问题? 对于MAC系统没...

Lennie002
52分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部