文档章节

hadoop从oracle中到数据

caolinsheng
 caolinsheng
发布于 2014/06/13 14:54
字数 650
阅读 163
收藏 1

你丫的调了两天,终于调通了。

MapperOracle.java

package com.uucun.hadooporacle;

import java.io.IOException;

 

 

 

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 

public class MapperOracle extends Mapper<LongWritable, MyDbWriteble, Text, NullWritable>{
  final  Logger logger  =  LoggerFactory.getLogger(MapperOracle.class);

 @Override
 protected void map(LongWritable key, MyDbWriteble value,
   Context context)
   throws IOException, InterruptedException {
  
   
 context.write(new Text(value.toString()), NullWritable.get());
}
}

 

MyDbWriteble.java

package com.uucun.hadooporacle;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class MyDbWriteble implements DBWritable,WritableComparable {
 
private List list=new ArrayList();
 
class DownloadAd{
 
 private int id;

 private String session_id;

 private int resource_id;

 private Date visit_date;

  public int getId() {
  return id;
 }

 public void setId(int id) {
  this.id = id;
 }

 public String getSession_id() {
  return session_id;
 }

 public void setSession_id(String session_id) {
  this.session_id = session_id;
 }

 public int getResource_id() {
  return resource_id;
 }

 public void setResource_id(int resource_id) {
  this.resource_id = resource_id;
 }

 public Date getVisit_date() {
  return visit_date;
 }

 public void setVisit_date(Date visit_date) {
  this.visit_date = visit_date;
 }
 
}
 
 @Override
 public void write(PreparedStatement statement) throws SQLException {
  //statement.execute();
 }

 @Override
 public void readFields(ResultSet resultSet) throws SQLException {
  DownloadAd downloadAd;
  
  do {
    downloadAd=new DownloadAd();
   downloadAd.id=resultSet.getInt("id");
   downloadAd.session_id=resultSet.getString("session_id");
   downloadAd.resource_id=resultSet.getInt("resource_id");
   downloadAd.visit_date=resultSet.getDate("visit_date");
   list.add(downloadAd);
  } while (resultSet.next());

 }

 

 @Override
 public String toString() {
  
  StringBuilder builder=new StringBuilder();
  
  for (Iterator iterator = list.iterator(); iterator.hasNext();) {
   DownloadAd downloadAd = (DownloadAd) iterator.next();
   builder.append(downloadAd.session_id);
   builder.append(",");
   builder.append(downloadAd.resource_id);
   builder.append("\n");
  }
  builder.deleteCharAt(builder.lastIndexOf("\n"));
  
  return builder.toString();
 }

 @Override
 public void write(DataOutput out) throws IOException {
  // TODO Auto-generated method stub
  
  
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  
 }

 @Override
 public int compareTo(Object o) {
  // TODO Auto-generated method stub
  return 0;
 }

 public List getList() {
  return list;
 }

 public void setList(List list) {
  this.list = list;
 }

}

 

 

RunTool.java

package com.uucun.hadooporacle;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;

 

import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.uucun.util.hadooputil.PluginUtil;


 class RunTool extends PluginUtil{
  
 public static void main(String[] args) throws Throwable{
     System.setProperty("HADOOP_USER_NAME","hadoop");
        
   final JobConf conf = new JobConf(); 

   System.out.println(args[0]);
  
  // System.exit(0);
   
       conf.set("fs.default.name", "hdfs://nn:9000");
     //  conf.set("hadoop.job.user", "hadoop");
       conf.set("mapred.job.tracker", "nn:9001");
 
       conf.set("dfs.permissions","false");
      
//       conf.set("mapred.map.tasks", "1");
      
       conf.setInt("mapred.map.tasks", 4);
        
     //   DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver", "url");
        conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "t.id");
      //  conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, "visit_date>TO_TIMESTAMP('2013-05-23','YYYY-MM-DD HH24:MI:SS.FF') and visit_date<=TO_TIMESTAMP('2013-05-25','YYYY-MM-DD HH24:MI:SS.FF')");
        DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@192.168.4.73:1521:XE", "oracle", "oracle");
       String jarName = runonhadoop().toString();
         conf.setJar(jarName);
                          
          Job job = new Job( conf , "hadoopOracle" ); 
   
         job.setMapperClass( MapperOracle.class );
       //  job.setReducerClass(Reducer.class);
       job.setInputFormatClass(OracleDataDrivenDBInputFormat.class);
        
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(NullWritable.class);
        
         job.setOutputFormatClass(TextOutputFormat.class);
         job.setJarByClass(RunTool.class);
        
         String queryStr="select t.id,t.session_id,t.resource_id,t.visit_date from tbl_res_download_succeed_log t where  t.visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and t.visit_date<=to_date('2013-05-25','yyyy-MM-dd HH24:mi:ss') and $CONDITIONS";
     //  String queryStr="select id,session_id,resource_id,visit_date from tbl_res_download_succeed_log  where visit_date>TO_TIMESTAMP('2013-05-23','YYYY-MM-DD HH24:MI:SS.FF') and visit_date<=TO_TIMESTAMP('2013-05-25','YYYY-MM-DD HH24:MI:SS.FF')";
      //   String queryStr="select id,session_id,resource_id,visit_date from tbl_res_download_succeed_log";
          
         String boundStr="select min(id),max(id) from tbl_res_download_succeed_log  where visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and visit_date<=to_date('2013-05-25','yyyy-MM-dd HH24:mi:ss')";
        
         TextOutputFormat.setOutputPath(job, new Path(args[1]));
        
       OracleDataDrivenDBInputFormat.setInput(job, MyDbWriteble.class,queryStr ,boundStr);
      
      
       //  OracleDataDrivenDBInputFormat.setInput(job, MyDbWriteble.class,"tbl_res_download_succeed_log","visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and visit_date<to_date('2013-05-24','yyyy-MM-dd HH24:mi:ss')","visit_date","id","session_id","resource_id","visit_date");
    //  OracleDataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(), boundStr);
        
      FileSystem fs  =FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))) {
    fs.delete(new Path(args[1]),true);
    fs.close();
   }
        
         job.waitForCompletion(true);
       
 }

}

 

