文档章节

从HDFS文件导入HBase

超人学院
 超人学院
发布于 2015/03/17 15:33
字数 399
阅读 73
收藏 1
点赞 0
评论 0

从HDFS文件导入HBase,继承自Mapper,代码如下:

  1. package hbase.mr;  

  2.   

  3. import java.io.IOException;  

  4.   

  5. import hbase.curd.HTableUtil;  

  6.   

  7. import org.apache.commons.cli.CommandLine;  

  8. import org.apache.commons.cli.CommandLineParser;  

  9. import org.apache.commons.cli.HelpFormatter;  

  10. import org.apache.commons.cli.Option;  

  11. import org.apache.commons.cli.Options;  

  12. import org.apache.commons.cli.PosixParser;  

  13. import org.apache.commons.codec.digest.DigestUtils;  

  14. import org.apache.hadoop.conf.Configuration;  

  15. import org.apache.hadoop.fs.Path;  

  16. import org.apache.hadoop.hbase.KeyValue;  

  17. import org.apache.hadoop.hbase.client.Put;  

  18. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

  19. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  

  20. import org.apache.hadoop.hbase.util.Bytes;  

  21. import org.apache.hadoop.io.LongWritable;  

  22. import org.apache.hadoop.io.Text;  

  23. import org.apache.hadoop.io.Writable;  

  24. import org.apache.hadoop.mapreduce.Job;  

  25. import org.apache.hadoop.mapreduce.Mapper;  

  26. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

  27. import org.apache.hadoop.util.GenericOptionsParser;  

  28.   

  29. public class ImportFromFile {  

  30.   

  31.     /** 

  32.      * 从文件导入到HBase 

  33.      * @param args 

  34.      */  

  35.     public static final String NAME="ImportFromFile";  

  36.     public enum Counters{LINES}  

  37.       

  38.     static class ImportMapper extends Mapper<LongWritable,Text,  

  39.         ImmutableBytesWritable,Writable>{  

  40.         private byte[] family =null;  

  41.         private byte[] qualifier = null;  

  42.         @Override  

  43.         protected void setup(Context cxt){  

  44.             String column = cxt.getConfiguration().get("conf.column");  

  45.             byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));  

  46.             family = colkey[0];  

  47.             if(colkey.length>1){  

  48.                 qualifier = colkey[1];  

  49.             }  

  50.         }  

  51.         @Override  

  52.         public void map(LongWritable offset,Text line,Context cxt){  

  53.             try{  

  54.                 String lineString= line.toString();  

  55.                 byte[] rowkey= DigestUtils.md5(lineString);  

  56.                 Put put = new Put(rowkey);  

  57.                 put.add(family,qualifier,Bytes.toBytes(lineString));  

  58.                 cxt.write(new ImmutableBytesWritable(rowkey), put);  

  59.                 cxt.getCounter(Counters.LINES).increment(1);  

  60.             }catch(Exception e){  

  61.                 e.printStackTrace();  

  62.             }  

  63.         }  

  64.     }  

  65.     private static CommandLine parseArgs(String[] args){  

  66.         Options options = new Options();  

  67.         Option o = new Option("t" ,"table",true,"table to import into (must exist)");  

  68.         o.setArgName("table-name");  

  69.         o.setRequired(true);  

  70.         options.addOption(o);  

  71.           

  72.         o= new Option("c","column",true,"column to store row data into");  

  73.         o.setArgName("family:qualifier");  

  74.         o.setRequired(true);  

  75.         options.addOption(o);  

  76.           

  77.         o = new Option("i""input"true,  

  78.         "the directory or file to read from");  

  79.         o.setArgName("path-in-HDFS");  

  80.         o.setRequired(true);  

  81.         options.addOption(o);  

  82.         options.addOption("d""debug"false"switch on DEBUG log level");  

  83.         CommandLineParser parser = new PosixParser();  

  84.         CommandLine cmd = null;  

  85.         try {  

  86.             cmd = parser.parse(options, args);  

  87.         } catch (Exception e) {  

  88.             System.err.println("ERROR: " + e.getMessage() + "\n");  

  89.             HelpFormatter formatter = new HelpFormatter();  

  90.             formatter.printHelp(NAME + " ", options, true);  

  91.             System.exit(-1);  

  92.         }  

  93.         return cmd;  

  94.     }  

  95.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  

  96.           

  97.         Configuration conf = HTableUtil.getConf();  

  98.         String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();   

  99.         CommandLine cmd = parseArgs(otherArgs);  

  100.         String table = cmd.getOptionValue("t");  

  101.         String input = cmd.getOptionValue("i");  

  102.         String column = cmd.getOptionValue("c");  

  103.         conf.set("conf.column", column);  

  104.         Job job = new Job(conf, "Import from file " + input + " into table " + table);  

  105.         job.setJarByClass(ImportFromFile.class);  

  106.         job.setMapperClass(ImportMapper.class);  

  107.         job.setOutputFormatClass(TableOutputFormat.class);  

  108.         job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);  

  109.         job.setOutputKeyClass(ImmutableBytesWritable.class);  

  110.         job.setOutputValueClass(Writable.class);  

  111.         job.setNumReduceTasks(0);   

  112.         FileInputFormat.addInputPath(job, new Path(input));  

  113.         System.exit(job.waitForCompletion(true) ? 0 : 1);  

  114.     }  

  115.       

  116.     private static String[] initialArg(){  

  117.         String []args = new String[6];  

  118.         args[0]="-c";  

  119.         args[1]="fam:data";  

  120.         args[2]="-i";  

  121.         args[3]="/user/hadoop/input/picdata";  

  122.         args[4]="-t";  

  123.         args[5]="testtable";  

  124.         return args;  

  125.     }  

  126. }  

