文档章节

使用Java编程方式将RDD转换成DataFrame

别寒
 别寒
发布于 2017/07/26 15:20
字数 346
阅读 23
收藏 0
package cn.hhb.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 以编程方式动态指定元数据,将rdd转换为dataframe
 * Created by dell on 2017/7/26.
 */
public class RDD2DataFrameProgrammatically {
    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("filter").setMaster("local")
                .set("spark.testing.memory", "2147480000");

        // 创建javasparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 第一步,创建一个普通的rdd,但是必须将其转换为rdd<row>的格式
        JavaRDD<String> lines = sc.textFile("c://students.txt");

        // 往row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下
        JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
            @Override
            public Row call(String line) throws Exception {
                String[] lineSplited = line.split(",");
                return RowFactory.create(
                        Integer.valueOf(lineSplited[0]),
                        lineSplited[1],
                        Integer.valueOf(lineSplited[2]));
            }
        });

        // 第二步,动态构造元数据
        /**
         * 比如说,id,name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql里
         * 或者是配置文件中,加载出来的,是不固定的
         * 所以特别适合用这种编程的方式,来构造元数据
         */
        List<StructField> structFilelds = new ArrayList<StructField>();
        structFilelds.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        structFilelds.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFilelds.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFilelds);

        // 第三步,使用动态构造的元数据,将rdd转换为dataframe
        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);

        // 后面就可以使用dataFrame了
        studentDF.registerTempTable("students");

        DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");

        List<Row> rows = teenagerDF.javaRDD().collect();

        for (Row row : rows){
            System.out.println(row);
        }

        sc.close();

    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 271
码字总数 137605
作品 0
永州
程序员
私信 提问
Apache Spark APIs:RDDs,DataFrames,and Datasets

一.Resilient Distributed Dataset(RDD,弹性分布式数据集) RDD是过去的Spark中最主要的面向用户的API。RDD是数据元素的不可变的分布式集合,在集群中的节点上进行分区,它提供了低级的API...

阿猫阿狗Hakuna
09/04
0
0
慕课网Spark SQL日志分析 - 5.DateFrame&Dataset

5.DateFrame&Dataset 1.DateFrame产生背景 DataFrame 不是Spark Sql提出的。而是在早起的Python、R、Pandas语言中就早就有了的。 Spark诞生之初一个目标就是给大数据生态圈提供一个基于通用语...

Meet相识_bfa5
07/12
0
0
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
09/19
0
0
【Spark】Spark Quick Start(快速入门翻译)

本文主要是翻译Spark官网Quick Start。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷 目录导航在右上角,感谢两个大佬(孤傲苍狼 JavaScript自动生成博文目录...

跑呀跑
09/16
0
0
Spark Core组件:RDD、DataFrame和DataSet

1. 介绍 spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和硬盘进行操作,或者调用CPU进行计算。 spark core定义了RDD、DataFrame和DataSet spark最初只有RDD,D...

wsc449
01/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Mariadb二进制包安装,Apache安装

安装mariadb 下载二进制包并解压 [root@test-a src]# wget https://downloads.mariadb.com/MariaDB/mariadb-10.2.6/bintar-linux-glibc_214-x86_64/mariadb-10.2.6-linux-glibc_214-x86_64.t......

野雪球
今天
3
0
ConcurrentHashMap 高并发性的实现机制

ConcurrentHashMap 的结构分析 为了更好的理解 ConcurrentHashMap 高并发的具体实现,让我们先探索它的结构模型。 ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEnt...

TonyStarkSir
今天
3
0
大数据教程(7.4)HDFS的java客户端API(流处理方式)

博主上一篇博客分享了namenode和datanode的工作原理,本章节将继前面的HDFS的java客户端简单API后深度讲述HDFS流处理API。 场景:博主前面的文章介绍过HDFS上存的大文件会成不同的块存储在不...

em_aaron
昨天
3
0
聊聊storm的window trigger

序 本文主要研究一下storm的window trigger WindowTridentProcessor.prepare storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java public v......

go4it
昨天
7
0
CentOS 生产环境配置

初始配置 对于一般配置来说,不需要安装 epel-release 仓库,本文主要在于希望跟随 RHEL 的配置流程,紧跟红帽公司对于服务器的配置说明。 # yum update 安装 centos-release-scl # yum ins...

clin003
昨天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部