使用BufferedMutatorParams高效往hbase中插入

原创
2017/06/17 18:02
阅读数 5.5K

package test.TestHbase;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;

public class TestHbaseMutator {

    public static void main(String[] args) throws IOException {

        Configuration config = HBaseConfiguration.create();
        // 一定要添加下面两个配置,不然找不到自己的Hbase位置
        config.set("hbase.rootdir", "hdfs://crxy99:9000/hbase");
        // 使用eclipse时必须添加这个,否则无法定位
        config.set("hbase.zookeeper.quorum", "crxy99,crxy100,crxy101");
        //获取链接
        final Connection connection = ConnectionFactory
                .createConnection(config);
        // 创建一个线程池,里面有10线程
        final ExecutorService threadPool = Executors.newFixedThreadPool(10);
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1);
        //开启线程池监控
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                System.out.println(threadPool);

            }
        }, 3, 5, TimeUnit.SECONDS);
        //向hbase中插入一千万条数据
        for (int i = 0; i < 1000; i++) {
            final int basix = i;
            threadPool.submit(new Runnable() {

                @Override
                public void run() {

                    try {
                        final List<Mutation> mutator = new ArrayList<>();
                        for (int j = 0; j < 1000; j++) {
                            Put puts = new Put(Bytes.toBytes(basix));
                            puts.add(Bytes.toBytes("f6"),
                                    Bytes.toBytes(basix + 2),
                                    Bytes.toBytes(basix + j));
                            mutator.add(puts);
                        }
                        mutatorToHbase(connection, "t6", mutator);
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }
            });
        }

    }

    //向hbase插入数据
    public static void mutatorToHbase(Connection connect, String tableName,
            List<Mutation> list) throws IOException {

        BufferedMutatorParams params = new BufferedMutatorParams(
                TableName.valueOf(tableName));
        //开启hbase插入异常监控
        BufferedMutator.ExceptionListener listeners = new BufferedMutator.ExceptionListener() {

            @Override
            public void onException(RetriesExhaustedWithDetailsException e,
                    BufferedMutator mutator)
                    throws RetriesExhaustedWithDetailsException {
                for (int i = 0; i < e.getNumExceptions(); i++) {
                    System.out.println("插入失败 " + e.getRow(i) + ".");
                }
            }
        };
        //开启异常监听
        params.listener(listeners);
        BufferedMutator bufferedMutator = connect.getBufferedMutator(params);
        //批量提交数据插入。
        bufferedMutator.mutate(list);
        bufferedMutator.close();

    }

}
 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部