文档章节

在不同版本hdfs集群之间转移数据

超人学院
 超人学院
发布于 2015/06/05 17:06
字数 1020
阅读 16
收藏 0

在不同版本hdfs集群之间转移数据

    最简单的办法就是把src集群的数据导到本地,然后起另一个进程将本地数据传到des集群上去。 
    不过这有几个问题:


  • 效率降低 

  • 占用本地磁盘空间 

  • 不能应付实时导数据需求     

  • 两个进程需要协调,复杂度增加 


    
更好的办法是在同一个进程内一边读src数据,一边写des集群。不过这相当于在同一个进程空间内加载两个版本的hadoop jar包,这就需要在程序中使用两个classloader来实现。 
    以下代码可以实现classloader加载自定义的jar包,并生成需要的Configuration对象:

Java代码


  • URL[] jarUrls = new URL[1];   

  • jarUrls[0]=new File(des_jar_path).toURI().toURL();       

  • ClassLoader jarloader = new URLClassLoader(jarUrls, null);       

  • Class Proxy = Class.forName("yourclass", true, jarloader);       

  • Configuration conf = (Configuration)Proxy.newInstance();  

URL[] jarUrls = new URL[1];

jarUrls[0]=newFile(des_jar_path).toURI().toURL();

ClassLoader jarloader = newURLClassLoader(jarUrls, null);

Class Proxy =Class.forName("yourclass", true, jarloader);

Configuration conf =(Configuration)Proxy.newInstance();



    
但是由于在生成HTable对象时,需要使用这个conf对象,而加载这个conf对象的代码本身是由默认的classloader加载的,也就是0.19.2的jar包。所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本的。那怎么办呢? 
    琢磨了一会,发现如果要实现以上功能,必须将生成HTable对象,以及以后的所有hbase操作都使用这个新的classloader,因此这个新的classloader必须加载除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封装进去。在外面用反射来调用。
    这样的话,通常构造函数都不为空了,因此需要用到Constructor来构造一个自定义的构造函数 
    代码段如下:

Java代码


  • main.java       

  • void init(){       

  •     ClassLoader jarloader = generateJarLoader();       

  •     Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);       

  •     Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});       

  •     Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);       

  •     proxy = con.newInstance(new Object[]{path, tablename, autoflush});       

  • }       

  • void put(){       

  • ...       

  •     while((line = getLine()) != null) {       

  •         proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));       

  •         Method addPut = proxy.getClass().getMethod("addPut",       

  •                 new Class[]{String.class, String.class, String.class});       

  •         addPut.invoke(proxy, new Object[]{field, column, encode});       

  •         proxy.getClass().getMethod("putLine").invoke(proxy);       

  •     }       

  • }       

  •   

  • ClassLoader generateJarLoader() throws IOException {       

  •       String libPath = System.getProperty("java.ext.dirs");       

  •       FileFilter filter = new FileFilter() {       

  •       @Override  

  •       public boolean accept(File pathname) {       

  •         if(pathname.getName().startsWith("hadoop-0.19.2"))       

  •           return false;       

  •         else  

  •             return pathname.getName().endsWith(".jar");       

  •       }       

  •       };       

  •       File[] jars = new File(libPath).listFiles(filter);       

  •       URL[] jarUrls = new URL[jars.length+1];   

  •                

  •       int k = 0;   

  •       for (int i = 0; i < jars.length; i++) {       

  •         jarUrls[k++] = jars.toURI().toURL();       

  •       }       

  •       jarUrls[k] = new File("hadoop-0.20.205.jar")       

  •       ClassLoader jarloader = new URLClassLoader(jarUrls, null);       

  •       return jarloader;       

  • }  

main.java

void init(){

  ClassLoader jarloader = generateJarLoader();

  Class Proxy =Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

  Constructor con = Proxy.getConstructor(new Class[]{String.class,String.class, boolean.class});

  Boolean autoflush =param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

  proxy = con.newInstance(new Object[]{path, tablename, autoflush});

}

void put(){

...

  while((line = getLine()) != null) {

   proxy.getClass().getMethod("generatePut",String.class).invoke(proxy,line.getField(rowkey));

   Method addPut = proxy.getClass().getMethod("addPut",

      new Class[]{String.class, String.class, String.class});

   addPut.invoke(proxy, new Object[]{field, column, encode});

   proxy.getClass().getMethod("putLine").invoke(proxy);

  }

}

ClassLoader generateJarLoader()throws IOException {

      String libPath =System.getProperty("java.ext.dirs");

      FileFilter filter = new FileFilter() {

      @Override

      public boolean accept(File pathname) {

        if(pathname.getName().startsWith("hadoop-0.19.2"))

          return false;

        else

         returnpathname.getName().endsWith(".jar");

      }

      };

      File[] jars = newFile(libPath).listFiles(filter);

      URL[] jarUrls = new URL[jars.length+1];

   

      int k = 0;

      for (int i = 0; i < jars.length; i++){

        jarUrls[k++] = jars.toURI().toURL();

      }

      jarUrls[k] = newFile("hadoop-0.20.205.jar")

      ClassLoader jarloader = newURLClassLoader(jarUrls, null);

     return jarloader;

}