© 著作权归作者所有

共有 人打赏支持
超人学院
粉丝 106
博文 335
码字总数 388917
作品 0
昌平
CTO(技术副总裁)
记一次测试环境Hbase数据备份恢复以及恢复后部分表无法删除的问题处理

一、Hbase数据备份恢复 说明: 因为测试环境要修改hadoop配置文件hdfs-site.xml的参数hdfs.rootdir 修改前的配置 hbase.rootdir hdfs://masters/hbase1 修改后的配置 hbase.rootdir hdfs://m...

断臂人
06/15
0
0
bulk-load装载hdfs数据到hbase小结

bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用,参考http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html: hbase提供了现成...

超人学院
2015/06/01
0
0
HBase 数据导入 ImportTsv

ImportTsv 工具是通过map reduce 完成的。所以要启动yarn. 工具要使用jar包,所以注意配置classpath。ImportTsv默认是通过hbase api 插入数据的 [hadoop-user@rhel work]$ cat /home/hadoop-...

JUN_LJ
06/28
0
0
利用BulkLoad导入Hbase表

1、插入HBase表传统方法具有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是TableOutputFormat方式,在map/reduce中直接生成put对象写入HBase,该方式在大量数据...

混绅士
06/28
0
0
HBase 写优化之 BulkLoad 实现数据快速入库

1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成...

大数据之路
2013/12/25
0
1
Hive Hbase区别 整理

Hive是为了简化编写MapReduce程序而生的,使用MapReduce做过数据分析的人都知道,很多分析程序除业务逻辑不同外,程序流程基本一样。在这种情况下,就需要Hive这样的用戶编程接口。Hive本身不...

李超
2015/04/17
0
0
sqoop 完成与关系型数据库的互导

一.安装SQOOP后可使用如下命令列出mysql数据库中的所有数据库,与检验是否安装成功。 # sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 123456 ...

gulf
06/26
0
0
Apache HBase 2.0.0 发布,Hadoop 数据库

