文档章节

从HDFS文件导入HBase

超人学院
 超人学院
发布于 2015/03/17 15:33
字数 399
阅读 77
收藏 1

从HDFS文件导入HBase,继承自Mapper,代码如下:

  1. package hbase.mr;  

  2.   

  3. import java.io.IOException;  

  4.   

  5. import hbase.curd.HTableUtil;  

  6.   

  7. import org.apache.commons.cli.CommandLine;  

  8. import org.apache.commons.cli.CommandLineParser;  

  9. import org.apache.commons.cli.HelpFormatter;  

  10. import org.apache.commons.cli.Option;  

  11. import org.apache.commons.cli.Options;  

  12. import org.apache.commons.cli.PosixParser;  

  13. import org.apache.commons.codec.digest.DigestUtils;  

  14. import org.apache.hadoop.conf.Configuration;  

  15. import org.apache.hadoop.fs.Path;  

  16. import org.apache.hadoop.hbase.KeyValue;  

  17. import org.apache.hadoop.hbase.client.Put;  

  18. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

  19. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  

  20. import org.apache.hadoop.hbase.util.Bytes;  

  21. import org.apache.hadoop.io.LongWritable;  

  22. import org.apache.hadoop.io.Text;  

  23. import org.apache.hadoop.io.Writable;  

  24. import org.apache.hadoop.mapreduce.Job;  

  25. import org.apache.hadoop.mapreduce.Mapper;  

  26. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

  27. import org.apache.hadoop.util.GenericOptionsParser;  

  28.   

  29. public class ImportFromFile {  

  30.   

  31.     /** 

  32.      * 从文件导入到HBase 

  33.      * @param args 

  34.      */  

  35.     public static final String NAME="ImportFromFile";  

  36.     public enum Counters{LINES}  

  37.       

  38.     static class ImportMapper extends Mapper<LongWritable,Text,  

  39.         ImmutableBytesWritable,Writable>{  

  40.         private byte[] family =null;  

  41.         private byte[] qualifier = null;  

  42.         @Override  

  43.         protected void setup(Context cxt){  

  44.             String column = cxt.getConfiguration().get("conf.column");  

  45.             byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));  

  46.             family = colkey[0];  

  47.             if(colkey.length>1){  

  48.                 qualifier = colkey[1];  

  49.             }  

  50.         }  

  51.         @Override  

  52.         public void map(LongWritable offset,Text line,Context cxt){  

  53.             try{  

  54.                 String lineString= line.toString();  

  55.                 byte[] rowkey= DigestUtils.md5(lineString);  

  56.                 Put put = new Put(rowkey);  

  57.                 put.add(family,qualifier,Bytes.toBytes(lineString));  

  58.                 cxt.write(new ImmutableBytesWritable(rowkey), put);  

  59.                 cxt.getCounter(Counters.LINES).increment(1);  

  60.             }catch(Exception e){  

  61.                 e.printStackTrace();  

  62.             }  

  63.         }  

  64.     }  

  65.     private static CommandLine parseArgs(String[] args){  

  66.         Options options = new Options();  

  67.         Option o = new Option("t" ,"table",true,"table to import into (must exist)");  

  68.         o.setArgName("table-name");  

  69.         o.setRequired(true);  

  70.         options.addOption(o);  

  71.           

  72.         o= new Option("c","column",true,"column to store row data into");  

  73.         o.setArgName("family:qualifier");  

  74.         o.setRequired(true);  

  75.         options.addOption(o);  

  76.           

  77.         o = new Option("i""input"true,  

  78.         "the directory or file to read from");  

  79.         o.setArgName("path-in-HDFS");  

  80.         o.setRequired(true);  

  81.         options.addOption(o);  

  82.         options.addOption("d""debug"false"switch on DEBUG log level");  

  83.         CommandLineParser parser = new PosixParser();  

  84.         CommandLine cmd = null;  

  85.         try {  

  86.             cmd = parser.parse(options, args);  

  87.         } catch (Exception e) {  

  88.             System.err.println("ERROR: " + e.getMessage() + "\n");  

  89.             HelpFormatter formatter = new HelpFormatter();  

  90.             formatter.printHelp(NAME + " ", options, true);  

  91.             System.exit(-1);  

  92.         }  

  93.         return cmd;  

  94.     }  

  95.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  

  96.           

  97.         Configuration conf = HTableUtil.getConf();  

  98.         String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();   

  99.         CommandLine cmd = parseArgs(otherArgs);  

  100.         String table = cmd.getOptionValue("t");  

  101.         String input = cmd.getOptionValue("i");  

  102.         String column = cmd.getOptionValue("c");  

  103.         conf.set("conf.column", column);  

  104.         Job job = new Job(conf, "Import from file " + input + " into table " + table);  

  105.         job.setJarByClass(ImportFromFile.class);  

  106.         job.setMapperClass(ImportMapper.class);  

  107.         job.setOutputFormatClass(TableOutputFormat.class);  

  108.         job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);  

  109.         job.setOutputKeyClass(ImmutableBytesWritable.class);  

  110.         job.setOutputValueClass(Writable.class);  

  111.         job.setNumReduceTasks(0);   

  112.         FileInputFormat.addInputPath(job, new Path(input));  

  113.         System.exit(job.waitForCompletion(true) ? 0 : 1);  

  114.     }  

  115.       

  116.     private static String[] initialArg(){  

  117.         String []args = new String[6];  

  118.         args[0]="-c";  

  119.         args[1]="fam:data";  

  120.         args[2]="-i";  

  121.         args[3]="/user/hadoop/input/picdata";  

  122.         args[4]="-t";  

  123.         args[5]="testtable";  

  124.         return args;  

  125.     }  

  126. }  

© 著作权归作者所有

共有 人打赏支持
超人学院
粉丝 107
博文 335
码字总数 388917
作品 0
昌平
CTO(技术副总裁)
HBase BulkLoad批量写入数据实战

1.概述 在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过...

哥不是小萝莉
08/19
0
0
记一次测试环境Hbase数据备份恢复以及恢复后部分表无法删除的问题处理

一、Hbase数据备份恢复 说明: 因为测试环境要修改hadoop配置文件hdfs-site.xml的参数hdfs.rootdir 修改前的配置 hbase.rootdir hdfs://masters/hbase1 修改后的配置 hbase.rootdir hdfs://m...

断臂人
06/15
0
0
bulk-load装载hdfs数据到hbase小结

bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用,参考http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html: hbase提供了现成...

超人学院
2015/06/01
0
0
sqoop导入数据到Base并同步hive与impala

使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟 基础环境 Sqool和Hive、HBase简介 测试Sqoop 使用Sqoop从MySQL导入数据到Hive 使用Sqoop从MySQL导入数据到HBase 关于Sqoop2 综上所述 其他...

hblt-j
07/20
0
0
HBase 数据导入 ImportTsv

ImportTsv 工具是通过map reduce 完成的。所以要启动yarn. 工具要使用jar包,所以注意配置classpath。ImportTsv默认是通过hbase api 插入数据的 [hadoop-user@rhel work]$ cat /home/hadoop-...

JUN_LJ
06/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Web系统大规模并发:电商秒杀与抢购

一、大规模并发带来的挑战 在过去的工作中,我曾经面对过5w每秒的高并发秒杀功能,在这个过程中,整个Web系统遇到了很多的问题和挑战。如果Web系统不做针对性的优化,会轻而易举地陷入到异常...

xtof
今天
2
0
代码质量管理平台-sonarqube

在工作中,往往开发的时候会不怎么注重代码质量的人很多,存在着很多的漏洞和隐患等问题,sonarqube可以进行代码质量的审核,而且十分的残酷。。。。。接下来我们说下怎么安装 进入官网下载:...

落叶清风
今天
7
0
在Ubuntu安装和配置Sphinx

Ubuntu系统默认是配置有sphinx的,先检查一下,别多此一举。。。。。 在开始本指南之前,您需要: 一个Ubuntu 16.04服务器。 sudo的一个非root用户,您可以通过以下设置本教程 。 安装在服务...

阿锋zxf
今天
1
0
Qt编写输入法V2018超级终结版

对于qt嵌入式linux开发人员来说,输入法一直是个鸡肋问题,要么不支持实体键盘同步,要么不能汉字输入,要么不支持网页输入等,这几年通过陆续接触大量的各种输入法应用场景客户,得到真实需...

飞扬青云
今天
2
0
TypeScript基础入门之高级类型的多态的 this类型

转发 TypeScript基础入门之高级类型的多态的 this类型 高级类型 多态的this类型 多态的this类型表示的是某个包含类或接口的子类型。 这被称做F-bounded多态性。 它能很容易的表现连贯接口间的...

durban
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部