文档章节

SparkSQL Java

Yulong_
 Yulong_
发布于 2017/08/14 09:37
字数 1157
阅读 2
收藏 0
点赞 0
评论 0

SparkSQL是为了结构化数据处理准备的Spark模块。可以使用SQL、DataFrames、DataSets来跟SparkSQL交互。

1、项目创建

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
 

关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。

关于Spark版本:使用1.6.3进行编写。

maven 依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

2、SparkSQL

2.1、初始化

Spark 编程的第一步是需要创建一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在创建 JavaSparkContext之前,你需要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。

Spark SQL的所有函数的入口是SQLContext 类, 或者它的子类. 为了创建基本的SQLContext, 需要先创建一个SparkContext.

SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local");
		@SuppressWarnings("resource")
		JavaSparkContext jsc = new JavaSparkContext(conf)
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

2.2、SparkSQL数据处理

可以从parquet和json文件直接生成DF,但是text文件需要进行rdd+schema的转换。

2.2.1、from parquet

从parquet文件读取生成dataframe

DataFrame df = sqlContext.read().parquet("data/user.parquet");
DataFrame df = sqlContext.read().format("parquet").load("data/user.parquet");

从parquet文件执行查询

DataFrame df = sqlContext.sql("SELECT * FROM parquet.`data/user.parquet`");

2.2.2、from json

从json文件读取生成dataframe

DataFrame df = sqlContext.read().format("json").load("data/user.json");

从list的json直接生成dataframe

List<String> jsonData = Arrays.asList("{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

2.2.3、from JDBC

从oracle、Mysql数据库生成DataFrame

Properties prop=new Properties();
prop.put("user","tarena");
prop.put("password","tarenapassword");
//oracle.jdbc.driver.OracleDriver
		
DataFrame jdbcDF = sqlContext.read().jdbc("jdbc:oracle:thin:@172.16.13.80:1521:orcl","tarena.test",prop);

2.2.4、from text

从文本文件只能生成RDD,然后通过反射或者指定schema的形式来生成DataFrame。对于DataFrame可以进行注册生成TempTable然后进行相关的SQL操作。

2.2.4.1、SchemaUsingReflection

在map函数中指定类反射

public class SchemaUsingReflection {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SchemaUsingReflection.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<Person> people = sc.textFile("data/userinfo.txt").map(new Function<String, Person>() {
			public Person call(String line) throws Exception {
				String[] parts = line.split("\t");
				Person person = new Person();
				person.setName(parts[1]);
				person.setAge(Integer.parseInt(parts[2].trim()));
				return person;
			}
		});
		// Apply a schema to an RDD of JavaBeans and register it as a table.
		DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
		schemaPeople.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 24 AND age <= 29");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(teenagerNames);
	}
	public static class Person implements Serializable {
		private String name;
		private int age;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public int getAge() {
			return age;
		}
		public void setAge(int age) {
			this.age = age;
		}
	}
}

2.2.4.1、SpecifyingSchema

通过StructType 进行指定schema

public class SpecifyingSchema {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SpecifyingSchema.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<String> people = sc.textFile("data/userinfo.txt");
		// The schema is encoded in a string
		String schemaString = "id name age";
		// Generate the schema based on the string of schema
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName : schemaString.split(" ")) {
			fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
		StructType schema = DataTypes.createStructType(fields);
		// Convert records of the RDD (people) to Rows.
		JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
			public Row call(String record) throws Exception {
				String[] fields = record.split("\t");
				return RowFactory.create(fields[0], fields[1].trim(), fields[2]);
			}
		});
		// Apply the schema to the RDD.
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
		// Register the DataFrame as a table.
		peopleDataFrame.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame results = sqlContext.sql("SELECT name FROM people");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> names = results.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(names);
	}
}

2.2.5、HiveContext

Spark SQL也支持从Apache Hive中读出和写入数据。
当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也 可以创建HiveContext。

当没有通过 hive-site.xml 配置,上下文将会在当前目录自动地创建 metastore_db 和 warehouse 

