文档章节

Hadoop读写mysql

o
 osc_ogi0qclx
发布于 2019/08/22 16:43
字数 609
阅读 0
收藏 0

精选30+云产品,助力企业轻松上云!>>>

需求

两张表,一张click表记录某广告某一天的点击量,另一张total_click表记录某广告的总点击量

 

建表

CREATE TABLE `click` (
  `id` int(20) NOT NULL AUTO_INCREMENT,
  `ad_id` int(20) DEFAULT NULL, -- 广告ID
  `click_num` int(30) DEFAULT NULL, -- 某天的点击数量
  `day` date,
  PRIMARY KEY (`id`)
);

CREATE TABLE `total_click` (
  `id` int(20) NOT NULL AUTO_INCREMENT,
  `ad_id` int(20) DEFAULT NULL, -- 广告ID
  `total_click_num` int(50) DEFAULT NULL, -- 总点击数量
  PRIMARY KEY (`id`)
)

 

pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.45</version>
        </dependency>
    </dependencies>

 

代码

自定义类

Writable是为了与MapReduce进行对接,而DBWritable是为了与MySQL进行对接。

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;


public class MyDBWritable implements DBWritable, Writable {
    private String ad_id;
    private int click_num;
    private int total_click_num;


    public MyDBWritable(){

    }
    public MyDBWritable(String name, int age) {
        this.ad_id = name;
        this.click_num = age;
        this.total_click_num = total_click_num;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(ad_id);
        out.writeInt(click_num);
        out.writeInt(total_click_num);
    }

    //写数据的过程
    public void write(PreparedStatement statement) throws SQLException {
        //要和SQL_Run类的DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num")语句里字段的顺序保持一致
        statement.setString(1,ad_id);
        statement.setInt(2, total_click_num);
    }

    //读数据的过程
    public void readFields(ResultSet resultSet) throws SQLException {
        ad_id =resultSet.getString(1);
        click_num =resultSet.getInt(2);
    }

    public void readFields(DataInput in) throws IOException {
        ad_id =in.readUTF();
        click_num =in.readInt();
        total_click_num =in.readInt();
    }

    public String getAd_id() {
        return ad_id;
    }

    public void setAd_id(String ad_id) {
        this.ad_id = ad_id;
    }

    public int getClick_num() {
        return click_num;
    }

    public void setClick_num(int click_num) {
        this.click_num = click_num;
    }

    public int getTotal_click_num() {
        return total_click_num;
    }

    public void setTotal_click_num(int total_click_num) {
        this.total_click_num = total_click_num;
    }

}

 

Map

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class SQLMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable> {
    @Override
    protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
        context.write(new Text(value.getAd_id()),new IntWritable(value.getClick_num()));
    }

}

 

Reduce

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class SQLReducer extends Reducer<Text,IntWritable,MyDBWritable,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int total = 0;
        for(IntWritable i :values) {
            total+= i.get();
        }
        MyDBWritable myDBWritable = new MyDBWritable();
        myDBWritable.setAd_id(key.toString());
        myDBWritable.setTotal_click_num(total);
        context.write(myDBWritable,NullWritable.get());
    }
}

 

App

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;


public class SQL_Run {
    public static void main(String[] args) throws Exception {

        Configuration conf=new Configuration();

        //假如是本地测试,需要设置fs.defaultFS
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);


        FileSystem fs=FileSystem.get(conf);

        job.setJobName("SQL_TEST");
        job.setJarByClass(SQL_Run.class);
        job.setMapperClass(SQLMapper.class);
        job.setReducerClass(SQLReducer.class);

        //配置数据库信息
        String driveclass="com.mysql.jdbc.Driver";
        String url="jdbc:mysql://192.168.0.8:3306/bigdata";
        String username="root";
        String password="123456";
        DBConfiguration.configureDB(job.getConfiguration(),driveclass,url,username,password);

        //设置数据库输入
        //需要通过总的记录数来计算切片
        DBInputFormat.setInput(job,MyDBWritable.class,"select ad_id,click_num from click","select count(id) from click");

        //设置数据库输出  //total_click是表名,后面参数是字段值(可以多个)
        DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num");

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(MyDBWritable.class);
        job.setOutputValueClass(NullWritable.class);

        job.waitForCompletion(true);
    }
}

 

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
Linux中基于hadoop安装hive(SUSE12+hadoop2.6.4+hive2.3.4)

说明:安装hive前提是要先安装hadoop集群,并且hive只需要再hadoop的namenode节点集群里安装即可(需要再所有namenode上安装),可以不在datanode节点的机器上安装。另外还需要说明的是,虽然修...

osc_s8kmhvea
2019/03/18
3
0
Difeye 1.1.5 版本发布 增加gearman读写分离调度作业

Difeye 1.1.5 版本---更新日期:2013-05-28 Difeye是一款超轻量级PHP框架,主要特点有: ◆数据库连接做自动主从读写分离配置,适合单机和分布式站点部署; ◆支持Smarty模板机制,可灵活配置...

mathcn123456
2013/05/28
332
0
轻量级PHP框架--Difeye

Difeye 1.1.4 版本---更新日期:2013-03-1 Difeye是一款超轻量级PHP框架,主要特点有: ◆数据库连接做自动主从读写分离配置,适合单机和分布式站点部署; ◆支持Smarty模板机制,可灵活配置...

匿名
2011/03/17
2.1K
0
异构数据源海量数据交换工具-Taobao DataX 下载和使用

DataX介绍 DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。 目前成熟的数据导入导出工具比较多,但...

jxpxwh
2015/06/09
3
0
为什么需要MongoDB

为什么需要MongoDB MongoDB与MySQL的区别 关于MongoDB与MySQL的区别可以参考网上关于NoSQL与MySQL的区别,以下是找到的网上的关于两者区别的截图: 总体上讲: 由于MongoDB独特的数据处理方式...

osc_5g1gl9wp
2019/10/17
1
0

没有更多内容

加载失败,请刷新页面

加载更多

浅谈对python pandas中 inplace 参数的理解

这篇文章主要介绍了对python pandas中 inplace 参数的理解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 pandas 中 inplace 参数在很多函数中都会有,它的作用是:是否...

Linux就该这么学
6分钟前
0
0
C++ 从基本数据类型说起

前言 int 在32位和64位操作系统,都是四个字节长度。为了能编写一个在32位和64位操作系统都能稳定运行的程序,建议采用std::int32_t 或者std::int64_t指定数据类型。*与long随操作系统子长变...

osc_sxdofc9c
6分钟前
0
0
游戏音乐的作用以及起源

游戏音乐是由特殊的音乐、语言符号、美学符号组成,在电子游戏的发展下,游戏音乐越来越成熟,游戏音乐与美术相融合,能够带给玩家视觉与声音的感官冲击,形成游戏音乐所具有的独特的审美效果...

奇亿音乐
7分钟前
0
0
2020,最新Model的设计-APP重构之路

很多的app使用MVC设计模式来将“用户交互”与“数据和逻辑”分开,而model其中一个重要作用就是持久化。下文中设计的Model可能不是一个完美的,扩展性强的model范例,但在我需要重构的app中,...

osc_mfzkzkxi
7分钟前
0
0
面对职业瓶颈,iOS 开发人员应该如何突破?

我们经常看到 iOS 开发人员(各种能力水平都有)的一些问题,咨询有关专业和财务发展方面的建议。 这些问题有一个共同点:前面都会说“我现在遇到了职业困境”,然后会问一些诸如“我是否应该...

osc_gfpedeca
8分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部