尤其要注意 $CONDITIONS,就是这个毛毛搞得我两天才调通,还有DBWritable接口中的readFields方法中,ResultSet类型的参数已经是记录的第一条,不能按常规的去next,否则会少一条记录。

© 著作权归作者所有

共有 人打赏支持
caolinsheng
粉丝 3
博文 9
码字总数 2532
作品 0
东城
甲骨文推出 Oracle Big Data 机

甲骨文全球大会,2011年10月4日—— 美国旧金山当地时间10月3日,甲骨文公司在2011甲骨文全球大会上宣布推出Oracle Big Data机(Oracle Big Data Appliance),以帮助客户整合及最大限度挖掘...

红薯
2011/10/04
2.9K
7
SQL、NewSQL和NoSQL融合研究与实践

本文根据DBAplus社群第111期线上分享整理而成。 近几年,各类大数据技术迅猛发展,企业中数据处理量呈现几十到几百倍增长,数据类型也从传统结构化数据,延伸到实时流数据,以及各类非结构化...

朱祥磊
2017/07/06
0
0
2015也过去一半了,Hadoop大事件盘点

2015也快过去一半了,Hadoop在过去一年的发展究竟如何,下面小象带你盘点一下2014Hadoop大事件! 2014年2月,Hadoop 2.3.0发布,新特性包括支持HDFS的混合存储分级,可以集中管理HDFS内存里的...

dongzhumao
2015/05/19
0
0
Sqoop数据分析引擎安装与使用

Sqoop数据分析引擎安装与使用 ==>什么是Sqoop ? Sqoop 是一个开源的数据处理引擎,主要是通过 JDBC 为媒介, 在Hadoop(Hive)与 传统的关系型数据库(Oracle, MySQL,Postgres等)间进行数据的...

菜鸟的征程
01/12
0
0
RMDB与hadoop的实时整合

一、MySQL的Hadoop Applier 实现原理是:把hadoop作为MYSQL 的slave,实时把数据同步到hadoop,支持apache hadoop 通过分析MYSQL的binlog日志,在hdfs产生一个目录(同表名),所有的表记录都存...

cloud-coder
2014/03/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

win32截屏并rgb24转yuv420

//最终f的内存布局为BGRA格式,需要保证buf长度足够(>w*h*4)void ScreenCap(void* buf, int w, int h){ HWND hDesk = GetDesktopWindow(); HDC hScreen = GetDC(hDesk); ......

styleman
今天
1
0
php输出mysql取出的中文为??的问题

解决方法: @ $db=new mysqli(DB_HOST,DB_USER,DB_PASSWORD,DB_DB); $db->query("set names utf8");//添加此语句,可以解决问题...

Aomo
今天
1
2
白话SpringCloud | 第五章:服务容错保护(Hystrix)

前言 前一章节,我们知道了如何利用RestTemplate+Ribbon和Feign的方式进行服务的调用。在微服务架构中,一个服务可能会调用很多的其他微服务应用,虽然做了多集群部署,但可能还会存在诸如网...

oKong
今天
2
0
【解惑】领略Java内部类的“内部”

内部类有两种情况: (1) 在类中定义一个类(私有内部类,静态内部类) (2) 在方法中定义一个类(局部内部类,匿名内部类) 1、私有内部类 —— 在方法之间定义的内部类,非静态 我们首先看看类中...

偶尔诗文
今天
1
0
sqlserver 2008 r2 直接下载地址(百度云)

之前下载的sqlserver2008发现不能附加,就卸载了,重新找到了sqlserver2008R2的百度云资源 卸载sqlserver2008还是有点麻烦,不过就是需要删除注册表中的信息 自己来回卸载了3次终于重装sqlse...

dillonxiao
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部