文档章节

hadoop从oracle中到数据

caolinsheng
 caolinsheng
发布于 2014/06/13 14:54
字数 650
阅读 163
收藏 1
点赞 0
评论 0

你丫的调了两天,终于调通了。

MapperOracle.java

package com.uucun.hadooporacle;

import java.io.IOException;

 

 

 

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 

public class MapperOracle extends Mapper<LongWritable, MyDbWriteble, Text, NullWritable>{
  final  Logger logger  =  LoggerFactory.getLogger(MapperOracle.class);

 @Override
 protected void map(LongWritable key, MyDbWriteble value,
   Context context)
   throws IOException, InterruptedException {
  
   
 context.write(new Text(value.toString()), NullWritable.get());
}
}

 

MyDbWriteble.java

package com.uucun.hadooporacle;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class MyDbWriteble implements DBWritable,WritableComparable {
 
private List list=new ArrayList();
 
class DownloadAd{
 
 private int id;

 private String session_id;

 private int resource_id;

 private Date visit_date;

  public int getId() {
  return id;
 }

 public void setId(int id) {
  this.id = id;
 }

 public String getSession_id() {
  return session_id;
 }

 public void setSession_id(String session_id) {
  this.session_id = session_id;
 }

 public int getResource_id() {
  return resource_id;
 }

 public void setResource_id(int resource_id) {
  this.resource_id = resource_id;
 }

 public Date getVisit_date() {
  return visit_date;
 }

 public void setVisit_date(Date visit_date) {
  this.visit_date = visit_date;
 }
 
}
 
 @Override
 public void write(PreparedStatement statement) throws SQLException {
  //statement.execute();
 }

 @Override
 public void readFields(ResultSet resultSet) throws SQLException {
  DownloadAd downloadAd;
  
  do {
    downloadAd=new DownloadAd();
   downloadAd.id=resultSet.getInt("id");
   downloadAd.session_id=resultSet.getString("session_id");
   downloadAd.resource_id=resultSet.getInt("resource_id");
   downloadAd.visit_date=resultSet.getDate("visit_date");
   list.add(downloadAd);
  } while (resultSet.next());

 }

 

 @Override
 public String toString() {
  
  StringBuilder builder=new StringBuilder();
  
  for (Iterator iterator = list.iterator(); iterator.hasNext();) {
   DownloadAd downloadAd = (DownloadAd) iterator.next();
   builder.append(downloadAd.session_id);
   builder.append(",");
   builder.append(downloadAd.resource_id);
   builder.append("\n");
  }
  builder.deleteCharAt(builder.lastIndexOf("\n"));
  
  return builder.toString();
 }

 @Override
 public void write(DataOutput out) throws IOException {
  // TODO Auto-generated method stub
  
  
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  
 }

 @Override
 public int compareTo(Object o) {
  // TODO Auto-generated method stub
  return 0;
 }

 public List getList() {
  return list;
 }

 public void setList(List list) {
  this.list = list;
 }

}

 

 

RunTool.java

package com.uucun.hadooporacle;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;

 

import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.uucun.util.hadooputil.PluginUtil;


 class RunTool extends PluginUtil{
  
 public static void main(String[] args) throws Throwable{
     System.setProperty("HADOOP_USER_NAME","hadoop");
        
   final JobConf conf = new JobConf(); 

   System.out.println(args[0]);
  
  // System.exit(0);
   
       conf.set("fs.default.name", "hdfs://nn:9000");
     //  conf.set("hadoop.job.user", "hadoop");
       conf.set("mapred.job.tracker", "nn:9001");
 
       conf.set("dfs.permissions","false");
      
//       conf.set("mapred.map.tasks", "1");
      
       conf.setInt("mapred.map.tasks", 4);
        
     //   DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver", "url");
        conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "t.id");
      //  conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, "visit_date>TO_TIMESTAMP('2013-05-23','YYYY-MM-DD HH24:MI:SS.FF') and visit_date<=TO_TIMESTAMP('2013-05-25','YYYY-MM-DD HH24:MI:SS.FF')");
        DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@192.168.4.73:1521:XE", "oracle", "oracle");
       String jarName = runonhadoop().toString();
         conf.setJar(jarName);
                          
          Job job = new Job( conf , "hadoopOracle" ); 
   
         job.setMapperClass( MapperOracle.class );
       //  job.setReducerClass(Reducer.class);
       job.setInputFormatClass(OracleDataDrivenDBInputFormat.class);
        
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(NullWritable.class);
        
         job.setOutputFormatClass(TextOutputFormat.class);
         job.setJarByClass(RunTool.class);
        
         String queryStr="select t.id,t.session_id,t.resource_id,t.visit_date from tbl_res_download_succeed_log t where  t.visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and t.visit_date<=to_date('2013-05-25','yyyy-MM-dd HH24:mi:ss') and $CONDITIONS";
     //  String queryStr="select id,session_id,resource_id,visit_date from tbl_res_download_succeed_log  where visit_date>TO_TIMESTAMP('2013-05-23','YYYY-MM-DD HH24:MI:SS.FF') and visit_date<=TO_TIMESTAMP('2013-05-25','YYYY-MM-DD HH24:MI:SS.FF')";
      //   String queryStr="select id,session_id,resource_id,visit_date from tbl_res_download_succeed_log";
          
         String boundStr="select min(id),max(id) from tbl_res_download_succeed_log  where visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and visit_date<=to_date('2013-05-25','yyyy-MM-dd HH24:mi:ss')";
        
         TextOutputFormat.setOutputPath(job, new Path(args[1]));
        
       OracleDataDrivenDBInputFormat.setInput(job, MyDbWriteble.class,queryStr ,boundStr);
      
      
       //  OracleDataDrivenDBInputFormat.setInput(job, MyDbWriteble.class,"tbl_res_download_succeed_log","visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and visit_date<to_date('2013-05-24','yyyy-MM-dd HH24:mi:ss')","visit_date","id","session_id","resource_id","visit_date");
    //  OracleDataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(), boundStr);
        
      FileSystem fs  =FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))) {
    fs.delete(new Path(args[1]),true);
    fs.close();
   }
        
         job.waitForCompletion(true);
       
 }

}

 

