Mapreduce读写Hbase

原创
2015/11/20 18:42
阅读数 47

一、Main方法代码大放送

    

public static void main(String[] args)  throws IOException {
        Configuration conf = HBaseConfiguration.create();
        //可以设置一些参数,比如元数据的一些东西
        //conf.set("path", inputPath);
        Job job = Job.getInstance(conf ,"mapreduce read data from bhase test.");
        job.setJarByClass(MyMapper.class);
        //全表扫描Hbase全库数据,这样并非不好,但是压力会大
        Scan scan = new Scan();
        //加上startKey与stopkey读取某个数据类型的数据
        //Scan scan = new Scan(Bytes.toBytes("195861"),Bytes.toBytes("195861" + "1"));
        //也可以读取某个分区的数据
        //Scan scan = new Scan(Bytes.toBytes("195861-123456"),Bytes.toBytes("195861-654321"));
        //需要扫描的列簇,如果不加将会扫描Hbase的所有列簇,这需要根据需求来确定
        scan.addFamily(Bytes.toBytes("info"));
        //只需要info列簇下的name列
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
        //设置输出任务的输出路径
        FileOutputFormat.setOutputPath(job, new Path("/tmp/test" + new Random().nextInt(10000)));
        //这个是初始化读取Hbase的方法
        TableMapReduceUtil.initTableMapperJob("user"(表名), scan, MyMapper.class, NullWritable.class(设置Map任务的输出Key), Text.class(设置Map任务的输出value), job);
        //设置reduce的数量 根据需求来定 有些情况是不需要的
        job.setNumReduceTasks(0);
        
        //提交任务到集群
        job.waitForCompletion(true);
      
    }

二、Map任务代码大放送

public static class MyMapper extends TableMapper<NullWritable, Text> {

        public HTableInterface user = null;

        public MyMapper() {

        }

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            user = new HTable(context.getConfiguration(), "user");
            /**FileSystem fs = FileSystem.get(context.getConfiguration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(context.getConfiguration().get("fileInput")))));
            String line = null;
            while ((line = reader.readLine()) != null) {
                String userUrn = "195861-" + line;
                userUrnList.add(userUrn);
            }
            reader.close();
            */
            
        }


        @Override
        protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
            //TODO做一些事情 Put Delete?
            ...........................................
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            user .flushCommits();
            user .close();
        }
    }




展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部