文档章节

elasticsearch批量数据导入和导出

寻梦2012
 寻梦2012
发布于 2015/12/29 10:08
字数 405
阅读 10572
收藏 11
点赞 1
评论 24

 之前使用ES的时候建表Type时有个字段的类型搞错了。以至于用API查询时出错。所以就研究一下ES API做了一下ES批量导出和导入重构了Type

1:Java API批量导出

 Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
 SearchResponse response = client.prepareSearch("bigdata").setTypes("student")
               .setQuery(QueryBuilders.matchAllQuery()).setSize(10000).setScroll(new TimeValue(6000                  00))
                .setSearchType(SearchType.SCAN).execute().actionGet();//setSearchType(SearchType.Scan) 告诉ES不需要排序只要结果返回即可 setScroll(new TimeValue(600000)) 设置滚动的时间
        String scrollid = response.getScrollId();
        try {
        //把导出的结果以JSON的格式写到文件里
            BufferedWriter out = new BufferedWriter(new FileWriter("es", true));
            
            //每次返回数据10000条。一直循环查询直到所有的数据都查询出来
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查询不到数据时跳出循环
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查询数量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    out.write(json);
                    out.write("\r\n");
                }
            }
            System.out.println("查询结束");
            out.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

2:Java API 批量导入

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
        try {
        //读取刚才导出的ES数据
            BufferedReader br = new BufferedReader(new FileReader("es"));
            String json = null;
            int count = 0;
            //开启批量插入
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            while ((json = br.readLine()) != null) {
                bulkRequest.add(client.prepareIndex("bigdata", "student").setSource(json));
                //每一千条提交一次
                if (count% 1000==0) {
                    bulkRequest.execute().actionGet();
                    System.out.println("提交了:" + count);
                }
                count++;
            }
            bulkRequest.execute().actionGet();
            System.out.println("插入完毕");
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

 以上就是ES的批量导出和导入了。



© 著作权归作者所有

共有 人打赏支持
寻梦2012
粉丝 29
博文 20
码字总数 12011
作品 0
程序员
加载中

评论(24)

寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
美女你导出1亿条用ES是错误的。你怎么会用ES导出这么多数据呢。你可以ES+Hadoop+hive去导数据。
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,

引用来自“Brooo”的评论

查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
应该你的线程数超过了你ES集群的线程数吧。你把代码都贴出来
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂

引用来自“寻梦2012”的评论

在。好的我给你看看,
查询数量 :250000
Exception in thread "main" NoNodeAvailableException[None of the configured nodes were available: [{centos1}{PCZUnlIoQ8OiEg4ZRtFweg}{10.10.11.147}{10.10.11.147:9300}{cluster=true}, {centos2}{LPkRbdiWQGiUPKU_L8jRuQ}{10.10.11.148}{10.10.11.148:9300}, {centos3}{VqauhzDzS-m-eioQtFyczg}{10.10.11.149}{10.10.11.149:9300}{cluster=true}]]; nested: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected];
  at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:244)
  at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
  at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: NodeDisconnectedException[[centos1][10.10.11.147:9300][indices:data/read/scroll] disconnected]
