Mapreduce 之Hbase两张表之间数据搬迁

原创
2015/11/21 15:44
阅读数 31

一、Main代码大放送

Job job = Job.getInstance(new Configuration(), "move");
job.setJarByClass(Move.class);
Scan scan = new Scan(Bytes.toBytes("195861-1000083607"), Bytes.toBytes("195861-1000083607" + "1"));

TableMapReduceUtil.initTableMapperJob(
                "bvuser",
                scan,
                MyMapper.class,
                ImmutableBytesWritable.class,
                Result.class,
                job);

TableMapReduceUtil.initTableReducerJob(
                "user",
                MyReducer.class,
                job);

job.setNumReduceTasks(1);

boolean b = job.waitForCompletion(true);

if (b) {
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
CounterGroup count = counters.getGroup("COUNT");
Counter total = count.findCounter("TOTAL");
LOG.info("=================" + total.getValue());
}

二、Map代码大放送

public static class MyMapper extends TableMapper<ImmutableBytesWritable, Result> {

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

三、Reduce代码大放送

public static class MyReducer extends TableReducer<ImmutableBytesWritable, Result, NullWritable> {
        //计数器
        Counter counter = null;
        public MyReducer() {
        }
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //初始化计数器
            counter = context.getCounter("COUNT", "TOTAL");
            counter.setValue(0L);
        }
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) throws IOException, InterruptedException {
            Put put = null;
            for (Result result : values) {
                counter.increment(1);
                byte[] value = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("screenName"));
                put = new Put(key.get());
                put.add(Bytes.toBytes("info"), Bytes.toBytes("screenName"), System.currentTimeMillis(), value);
                context.write(NullWritable.get(), put);
            }
        }
    }





展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部