文档章节

SparkSQLJDBC数据源实例

别寒
 别寒
发布于 2017/07/27 16:56
字数 437
阅读 8
收藏 0
package cn.hhb.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by dell on 2017/7/27.
 */
public class JDBCDataSource {
    public static void main(String[] args) {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("HiveDataSource").setMaster("local")
                .set("spark.testing.memory", "2147480000");

        // 创建javasparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 分别将mysql中两张表的数据加载为dataframe
        Map<String, String> options = new HashMap<String, String>();
        options.put("url","jdbc:mysql://spark1:3306/testdb");
        options.put("dbtable","student_infos");
        DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();

        options.put("dbtable","student_scores");
        DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load();

        // 将两个dataframe转换为javapairRDD,执行join操作
        JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD =
                studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(
                    row.getString(0),
                    Integer.valueOf(String.valueOf(row.getLong(1)))
                );
            }
        }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(
                        String.valueOf(row.get(0)),
                        Integer.valueOf(String.valueOf(row.get(1)))
                );
            }
        }));

        // 将javapairRDD转换为javaRDD<Row>
        JavaRDD<Row> studentRowsRDD = studentsRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
                return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
            }
        });

        // 过滤出分数大于80分的数据
        JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row row) throws Exception {
                if (row.getInt(2) > 80){
                    return null;
                }
                return false;
            }
        });

        // 转换为dataframe
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFields);

        // 使用动态构造的元数据,将rdd转换为dataframe
        DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);

        Row[] rows = studentsDF.collect();

        for (Row row : rows){
            System.out.println(row);
        }

        // 将dataFrame中的数据保存到mysql表中
        studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {

                String sql = "insert into good_student_infos values('"+row.getString(0)+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"')";

                Class.forName("com.mysql.jdbc.Driver");
                Connection conn = null;
                Statement stmt = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://spark1:3306/testdb",
                            "",
                            ""
                    );
                    stmt = conn.createStatement();
                    stmt.executeUpdate(sql);
                } catch (Exception e){
                    e.printStackTrace();
                } finally {
                    if (stmt != null){
                        stmt.close();
                    }
                    if (conn != null){
                        conn.close();
                    }
                }

            }
        });


        sc.close();
    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 29
博文 271
码字总数 137605
作品 0
永州
程序员
私信 提问
jfinal-ext3最新版本来袭

介绍 jfinal-ext3,源自jfinal-ext,jfinal-ext2,基于jfinal3.x,扩展了很多特性。 使用 特性说明 配置说明 主要就是conf/jf-app-cfg.conf(以下简称”配置文件“)的配置说明 配置文件可以...

Jobsz
2018/07/18
0
0
Spring多数据源解决方案

在很多大型应用中都会对数据进行切分,并且采用多个数据库实例进行管理,这样可以有效提高系统的水平伸缩性。而这样的方案就会不同于常见的单一数据实例的 方案,这就要程序在运行时根据当时...

taote
2012/10/17
0
0
jfinal-ext3 最新版本来袭:基于 JFinal 3.x

更新内容: 基于jfinal3.x; 扩展Model; 重新定义Generator的Teamplate; MappingKit文件在在应用启动时自动加载,不需要在手动MappingKit.mapping(arp); 升级conf/jf-app-cfg.conf配置; ...

Jobsz
2018/07/18
1K
2
网易云 MySQL实例迁移的技术实现

欢迎访问网易云社区,了解更多网易技术产品运营经验。 我们把数据库里部分或全部 Schema和数据迁移到另一个实例的行为称为实例迁移,将导出数据的实例称为源实例,导入数据的实例称为目标实例...

jessicaiu
2018/11/27
0
0
解密网易MySQL实例迁移高效完成背后的黑科技

作者:温正湖,网易杭研院资深工程师,负责网易云数据库平台核心开发和运维工作,对MySQL、MongoDB等数据库和Linux存储领域具有深入研究。 网易蜂巢团队:为企业提供专业容器云平台,深度整合...

温正湖
2016/09/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

element-ui之el-collapse-transition(折叠展开动画)源码解析学习

项目中发现同事使用了element-ui的el-collapse-transition来做折叠展开效果,打开源码看了下发现挺有意思,来解析学习一番。 el-collapse-transition的引入方式 // fade/zoom 等import 'e...

学霸猫
6分钟前
0
0
解释器模式_实战

前言 解释器模式是什么?这个设计模式其实比较冷门,不太会解释,用例子说明把。解释器模式一般用在sql,xml,json解析等场景。比如说你有一个json对象,你要获取这个对象中任意一个节点的值。...

grace_233
29分钟前
1
0
告别2018

今天中午从喵喵家回来之后,倒头就睡到下午4点了。可能是之前透支的身体,在我放松下来后,开始觉得疲惫了,所以最近估计会进入嗜睡期。醒来之后,拿了包花生,开了瓶低糖菊花茶,听着网易云...

七木网络科技
35分钟前
4
0
MySql数据库分表分区实践

1. 背景 —— 公司物联网项目 海量设备通过物联网服务接入云端,设备每30s上报一次自身数据(以下称为动态数据)。 物联网服务将设备上报的数据转发给数据处理网关,由数据入库网关执行批量入...

吴伟祥
48分钟前
2
0
大表关联走hash优化

大表关联走hash? 案例: ---- 反正我执行过1个多小时,没有跑完 SELECT a.id AS order_id ,b.s_id AS bill_id, d.id AS sub_order_id, d.deal_oper_id FROM EM_ORDER PARTITION(EM_ORDER_20......

hnairdb
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部