文档章节

使用Java编程方式将RDD转换成DataFrame

别寒
 别寒
发布于 2017/07/26 15:20
字数 346
阅读 18
收藏 0
package cn.hhb.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 以编程方式动态指定元数据,将rdd转换为dataframe
 * Created by dell on 2017/7/26.
 */
public class RDD2DataFrameProgrammatically {
    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("filter").setMaster("local")
                .set("spark.testing.memory", "2147480000");

        // 创建javasparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 第一步,创建一个普通的rdd,但是必须将其转换为rdd<row>的格式
        JavaRDD<String> lines = sc.textFile("c://students.txt");

        // 往row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下
        JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
            @Override
            public Row call(String line) throws Exception {
                String[] lineSplited = line.split(",");
                return RowFactory.create(
                        Integer.valueOf(lineSplited[0]),
                        lineSplited[1],
                        Integer.valueOf(lineSplited[2]));
            }
        });

        // 第二步,动态构造元数据
        /**
         * 比如说,id,name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql里
         * 或者是配置文件中,加载出来的,是不固定的
         * 所以特别适合用这种编程的方式,来构造元数据
         */
        List<StructField> structFilelds = new ArrayList<StructField>();
        structFilelds.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        structFilelds.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFilelds.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFilelds);

        // 第三步,使用动态构造的元数据,将rdd转换为dataframe
        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);

        // 后面就可以使用dataFrame了
        studentDF.registerTempTable("students");

        DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");

        List<Row> rows = teenagerDF.javaRDD().collect();

        for (Row row : rows){
            System.out.println(row);
        }

        sc.close();

    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 271
码字总数 137605
作品 0
永州
程序员
Apache Spark APIs:RDDs,DataFrames,and Datasets

一.Resilient Distributed Dataset(RDD,弹性分布式数据集) RDD是过去的Spark中最主要的面向用户的API。RDD是数据元素的不可变的分布式集合,在集群中的节点上进行分区,它提供了低级的API...

阿猫阿狗Hakuna
09/04
0
0
慕课网Spark SQL日志分析 - 5.DateFrame&Dataset

5.DateFrame&Dataset 1.DateFrame产生背景 DataFrame 不是Spark Sql提出的。而是在早起的Python、R、Pandas语言中就早就有了的。 Spark诞生之初一个目标就是给大数据生态圈提供一个基于通用语...

Meet相识_bfa5
07/12
0
0
Spark Core组件:RDD、DataFrame和DataSet

1. 介绍 spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和硬盘进行操作,或者调用CPU进行计算。 spark core定义了RDD、DataFrame和DataSet spark最初只有RDD,D...

wsc449
01/17
0
0
基于spark1.3.1的spark-sql实战-01

sqlContext总的一个过程如下图所示: SQL语句经过SqlParse解析成UnresolvedLogicalPlan; 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan; 使用optimizer对res...

stark_summer
2015/05/19
0
0
详细解读Spark的数据分析引擎:Spark SQL

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一、spark SQL:类似于Hive,...

李金泽
03/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何通过 J2Cache 实现分布式 session 存储

做 Java Web 开发的人多数都会需要使用到 session (会话),我们使用 session 来保存一些需要在两个不同的请求之间共享数据。一般 Java 的 Web 容器像 Tomcat、Resin、Jetty 等等,它们会在...

红薯
今天
3
0
C++ std::thread

C++11提供了std::thread类来表示一个多线程对象。 1,首先介绍一下std::this_thread命名空间: (1)std::this_thread::get_id():返回当前线程id (2)std::this_thread::yield():用户接口...

yepanl
今天
3
0
Nignx缓存文件与动态文件自动均衡的配置

下面这段nginx的配置脚本的作用是,自动判断是否存在缓存文件,如果有优先输出缓存文件,不经过php,如果没有,则回到php去处理,同时生成缓存文件。 PHP框架是ThinkPHP,最后一个rewrite有关...

swingcoder
今天
2
0
20180920 usermod命令与用户密码管理

命令 usermod usermod 命令的选项和 useradd 差不多。 一个用户可以属于多个组,但是gid只有一个;除了gid,其他的组(groups)叫做扩展组。 usermod -u 1010 username # 更改用户idusermod ...

野雪球
今天
3
0
Java网络编程基础

1. 简单了解网络通信协议TCP/IP网络模型相关名词 应用层(HTTP,FTP,DNS等) 传输层(TCP,UDP) 网络层(IP,ICMP等) 链路层(驱动程序,接口等) 链路层:用于定义物理传输通道,通常是对...

江左煤郎
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部