使用Java编程方式将RDD转换成DataFrame
博客专区 > 别寒 的博客 > 博客详情
使用Java编程方式将RDD转换成DataFrame
别寒 发表于6个月前
使用Java编程方式将RDD转换成DataFrame
  • 发表于 6个月前
  • 阅读 7
  • 收藏 0
  • 点赞 0
  • 评论 0

标题:腾讯云 新注册用户域名抢购1元起>>>   

package cn.hhb.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 以编程方式动态指定元数据,将rdd转换为dataframe
 * Created by dell on 2017/7/26.
 */
public class RDD2DataFrameProgrammatically {
    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("filter").setMaster("local")
                .set("spark.testing.memory", "2147480000");

        // 创建javasparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 第一步,创建一个普通的rdd,但是必须将其转换为rdd<row>的格式
        JavaRDD<String> lines = sc.textFile("c://students.txt");

        // 往row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下
        JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
            @Override
            public Row call(String line) throws Exception {
                String[] lineSplited = line.split(",");
                return RowFactory.create(
                        Integer.valueOf(lineSplited[0]),
                        lineSplited[1],
                        Integer.valueOf(lineSplited[2]));
            }
        });

        // 第二步,动态构造元数据
        /**
         * 比如说,id,name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql里
         * 或者是配置文件中,加载出来的,是不固定的
         * 所以特别适合用这种编程的方式,来构造元数据
         */
        List<StructField> structFilelds = new ArrayList<StructField>();
        structFilelds.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        structFilelds.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFilelds.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFilelds);

        // 第三步,使用动态构造的元数据,将rdd转换为dataframe
        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);

        // 后面就可以使用dataFrame了
        studentDF.registerTempTable("students");

        DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");

        List<Row> rows = teenagerDF.javaRDD().collect();

        for (Row row : rows){
            System.out.println(row);
        }

        sc.close();

    }
}

共有 人打赏支持
粉丝 28
博文 254
码字总数 130346
×
别寒
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: