文档章节

从HDFS文件导入HBase

超人学院
 超人学院
发布于 2015/03/17 15:33
字数 399
阅读 103
收藏 1

精选30+云产品,助力企业轻松上云!>>>

从HDFS文件导入HBase,继承自Mapper,代码如下:

  1. package hbase.mr;  

  2.   

  3. import java.io.IOException;  

  4.   

  5. import hbase.curd.HTableUtil;  

  6.   

  7. import org.apache.commons.cli.CommandLine;  

  8. import org.apache.commons.cli.CommandLineParser;  

  9. import org.apache.commons.cli.HelpFormatter;  

  10. import org.apache.commons.cli.Option;  

  11. import org.apache.commons.cli.Options;  

  12. import org.apache.commons.cli.PosixParser;  

  13. import org.apache.commons.codec.digest.DigestUtils;  

  14. import org.apache.hadoop.conf.Configuration;  

  15. import org.apache.hadoop.fs.Path;  

  16. import org.apache.hadoop.hbase.KeyValue;  

  17. import org.apache.hadoop.hbase.client.Put;  

  18. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

  19. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  

  20. import org.apache.hadoop.hbase.util.Bytes;  

  21. import org.apache.hadoop.io.LongWritable;  

  22. import org.apache.hadoop.io.Text;  

  23. import org.apache.hadoop.io.Writable;  

  24. import org.apache.hadoop.mapreduce.Job;  

  25. import org.apache.hadoop.mapreduce.Mapper;  

  26. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

  27. import org.apache.hadoop.util.GenericOptionsParser;  

  28.   

  29. public class ImportFromFile {  

  30.   

  31.     /** 

  32.      * 从文件导入到HBase 

  33.      * @param args 

  34.      */  

  35.     public static final String NAME="ImportFromFile";  

  36.     public enum Counters{LINES}  

  37.       

  38.     static class ImportMapper extends Mapper<LongWritable,Text,  

  39.         ImmutableBytesWritable,Writable>{  

  40.         private byte[] family =null;  

  41.         private byte[] qualifier = null;  

  42.         @Override  

  43.         protected void setup(Context cxt){  

  44.             String column = cxt.getConfiguration().get("conf.column");  

  45.             byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));  

  46.             family = colkey[0];  

  47.             if(colkey.length>1){  

  48.                 qualifier = colkey[1];  

  49.             }  

  50.         }  

  51.         @Override  

  52.         public void map(LongWritable offset,Text line,Context cxt){  

  53.             try{  

  54.                 String lineString= line.toString();  

  55.                 byte[] rowkey= DigestUtils.md5(lineString);  

  56.                 Put put = new Put(rowkey);  

  57.                 put.add(family,qualifier,Bytes.toBytes(lineString));  

  58.                 cxt.write(new ImmutableBytesWritable(rowkey), put);  

  59.                 cxt.getCounter(Counters.LINES).increment(1);  

  60.             }catch(Exception e){  

  61.                 e.printStackTrace();  

  62.             }  

  63.         }  

  64.     }  

  65.     private static CommandLine parseArgs(String[] args){  

  66.         Options options = new Options();  

  67.         Option o = new Option("t" ,"table",true,"table to import into (must exist)");  

  68.         o.setArgName("table-name");  

  69.         o.setRequired(true);  

  70.         options.addOption(o);  

  71.           

  72.         o= new Option("c","column",true,"column to store row data into");  

  73.         o.setArgName("family:qualifier");  

  74.         o.setRequired(true);  

  75.         options.addOption(o);  

  76.           

  77.         o = new Option("i""input"true,  

  78.         "the directory or file to read from");  

  79.         o.setArgName("path-in-HDFS");  

  80.         o.setRequired(true);  

  81.         options.addOption(o);  

  82.         options.addOption("d""debug"false"switch on DEBUG log level");  

  83.         CommandLineParser parser = new PosixParser();  

  84.         CommandLine cmd = null;  

  85.         try {  

  86.             cmd = parser.parse(options, args);  

  87.         } catch (Exception e) {  

  88.             System.err.println("ERROR: " + e.getMessage() + "\n");  

  89.             HelpFormatter formatter = new HelpFormatter();  

  90.             formatter.printHelp(NAME + " ", options, true);  

  91.             System.exit(-1);  

  92.         }  

  93.         return cmd;  

  94.     }  

  95.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  

  96.           

  97.         Configuration conf = HTableUtil.getConf();  

  98.         String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();   

  99.         CommandLine cmd = parseArgs(otherArgs);  

  100.         String table = cmd.getOptionValue("t");  

  101.         String input = cmd.getOptionValue("i");  

  102.         String column = cmd.getOptionValue("c");  

  103.         conf.set("conf.column", column);  

  104.         Job job = new Job(conf, "Import from file " + input + " into table " + table);  

  105.         job.setJarByClass(ImportFromFile.class);  

  106.         job.setMapperClass(ImportMapper.class);  

  107.         job.setOutputFormatClass(TableOutputFormat.class);  

  108.         job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);  

  109.         job.setOutputKeyClass(ImmutableBytesWritable.class);  

  110.         job.setOutputValueClass(Writable.class);  

  111.         job.setNumReduceTasks(0);   

  112.         FileInputFormat.addInputPath(job, new Path(input));  

  113.         System.exit(job.waitForCompletion(true) ? 0 : 1);  

  114.     }  

  115.       

  116.     private static String[] initialArg(){  

  117.         String []args = new String[6];  

  118.         args[0]="-c";  

  119.         args[1]="fam:data";  

  120.         args[2]="-i";  

  121.         args[3]="/user/hadoop/input/picdata";  

  122.         args[4]="-t";  

  123.         args[5]="testtable";  

  124.         return args;  

  125.     }  

  126. }  

