文档章节

Hadoop,MapReduce操作Mysql

osDaniel
 osDaniel
发布于 2014/08/15 10:40
字数 1425
阅读 67
收藏 5

转自:http://www.cnblogs.com/liqizhou/archive/2012/05/16/2503458.html

前以前帖子介绍,怎样读取文本数据源和多个数据源的合并:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

这一个博客介绍一下MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码。话不说多了。

hadoop技术推出一首曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle 等几个数据库系统。

1. 从Mysql读出数据

Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。

  1. 在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。

  2. MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

 

 DBInputFormat 类中包含以下三个内置类

  1. protected class DBRecordReader implementsRecordReader<LongWritable, T>:用来从一张数据库表中读取一条条元组记录。

  2. 2.public static class NullDBWritable implements DBWritable,Writable:主要用来实现 DBWritable 接口。DBWritable接口要实现二个函数,第一是write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:

    public void write(PreparedStatement statement) throwsSQLException;public void readFields(ResultSet resultSet) throws SQLException;
  3. protected static class DBInputSplit implements InputSplit:主要用来描述输入元组集合的范围,包括 start 和 end 两个属性,start 用来表示第一条记录的索引号,end 表示最后一条记录的索引号.

下面对怎样使用 DBInputFormat 读取数据库记录进行详细的介绍,具体步骤如下:

  1. DBConfiguration.configureDB (JobConf job, StringdriverClass, String dbUrl, String userName, String passwd)函数,配置JDBC 驱动,数据源,以及数据库访问的用户名和密码。MySQL 数据库的 JDBC 的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。

  2. DBInputFormat.setInput(JobConf job, Class<?extends DBWritable> inputClass, String tableName, String conditions,String orderBy, String... fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,string tableName表名, conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConf job, Class<?extends DBWritable> inputClass, String inputQuery, StringinputCountQuery)。

  3. 编写MapReduce函数,包括Mapper 类、Reducer 类、输入输出文件格式等,然后调用JobClient.runJob(conf)。

上面讲了理论,下面举个例子:假设 MySQL 数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。

第一步要实现DBwrite和write数据接口。代码如下:

复制代码

        public class StudentRecord implements Writable, DBWritable{            int id;
            String name;
            String gender;
            String number;
            @Override        public void readFields(DataInput in) throws IOException {            // TODO Auto-generated method stub
            this.id = in.readInt();            this.gender = Text.readString(in);            this.name = in.readString();            this.number = in.readString();
        }
            @Override        public void write(DataOutput out) throws IOException {            // TODO Auto-generated method stub
            out.writeInt(this.id);
            Text.writeString(out,this.name);            out.writeInt(this.gender);            out.writeInt(this.number);
        }
            
            @Override        public void readFields(ResultSet result) throws SQLException {            // TODO Auto-generated method stub
            this.id = result.getInt(1);            this.name = result.getString(2);            this.gender = result.getString(3);            this.number = result.getString(4);
        }
            
            @Override        public void write(PreparedStatement stmt) throws SQLException{            // TODO Auto-generated method stub
            stmt.setInt(1, this.id);
            stmt.setString(2, this.name);
            stmt.setString(3, this.gender);
            stmt.setString(4, this.number);
        }
            @Override        public String toString() {            // TODO Auto-generated method stub
            return new String(this.name + " " + this.gender + " " +this.number);
        }

复制代码

第二步,实现Map和Reduce类

复制代码

    public class DBAccessMapper extends MapReduceBase implements
            Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
        @Override        public void map(LongWritable key, TeacherRecord value,
                OutputCollector<LongWritable, Text> collector, Reporter reporter)
                throws IOException {            // TODO Auto-generated method stub
            new collector.collect(new LongWritable(value.id), new Text(value
                    .toString()));
        }
    }

复制代码

第三步:主函数的实现,函数

复制代码

