文档章节

SparkSQLJDBC数据源实例

别寒
 别寒
发布于 2017/07/27 16:56
字数 437
阅读 7
收藏 0
package cn.hhb.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by dell on 2017/7/27.
 */
public class JDBCDataSource {
    public static void main(String[] args) {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("HiveDataSource").setMaster("local")
                .set("spark.testing.memory", "2147480000");

        // 创建javasparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 分别将mysql中两张表的数据加载为dataframe
        Map<String, String> options = new HashMap<String, String>();
        options.put("url","jdbc:mysql://spark1:3306/testdb");
        options.put("dbtable","student_infos");
        DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();

        options.put("dbtable","student_scores");
        DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load();

        // 将两个dataframe转换为javapairRDD,执行join操作
        JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD =
                studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(
                    row.getString(0),
                    Integer.valueOf(String.valueOf(row.getLong(1)))
                );
            }
        }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(
                        String.valueOf(row.get(0)),
                        Integer.valueOf(String.valueOf(row.get(1)))
                );
            }
        }));

        // 将javapairRDD转换为javaRDD<Row>
        JavaRDD<Row> studentRowsRDD = studentsRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
                return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
            }
        });

        // 过滤出分数大于80分的数据
        JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row row) throws Exception {
                if (row.getInt(2) > 80){
                    return null;
                }
                return false;
            }
        });

        // 转换为dataframe
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFields);

        // 使用动态构造的元数据,将rdd转换为dataframe
        DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);

        Row[] rows = studentsDF.collect();

        for (Row row : rows){
            System.out.println(row);
        }

        // 将dataFrame中的数据保存到mysql表中
        studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {

                String sql = "insert into good_student_infos values('"+row.getString(0)+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"')";

                Class.forName("com.mysql.jdbc.Driver");
                Connection conn = null;
                Statement stmt = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://spark1:3306/testdb",
                            "",
                            ""
                    );
                    stmt = conn.createStatement();
                    stmt.executeUpdate(sql);
                } catch (Exception e){
                    e.printStackTrace();
                } finally {
                    if (stmt != null){
                        stmt.close();
                    }
                    if (conn != null){
                        conn.close();
                    }
                }

            }
        });


        sc.close();
    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 271
码字总数 137605
作品 0
永州
程序员
jfinal-ext3 最新版本来袭:基于 JFinal 3.x

更新内容: 基于jfinal3.x; 扩展Model; 重新定义Generator的Teamplate; MappingKit文件在在应用启动时自动加载,不需要在手动MappingKit.mapping(arp); 升级conf/jf-app-cfg.conf配置; ...

Jobsz
07/18
0
0
Spring多数据源解决方案

在很多大型应用中都会对数据进行切分,并且采用多个数据库实例进行管理,这样可以有效提高系统的水平伸缩性。而这样的方案就会不同于常见的单一数据实例的 方案,这就要程序在运行时根据当时...

taote
2012/10/17
0
0
解密网易MySQL实例迁移高效完成背后的黑科技

作者:温正湖,网易杭研院资深工程师,负责网易云数据库平台核心开发和运维工作,对MySQL、MongoDB等数据库和Linux存储领域具有深入研究。 网易蜂巢团队:为企业提供专业容器云平台,深度整合...

温正湖
2016/09/30
0
0
InnoDB--------独立表空间平滑迁移

1. 背景 * InnoDB的表空间可以是共享的或独立的。如果是共享表空间,则所有的表空间都放在一个文件里:ibdata1,ibdata2..ibdataN,这种情况下,目前应该还没办法实现表空间的迁移,除非完全迁...

asd1123509133
2017/07/20
0
0
Spring 管理配置多个数据源

Spring动态配置多数据源,即在大型应用中对数据进行切分,并且采用多个数据库实例进行管理,这样可以有效提高系统的水平伸缩性。而这样的方案就会不同于常见的单一数据实例的方案,这就要程序...

鉴客
2011/10/28
26.8K
0

没有更多内容

加载失败,请刷新页面

加载更多

实战讲解高并发和秒杀抢购系统设计

互联网特别是电商平台,阿里双11秒杀、还有12306春运抢票、以及平时各种节假日抢购活动等,都是典型的高并发场景。 这类场景最大的特征就是活动周期短,瞬间流量大(高并发),大量的人短期涌...

xtof
21分钟前
0
0
代码质量管理平台-sonarqube

在工作中,往往开发的时候会不怎么注重代码质量的人很多,存在着很多的漏洞和隐患等问题,sonarqube可以进行代码质量的审核,而且十分的残酷。。。。。接下来我们说下怎么安装 进入官网下载:...

落叶清风
24分钟前
4
0
在Ubuntu安装和配置Sphinx

Ubuntu系统默认是配置有sphinx的,先检查一下,别多此一举。。。。。 在开始本指南之前,您需要: 一个Ubuntu 16.04服务器。 sudo的一个非root用户,您可以通过以下设置本教程 。 安装在服务...

阿锋zxf
33分钟前
0
0
Qt编写输入法V2018超级终结版

对于qt嵌入式linux开发人员来说,输入法一直是个鸡肋问题,要么不支持实体键盘同步,要么不能汉字输入,要么不支持网页输入等,这几年通过陆续接触大量的各种输入法应用场景客户,得到真实需...

飞扬青云
44分钟前
1
0
TypeScript基础入门之高级类型的多态的 this类型

转发 TypeScript基础入门之高级类型的多态的 this类型 高级类型 多态的this类型 多态的this类型表示的是某个包含类或接口的子类型。 这被称做F-bounded多态性。 它能很容易的表现连贯接口间的...

durban
50分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部