HBase BulkLoad Put

原创
2018/04/20 17:16
阅读数 1.4K

先不多说,先贴代码(代码中包含业务内容,选择性忽略),当前使用的Hadoop版本:2.7.3,HBase版本:1.2.6。

代码

// HFileCreate.java
package com.huangshihe.ecommerce.hbasesimulation;

import com.huangshihe.ecommerce.ecommercehbase.hbaseservice.constants.CommonConstant;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * HFile生成.
 * <p>
 * Create Date: 2018-04-12 21:00
 *
 * @author huangshihe
 */
public class HFileCreate {

    private static final Logger LOGGER = LoggerFactory.getLogger(HFileCreate.class);

    private static Simulation _simulation;

    private static List<String> _qualifiers;

    public static void buildSimulation(Simulation simulation) {
        _simulation = simulation;
        _qualifiers = simulation.getQualifierIndexs();
        LOGGER.debug("qualifiers:{}", _qualifiers);
    }

    private HFileCreate() {

    }


    public static class HFileImportMapper2 extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

        private static final String regEx = "\"?(\\[([0-9 ,-]*)\\])\"?";
        private final Pattern pattern = Pattern.compile(regEx);


        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            LOGGER.debug("line:{}", line);
            // "[0, 0, 0, 0, 0, 0][0, 0, 0, 0, 0, 0][0, 0, 31, 62, 70, 70]
            // [0, 0, 1, 98, -70, -107, -60, 0][0, 0, 0, 1, 70, 70]",
            //"[0, 0, 0, 3, 0, 0][0, 0, 0, 2][0, 0, 0, 4][-1, -1, -1, -48][0, 0, 0, 64][1][0]"
            String[] lines = line.split("\",\"");
            String rowkeyStr = lines[0];
            LOGGER.debug("rowkeyStr:{}", rowkeyStr);
            String qualifierStr = lines[1];
            LOGGER.debug("qualifierStr:{}", qualifierStr);
            Matcher matcher = pattern.matcher(rowkeyStr);
            byte[] rowkeyByte = new byte[0];
            while (matcher.find()) {
                String str = matcher.group(2);
                String[] strs = str.split(",");
                byte[] b = new byte[strs.length];
                for (int i = 0; i < strs.length; i++) {
                    b[i] = Byte.parseByte(strs[i].trim());
                }
                rowkeyByte = Bytes.add(rowkeyByte, b);
            }
            LOGGER.debug("rowkeyByte:{}", Arrays.toString(rowkeyByte));

            final ImmutableBytesWritable rowkey = new ImmutableBytesWritable(rowkeyByte);

            // 拆解qualifier
            matcher = pattern.matcher(qualifierStr);

            Put put = new Put(rowkeyByte);

            // 当前的qualifier坐标
            int qualifierIndex = 0;
            List<KeyValue> kvList = new ArrayList<>(_qualifiers.size());
            while (matcher.find()) {
                // 获取其中一个qualifier
                String str = matcher.group(2);  // 由逗号分隔的字节数组
                String[] strs = str.split(",");
                byte[] b = new byte[strs.length];
                for (int i = 0; i < strs.length; i++) {
                    // 将str中的字节还原为真实的字节,并放入数组中
                    b[i] = Byte.parseByte(strs[i].trim());
                }
                // 获取qualifier!
                String qualifier;
                if (_qualifiers.size() > qualifierIndex) {
                    qualifier = _qualifiers.get(qualifierIndex++);
                } else {
                    LOGGER.error("qualifiers不足!qualifiers:{}, qualifierIndex", _qualifiers, qualifierIndex);
                    throw new IllegalArgumentException("qualifiers不足!");
                }
                put.addColumn(Bytes.toBytes(CommonConstant.FAMILY_NAME), Bytes.toBytes(qualifier), b);
            }
            context.write(rowkey, put);

        }
    }

}

