elasticsearch批量数据导入和导出
elasticsearch批量数据导入和导出
寻梦2012 发表于2年前
elasticsearch批量数据导入和导出
  • 发表于 2年前
  • 阅读 7533
  • 收藏 11
  • 点赞 1
  • 评论 24

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: elasticsearch批量数据导入和导出

 之前使用ES的时候建表Type时有个字段的类型搞错了。以至于用API查询时出错。所以就研究一下ES API做了一下ES批量导出和导入重构了Type

1:Java API批量导出

 Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
 SearchResponse response = client.prepareSearch("bigdata").setTypes("student")
               .setQuery(QueryBuilders.matchAllQuery()).setSize(10000).setScroll(new TimeValue(6000                  00))
                .setSearchType(SearchType.SCAN).execute().actionGet();//setSearchType(SearchType.Scan) 告诉ES不需要排序只要结果返回即可 setScroll(new TimeValue(600000)) 设置滚动的时间
        String scrollid = response.getScrollId();
        try {
        //把导出的结果以JSON的格式写到文件里
            BufferedWriter out = new BufferedWriter(new FileWriter("es", true));
            
            //每次返回数据10000条。一直循环查询直到所有的数据都查询出来
            while (true) {
                SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                        .execute().actionGet();
                SearchHits searchHit = response2.getHits();
                //再次查询不到数据时跳出循环
                if (searchHit.getHits().length == 0) {
                    break;
                }
                System.out.println("查询数量 :" + searchHit.getHits().length);
                for (int i = 0; i < searchHit.getHits().length; i++) {
                    String json = searchHit.getHits()[i].getSourceAsString();
                    out.write(json);
                    out.write("\r\n");
                }
            }
            System.out.println("查询结束");
            out.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

2:Java API 批量导入

Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "elasticsearch-bigdata").build();
        Client client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress("10.58.71.6", 9300));
        try {
        //读取刚才导出的ES数据
            BufferedReader br = new BufferedReader(new FileReader("es"));
            String json = null;
            int count = 0;
            //开启批量插入
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            while ((json = br.readLine()) != null) {
                bulkRequest.add(client.prepareIndex("bigdata", "student").setSource(json));
                //每一千条提交一次
                if (count% 1000==0) {
                    bulkRequest.execute().actionGet();
                    System.out.println("提交了:" + count);
                }
                count++;
            }
            bulkRequest.execute().actionGet();
            System.out.println("插入完毕");
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

 以上就是ES的批量导出和导入了。



共有 人打赏支持
粉丝 30
博文 20
码字总数 12011
评论 (24)
刘伟
撸主, 你这个代码在while循环中, 每1000次批量增加索引后, 必须要重新初始化啊, 不然数据会重复的.
应该增加类似
bulkRequest = client.prepareBulk();
这种初始化代码.
刹那光明

引用来自“刘伟”的评论

撸主, 你这个代码在while循环中, 每1000次批量增加索引后, 必须要重新初始化啊, 不然数据会重复的.
应该增加类似
bulkRequest = client.prepareBulk();
这种初始化代码.
谢谢,我还在找怎么解决呢,帮了大忙了:+1:
小湘西
批量导出时scrollid 不需要实时更新吗?
寻梦2012

引用来自“小湘西”的评论

批量导出时scrollid 不需要实时更新吗?
不需要
小湘西

引用来自“小湘西”的评论

批量导出时scrollid 不需要实时更新吗?

引用来自“寻梦2012”的评论

不需要
但原文是这样说的:初始搜索请求和每个后续滚动请求返回一个新的 滚动ID——只有最近的滚动ID才能被使用(The initial search request and each subsequent scroll request returns a new _scroll_id — only the most recent _scroll_id should be used.)。 不是很明白,有点困惑啊,可否告知一下具体什么意识?
DimonHo
我导出的时候总是会中途被中断,报错org.elasticsearch.client.transport.NoNodeAvailableException: No node available. 不知道问题出在哪了。是不是需要先把index关闭才能执行导出操作?
寻梦2012

引用来自“DimonHo”的评论

我导出的时候总是会中途被中断,报错org.elasticsearch.client.transport.NoNodeAvailableException: No node available. 不知道问题出在哪了。是不是需要先把index关闭才能执行导出操作?
你这个可能是由于你多线程导出的,每个线程你新建了一个连接池。你把到处的代码贴出来看看
DimonHo

引用来自“DimonHo”的评论

我导出的时候总是会中途被中断,报错org.elasticsearch.client.transport.NoNodeAvailableException: No node available. 不知道问题出在哪了。是不是需要先把index关闭才能执行导出操作?

引用来自“寻梦2012”的评论

你这个可能是由于你多线程导出的,每个线程你新建了一个连接池。你把到处的代码贴出来看看
就是直接复制你的代码,第一次是中途中断了,后面再运行几次都是到SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
            .execute().actionGet();这句代码就报no node avaiable了,后面我把size10000改成5000就可以了。是size太大,内存不足导致的?
寻梦2012

引用来自“DimonHo”的评论

我导出的时候总是会中途被中断,报错org.elasticsearch.client.transport.NoNodeAvailableException: No node available. 不知道问题出在哪了。是不是需要先把index关闭才能执行导出操作?

引用来自“寻梦2012”的评论

你这个可能是由于你多线程导出的,每个线程你新建了一个连接池。你把到处的代码贴出来看看

引用来自“DimonHo”的评论

就是直接复制你的代码,第一次是中途中断了,后面再运行几次都是到SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
            .execute().actionGet();这句代码就报no node avaiable了,后面我把size10000改成5000就可以了。是size太大,内存不足导致的?
有可能。你一次取一百万。太多了。你改小点。
Brooo
我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊
寻梦2012

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊
是你插入数据之前就有重复数据了还是直接插入的时候查重了数据
Brooo

引用来自“刘伟”的评论

撸主, 你这个代码在while循环中, 每1000次批量增加索引后, 必须要重新初始化啊, 不然数据会重复的.
应该增加类似
bulkRequest = client.prepareBulk();
这种初始化代码.
具体是加在哪,我这都是重复数据
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据
插入的时候有的重复数据,重复的倍数等于插入的次数,改了好多次都没用
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据
ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的
寻梦2012

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的
你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
Brooo

引用来自“Brooo”的评论

我插入的时候数据多了好多倍,都是重复数据,这个怎么办啊

引用来自“寻梦2012”的评论

是你插入数据之前就有重复数据了还是直接插入的时候查重了数据

引用来自“Brooo”的评论

ExportES exportES=new ExportES();
exportES.getESinformatation();
exportES.insertESInformation();
可以这样直接调用吗,这两个方法一个是导出一个是导入的

引用来自“寻梦2012”的评论

你发个博客把代码贴出来。把地址发给我。然后我看看你写的代码
https://my.oschina.net/u/3537613/blog/948324 ,你好,这里面是我的代码,实在感谢
Brooo
问下,这个怎么用多线程去跑啊
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
我看看。我今天有事没在公司刚看到
寻梦2012

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊
晚上我看看给你恢复你明天看吧
Brooo

引用来自“Brooo”的评论

问下,这个怎么用多线程去跑啊

引用来自“寻梦2012”的评论

晚上我看看给你恢复你明天看吧
你好,请问还在吗,我导出大概1亿条数据时总会报远程服务器关闭,怎么才能用多线程去写而不会数据重复交叉呢,新手不太懂
×
寻梦2012
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: