文档章节

从HDFS文件导入HBase

超人学院
 超人学院
发布于 2015/03/17 15:33
字数 399
阅读 78
收藏 1

从HDFS文件导入HBase,继承自Mapper,代码如下:

  1. package hbase.mr;  

  2.   

  3. import java.io.IOException;  

  4.   

  5. import hbase.curd.HTableUtil;  

  6.   

  7. import org.apache.commons.cli.CommandLine;  

  8. import org.apache.commons.cli.CommandLineParser;  

  9. import org.apache.commons.cli.HelpFormatter;  

  10. import org.apache.commons.cli.Option;  

  11. import org.apache.commons.cli.Options;  

  12. import org.apache.commons.cli.PosixParser;  

  13. import org.apache.commons.codec.digest.DigestUtils;  

  14. import org.apache.hadoop.conf.Configuration;  

  15. import org.apache.hadoop.fs.Path;  

  16. import org.apache.hadoop.hbase.KeyValue;  

  17. import org.apache.hadoop.hbase.client.Put;  

  18. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

  19. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  

  20. import org.apache.hadoop.hbase.util.Bytes;  

  21. import org.apache.hadoop.io.LongWritable;  

  22. import org.apache.hadoop.io.Text;  

  23. import org.apache.hadoop.io.Writable;  

  24. import org.apache.hadoop.mapreduce.Job;  

  25. import org.apache.hadoop.mapreduce.Mapper;  

  26. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

  27. import org.apache.hadoop.util.GenericOptionsParser;  

  28.   

  29. public class ImportFromFile {  

  30.   

  31.     /** 

  32.      * 从文件导入到HBase 

  33.      * @param args 

  34.      */  

  35.     public static final String NAME="ImportFromFile";  

  36.     public enum Counters{LINES}  

  37.       

  38.     static class ImportMapper extends Mapper<LongWritable,Text,  

  39.         ImmutableBytesWritable,Writable>{  

  40.         private byte[] family =null;  

  41.         private byte[] qualifier = null;  

  42.         @Override  

  43.         protected void setup(Context cxt){  

  44.             String column = cxt.getConfiguration().get("conf.column");  

  45.             byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));  

  46.             family = colkey[0];  

  47.             if(colkey.length>1){  

  48.                 qualifier = colkey[1];  

  49.             }  

  50.         }  

  51.         @Override  

  52.         public void map(LongWritable offset,Text line,Context cxt){  

  53.             try{  

  54.                 String lineString= line.toString();  

  55.                 byte[] rowkey= DigestUtils.md5(lineString);  

  56.                 Put put = new Put(rowkey);  

  57.                 put.add(family,qualifier,Bytes.toBytes(lineString));  

  58.                 cxt.write(new ImmutableBytesWritable(rowkey), put);  

  59.                 cxt.getCounter(Counters.LINES).increment(1);  

  60.             }catch(Exception e){  

  61.                 e.printStackTrace();  

  62.             }  

  63.         }  

  64.     }  

  65.     private static CommandLine parseArgs(String[] args){  

  66.         Options options = new Options();  

  67.         Option o = new Option("t" ,"table",true,"table to import into (must exist)");  

  68.         o.setArgName("table-name");  

  69.         o.setRequired(true);  

  70.         options.addOption(o);  

  71.           

  72.         o= new Option("c","column",true,"column to store row data into");  

  73.         o.setArgName("family:qualifier");  

  74.         o.setRequired(true);  

  75.         options.addOption(o);  

  76.           

  77.         o = new Option("i""input"true,  

  78.         "the directory or file to read from");  

  79.         o.setArgName("path-in-HDFS");  

  80.         o.setRequired(true);  

  81.         options.addOption(o);  

  82.         options.addOption("d""debug"false"switch on DEBUG log level");  

  83.         CommandLineParser parser = new PosixParser();  

  84.         CommandLine cmd = null;  

  85.         try {  

  86.             cmd = parser.parse(options, args);  

  87.         } catch (Exception e) {  

  88.             System.err.println("ERROR: " + e.getMessage() + "\n");  

  89.             HelpFormatter formatter = new HelpFormatter();  

  90.             formatter.printHelp(NAME + " ", options, true);  

  91.             System.exit(-1);  

  92.         }  

  93.         return cmd;  

  94.     }  

  95.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  

  96.           

  97.         Configuration conf = HTableUtil.getConf();  

  98.         String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();   

  99.         CommandLine cmd = parseArgs(otherArgs);  

  100.         String table = cmd.getOptionValue("t");  

  101.         String input = cmd.getOptionValue("i");  

  102.         String column = cmd.getOptionValue("c");  

  103.         conf.set("conf.column", column);  

  104.         Job job = new Job(conf, "Import from file " + input + " into table " + table);  

  105.         job.setJarByClass(ImportFromFile.class);  

  106.         job.setMapperClass(ImportMapper.class);  

  107.         job.setOutputFormatClass(TableOutputFormat.class);  

  108.         job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);  

  109.         job.setOutputKeyClass(ImmutableBytesWritable.class);  

  110.         job.setOutputValueClass(Writable.class);  

  111.         job.setNumReduceTasks(0);   

  112.         FileInputFormat.addInputPath(job, new Path(input));  

  113.         System.exit(job.waitForCompletion(true) ? 0 : 1);  

  114.     }  

  115.       

  116.     private static String[] initialArg(){  

  117.         String []args = new String[6];  

  118.         args[0]="-c";  

  119.         args[1]="fam:data";  

  120.         args[2]="-i";  

  121.         args[3]="/user/hadoop/input/picdata";  

  122.         args[4]="-t";  

  123.         args[5]="testtable";  

  124.         return args;  

  125.     }  

  126. }  

© 著作权归作者所有

共有 人打赏支持
上一篇: hive hbase区别
下一篇: Docker基础命令
超人学院
粉丝 110
博文 335
码字总数 388917
作品 0
昌平
CTO(技术副总裁)
私信 提问
HBase BulkLoad批量写入数据实战

1.概述 在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过...

哥不是小萝莉
08/19
0
0
HBase实战之HBase BulkLoad批量写入数据

1.概述 在进行数据传输时,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,导入数据的过程...

HBase技术社区
10/02
0
0
HBase实战 | Hive数据导入云HBase

网络环境 专线:用户需要把hbase集群的VPC相关网络信息配置到专线里面,可直通hbase环境 公有云虚拟机VPC环境:选择和hbase通VPC 其他:需要开hbase公网 注意:默认导入hbase数据,依赖的hba...

hbase小能手
11/14
0
0
记一次测试环境Hbase数据备份恢复以及恢复后部分表无法删除的问题处理

一、Hbase数据备份恢复 说明: 因为测试环境要修改hadoop配置文件hdfs-site.xml的参数hdfs.rootdir 修改前的配置 hbase.rootdir hdfs://masters/hbase1 修改后的配置 hbase.rootdir hdfs://m...

断臂人
06/15
0
0
sqoop导入数据到Base并同步hive与impala

使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟 基础环境 Sqool和Hive、HBase简介 测试Sqoop 使用Sqoop从MySQL导入数据到Hive 使用Sqoop从MySQL导入数据到HBase 关于Sqoop2 综上所述 其他...

hblt-j
07/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux中常用标识---不定期更新

LINUX常用标识符: 1 & && | || &: 表示进程在后台运行 例如 redis-server & 不是所有后台运行都是& 比如es ./bin/elasticsearch -d es后台运行&&: 第一个命令执行成功后 才执行后面的命令...

geek土拨鼠
37分钟前
1
0
Mybatis 中$与#的区别,预防SQL注入

一直没注意Mybatis 中$与#的区别,当然也是更习惯使用#,没想到避免了SQL注入,但是由于要处理项目中安全渗透的问题,不可避免的又遇到了这个问题,特此记录一下。 首先是共同点: 在mybatis...

大雁南飞了
53分钟前
0
0
Cydia的基石:MobileSubstrate

在MAC与IOS平台上,动态库的后缀一般是dylid,而加载这些动态库的程序叫做dynamic linker(dyld)。这个程序有很多的环境变量来设置程序的一些行为,最为常用的一个环境变量叫做"DYLD_INSERT_...

HeroHY
56分钟前
1
0
Spring Clould负载均衡重要组件:Ribbon中重要类的用法

Ribbon是Spring Cloud Netflix全家桶中负责负载均衡的组件,它是一组类库的集合。通过Ribbon,程序员能在不涉及到具体实现细节的基础上“透明”地用到负载均衡,而不必在项目里过多地编写实现...

Ala6
今天
0
0
让 linux 删除能够进入回收站

可以参考这个贴子 https://blog.csdn.net/F8qG7f9YD02Pe/article/details/79543316 从那个git地址 把saferm.sh下载下来 把saferm.sh复制到 /usr/bin 目录下 在用~/目下 的.bashrc 下加一句这...

shzwork
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部