HDFS数据更新到hbase表

原创
2016/05/25 11:11
阅读数 274

1,map类

public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

    List<Put> putList = new LinkedList<Put>();

    HTableInterface bvuser;

    public MyMapper() {

    }

  //这个方法每个map任务只调用一次
    @Override
    protected void setup(final Context context) throws IOException, InterruptedException {

        bvuser = new HTable(context.getConfiguration(),"bvuser");

        BufferedReader br = new BufferedReader(new InputStreamReader(FileSystem.get(context.getConfiguration()).open(new Path("hdfs文件路径"))));

        String line = null;

        while((line = br.readLine()) != null){
            
        }
        IOUtils.closeQuietly(br);

    }

  //hdfs的每行数据会调用一次
    @Override
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
        Counter counter = context.getCounter("group", "mapcount");
        counter.increment(1);

        String line = value.toString();
        if(line==null || line.isEmpty()){
            return;
        }

        Put put = new Put(Bytes.toBytes("3136949-" + userId));

        put.add(Bytes.toBytes("info"), Bytes.toBytes("billTypes"), System.currentTimeMillis(), Bytes.toBytes(StringUtils.join(typeidSet, ",")));

        putList.add(put);

        if(putList.size()==1000){
            bvuser.put(putList);
            putList.clear();
        }

    }

    //每个任务只掉用一次
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        if(!putList.isEmpty()){
            bvuser.put(putList);
            putList.clear();
        }
        
        bvuser.close();
    }
}

 

2,Main方法

Configuration conf = HBaseConfiguration.create();
final Job job = Job.getInstance(conf, "data-save-job");

//格式化文件 文本形式 相当于字符串
job.setInputFormatClass(TextInputFormat.class);
job.setJarByClass(TextFile2HbaseJob.class);

//设置map类
job.setMapperClass(MyMapper.class);

//设置map的输出结果
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);

FileOutputFormat.setOutputPath(job, new Path(output));
FileSystem fs = FileSystem.get(conf);
FileStatus[] files = fs.listStatus(new Path(input), new PathFilter() {
    @Override
    public boolean accept(Path path) {
        return path.getName().startsWith("part-m");
    }
});

if (files.length == 0) {
    throw new IllegalStateException("no file");
}
//整个目录下的文件都加进去 /export/part-m-0000000,part-m-0000000格式
for (FileStatus file : files) {
    FileInputFormat.addInputPath(job, file.getPath());
}
//设置reduce数0
job.setNumReduceTasks(0);

 

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部