public class DBAccessReader {    public static void main(String[] args) throws IOException {
        JobConf conf = new JobConf(DBAccessReader.class);
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setInputFormat(DBInputFormat.class);
        FileOutputFormat.setOutputPath(conf, new Path("dboutput"));
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",        "jdbc:mysql://localhost/school","root","123456");
        String [] fields = {"id", "name", "gender", "number"};
        DBInputFormat.setInput(conf, StudentRecord.class,"Student",null "id", fields);
        conf.setMapperClass(DBAccessMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        JobClient.runJob(conf);
        }

}

复制代码

2.写数据

 往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。

  1. DBOutFormat: 提供数据库写入接口。

  2. DBRecordWriter:提供向数据库中写入的数据记录的接口。

  3. DBConfiguration:提供数据库配置和创建链接的接口。

DBOutFormat提供一个静态方法setOutput(job,String table,String ...filedNames);该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为

复制代码

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        JobConf conf = new JobConf();
        conf.setOutputFormat(DBOutputFormat.class);
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",                "jdbc:mysql://localhost/school","root","123456");
        DBOutputFormat.setOutput(conf,"Student", 456, "liqizhou", "man", "20004154578");
        JobClient.runJob(conf);

复制代码


本文转载自:http://www.cnblogs.com/liqizhou/archive/2012/05/16/2503458.html

osDaniel
粉丝 6
博文 37
码字总数 17120
作品 0
广州
私信 提问
hadoop--Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别

Pig 一种操作hadoop的轻量级脚本语言,最初又雅虎公司推出,不过现在正在走下坡路了。当初雅虎自己慢慢退出pig的维护之后将它开源贡献到开源社区由所有爱好者来维护。不过现在还是有些公司在...

寒月谷
2018/08/02
0
0
【电子书】Hadoop实战手册 (样章第一章)

Hadoop实战手册 [美] Jonathan R. Owens,Jon Lentz,Brian Femiano 著; 傅杰,赵磊,卢学裕 译 内容简介   这是一本Hadoop实用手册,主要针对实际问题给出相应的解决方案。《Hadoop实战手...

dwf07223
2018/06/28
0
0
大数据经典学习路线(及供参考)之 一

1.Linux基础和分布式集群技术 学完此阶段可掌握的核心能力: 熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构; 学完此...

柯西带你学编程
2018/05/22
0
0
Hadoop的mapreduce的简单用法

 Mapreduce初析   Mapreduce是一个计算框架,既然是做计算的框架,那么表现形式就是有个输入(input),mapreduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(outpu...

魔法王者安琪拉
2018/08/23
29
0
Hadoop 中利用 mapreduce 读写 mysql 数据

有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特...

大数据之路
2013/07/29
10.3K
1

没有更多内容

加载失败,请刷新页面

加载更多

反编译9.png图片还原

本文链接:https://blog.csdn.net/a1140778530/article/details/10528507 经常反编译apk文件找资源,9.png的文件处理起来很麻烦。 最近使用Ant自动编译打包app时,从别处搜罗来的9.png文件导...

shzwork
10分钟前
2
0
Shell脚本应用 – for、while循环语句

一、for循环语句 在实际工作中,经常会遇到某项任务需要多次执行的情况,而每次执行时仅仅是处理的对象不一样,其他命令相同。例如:根据通讯录中的姓名列表创建系统账号等情况。 当面对各种...

linux-tao
11分钟前
2
0
RPA风潮下企业财务工作模式的变革

RPA(机器人流程自动化)在财务领域的应用,正给企业财务带来前所未有的改变。 前RPA时代,财务领域面临的痛点 在RPA机器人应用之前,企业财务工作进程的推进,主要通过财务人员人工操作或信...

UiBot
16分钟前
3
0
Hive之命令行修改表注释

最近遇到一个需求,在不重建表的情况下,修改表的注释,hive有没有类似关系型数据库的SQL命令来修改呢,找了下,亲测有效,如下List-1 List-1 hive>use your_schemahvie>ALTER TABLE tabl...

克虏伯
16分钟前
3
0
是什么,它的作用是什么

在HTML文档的首部往往会有这么一句话<!DOCTYPE html>,许多时候我们忽视了它的存在,它实际上是一个声明,告诉浏览器用哪种HTML版本的规范来解读HTML文档。 尽管我们不给出这句声明浏览器照样...

前端老手
22分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部