尤其要注意 $CONDITIONS,就是这个毛毛搞得我两天才调通,还有DBWritable接口中的readFields方法中,ResultSet类型的参数已经是记录的第一条,不能按常规的去next,否则会少一条记录。

© 著作权归作者所有

共有 人打赏支持
caolinsheng
粉丝 3
博文 9
码字总数 2532
作品 0
东城
甲骨文推出 Oracle Big Data 机

甲骨文全球大会,2011年10月4日—— 美国旧金山当地时间10月3日,甲骨文公司在2011甲骨文全球大会上宣布推出Oracle Big Data机(Oracle Big Data Appliance),以帮助客户整合及最大限度挖掘...

红薯 ⋅ 2011/10/04 ⋅ 7

SQL、NewSQL和NoSQL融合研究与实践

本文根据DBAplus社群第111期线上分享整理而成。 近几年,各类大数据技术迅猛发展,企业中数据处理量呈现几十到几百倍增长,数据类型也从传统结构化数据,延伸到实时流数据,以及各类非结构化...

朱祥磊 ⋅ 2017/07/06 ⋅ 0

RMDB与hadoop的实时整合

一、MySQL的Hadoop Applier 实现原理是:把hadoop作为MYSQL 的slave,实时把数据同步到hadoop,支持apache hadoop 通过分析MYSQL的binlog日志,在hdfs产生一个目录(同表名),所有的表记录都存...

cloud-coder ⋅ 2014/03/21 ⋅ 0

2015也过去一半了,Hadoop大事件盘点

2015也快过去一半了,Hadoop在过去一年的发展究竟如何,下面小象带你盘点一下2014Hadoop大事件! 2014年2月,Hadoop 2.3.0发布,新特性包括支持HDFS的混合存储分级,可以集中管理HDFS内存里的...

dongzhumao ⋅ 2015/05/19 ⋅ 0

Sqoop数据分析引擎安装与使用

Sqoop数据分析引擎安装与使用 ==>什么是Sqoop ? Sqoop 是一个开源的数据处理引擎,主要是通过 JDBC 为媒介, 在Hadoop(Hive)与 传统的关系型数据库(Oracle, MySQL,Postgres等)间进行数据的...

菜鸟的征程 ⋅ 01/12 ⋅ 0

sqoop数据迁移

1.Sqoop1与Sqoop2的优缺点 比较 Sqoop1 Sqoop2 架构 仅仅使用一个Sqoop客户端 引入了Sqoop server集中化管理connector,以及rest api,web,UI,并引入权限安全机制 部署 部署简单,安装需要...

脸大的都是胖纸 ⋅ 2015/04/22 ⋅ 0

将现有的SQL工作负载迁移至hadoop竟然如此简单!

想迁移现有的数据仓库到Hadoop平台?想在Hadoop上重用其他RDMBS的SQL技能?有何方案能帮助您解决这类问题,答案是IBM Big SQL。 Big SQL是IBM的SQL on Hadoop解决方案,它充分利用了IBM在RDB...

勿忘初心321 ⋅ 2016/08/25 ⋅ 0

Big Data Strategy & Big SQL

12月8日,IBM软件工程师胡泽远老师,在DBA+社群DB2用户群进行了一次主题为“Big Data Strategy & Big SQL”的线上分享。小编特别整理出其中精华内容,供大家学习交流。同时,也非常感谢胡泽远...

胡泽远 ⋅ 2015/12/10 ⋅ 0

Oracle 发布 NoSQL 数据库

Oracle 作为全球最大的关系型数据库提供商,在其产品链条中,也加入了NoSQL数据库这一环,而且这个新的数据库名字很霸气,就叫NoSQL Database,想起了当年新浪微博更换weibo.com 域名之时的一...

红薯 ⋅ 2011/10/06 ⋅ 14

从2015硅谷Strata大会看:数据库的发展现状与前景

Strata+Hadoop World(SHW)大会是全世界最大的大数据大会之一。 SHW大会为各种技术提供了深度交流的机会,还会看到最领先的大数据技术、最广泛的应用场景、最有趣的用例教学以及最全面的大数...

oschina ⋅ 2015/03/12 ⋅ 5

没有更多内容

加载失败,请刷新页面

加载更多

下一页

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 38分钟前 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

实验楼—MySQL基础课程-挑战3实验报告

按照文档要求创建数据库 sudo sercice mysql startwget http://labfile.oss.aliyuncs.com/courses/9/createdb2.sqlvim /home/shiyanlou/createdb2.sql#查看下数据库代码 代码创建了grade......

zhangjin7 ⋅ 昨天 ⋅ 0

一起读书《深入浅出nodejs》-node模块机制

node 模块机制 前言 说到node,就不免得提到JavaScript。JavaScript自诞生以来,经历了工具类库、组件库、前端框架、前端应用的变迁。通过无数开发人员的努力,JavaScript不断被类聚和抽象,...

小草先森 ⋅ 昨天 ⋅ 0

Java桌球小游戏

其实算不上一个游戏,就是两张图片,不停的重画,改变ball图片的位置。一个左右直线碰撞的,一个有角度碰撞的。 左右直线碰撞 package com.bjsxt.test;import javax.swing.*;import j...

森林之下 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部