Apache HBase 2.0.0 发布了,HBase 2.0.0 是 HBase 的第二个主要版本。 此次更新信息如下: 一个新的区域分配管理器(“AMv2”), 用于配置读取和/或写入路径以运行堆外的装置,以及可选的内...

雨田桑
05/03
0
0
Ubuntu10.04LTS配置Hadoop1.0.1+HBase 0.92.0

(关于Hadoop的单机和伪分布式配置参见:http://my.oschina.net/unclegeek/blog/40042)当写这篇文章的时候,最新版的hadoop是1.0.1版本,支持了许多新的特性,使得hbase持久化不易丢失数据。...

怪蜀黍
2012/03/05
0
0
hive 与 hbase 结合

一、hive与hbase的结合 Hive会经常和Hbase结合使用,把Hbase作为Hive的存储路径,所以Hive整合Hbase尤其重要。使用Hive读取Hbase中的数据,可以使用HQL语句在HBase表上进行查询、插入操作;甚...

meteor_hy
06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Tomcat中JAVA JVM内存溢出及合理配置

一、Java JVM内存介绍 JVM管理两种类型的内存,堆和非堆。按照官方的说法:“Java 虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在 Java 虚拟机启动时创...

學無止境
5分钟前
0
0
centOS7.4+nginx 1.12.2负载均衡

centOS7.4+nginx 1.12.2负载均衡 2018年04月10日 09:24:51 阅读数:58 1:参数信息 三台 centOS7.4 A,B,C A作为主服务器,B C作为分流的服务器 都搭建 nginx 1.12.2 一:安装 nginx 1:下载...

linjin200
11分钟前
0
0
分布式之抉择分布式锁

前言: 目前网上大部分的基于zookpeer,和redis的分布式锁的文章都不够全面。要么就是特意避开集群的情况,要么就是考虑不全,读者看着还是一脸迷茫。坦白说,这种老题材,很难写出新创意,博...

Java大蜗牛
16分钟前
0
0
rm: cannot remove `xxx’: Operation not permitted

rm: cannot remove `xxx': Operation not permitted可以先用lsattr xxx查看文件的隐藏属性。如果看到-----a-------的情况,可以用chattr -a xxx去除a属性,然后再进行删除就可以了....

殘留回憶
16分钟前
0
0
oracle 如何查看当前用户的表空间名称

如何查询当前用户的表空间名称?因为oracle建立索引,需要知道当前用户的表空间,查找了一下资料 --查询语法-- select default_tablespace from dba_users where username='登录用户' 如,...

youfen
20分钟前
0
0
MicroPython-TPYBoard开发板DIY小型家庭气象站

对于喜欢登山的人来说,都会非常关心自己所处的高度跟温度,海拔高度的测量方法,海拔测量一般常用的有两种方式,一是通过GPS全球定位系统,二是通过测出大气压,根据气压值算出海拔高度。 ...

bodasisiter
20分钟前
0
0
抓取沪A股票资金流向数据

library(rvest)mydata<-list()day1<-Sys.Date()day2<-Sys.Date()-7stock<-c("600695","600734","603693","601990","603650","603045","603895","600735","601999","603970","600619"......

cuyi
20分钟前
0
0
Java中mqtt消息队列发送和订阅消息

1.首先本地建立mqtt协议的服务器 2.直接上代码 package io.powerx.test;import java.util.Date;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.p......

江湖鱼大虾
22分钟前
0
0
数据结构-树的学习

1. 相关连接 维基-二叉搜索树 维基-红黑树 思否-红黑树

liuyan_lc
24分钟前
0
0
Dubbo 源码解读——自定义 Classloader 之 ExtensionLoader

众所周知,Dubbo 是阿里巴巴公司自主研发开源的一个高性能的服务框架(现已捐献给 Apache 基金会组织),应用之间可以通过 RPC 的方式来互相调用并返回结果。主要基于 Java 语言开发,它提供...

Ryan-瑞恩
33分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部