一、Main方法代码大放送
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
//可以设置一些参数,比如元数据的一些东西
//conf.set("path", inputPath);
Job job = Job.getInstance(conf ,"mapreduce read data from bhase test.");
job.setJarByClass(MyMapper.class);
//全表扫描Hbase全库数据,这样并非不好,但是压力会大
Scan scan = new Scan();
//加上startKey与stopkey读取某个数据类型的数据
//Scan scan = new Scan(Bytes.toBytes("195861"),Bytes.toBytes("195861" + "1"));
//也可以读取某个分区的数据
//Scan scan = new Scan(Bytes.toBytes("195861-123456"),Bytes.toBytes("195861-654321"));
//需要扫描的列簇,如果不加将会扫描Hbase的所有列簇,这需要根据需求来确定
scan.addFamily(Bytes.toBytes("info"));
//只需要info列簇下的name列
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
//设置输出任务的输出路径
FileOutputFormat.setOutputPath(job, new Path("/tmp/test" + new Random().nextInt(10000)));
//这个是初始化读取Hbase的方法
TableMapReduceUtil.initTableMapperJob("user"(表名), scan, MyMapper.class, NullWritable.class(设置Map任务的输出Key), Text.class(设置Map任务的输出value), job);
//设置reduce的数量 根据需求来定 有些情况是不需要的
job.setNumReduceTasks(0);
//提交任务到集群
job.waitForCompletion(true);
}
二、Map任务代码大放送
public static class MyMapper extends TableMapper<NullWritable, Text> {
public HTableInterface user = null;
public MyMapper() {
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
user = new HTable(context.getConfiguration(), "user");
/**FileSystem fs = FileSystem.get(context.getConfiguration());
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(context.getConfiguration().get("fileInput")))));
String line = null;
while ((line = reader.readLine()) != null) {
String userUrn = "195861-" + line;
userUrnList.add(userUrn);
}
reader.close();
*/
}
@Override
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
//TODO做一些事情 Put Delete?
...........................................
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
user .flushCommits();
user .close();
}
}