一、Main代码大放送
Job job = Job.getInstance(new Configuration(), "move");
job.setJarByClass(Move.class);
Scan scan = new Scan(Bytes.toBytes("195861-1000083607"), Bytes.toBytes("195861-1000083607" + "1"));
TableMapReduceUtil.initTableMapperJob(
"bvuser",
scan,
MyMapper.class,
ImmutableBytesWritable.class,
Result.class,
job);
TableMapReduceUtil.initTableReducerJob(
"user",
MyReducer.class,
job);
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
if (b) {
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
CounterGroup count = counters.getGroup("COUNT");
Counter total = count.findCounter("TOTAL");
LOG.info("=================" + total.getValue());
}
二、Map代码大放送
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Result> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
三、Reduce代码大放送
public static class MyReducer extends TableReducer<ImmutableBytesWritable, Result, NullWritable> {
//计数器
Counter counter = null;
public MyReducer() {
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//初始化计数器
counter = context.getCounter("COUNT", "TOTAL");
counter.setValue(0L);
}
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) throws IOException, InterruptedException {
Put put = null;
for (Result result : values) {
counter.increment(1);
byte[] value = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("screenName"));
put = new Put(key.get());
put.add(Bytes.toBytes("info"), Bytes.toBytes("screenName"), System.currentTimeMillis(), value);
context.write(NullWritable.get(), put);
}
}
}