文档章节

SparkSQLJDBC数据源实例

别寒
 别寒
发布于 2017/07/27 16:56
字数 437
阅读 7
收藏 0
点赞 0
评论 0
package cn.hhb.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by dell on 2017/7/27.
 */
public class JDBCDataSource {
    public static void main(String[] args) {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("HiveDataSource").setMaster("local")
                .set("spark.testing.memory", "2147480000");

        // 创建javasparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 分别将mysql中两张表的数据加载为dataframe
        Map<String, String> options = new HashMap<String, String>();
        options.put("url","jdbc:mysql://spark1:3306/testdb");
        options.put("dbtable","student_infos");
        DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();

        options.put("dbtable","student_scores");
        DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load();

        // 将两个dataframe转换为javapairRDD,执行join操作
        JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD =
                studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(
                    row.getString(0),
                    Integer.valueOf(String.valueOf(row.getLong(1)))
                );
            }
        }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(
                        String.valueOf(row.get(0)),
                        Integer.valueOf(String.valueOf(row.get(1)))
                );
            }
        }));

        // 将javapairRDD转换为javaRDD<Row>
        JavaRDD<Row> studentRowsRDD = studentsRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
                return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
            }
        });

        // 过滤出分数大于80分的数据
        JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row row) throws Exception {
                if (row.getInt(2) > 80){
                    return null;
                }
                return false;
            }
        });

        // 转换为dataframe
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFields);

        // 使用动态构造的元数据,将rdd转换为dataframe
        DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);

        Row[] rows = studentsDF.collect();

        for (Row row : rows){
            System.out.println(row);
        }

        // 将dataFrame中的数据保存到mysql表中
        studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {

                String sql = "insert into good_student_infos values('"+row.getString(0)+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"','"+Integer.valueOf(String.valueOf(row.getLong(1)))+"')";

                Class.forName("com.mysql.jdbc.Driver");
                Connection conn = null;
                Statement stmt = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://spark1:3306/testdb",
                            "",
                            ""
                    );
                    stmt = conn.createStatement();
                    stmt.executeUpdate(sql);
                } catch (Exception e){
                    e.printStackTrace();
                } finally {
                    if (stmt != null){
                        stmt.close();
                    }
                    if (conn != null){
                        conn.close();
                    }
                }

            }
        });


        sc.close();
    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 267
码字总数 135817
作品 0
永州
程序员
Spring多数据源解决方案

在很多大型应用中都会对数据进行切分,并且采用多个数据库实例进行管理,这样可以有效提高系统的水平伸缩性。而这样的方案就会不同于常见的单一数据实例的 方案,这就要程序在运行时根据当时...

taote ⋅ 2012/10/17 ⋅ 0

InnoDB--------独立表空间平滑迁移

1. 背景 * InnoDB的表空间可以是共享的或独立的。如果是共享表空间,则所有的表空间都放在一个文件里:ibdata1,ibdata2..ibdataN,这种情况下,目前应该还没办法实现表空间的迁移,除非完全迁...

asd1123509133 ⋅ 2017/07/20 ⋅ 0

Spring 管理配置多个数据源

Spring动态配置多数据源,即在大型应用中对数据进行切分,并且采用多个数据库实例进行管理,这样可以有效提高系统的水平伸缩性。而这样的方案就会不同于常见的单一数据实例的方案,这就要程序...

鉴客 ⋅ 2011/10/28 ⋅ 0

Windows 8实例教程系列 - 数据绑定基础实例

数据绑定是WPF,Silverlight以及Windows Phone应用开发中最为常用的开发技术,在基于XAML的Windows Store应用开发中,数据绑定是其开发特性之一,本文将讨论Windows 8应用开发数据绑定的使用...

冷秋寒 ⋅ 2013/03/25 ⋅ 0

druid如何配置多数据源(多实例)访问

@wenshao 你好,想跟你请教个问题:druid如何配置数据源多实例访问? 即数据库的多个实例,类似weblogic的多数据源。再者这个问题能配置JNDI吗?也是类似weblogic的多数据源...

书风 ⋅ 2015/03/31 ⋅ 2

oracle数据导入sql server 2008方法

随着社会的发展,各被审计单位的数据量也不断增加。过去常用的小型数据库已经不能适应时代的发展,大型数据库逐渐普遍起来,尤其以oracle数据为代表的大型数据库更是被众多单位、公司和企业采...

donny945 ⋅ 2014/02/14 ⋅ 0

了解rxjs

