文档章节

使用BufferedMutatorParams高效往hbase中插入

爱运动的小乌龟
 爱运动的小乌龟
发布于 2017/06/17 18:02
字数 320
阅读 559
收藏 0

package test.TestHbase;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;

public class TestHbaseMutator {

    public static void main(String[] args) throws IOException {

        Configuration config = HBaseConfiguration.create();
        // 一定要添加下面两个配置,不然找不到自己的Hbase位置
        config.set("hbase.rootdir", "hdfs://crxy99:9000/hbase");
        // 使用eclipse时必须添加这个,否则无法定位
        config.set("hbase.zookeeper.quorum", "crxy99,crxy100,crxy101");
        //获取链接
        final Connection connection = ConnectionFactory
                .createConnection(config);
        // 创建一个线程池,里面有10线程
        final ExecutorService threadPool = Executors.newFixedThreadPool(10);
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1);
        //开启线程池监控
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                System.out.println(threadPool);

            }
        }, 3, 5, TimeUnit.SECONDS);
        //向hbase中插入一千万条数据
        for (int i = 0; i < 1000; i++) {
            final int basix = i;
            threadPool.submit(new Runnable() {

                @Override
                public void run() {

                    try {
                        final List<Mutation> mutator = new ArrayList<>();
                        for (int j = 0; j < 1000; j++) {
                            Put puts = new Put(Bytes.toBytes(basix));
                            puts.add(Bytes.toBytes("f6"),
                                    Bytes.toBytes(basix + 2),
                                    Bytes.toBytes(basix + j));
                            mutator.add(puts);
                        }
                        mutatorToHbase(connection, "t6", mutator);
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }
            });
        }

    }

    //向hbase插入数据
    public static void mutatorToHbase(Connection connect, String tableName,
            List<Mutation> list) throws IOException {

        BufferedMutatorParams params = new BufferedMutatorParams(
                TableName.valueOf(tableName));
        //开启hbase插入异常监控
        BufferedMutator.ExceptionListener listeners = new BufferedMutator.ExceptionListener() {

            @Override
            public void onException(RetriesExhaustedWithDetailsException e,
                    BufferedMutator mutator)
                    throws RetriesExhaustedWithDetailsException {
                for (int i = 0; i < e.getNumExceptions(); i++) {
                    System.out.println("插入失败 " + e.getRow(i) + ".");
                }
            }
        };
        //开启异常监听
        params.listener(listeners);
        BufferedMutator bufferedMutator = connect.getBufferedMutator(params);
        //批量提交数据插入。
        bufferedMutator.mutate(list);
        bufferedMutator.close();

    }

}
 

© 著作权归作者所有

爱运动的小乌龟
粉丝 3
博文 149
码字总数 43760
作品 0
朝阳
私信 提问
利用BulkLoad导入Hbase表

1、插入HBase表传统方法具有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是TableOutputFormat方式,在map/reduce中直接生成put对象写入HBase,该方式在大量数据...

混绅士
2018/06/28
0
0
实现HBase与Hive之间数据互通互导

今天主要给大家讲解一下,如何使用Hive来查询及操作HBase里面的数据,也就是实现二者的数据互通互导。 首先不了解HBase的同学可以看一下我之前写过的一片博文: 一、那么看完之后我们先来简单...

马修
2018/08/27
0
0
云HBase发布全文索引服务,轻松应对复杂查询

云HBase发布了“全文索引服务”功能,自2019年01月25日后创建的云HBase实例,可以在控制台免费开启此“全文索引服务”功能。使用此功能可以让用户在HBase之上构建功能更丰富的搜索业务,不再...

阿里云云栖社区
02/19
16
0
区分 hdfs hbase hive hbase适用场景

Hive 不想用程序语言开发MapReduce的朋友比如DB们,熟悉SQL的朋友可以使用Hive开离线的进行数据处理与分析工作。 注意Hive现在适合在离线下进行数据的操作,就是说不适合在挂在真实的生产环境...

八戒_o
2015/11/19
1K
0
hive 与 hbase 结合

一、hive与hbase的结合 Hive会经常和Hbase结合使用,把Hbase作为Hive的存储路径,所以Hive整合Hbase尤其重要。使用Hive读取Hbase中的数据,可以使用HQL语句在HBase表上进行查询、插入操作;甚...

meteor_hy
2018/06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0
简述TCP的流量控制与拥塞控制

1. TCP流量控制 流量控制就是让发送方的发送速率不要太快,要让接收方来的及接收。 原理是通过确认报文中窗口字段来控制发送方的发送速率,发送方的发送窗口大小不能超过接收方给出窗口大小。...

鏡花水月
今天
10
0
OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
1K
11
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部