文档章节

对ElasticSearch读写操作的封装

林中漫步
 林中漫步
发布于 2016/06/30 12:43
字数 1513
阅读 298
收藏 1

1 依赖的jar包:

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>1.7.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.4</version>
        </dependency>

2 Es环境配置:elasticsearch.yml

cluster.name: ${es.cluster.name}
node.name: "${es.node.name}"

client.transport.ip: ${es.client.transport.ip}
client.transport.port: ${es.client.transport.port}

3 封装的客户端工具类 ESClient

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class ESClient {

    private static final Logger log = LoggerFactory.getLogger(ESClient.class);

    private TransportClient client;

    public ESClient() {
        this.init();
    }


    private void close() {
        if (this.client != null) {
            this.client.close();
        }
    }

    @Override
    public void finalize() throws Throwable {
        this.close();
        super.finalize();
    }

    private void init() {
        try {
            Settings settings = ImmutableSettings.settingsBuilder().loadFromClasspath("elasticsearch.yml")
                    .put("client.transport.sniff", true).build();
            this.client = new TransportClient(settings);

            int port = settings.getAsInt("client.transport.port", 9900);
            String[] ips = settings.getAsArray("client.transport.ip");

            for (String ip : ips) {
                log.info("the ip is:" + ip);
                client.addTransportAddress(new InetSocketTransportAddress(ip, port));
            }
            log.info("es连接成功:{},{}", client, JSONObject.toJSONString(client.listedNodes()));

        } catch (Exception e) {
            if (client != null) {
                client.close();
            }
            log.error("连接es失败!", e);
        }
    }

    /**
     * 为一份文档建立索引
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param json  json格式的数据集,必须含有属性"id"
     * @return
     */
    public IndexResponse indexDoc(String index, String type, String json) throws Exception {
        JSONObject kvMap = JSONObject.parseObject(json);
        return this.indexDoc(index, type, kvMap);
    }


    /**
     * 为一份文档建立索引
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param kvMap 键值对形式的数据集,map中必须有属性key: "id"
     * @return
     */
    public IndexResponse indexDoc(String index, String type, Map<String, Object> kvMap)
            throws Exception {
        if (!kvMap.containsKey("id")) {
            throw new Exception("创建索引时,传入的map或json串中没有属性'id'! ");
        }
        String id = (String) kvMap.get("id");
        if (id == null) {
            throw new Exception("创建索引时,传入的map或json的属性'id'的值为null! ");
        }

        IndexRequestBuilder builder = client.prepareIndex(index, type, id);
        IndexResponse response = builder.setSource(kvMap)
                .execute()
                .actionGet();
        return response;
    }


    /**
     * 为多份文档建立索引
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param jsons json格式的数据集,其下json串必须有属性"id"
     * @return
     */
    public BulkResponse batchIndexDocsForJson(String index, String type, List<String> jsons)
            throws Exception {
        if (jsons.isEmpty()) {
            throw new Exception("批量创建索引时,传入的参数'jsons'为空!");
        }

        List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>(jsons.size());
        for (String json : jsons) {
            JSONObject kvMap = JSONObject.parseObject(json);
            kvList.add(kvMap);
        }

        BulkResponse response = this.batchIndexDocsForMap(index, type, kvList);
        kvList.clear();
        return response;
    }


    /**
     * 为多份文档建立索引
     *
     * @param index  索引名,相当于关系型数据库的库名
     * @param type   文档类型,相当于关系型数据库的表名
     * @param kvList 键值对形式的数据集,其下map中必须有属性key: "id"
     * @return
     */
    public BulkResponse batchIndexDocsForMap(String index, String type, List<Map<String, Object>> kvList)
            throws Exception {
        if (kvList.isEmpty()) {
            throw new Exception("批量创建索引时,传入的参数'kvList'为空!");
        }

        List<IndexRequest> requestList = new ArrayList<IndexRequest>(kvList.size());

        for (Map<String, Object> kvMap : kvList) {
            if (!kvMap.containsKey("id")) {
                throw new Exception("批量创建索引时,传入的map或json串中没有属性'id'! ");
            }
            String id = (String) kvMap.get("id");
            if (id == null) {
                throw new Exception("批量创建索引时,传入的map或json的属性'id'的值为null! ");
            }

            IndexRequest request = client
                    .prepareIndex(index, type, id).setSource(kvMap)
                    .request();
            requestList.add(request);
        }

        BulkRequestBuilder bulkRequest = client.prepareBulk();
        for (IndexRequest request : requestList) {
            bulkRequest.add(request);
        }

        BulkResponse response = bulkRequest
                .execute()
                .actionGet();

        return response;
    }


    /**
     * 删除一个文档
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param id    键值对形式的数据集
     * @return
     */
    public DeleteResponse deleteDoc(String index, String type, String id) throws InterruptedException {
        DeleteRequestBuilder builder = client.prepareDelete(index, type, id);
        DeleteResponse response = builder
                .execute()
                .actionGet();
        return response;
    }

    /**
     * 根据条件删除多个文档
     *
     * @param index        索引名,相当于关系型数据库的库名
     * @param type         文档类型,相当于关系型数据库的表名
     * @param queryBuilder 查询器
     * @return
     */
    public void deleteDocsByQuery(String index, String type, QueryBuilder queryBuilder) {
        client.prepareDeleteByQuery(index).setTypes(type).setQuery(queryBuilder)
                .execute()
                .actionGet();
    }


    /**
     * 指定id获取文档
     *
     * @param index 索引名,相当于关系型数据库的库名
     * @param type  文档类型,相当于关系型数据库的表名
     * @param id    文档id
     * @return
     */
    public Map<String, Object> getDoc(String index, String type, String id) {
        GetResponse response = client.prepareGet(index, type, id)
                .execute()
                .actionGet();

        Map<String, Object> retMap = response.getSourceAsMap();
        return retMap;
    }


    public List<Map<String, Object>> search(String index, String type, QueryBuilder queryBuilder, FilterBuilder filterBuilder) {
        SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type);
        if (queryBuilder != null) {
            builder = builder.setQuery(queryBuilder);
        }
        if (filterBuilder != null) {
            builder = builder.setPostFilter(filterBuilder);
        }

        SearchResponse searchResponse = builder.execute().actionGet();

        SearchHits hits = searchResponse.getHits();
        log.info("Es Hits count: " + hits.getTotalHits());

        List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>();

        SearchHit[] hitArray = hits.getHits();
        if (hitArray.length > 0) {
            for (SearchHit hit : hitArray) {
                Map<String, Object> kvMap = hit.getSource();
                kvMap.put("id", hit.getId());
                kvList.add(kvMap);
            }
        }
        return kvList;
    }
}