rxjs rxjs(Reactive Extensions for JavaScript)是Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据、状态、事件等转成可被观察的序列(Observable Sequence),然后订阅序列中对...

bestvist ⋅ 2017/11/19 ⋅ 0

springboot mybatis 读写分离集成

背景 在实际开发中,我们一个项目可能会用到多个数据库,通常一个数据库对应一个数据源。本示例,通过两种方式实现多数据源切换 1)手动切换 2)使用注解进行切换 本文使用的是第二种方式,使...

文心丶雕龙 ⋅ 03/02 ⋅ 0

web工作流管理系统开发之十 数据库连接及事务设定

为了方便设置数据库连接和事务的一致,将所有数据库连接信息统一设置在fcconfig.xml文件中; fcconfig.xml的内容:

长平狐 ⋅ 2012/10/11 ⋅ 0

PropertyPlaceholderConfigurer 无效的问题

问题描述: 这两天自己配置SPring+MyBatis遇到了个问题,搞了一天才搞定。就是PropertyPlaceholderConfigurer加载配置之后在DatasSource中的使用无效的问题。 以下是配置 <bean id="property...

Lubby ⋅ 2015/12/08 ⋅ 1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

从 Confluence 5.3 及其早期版本中恢复空间

如果你需要从 Confluence 5.3 及其早期版本中的导出文件恢复到晚于 Confluence 5.3 的 Confluence 中的话。你可以使用临时的 Confluence 空间安装,然后将这个 Confluence 安装实例升级到你现...

honeymose ⋅ 今天 ⋅ 0

用ZBLOG2.3博客写读书笔记网站能创造今日头条的辉煌吗?

最近两年,著名的自媒体网站今日头条可以说是火得一塌糊涂,虽然从目前来看也遇到了一点瓶颈,毕竟发展到了一定的规模,继续增长就更加难了,但如今的今日头条规模和流量已经非常大了。 我们...

原创小博客 ⋅ 今天 ⋅ 0

MyBatis四大核心概念

本文讲解 MyBatis 四大核心概念(SqlSessionFactoryBuilder、SqlSessionFactory、SqlSession、Mapper)。 MyBatis 作为互联网数据库映射工具界的“上古神器”,训有四大“神兽”,谓之:Sql...

waylau ⋅ 今天 ⋅ 0

以太坊java开发包web3j简介

web3j(org.web3j)是Java版本的以太坊JSON RPC接口协议封装实现,如果需要将你的Java应用或安卓应用接入以太坊,或者希望用java开发一个钱包应用,那么用web3j就对了。 web3j的功能相当完整...

汇智网教程 ⋅ 今天 ⋅ 0

2个线程交替打印100以内的数字

重点提示: 线程的本质上只是一个壳子,真正的逻辑其实在“竞态条件”中。 举个例子,比如本题中的打印,那么在竞态条件中,我只需要一个方法即可; 假如我的需求是2个线程,一个+1,一个-1,...

Germmy ⋅ 今天 ⋅ 0

Springboot2 之 Spring Data Redis 实现消息队列——发布/订阅模式

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式,这里利用redis消息“发布/订阅”来简单实现订阅者模式。 实现之前先过过 redis 发布订阅的一些基础概念和操...

Simonton ⋅ 今天 ⋅ 0

error:Could not find gradle

一.更新Android Studio后打开Project,报如下错误: Error: Could not find com.android.tools.build:gradle:2.2.1. Searched in the following locations: file:/D:/software/android/andro......

Yao--靠自己 ⋅ 昨天 ⋅ 0

Spring boot 项目打包及引入本地jar包

Spring Boot 项目打包以及引入本地Jar包 [TOC] 上篇文章提到 Maven 项目添加本地jar包的三种方式 ,本篇文章记录下在实际项目中的应用。 spring boot 打包方式 我们知道,传统应用可以将程序...

Os_yxguang ⋅ 昨天 ⋅ 0

常见数据结构(二)-树(二叉树,红黑树,B树)

本文介绍数据结构中几种常见的树:二分查找树,2-3树,红黑树,B树 写在前面 本文所有图片均截图自coursera上普林斯顿的课程《Algorithms, Part I》中的Slides 相关命题的证明可参考《算法(第...

浮躁的码农 ⋅ 昨天 ⋅ 0

android -------- 混淆打包报错 (warning - InnerClass ...)

最近做Android混淆打包遇到一些问题,Android Sdutio 3.1 版本打包的 错误如下: Android studio warning - InnerClass annotations are missing corresponding EnclosingMember annotation......

切切歆语 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部