public class HiveContextOpts {
	public static void main(String[] args) {
		hiveRead();
	}
	public static void hiveRead() {
		SparkConf conf = new SparkConf().setAppName(HiveContextOpts.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		// sc is an existing JavaSparkContext.
		HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
		sqlContext.setConf("fs.default.name", "hdfs://breath:9000");
		sqlContext.setConf("hive.metastore.uris", "thrift://breath:9083");
		sqlContext.setConf("javax.jdo.option.ConnectionURL",
				"jdbc:mysql://breath:3306/hive?createDatabaseIfNotExist=true&amp;characterEncoding=UTF-8");
		sqlContext.setConf("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver");
		sqlContext.sql(
				"CREATE TABLE IF NOT EXISTS userinfo (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE");
		sqlContext.sql("LOAD DATA LOCAL INPATH 'data/userinfo.txt' OVERWRITE INTO TABLE userinfo ");
		// Queries are expressed in HiveQL.
		Row[] results = sqlContext.sql("FROM userinfo SELECT name, age").collect();
		for (Row row : results) {
			System.out.println(row.getString(0) + ":" + row.getInt(1));
		}
		sqlContext.sql("CREATE TABLE IF NOT EXISTS userinfobak AS SELECT name,age FROM userinfo");
	}
}

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 8
博文 79
码字总数 169760
作品 0
朝阳
部门经理
阿里年薪50WJAVA工程师转大数据学习路线!

大数据有两个方向,一个是偏计算机的,另一个是偏经济的。你学过Java,所以你可以偏将计算机的。 Java程序员想转大数据可行吗?Java是全世界使用人数最多的编程语言。不少程序员选择Java做为...

JAVA丶学习 ⋅ 04/25 ⋅ 0

Spark数据统计(java版)

Java数据统计 spark版本2.1.2,包含Dateset使用,SparkStreaming数据统计 项目地址为https://github.com/baifanwudi/big-data-analysis 代码示例 SparkSql demo: 读取json文件写入hive import...

baifanwudi ⋅ 04/19 ⋅ 0

【目录导航】JAVA零基础进阶之路

【JAVA零基础入门系列】(已完结)导航目录 Day1 开发环境搭建 Day2 Java集成开发环境IDEA Day3 Java基本数据类型 Day4 变量与常量 Day5 Java中的运算符 Day6 Java字符串 Day7 Java输入与输出...

MFrank ⋅ 06/21 ⋅ 0

sharding-jdbc源码分析—准备工作

原文作者:阿飞Javaer 原文链接:https://www.jianshu.com/p/7831817c1da8 接下来对sharding-jdbc源码的分析基于tag为源码,根据sharding-jdbc Features深入学习sharding-jdbc的几个主要特性...

飞哥-Javaer ⋅ 05/03 ⋅ 0

培训云计算学校,虚拟机基本结构讲解

我们要对JVM虚拟机的结构有一个感性的认知。毕竟我们不是编程人员,认知程度达不到那么深入。一个运行时的Java虚拟机实例的天职是:负责运行一个java程序。当启动一个Java程序时,一个虚拟机...

长沙千锋 ⋅ 05/17 ⋅ 0

14、Java并发性和多线程-Java ThreadLocal

以下内容转自http://ifeve.com/java-theadlocal/: Java中的ThreadLocal类可以让你创建的变量只被同一个线程进行读和写操作。因此,尽管有两个线程同时执行一段相同的代码,而且这段代码又有...

easonjim ⋅ 2017/06/16 ⋅ 0

CentOS 6.5 安装JDK(包含卸载原有默认JDK)

卸载原有1.7 JDK 查看是否安装了JDK 若有内容就进一步查看JDK信息 卸载 安装jdk ===================================== 安装wget 新建目录 进入目录 下载JDK 安装JDK 配置环境变量 往文件内...

阿白 ⋅ 05/23 ⋅ 0

Java学习---Java简单认识

前言 小编在学习Java方面的基础知识,发现里面有很多是结合之前的语言的特点发展过来的,不同的地方是,Java有它自己的发展和特点。下面小编先简单地做一下总结,结合看过的1-2章的J2SE视频,...

m18633778874 ⋅ 04/01 ⋅ 0

sharding-jdbc源码解析全集

sharding-jdbc源码解析之词法解析 sharding源码解析之api分析 sharding-jdbc源码解析之spring集成 sharding-jdbc源码解析之spring集成分片构造实现 sharding-jdbc源码解析之jdbc规范重写 sh...

天河2018 ⋅ 05/03 ⋅ 0

深入理解JVM学习笔记(一、总览)

1、JVM历史 2、JVM内存结构 3、JVM垃圾回收机制 4、JVM性能监控工具 5、JVM性能调优案例时间 6、JVM类文件结构 7、JVM类加载机制 8、JVM字节码执行引擎 9、JVM虚拟机编译及其运行时优化 10、...

jintaohahahaha ⋅ 05/28 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

个人博客的运营模式能否学习TMALL天猫质量为上?

心情随笔|个人博客的运营模式能否学习TMALL天猫质量为上? 中国的互联网已经发展了很多年了,记得在十年前,个人博客十分流行,大量的人都在写博客,而且质量还不错,很多高质量的文章都是在...

原创小博客 ⋅ 今天 ⋅ 0

JavaScript零基础入门——(十一)JavaScript的DOM操作

JavaScript零基础入门——(十一)JavaScript的DOM操作 大家好,欢迎回到我们的JavaScript零基础入门。最近有些同学问我说,我讲的的比书上的精简不少。其实呢,我主要讲的是我在开发中经常会...

JandenMa ⋅ 今天 ⋅ 0

volatile和synchronized的区别

volatile和synchronized的区别 在讲这个之前需要先了解下JMM(Java memory Model :java内存模型):并发过程中如何处理可见性、原子性、有序性的问题--建立JMM模型 详情请看:https://baike.b...

MarinJ_Shao ⋅ 今天 ⋅ 0

深入分析Kubernetes Critical Pod(一)

Author: xidianwangtao@gmail.com 摘要:大家在部署Kubernetes集群AddOn组件的时候,经常会看到Annotation scheduler.alpha.kubernetes.io/critical-pod"="",以表示这是一个关键服务,那你知...

WaltonWang ⋅ 今天 ⋅ 0

原子性 - synchronized关键词

原子性概念 原子性提供了程序的互斥操作,同一时刻只能有一个线程能对某块代码进行操作。 原子性的实现方式 在jdk中,原子性的实现方式主要分为: synchronized:关键词,它依赖于JVM,保证了同...

dotleo ⋅ 今天 ⋅ 0

【2018.06.22学习笔记】【linux高级知识 14.4-15.3】

14.4 exportfs命令 14.5 NFS客户端问题 15.1 FTP介绍 15.2/15.3 使用vsftpd搭建ftp

lgsxp ⋅ 今天 ⋅ 0

JeeSite 4.0 功能权限管理基础(Shiro)

Shiro是Apache的一个开源框架,是一个权限管理的框架,实现用户认证、用户授权等。 只要有用户参与一般都要有权限管理,权限管理实现对用户访问系统的控制,按照安全规则或者安全策略控制用户...

ThinkGem ⋅ 昨天 ⋅ 0

python f-string 字符串格式化

主要内容 从Python 3.6开始,f-string是格式化字符串的一种很好的新方法。与其他格式化方式相比,它们不仅更易读,更简洁,不易出错,而且速度更快! 在本文的最后,您将了解如何以及为什么今...

阿豪boy ⋅ 昨天 ⋅ 0

Python实现自动登录站点

如果我们想要实现自动登录,那么我们就需要能够驱动浏览器(比如谷歌浏览器)来实现操作,ChromeDriver 刚好能够帮助我们这一点(非谷歌浏览器的驱动有所不同)。 一、确认软件版本 首先我们...

blackfoxya ⋅ 昨天 ⋅ 0

线性回归原理和实现基本认识

一:介绍 定义:线性回归在假设特证满足线性关系,根据给定的训练数据训练一个模型,并用此模型进行预测。为了了解这个定义,我们先举个简单的例子;我们假设一个线性方程 Y=2x+1, x变量为商...

wangxuwei ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部