文档章节

elasticsearch批量数据导入和导出

寻梦2012
 寻梦2012
发布于 2015/12/29 10:08
字数 405
阅读 4W
收藏 11

阿里云携手百名商业领袖、技术大咖,带您一探行进中的数字新基建!>>>

 之前使用ES的时候建表Type时有个字段的类型搞错了。以至于用API查询时出错。所以就研究一下ES API做了一下ES批量导出和导入重构了Type

1:Java API批量导出

 Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
 SearchResponse response = client.prepareSearch("bigdata").setTypes("student")
               .setQuery(QueryBuilders.matchAllQuery()).setSize(10000).setScroll(new TimeValue(6000                  00))
                .setSearchType(SearchType.SCAN).execute().actionGet();//setSearchType(SearchType.Scan) 告诉ES不需要排序只要结果返回即可 setScroll(new TimeValue(600000)) 设置滚动的时间
        String scrollid = response.getScrollId();
        try {
        //把导出的结果以JSON的格式写到文件里
            BufferedWriter out = new BufferedWriter(new FileWriter("es", true));
            
            //每次返回数据10000条。一直循环查询直到所有的数据都查询出来
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查询不到数据时跳出循环
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查询数量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    out.write(json);
                    out.write("\r\n");
                }
            }
            System.out.println("查询结束");
            out.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

2:Java API 批量导入

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
        try {
        //读取刚才导出的ES数据
            BufferedReader br = new BufferedReader(new FileReader("es"));
            String json = null;
            int count = 0;
            //开启批量插入
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            while ((json = br.readLine()) != null) {
                bulkRequest.add(client.prepareIndex("bigdata", "student").setSource(json));
                //每一千条提交一次
                if (count% 1000==0) {
                    bulkRequest.execute().actionGet();
                    System.out.println("提交了:" + count);
                }
                count++;
            }
            bulkRequest.execute().actionGet();
            System.out.println("插入完毕");
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

 以上就是ES的批量导出和导入了。



© 著作权归作者所有

寻梦2012
粉丝 32
博文 20
码字总数 12011
作品 0
程序员
私信 提问
加载中

评论(24)

寻梦2012
寻梦2012 博主

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
美女你导出1亿条用ES是错误的。你怎么会用ES导出这么多数据呢。你可以ES+Hadoop+hive去导数据。
寻梦2012
寻梦2012 博主

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
应该你的线程数超过了你ES集群的线程数吧。你把代码都贴出来
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,
查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
寻梦2012
寻梦2012 博主

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
在。好的我给你看看,
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧
你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
寻梦2012
寻梦2012 博主

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
晚上我看看给你恢复你明天看吧
寻梦2012
寻梦2012 博主

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
我看看。我今天有事没在公司刚看到
Brooo
Brooo
问下,这个怎么用多线程去跑啊
Brooo
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的

引用来自“寻梦2012”的评论

你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
https://my.oschina.net/u/3537613/blog/948324 ,你好,这里面是我的代码,实在感谢
寻梦2012
寻梦2012 博主

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的
你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
ELK学习笔记之Elasticsearch和Kibana数据导出实战

0x00 问题引出 以下两个导出问题来自Elastic中文社区。 问题1、kibana怎么导出查询数据? 问题2:elasticsearch数据导出 就像数据库数据导出一样,elasticsearch可以么? 或者找到它磁盘上存放...

osc_9hx9qg8o
2019/11/27
5
0
Elasticsearch 高效数据同步工具 - elasticsearch-datatran

Elasticsearch-datatran 是由 bboss 开源的一款将各种数据源中的海量数据同步到 Elasticsearch 的高效数据同步工具。 功能特点 1.支持多种数据源 数据库表数据同步到Elasticsearch 数据库表数...

bboss
03/23
3K
0
elasticsearch-dsl笔记

一、elasticsearch安装 安装java1.8以上 安装elasticsearch-rtf(https://github.com/medcl/elasticsearch-rtf) head插件和kibana的安装 head:A web front end for an elastic search clu......

osc_rszue8l4
2018/12/10
5
0
基于docker使用elasticsearch-dump,es数据导入导出

1.拉去dump镜像 [root@localhost ~]# docker pull taskrabbit/elasticsearch-dump 2.相关实例 1.创建文件存放路径 [root@localhost ~]# mkdir -p /data/ 2.将索引数据备份到文件 [root@loca......

osc_vyztkm1b
2019/12/26
3
0
ElasticSearch ik,elasticsearch-jdbc 使用 和 yii2 实例

1.ES安装+ik插件 mac安装和使用: a.由于ES为java开发 所以首先安装下载jdk,由于最新版本对jdk要求>1.8.0,所以可以通过brew 升级jdk b.brew 安装ES 启动 elasticsearch 后,在浏览器地址:127....

罗培海
2017/12/16
529
0

没有更多内容

加载失败,请刷新页面

加载更多

史上最全的“文件或目录损坏且无法读取”的解决办法大集合

问题描述: G盘打不开文件或目录损坏且无法读取,是因为这个I盘的文件系统内部结构损坏导致的。史上最全的“文件或目录损坏且无法读取”的解决办法大集合具体的恢复方法看正文 工具/软件:极...

计算无敌
今天
9
0
2048游戏的最佳算法是什么? - What is the optimal algorithm for the game 2048?

问题: I have recently stumbled upon the game 2048 . 我最近偶然发现了2048游戏。 You merge similar tiles by moving them in any of the four directions to make "bigger" tiles. 您可......

javail
今天
9
0
Spring Cloud Ribbon 客户端负载均衡

Ribbon客户端组件提供一系列完善的配置选项,比如连接超时、重试、重试算法等,内置可插拔、可定制的负载均衡组件。下面是用到的一些负载均衡策略: 简单轮询负载均衡 加权轮询负载均衡 区域...

泥瓦匠BYSocket
今天
7
0
为什么在Python 3中“范围(1000000000000000(1000000000000001))”这么快?

问题: It is my understanding that the range() function, which is actually an object type in Python 3 , generates its contents on the fly, similar to a generator. 据我了解, ra......

技术盛宴
今天
9
0
OSChina 周四乱弹 —— 卖全家桶!

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @巴拉迪维 :陈慧娴的单曲《与泪抱拥》 陈慧娴的嗓音加上向雪怀的词,这样的经典组合真不多。#今日歌曲推荐# 《与泪抱拥》- 陈慧娴 手机党少年...

小小编辑
今天
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部