文档章节

elasticsearch批量数据导入和导出

寻梦2012
 寻梦2012
发布于 2015/12/29 10:08
字数 405
阅读 12254
收藏 11

 之前使用ES的时候建表Type时有个字段的类型搞错了。以至于用API查询时出错。所以就研究一下ES API做了一下ES批量导出和导入重构了Type

1:Java API批量导出

 Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
 SearchResponse response = client.prepareSearch("bigdata").setTypes("student")
               .setQuery(QueryBuilders.matchAllQuery()).setSize(10000).setScroll(new TimeValue(6000                  00))
                .setSearchType(SearchType.SCAN).execute().actionGet();//setSearchType(SearchType.Scan) 告诉ES不需要排序只要结果返回即可 setScroll(new TimeValue(600000)) 设置滚动的时间
        String scrollid = response.getScrollId();
        try {
        //把导出的结果以JSON的格式写到文件里
            BufferedWriter out = new BufferedWriter(new FileWriter("es", true));
            
            //每次返回数据10000条。一直循环查询直到所有的数据都查询出来
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查询不到数据时跳出循环
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查询数量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    out.write(json);
                    out.write("\r\n");
                }
            }
            System.out.println("查询结束");
            out.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

2:Java API 批量导入

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
        try {
        //读取刚才导出的ES数据
            BufferedReader br = new BufferedReader(new FileReader("es"));
            String json = null;
            int count = 0;
            //开启批量插入
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            while ((json = br.readLine()) != null) {
                bulkRequest.add(client.prepareIndex("bigdata", "student").setSource(json));
                //每一千条提交一次
                if (count% 1000==0) {
                    bulkRequest.execute().actionGet();
                    System.out.println("提交了:" + count);
                }
                count++;
            }
            bulkRequest.execute().actionGet();
            System.out.println("插入完毕");
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

 以上就是ES的批量导出和导入了。



© 著作权归作者所有

共有 人打赏支持
寻梦2012
粉丝 30
博文 20
码字总数 12011
作品 0
程序员
私信 提问
加载中

评论(24)

寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
美女你导出1亿条用ES是错误的。你怎么会用ES导出这么多数据呢。你可以ES+Hadoop+hive去导数据。
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
应该你的线程数超过了你ES集群的线程数吧。你把代码都贴出来
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,
查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
在。好的我给你看看,
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧
你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
晚上我看看给你恢复你明天看吧
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
我看看。我今天有事没在公司刚看到
Brooo
Brooo
问下,这个怎么用多线程去跑啊
Brooo
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的

引用来自“寻梦2012”的评论

你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
https://my.oschina.net/u/3537613/blog/948324 ,你好,这里面是我的代码,实在感谢
寻梦2012
寻梦2012

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的
你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
elasticsearch使用java api批量插入数据

使用java api批量插入数据,主要参考官网的api,主要是client和Document APIs-Index API部分。 pom依赖 主要代码 参考: https://www.elastic.co/guide/en/elasticsearch/client/java-api/cu...

cjun1990
2016/10/20
0
5
Elasticsearch _bulk批量导入数据问题

各位大神好,在下是Elasticsearch初学者,遇到如下问题 ,还请帮忙解答,不胜感激。 1 问题1,本人用如下命令批量导入数据到Elasticsearch: curl -XPOST http://192.168.6.64:9200/myindex...

余智君sir
2017/04/03
702
2
ElasticSearch ik,elasticsearch-jdbc 使用 和 yii2 实例

1.ES安装+ik插件 mac安装和使用: a.由于ES为java开发 所以首先安装下载jdk,由于最新版本对jdk要求>1.8.0,所以可以通过brew 升级jdk b.brew 安装ES 启动 elasticsearch 后,在浏览器地址:127....

罗培海
2017/12/16
0
0
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
09/11
0
0
如何用 Node.js 和 Elasticsearch 构建搜索引擎

Elasticsearch 是一款开源的搜索引擎,由于其高性能和分布式系统架构而备受关注。本文将讨论其关键特性,并手把手教你如何用它创建 Node.js 搜索引擎。 Elasticsearch 概述 Elasticsearch 底...

oschina
2016/09/29
10.2K
6

没有更多内容

加载失败,请刷新页面

加载更多

EOS docker开发环境

使用eos docker镜像是部署本地EOS开发环境的最轻松愉快的方法。使用官方提供的eos docker镜像,你可以快速建立一个eos开发环境,可以迅速启动开发节点和钱包服务器、创建账户、编写智能合约....

汇智网教程
今天
15
0
《唐史原来超有趣》的读后感优秀范文3700字

《唐史原来超有趣》的读后感优秀范文3700字: 作者:花若离。我今天分享的内容《唐史原来超有趣》这本书的读后感,我将这本书看了一遍之后就束之高阁了,不过里面的内容一直在在脑海中回放,...

原创小博客
今天
19
0
IC-CAD Methodology知识图谱

CAD (Computer Aided Design),计算机辅助设计,指利用计算机及其图形设备帮助设计人员进行设计工作,这个定义同样可以用来近似描述IC公司CAD工程师这个岗位的工作。 早期IC公司的CAD岗位最初...

李艳青1987
今天
19
0
CompletableFuture get方法一直阻塞或抛出TimeoutException

问题描述 最近刚刚上线的服务突然抛出大量的TimeoutException,查询后发现是使用了CompletableFuture,并且在执行future.get(5, TimeUnit.SECONDS);时抛出了TimeoutException异常,导致接口响...

xiaolyuh
今天
10
0
dubbo 搭建与使用

官网:http://dubbo.apache.org/en-us/ 一,安装监控中心(可以不安装) admin管理控制台,monitor监控中心 下载 bubbo ops 这个是新版的,需要node.js环境,我没有就用老版的了...

小兵胖胖
今天
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部