4 测试类 ESClientTest

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class ESClientTest {

    private static final Logger log = LoggerFactory.getLogger(ESClientTest.class);

    @Test
    public void testIndexDoc() {
        ESClient client = new ESClient();
        try {
            // 创建索引
            Map<String, Object> kvMap = new HashMap<String, Object>();
            kvMap.put("name", "hu");
            kvMap.put("age", "30");
            kvMap.put("gender", "f");
            kvMap.put("id", "33sdfa");
            client.indexDoc("test", "test_user", kvMap);

            // 创建索引
            String json = "{\"id\":\"55\",\"bb\":\"bbs\"}";
            client.indexDoc("test", "test_user", json);


            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testBatchIndexDocsForJson() {
        String json1 = "{\"id\":\"1\",\"ak\":\"av\",\"bk\":\"bv\"}";
        String json2 = "{\"id\":\"2\",\"ak\":\"av2\",\"bk\":\"bv2\"}";
        String json3 = "{\"id\":\"3\",\"ak\":\"av3\",\"bk\":\"bv3\"}";

        List<String> jsonList = new ArrayList<String>();
        jsonList.add(json1);
        jsonList.add(json2);
        jsonList.add(json3);

        ESClient client = new ESClient();
        try {
            // 创建索引
            client.batchIndexDocsForJson("test", "test_hw", jsonList);

            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testBatchIndexDocsForMap() {
        Map<String, Object> kvMap = new HashMap<String, Object>();
        kvMap.put("id", "3");
        kvMap.put("name", "hu");
        kvMap.put("age", "30");
        kvMap.put("gender", "f");
        kvMap.put("asdf", "33");

        Map<String, Object> kvMap2 = new HashMap<String, Object>();
        kvMap2.put("id", "2");
        kvMap2.put("name", "wang");
        kvMap2.put("age", "35");
        kvMap2.put("gender", "f");

        List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>();
        kvList.add(kvMap);
        kvList.add(kvMap2);


        ESClient client = new ESClient();
        try {
            // 创建索引
            client.batchIndexDocsForMap("test", "test_user", kvList);

            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            kvList = client.search("test", "test_user", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testSearch() {
        ESClient client = new ESClient();
        try {

            QueryBuilder queryBuilder = QueryBuilders.termQuery("gender", "f");
            FilterBuilder filterBuilder = FilterBuilders.rangeFilter("age").from(32);

            List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, filterBuilder);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testDeleteDoc() {
        ESClient client = new ESClient();
        try {

            client.deleteDoc("test", "test_user", "1");

            Thread.sleep(3000);

            // 查询结果
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void testDeleteDocs() {
        ESClient client = new ESClient();
        try {
            // 删除文档
            QueryBuilder queryBuilder = QueryBuilders.termQuery("ak", "av2");
            client.deleteDocsByQuery("test", "test_user", queryBuilder);

            // 查询结果
            queryBuilder = QueryBuilders.matchAllQuery();
            List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null);
            for (Map<String, Object> kv : kvList) {
                System.out.println(JSONObject.toJSONString(kv));
            }

            queryBuilder = QueryBuilders.matchAllQuery();
            client.deleteDocsByQuery("test", "test_user", queryBuilder);

            queryBuilder = QueryBuilders.matchAllQuery();
            client.deleteDocsByQuery("test", "test_hw", queryBuilder);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

© 著作权归作者所有

共有 人打赏支持
林中漫步
粉丝 96
博文 55
码字总数 33247
作品 0
深圳
架构师
Lucene、solr以及elasticsearch之间的区别和联系

全球数据库排行:DB-Engines Ranking 首先分别说明三者的概念: Lucene是一套信息检索工具包,并不包含搜索引擎系统,它包含了索引结构、读写索引工具、相关性工具、排序等功能,因此在使用L...

吴伟祥
08/13
0
0
ElasticSearch的ik分词插件开发

ik插件,说白了,就是通过封装ik分词器,与ElasticSearch对接,让ElasticSearch能够驱动该分词器。那么,具体怎么与ElasticSearch对接呢?从下往上走,总共3步: 一、封装IK分析器 与Elastic...

萧十一郎君
2014/05/26
0
1
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0
elk5.6.0 centos7 及问题

elk5.6.0 centos7 及问题 将elasticsearch,kibana,logstash 三个压缩包放入/data/docker_images/elk 目录中 服务器ip:192.168.1.250 cd /data/docker_images/elk/ 1.安装elasticsearch 本......

zbill
06/26
0
0
使用logstash+elasticsearch+kibana快速搭建日志平台

日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数、执行时间...

eddy_linux
2015/11/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

[Hive]JsonSerde使用指南

注意: 重要的是每行必须是一个完整的JSON,一个JSON不能跨越多行,也就是说,serde不会对多行的Json有效。 因为这是由Hadoop处理文件的工作方式决定,文件必须是可拆分的,例如,Hadoop将在...

Mr_yul
34分钟前
1
0
54:mysql修改密码|连接mysql|mysql常用命令

1、mysql修改密码: root用户时mysql的超级管理员,默认mysql的密码是空的,直接可以连接上去的,不过这样不安全; 注释:为了方便的使用mysql,需要把mysql加入到环境变量里; #后续自己输入mys...

芬野de博客
41分钟前
1
0
鼠标单击复制粘贴标签中的内容

<span ref="spanContentOne" id="spanContentOne" style="font-size: 14px;">或许不是最亮眼,总比瞎买强一点</span><!--<input type="button" @click="copyClick('1')" value="复制" />-......

帝子兮
45分钟前
1
0
使用axel多线程疯狂下载

在Linux中比较常见见的下载工具是curl和wget,但是下载比较大的文件两者都不支持多线程, 断点续传的作用不见得能发挥到最大。今天介绍一个axel工具,开启多线程疯狂下载。 安装 Fedora/Cen...

linuxprobe16
47分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部