上一篇: hive hbase区别
下一篇: Docker基础命令
超人学院
粉丝 116
博文 335
码字总数 388917
作品 0
昌平
CTO(技术副总裁)
私信 提问
加载中
请先登录后再评论。
HBase BulkLoad批量写入数据实战

1.概述 在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过...

哥不是小萝莉
2018/08/19
0
0
HBase BulkLoad批量写入数据实战

1.概述 在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过...

osc_nqztydej
2018/08/19
3
0
hbase命令使用记录

使用自带MR将一个集群上hbase的表导入另一个集群上hbase表中: 1、使用 hbase org.apache.hadoop.hbase.mapreduce.Driver export tcan20190205 hdfs:///tmp/tcan20190205 命令将hbase表导入到...

osc_izu6drnv
2019/05/30
0
0
Hbase 整合 Hadoop 的数据迁移

上篇文章说了 Hbase 的基础架构,都是比较理论的知识,最近我也一直在搞 Hbase 的数据迁移, 今天就来一篇实战型的,把最近一段时间的 Hbase 整合 Hadoop 的基础知识在梳理一遍,毕竟当初搞得...

osc_5apgdxem
04/16
3
0
Hbase 整合 Hadoop 的数据迁移

上篇文章说了 Hbase 的基础架构,都是比较理论的知识,最近我也一直在搞 Hbase 的数据迁移, 今天就来一篇实战型的,把最近一段时间的 Hbase 整合 Hadoop 的基础知识在梳理一遍,毕竟当初搞得...

大数据江湖
04/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

将字符串拆分为具有多个单词边界定界符的单词 - Split Strings into words with multiple word boundary delimiters

问题: I think what I want to do is a fairly common task but I've found no reference on the web. 我认为我想做的是一项相当普通的任务,但是我在网络上找不到任何参考。 I have text ...

fyin1314
50分钟前
9
0
jQuery选择器中的通配符 - Wildcards in jQuery selectors

问题: I'm trying to use a wildcard to get the id of all the elements whose id begin with "jander". 我正在尝试使用通配符来获取id以“jander”开头的所有元素的id。 I tried $('#jand......

法国红酒甜
今天
19
0
唐山5.1级地震 百度人工智能技术帮上忙:成都减灾所提前30秒让北京、天津市民收到预警

本文作者:y****n 7月12日6时38分在河北唐山市古冶区发生5.1级地震,成都高新减灾研究所与应急管理部门联合建成的大陆地震预警网成功预警该地震,给唐山市提前3秒预警,给天津市提前33秒预警...

百度开发者中心
昨天
30
0
如何使用PHP发送POST请求? - How do I send a POST request with PHP?

问题: Actually I want to read the contents that come after the search query, when it is done. 实际上,我想阅读搜索查询之后的内容,完成之后。 The problem is that the URL only a......

javail
今天
14
0
如何从Java读取文件夹中的所有文件? - How to read all files in a folder from Java?

问题: 如何通过Java读取文件夹中的所有文件? 解决方案: 参考一: https://stackoom.com/question/7jt2/如何从Java读取文件夹中的所有文件 参考二: https://oldbug.net/q/7jt2/How-to-rea...

富含淀粉
今天
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部