文档章节

elasticsearch批量数据导入和导出

寻梦2012
 寻梦2012
发布于 2015/12/29 10:08
字数 405
阅读 11665
收藏 11

 之前使用ES的时候建表Type时有个字段的类型搞错了。以至于用API查询时出错。所以就研究一下ES API做了一下ES批量导出和导入重构了Type

1:Java API批量导出

 Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
 SearchResponse response = client.prepareSearch("bigdata").setTypes("student")
               .setQuery(QueryBuilders.matchAllQuery()).setSize(10000).setScroll(new TimeValue(6000                  00))
                .setSearchType(SearchType.SCAN).execute().actionGet();//setSearchType(SearchType.Scan) 告诉ES不需要排序只要结果返回即可 setScroll(new TimeValue(600000)) 设置滚动的时间
        String scrollid = response.getScrollId();
        try {
        //把导出的结果以JSON的格式写到文件里
            BufferedWriter out = new BufferedWriter(new FileWriter("es", true));
            
            //每次返回数据10000条。一直循环查询直到所有的数据都查询出来
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查询不到数据时跳出循环
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查询数量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    out.write(json);
                    out.write("\r\n");
                }
            }
            System.out.println("查询结束");
            out.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

2:Java API 批量导入

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
        try {
        //读取刚才导出的ES数据
            BufferedReader br = new BufferedReader(new FileReader("es"));
            String json = null;
            int count = 0;
            //开启批量插入
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            while ((json = br.readLine()) != null) {
                bulkRequest.add(client.prepareIndex("bigdata", "student").setSource(json));
                //每一千条提交一次
                if (count% 1000==0) {
                    bulkRequest.execute().actionGet();
                    System.out.println("提交了:" + count);
                }
                count++;
            }
            bulkRequest.execute().actionGet();
            System.out.println("插入完毕");
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

 以上就是ES的批量导出和导入了。



© 著作权归作者所有

共有 人打赏支持
寻梦2012
粉丝 30
博文 20
码字总数 12011
作品 0
程序员
加载中

评论(24)

寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
美女你导出1亿条用ES是错误的。你怎么会用ES导出这么多数据呢。你可以ES+Hadoop+hive去导数据。
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
应该你的线程数超过了你ES集群的线程数吧。你把代码都贴出来
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,
查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
在。好的我给你看看,
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧
你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
晚上我看看给你恢复你明天看吧
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
我看看。我今天有事没在公司刚看到
Brooo
Brooo
问下,这个怎么用多线程去跑啊
Brooo
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的

引用来自“寻梦2012”的评论

你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
https://my.oschina.net/u/3537613/blog/948324 ,你好,这里面是我的代码,实在感谢
寻梦2012
寻梦2012

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的
你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
elasticsearch使用java api批量插入数据

使用java api批量插入数据,主要参考官网的api,主要是client和Document APIs-Index API部分。 pom依赖 主要代码 参考: https://www.elastic.co/guide/en/elasticsearch/client/java-api/cu...

cjun1990
2016/10/20
0
5
ElasticSearch ik,elasticsearch-jdbc 使用 和 yii2 实例

1.ES安装 mac安装: a.由于ES为java开发 所以首先安装下载jdk,由于最新版本对jdk要求>1.8.0,所以可以通过brew 升级jdk b.brew 安装ES 启动 elasticsearch 后,在浏览器地址:127.0.0.1:9200,浏览...

罗培海
2017/12/16
0
0
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
09/11
0
0
Elasticsearch 入门: _bulk 批量导入数据

批量导入数据 使用 Elasticsearch Bulk API 批量 update 步骤: 需求:我希望批量导入一个 type 的名词列表到 index 索引。 准备数据: 根据官方文档,Json 数据要准备成这个格式的: 其中 ...

王兵
05/10
0
0
初探 ELK - 每天5分钟玩转 Docker 容器技术(89)

在开源的日志管理方案中,最出名的莫过于 ELK 了。ELK 是三个软件的合称:Elasticsearch、Logstash、Kibana。 Elasticsearch 一个近乎实时查询的全文搜索引擎。Elasticsearch 的设计目标就是...

CloudMAN
2017/11/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

mysql 数据类型及占用字节数

数字类型 TINYINT                           1 字节 SMALLINT                          2 个字节 MEDIUMINT                         3 个字节...

会游泳的鱼_
今天
3
0
高性能mysql:创建高性能的索引

性能优化简介 MySQL性能定义为完成某件任务所需要的时间量度,换句话说,性能即响应时间,这是一个非常重要的原则。我们通过任务和时间而不是资源来测量性能。数据库服务器的目的是执行SQL语...

背后的辛酸
今天
7
0
HTTP get、post 中请求json与map传参格式

import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;im......

寒风中的独狼
今天
5
0
IDEA中tomcat启动慢 耗时10分钟

用idea中的tomcat以debug模式启动,会非常的慢,而正常启动没啥问题;原因是debug模式中View Breakpoints断点代码,断点的是jar包,而现在启动由于jar包发生变化,导致启动时一直处于等待中。...

GoodMarver
今天
6
0
Linux学习-10月18(awk)

9.6/9.7 awk 一、awk简介   1. awk是一种编程语言,用于对文本和数据进行处理的   2. 具有强大的文本格式化能力   3. 利用命令awk,可以将一些文本整理成为我们想要的样子   4. 命令awk...

wxy丶
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部