Java代码


  • HBaseProxy.java       

  • public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)       

  •      throws IOException{       

  •         Configuration conf = new Configuration();       

  •         conf.addResource(new Path(hbase_conf));       

  •         config = new Configuration(conf);       

  •         htable = new HTable(config, tableName);       

  •         admin = new HBaseAdmin(config);       

  •         htable.setAutoFlush(autoflush);       

  •     }       

  • public void addPut(String field, String column, String encode) throws IOException {       

  •     try {       

  •             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   

  •                     field.getBytes(encode));       

  •         } catch (UnsupportedEncodingException e) {       

  •             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   

  •                     field.getBytes());       

  •         }       

  •                

  •     }       

  •     public void generatePut(String rowkey){       

  •         p = new Put(rowkey.getBytes());       

  •     }       

  •            

  •     public void putLine() throws IOException{       

  •         htable.put(p);       

  •     }  

HBaseProxy.java

public HBaseProxy(Stringhbase_conf, String tableName, boolean autoflush)

     throws IOException{

   Configuration conf = new Configuration();

   conf.addResource(new Path(hbase_conf));

   config = new Configuration(conf);

   htable = new HTable(config, tableName);

   admin = new HBaseAdmin(config);

   htable.setAutoFlush(autoflush);

  }

public void addPut(Stringfield, String column, String encode) throws IOException {

    try {

     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

        field.getBytes(encode));

   } catch (UnsupportedEncodingException e) {

     p.add(column.split(":")[0].getBytes(),column.split(":")[1].getBytes(),

        field.getBytes());

   }

   

  }

    public void generatePut(String rowkey){

   p = new Put(rowkey.getBytes());

  }

  

    public void putLine() throws IOException{

   htable.put(p);

  }


    总之,在同一个进程中加载多个classloader时一定要注意,classloader A所加载的对象是不能转换成classloader B的对象的,当然也不能使用。两个空间的相互调用只能用java的基本类型或是反射。


更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

关注超人学院java免费学习交流群:

© 著作权归作者所有

超人学院
粉丝 114
博文 335
码字总数 388917
作品 0
昌平
CTO(技术副总裁)
私信 提问
HDFS的Shell访问和Java API访问

Shell访问HDFS常用命令 1.shell 操作单个 HDFS 集群 下面列举出几个常用场景下的命令。 1、创建文件夹 HDFS 上的文件目录结构类似 Linux,根目录使用 "/" 表示。下面的命令将在 /middle 目录...

等待救赎
2015/10/19
249
0
Hadoop HA 是什么?架构?

Hadoop HA 是什么? Hadoop HA架构详解 1.1 HDFS HA背景 HDFS集群中NameNode 存在单点故障(SPOF)。对于只有一个NameNode的集群,如果NameNode机器出现意外情况,将导致整个集群无法使用,直...

weixin_39915358
2018/05/06
0
0
Hadoop Introduction

1 简介 HDFS分布式文件系统,即Hadoop Distributed Filesystem,是一个分布式文件系统,被设计部署在廉价硬件上。HDFS是一个高容错,被设计部署在廉价硬件上。HDFS提供高吞吐量访问数据,并且...

Yulong_
2017/08/09
20
0
Hadoop-2.6.0集群搭建(多机环境HDFS HA+YARN HA,推荐)

搭建 HDFS HA 和 ResourceManager HA 集群 下载Hadoop: http://hadoop.apache.org/releases.html 集群规划 HDFS HA说明 Hadoop 2.x中通常由两个NameNode组成,一个处于Active状态,另一个处...

翻船全靠浪
2016/04/25
66
0
hbase--在不同版本hdfs集群之间转移数据

很多人会有这样一个需求:将一个hdfs集群上的数据写入另一个hdfs集群所在的hbase数据库。通常情况下两个hdfs集群的版本差距并不大,这样的程序会很容易写。但有时会跨大版本。比如作者所在的...

超人学院
2016/01/18
129
0

没有更多内容

加载失败,请刷新页面

加载更多

川普给埃尔多安和内堪尼亚胡的信

任性 https://twitter.com/netanyahu/status/1186647558401253377 https://edition.cnn.com/2019/10/16/politics/trump-erdogan-letter/index.htm...

Iridium
16分钟前
7
0
golang-mysql-原生

db.go package mainimport ("database/sql""time"_ "github.com/go-sql-driver/mysql")var (db *sql.DBdsn = "root:123456@tcp(127.0.0.1:3306)/test?charset=u......

李琼涛
44分钟前
4
0
编程作业20191021092341

1编写一个程序,把用分钟表示的时间转换成用小时和分钟表示的时 间。使用#define或const创建一个表示60的符号常量或const变量。通过while 循环让用户重复输入值,直到用户输入小于或等于0的值...

1李嘉焘1
44分钟前
6
0
Netty整合Protobuffer

现在我们都知道,rpc的三要素:IO模型,线程模型,然后就是数据交互模型,即我们说的序列化和反序列化,现在我们来看一下压缩比率最大的二进制序列化方式——Protobuffer,而且该方式是可以跨...

算法之名
49分钟前
18
0
如何用C++实现栈

栈的定义 栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶,相对地,把另一端称为栈底。向一个栈插入新元素又称作进栈、入栈或压...

BWH_Steven
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部