1,map类
public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> { List<Put> putList = new LinkedList<Put>(); HTableInterface bvuser; public MyMapper() { } //这个方法每个map任务只调用一次 @Override protected void setup(final Context context) throws IOException, InterruptedException { bvuser = new HTable(context.getConfiguration(),"bvuser"); BufferedReader br = new BufferedReader(new InputStreamReader(FileSystem.get(context.getConfiguration()).open(new Path("hdfs文件路径")))); String line = null; while((line = br.readLine()) != null){ } IOUtils.closeQuietly(br); } //hdfs的每行数据会调用一次 @Override public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { Counter counter = context.getCounter("group", "mapcount"); counter.increment(1); String line = value.toString(); if(line==null || line.isEmpty()){ return; } Put put = new Put(Bytes.toBytes("3136949-" + userId)); put.add(Bytes.toBytes("info"), Bytes.toBytes("billTypes"), System.currentTimeMillis(), Bytes.toBytes(StringUtils.join(typeidSet, ","))); putList.add(put); if(putList.size()==1000){ bvuser.put(putList); putList.clear(); } } //每个任务只掉用一次 @Override protected void cleanup(Context context) throws IOException, InterruptedException { if(!putList.isEmpty()){ bvuser.put(putList); putList.clear(); } bvuser.close(); } }
2,Main方法
Configuration conf = HBaseConfiguration.create(); final Job job = Job.getInstance(conf, "data-save-job"); //格式化文件 文本形式 相当于字符串 job.setInputFormatClass(TextInputFormat.class); job.setJarByClass(TextFile2HbaseJob.class); //设置map类 job.setMapperClass(MyMapper.class); //设置map的输出结果 job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(output)); FileSystem fs = FileSystem.get(conf); FileStatus[] files = fs.listStatus(new Path(input), new PathFilter() { @Override public boolean accept(Path path) { return path.getName().startsWith("part-m"); } }); if (files.length == 0) { throw new IllegalStateException("no file"); } //整个目录下的文件都加进去 /export/part-m-0000000,part-m-0000000格式 for (FileStatus file : files) { FileInputFormat.addInputPath(job, file.getPath()); } //设置reduce数0 job.setNumReduceTasks(0);