// Controller.java
/*
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
*/
Thread thread = new Thread(() -> {
            LOGGER.debug("begin create HFiles...");
            // 连接HBase
            Configuration conf = HBaseConnectionManager.getInstance().getNewConfiguration();
            /////////////////////////////////
            IHBaseDao hbaseDao = new HBaseDaoImpl();
            /////////////////////////////////

            // 获取所有需要保存到HFile为文件
            for (File datFile : FileKit.getAllFiles(Constants.SIMULATION_HBASE_DIR, "(.*)\\.dat")) {
                // tableName,表名:表名前缀+文件名(去后缀)
                String tableName = OriginalConstant.TABLE_NAME_PRE + FileKit.getFileNameStr(datFile);
                String datFilePath = datFile.getAbsolutePath();
                // 运行前,删除已存在的中间输出目录
                try {
                    LOGGER.debug("simulation file dir:{}", Constants.SIMULATION_HFILE_DIR);
                    FileSystem fs = FileSystem.get(URI.create(Constants.SIMULATION_HFILE_DIR), conf);

                    fs.delete(new Path(Constants.SIMULATION_HFILE_DIR), true);
                    fs.close();
                } catch (IOException e) {
                    LOGGER.error("io exception... detail:{}", e);
                    throw new IllegalArgumentException(e);
                }

                // 如果表不存在,创建表
                if (!hbaseDao.isExists(tableName)) {
                    hbaseDao.createTable(tableName, CommonConstant.FAMILY_NAME, OriginalConstant.TTL);
                }
                // 获取表
                Table table = hbaseDao.getTable(tableName);


                // 生成Job,注意以下的包别导错!
                try {

                    ///////////////////////////
                    // 通过调试,发现默认添加的序列器只有三个,这里手动添加其他的
                    // 其中Put.class用的是org.apache.hadoop.hbase.mapreduce.MutationSerialization,否则将报空指针
                    conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization,"
                            + "org.apache.hadoop.io.serializer.WritableSerialization,"
                            + "org.apache.hadoop.hbase.mapreduce.KeyValueSerialization,"
                            + "org.apache.hadoop.hbase.mapreduce.MutationSerialization,"
                            + "org.apache.hadoop.hbase.mapreduce.ResultSerialization"
                    );
                    ///////////////////////////

                    Job job = Job.getInstance(conf, "create HFile");
                    job.setJarByClass(Controller.class);

                    // in/out format
                    job.setInputFormatClass(TextInputFormat.class);
                    job.setOutputFormatClass(HFileOutputFormat2.class);

                    // 设置mapper类
                    job.setMapperClass(HFileCreate.HFileImportMapper2.class);
                    // HFileOutputFormat2.configureIncrementalLoadMap 没有指定Put默认为PutSortReducer,所以这里手动指定
                    job.setReducerClass(PutSortReducer.class);


                    // 设置输出的key和value类
                    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                    job.setMapOutputValueClass(Put.class);
//                    job.setMapOutputValueClass(KeyValue.class);

                    // speculation
                    job.setSpeculativeExecution(false);
                    job.setReduceSpeculativeExecution(false);

                    // 设置输入/输出路径
                    FileInputFormat.setInputPaths(job, datFilePath);
                    FileOutputFormat.setOutputPath(job, new Path(Constants.SIMULATION_HFILE_DIR));

//                    HFileOutputFormat2.configureIncrementalLoad(job, table, hbaseDao.getRegionLocator(table));
                    HFileOutputFormat2.configureIncrementalLoadMap(job, table);
                    // 设置等待
                    job.waitForCompletion(true);
                } catch (IOException e) {
                    LOGGER.error("io exception... detail:{}", e);
                    throw new IllegalArgumentException(e);
                } catch (InterruptedException e) {
                    LOGGER.error("interrupted exception... detail:{}", e);
                    throw new IllegalArgumentException(e);
                } catch (ClassNotFoundException e) {
                    LOGGER.error("class not found! detail:{}", e);
                    throw new IllegalArgumentException(e);
                } catch (Exception e) {
                    LOGGER.error("unknown error, detail:{}", e);
                    throw new IllegalArgumentException(e);
                } finally {
                    try {
                        if (table != null) {
                            table.close();
                        }
                    } catch (IOException e) {
                        LOGGER.error("close table error!");
                    }
                }

            }

注意点

有几个需要注意的点:

  1. 这里使用的是HFileOutputFormat2.configureIncrementalLoadMap(job, table);而不是configureIncrementalLoad方法。在configureIncrementalLoad方法中指定了Put和KeyValue的reducer类,而configureIncrementalLoadMap方法没有,所以在生成job时需要手动指定。另外在使用configureIncrementalLoad方法过程中遇到了环境变量等问题,这里暂时没有解决,直接使用configureIncrementalLoadMap方法了。

  2. 在使用configureIncrementalLoadMap方法,序列化类会缺少,所以这里也手动添加: conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization," + "org.apache.hadoop.hbase.mapreduce.KeyValueSerialization," + "org.apache.hadoop.hbase.mapreduce.MutationSerialization," + "org.apache.hadoop.hbase.mapreduce.ResultSerialization" );

  3. 代码完整地址:(分支可能会变化) https://github.com/htzy/e-commerce/tree/b_0_0_1/tools/hbase-simulation

乱七八糟(以下为追加模式)

1. 问题:org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell

解决过程:

以下内容中的: https://www.2cto.com/net/201710/692437.html 第4点报错类似 输入图片说明

换成KeyValue之后,报错: http://mail-archives.apache.org/mod_mbox/hbase-user/201405.mbox/%3CCAAMYKhq1gP8O2FJREEUZZpTxeGPO9w7Eu94RmKYMkxopOFekbw@mail.gmail.com%3E  尝试解决: https://stackoverflow.com/questions/37047145/why-hbase-keyvaluesortreducer-need-to-sort-all-keyvalue https://www.2cto.com/net/201710/692437.html

报错: Added a key not lexically larger than previous \x00\x00\x06\x1F\x00\x00\x00\x00\x06C\x00\x00\x00\x00\x0A\x1CFF\x00\x00\x01b\xD9{\xEC\x00\x00\x00\x00\x00FF/t:6/1524154091347/Put/vlen=1/seqid=0,

lastCell = \x00\x00\x06\x1F\x00\x00\x00\x00\x06C\x00\x00\x00\x00\x0A\x1CFF\x00\x00\x01b\xD9{\xEC\x00\x00\x00\x00\x00FF/t:7/1524154091347/Put/vlen=1/seqid=0


Current cell = \x00\x00\x06\x1F\x00\x00\x00\x00\x06>\x00\x00\x00\x00"\xF0FF\x00\x00\x01b\xD9{\xEC\x00\x00\x00\x00\x00FF/t:1/1524154499615/Put/vlen=6/seqid=0,

lastCell = \x00\x00\x06\x1F\x00\x00\x00\x00\x06>\x00\x00\x00\x00"\xF0FF\x00\x00\x01b\xD9{\xEC\x00\x00\x00\x00\x00FF/t:5/1524154499615/Put/vlen=4/seqid=0

手动排序后,只写入一条,则可以成功!如果rowkey一样,则无法写入!

kvList.sort((o1, o2) -> Bytes.compareTo(o2.getRowArray(), o1.getRowArray()));

如果同一个rowkey,只写入一个keyValue,那么不管多少条rowkey都可以成功!fuck 

手动指定reducer类为:job.setReducerClass(PutSortReducer.class);即可以使用Put了!

2. 问题获取Put序列化器失败

解决过程: 参考:https://blog.csdn.net/opensure/article/details/47054861 (Bulk Load-HBase数据导入最佳实践) 在使用BulkLoad生成HFile文件过程中报错: 具体的错误类似于: http://mail-archives.apache.org/mod_mbox/hbase-user/201504.mbox/raw/%3CCAPQV63WOjomoddv0X+VKbnMzguf6ViHuNnJPSyNE_7hsZjOzvw@mail.gmail.com%3E

输入图片说明

获取Put.class的序列化器失败!

调试:

输入图片说明

发现SerializationFactory中的获取序列化器,在接受Put.class一直失败

输入图片说明

而目前发现实现了该序列化接口的只有: 输入图片说明

输入图片说明

所以,索性:tmd在生成job时直接添加:

输入图片说明

 

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部