报这个错误,这个是从三个节点的es集群往一个节点的ES库里面导数据
public static Client initClient() {
if(Exportclient == null) {
try {
Exportclient = TransportClient.builder().settings(config()).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.10.11.102"), 9300))
;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧

引用来自“Brooo”的评论

你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
在。好的我给你看看,
Brooo
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧
你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
晚上我看看给你恢复你明天看吧
寻梦2012
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
我看看。我今天有事没在公司刚看到
Brooo
Brooo
问下,这个怎么用多线程去跑啊
Brooo
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的

引用来自“寻梦2012”的评论

你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
https://my.oschina.net/u/3537613/blog/948324 ,你好,这里面是我的代码,实在感谢
寻梦2012
寻梦2012

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的
你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
Elasticsearch 入门: _bulk 批量导入数据

批量导入数据 使用 Elasticsearch Bulk API 批量 update 步骤: 需求:我希望批量导入一个 type 的名词列表到 index 索引。 准备数据: 根据官方文档,Json 数据要准备成这个格式的: 其中 ...

王兵
05/10
0
0
初探 ELK - 每天5分钟玩转 Docker 容器技术(89)

在开源的日志管理方案中,最出名的莫过于 ELK 了。ELK 是三个软件的合称:Elasticsearch、Logstash、Kibana。 Elasticsearch 一个近乎实时查询的全文搜索引擎。Elasticsearch 的设计目标就是...

CloudMAN
2017/11/03
0
0
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0
基于ELK实时日志分析的最佳实践

在2018云栖大会深圳峰会大数据分析与可视化专场上,由阿里巴巴搜索引擎事业部开放搜索团队的吴迪带来了“基于ELK实时日志分析的最佳实践”的主题分享。介绍了传统的日志分析、ELK的概念和ELK...

smile小太阳
05/06
0
0
Nginx + Naxsi + Nxapi + ElasticSearch + Kibana 安装

Author: Xu FC Platform: CentOS 7 -- Linux localhost.localdomain 3.10.0-693.21.1.el7.x8664 #1 SMP Wed Mar 7 19:03:37 UTC 2018 x8664 x8664 x86_64 GNU/Linux 安装依赖 安装 libssl: ......

捞小虾
05/07
0
0
bboss elasticsearch v5.0.7.8 发布

The best elasticsearch highlevel java rest api-----bboss bboss elasticsearch v5.0.7.8发布 功能改进 1.新增spring boot starter模块,全面支持spring boot starter风格,支持单ES集群和......

bboss
06/27
0
0
kubernetes addons efk

一、简介 这个附加组件由Elasticsearch, Fluentd和Kibana组合而成。 通过结合这三个工具,我们获得了一个可扩展的,灵活的,易于使用的日志收集和分析管道。 Elasticsearch是一个搜索引擎,...

Bravepro
06/29
0
0
ELK系列一:ELK安装配置及nginx日志分析

本文分三个部分介绍了elk、elk安装配置及基于filebeat分析nginx日志的配置。 第一部分:elk介绍 一、什么是elk ELK 其实并不是一款软件,而是一整套解决方案,是三个软件产品的首字母缩写,E...

hnr1017
07/03
0
0
elasticsearch搜索引擎相关资料(更新中)

最近需要用到elasticsearch搜索引擎,所以搜集了很多相关资料,先放在这里(未详细整理) 一、步骤总结:(linux环境下) 1. 安装 (1)下载elasticsearch安装包:http://www.elasticsearch....

核桃人
03/08
0
0
elasticsearch数据库的数据修改问题

最近在学习elasticsearch数据库,但是感觉网上的资料并不是那么的丰富,大部分都是在转载elasticsearch权威指南的内容。 现在想问一下大家,当elasticsearch数据库中的数据需要批量修改的时候...

Death黎明
05/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

数据库事务的四大特性以及事务的隔离级别

本篇讲述数据库中事务的四大特性(ACID),并且将会详细地说明事务的隔离级别。 如果一个数据库声称支持事务的操作,那么该数据库必须要具备以下四个特性: ⑴ 原子性(Atomicity) 原子性是...

Java大蜗牛
12分钟前
0
0
Spring Boot 整合 MyBatis/通用Mapper/PageHelper分页插件

整合MyBatis 整合通用Mapper 1. POM依赖配置 <properties><mapper.starter.version>2.0.3-beta1</mapper.starter.version></properties><!-- 通用Mapper --><dependency><groupId>t......

OSC_fly
20分钟前
0
0
CentOS7 双网卡绑定

环境 操作系统 CentOS7.5,禁用 NetworkManager 服务 网卡 eth0 网卡 eth1 绑定网卡 bond0 网卡 eth0 配置 修改 /etc/sysconfig/network-scripts/ifcfg-eth0 TYPE=EthernetBOOTPROTO=noneD......

Colben
22分钟前
0
0
zk实战--rpc框架集群化

在看此篇内容时需要浏览下面内容 netty实战--手写rpc框架 前文功能简介以及功能扩充 利用netty来实现一个点对点的rpc调用。客户端和服务端都是靠手写地址进行socket同学的,无法1对多,也无法...

xpbob
38分钟前
11
0
springboot 发送邮件

获取授权码 添加配置 # 账号和密码spring.mail.username=aaa@qq.comspring.mail.password=bbb# 服务器地址spring.mail.host=smtp.qq.comspring.mail.properties.mail.smtp.ssl.en...

阿豪boy
39分钟前
0
0
如何使用GNU Ring?

文章名:如何使用GNU Ring? 作者:冰焰火灵X 1079092922@qq.com 文章许可:CC BY-SA 4.0 ##1. 安装 下载GNU Ring 点击左边选择你的系统版本(这里以 GNU/Linux 为例,我使用的是Mint 18.3)...

ICE冰焰火灵X
42分钟前
4
0
深入理解springMVC

什么是spring MVC Spring MVC属于SpringFrameWork的后续产品,已经融合在Spring Web Flow里面。Spring 框架提供了构建 Web 应用程序的全功能 MVC 模块。使用 Spring 可插入的 MVC 架构,从而...

Java填坑之路
47分钟前
1
0
《射雕英雄传》书摘

1. 我虽是个飘泊江湖的贫家女子,可不是低三下四、不知自爱之人。你如真心爱我,须当敬我重我。我此生决无别念,就是钢刀架颈,也决意跟定了你。将来……将来如有洞房花烛之日,自然……自能...

k91191
58分钟前
1
0
解决:modal中datePicker 选中时,会触发modal的hidden.bs.modal事件

最近项目中发现了一个bug,具体表现为选中模态框上datepicker组件上的日期时,会触发模态框的关闭事件,导致数据编辑无法正常进行。网上搜索了下,解决方法如下: $('.datepicker').on('hid...

Funcy1122
今天
0
0
Redis分布式锁的正确实现方式

前言 分布式锁一般有三种实现方式: 1.数据库乐观锁 2.基于Redis的分布式锁; 3.基于Zookeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis...

大海201506
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部