文档章节

SparkSQL Java

Yulong_
 Yulong_
发布于 2017/08/14 09:37
字数 1226
阅读 7
收藏 0

SparkSQL是为了结构化数据处理准备的Spark模块。可以使用SQL、DataFrames、DataSets来跟SparkSQL交互。

1、项目创建

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
 

关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。

关于Spark版本:使用1.6.3进行编写。

maven 依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

2、SparkSQL

2.1、初始化

Spark 编程的第一步是需要创建一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在创建 JavaSparkContext之前,你需要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。

Spark SQL的所有函数的入口是SQLContext 类, 或者它的子类. 为了创建基本的SQLContext, 需要先创建一个SparkContext.

SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local");
		@SuppressWarnings("resource")
		JavaSparkContext jsc = new JavaSparkContext(conf)
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

2.2、SparkSQL数据处理

可以从parquet和json文件直接生成DF,但是text文件需要进行rdd+schema的转换。

2.2.1、from parquet

从parquet文件读取生成dataframe

DataFrame df = sqlContext.read().parquet("data/user.parquet");
DataFrame df = sqlContext.read().format("parquet").load("data/user.parquet");

从parquet文件执行查询

DataFrame df = sqlContext.sql("SELECT * FROM parquet.`data/user.parquet`");

2.2.2、from json

从json文件读取生成dataframe

DataFrame df = sqlContext.read().format("json").load("data/user.json");

从list的json直接生成dataframe

List<String> jsonData = Arrays.asList("{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

2.2.3、from JDBC

从oracle、Mysql数据库生成DataFrame

Properties prop=new Properties();
prop.put("user","tarena");
prop.put("password","tarenapassword");
//oracle.jdbc.driver.OracleDriver
		
DataFrame jdbcDF = sqlContext.read().jdbc("jdbc:oracle:thin:@172.16.13.80:1521:orcl","tarena.test",prop);

2.2.4、from text

从文本文件只能生成RDD,然后通过反射或者指定schema的形式来生成DataFrame。对于DataFrame可以进行注册生成TempTable然后进行相关的SQL操作。

2.2.4.1、SchemaUsingReflection

在map函数中指定类反射

public class SchemaUsingReflection {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SchemaUsingReflection.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<Person> people = sc.textFile("data/userinfo.txt").map(new Function<String, Person>() {
			public Person call(String line) throws Exception {
				String[] parts = line.split("\t");
				Person person = new Person();
				person.setName(parts[1]);
				person.setAge(Integer.parseInt(parts[2].trim()));
				return person;
			}
		});
		// Apply a schema to an RDD of JavaBeans and register it as a table.
		DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
		schemaPeople.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 24 AND age <= 29");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(teenagerNames);
	}
	public static class Person implements Serializable {
		private String name;
		private int age;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public int getAge() {
			return age;
		}
		public void setAge(int age) {
			this.age = age;
		}
	}
}

2.2.4.1、SpecifyingSchema

通过StructType 进行指定schema

public class SpecifyingSchema {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SpecifyingSchema.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<String> people = sc.textFile("data/userinfo.txt");
		// The schema is encoded in a string
		String schemaString = "id name age";
		// Generate the schema based on the string of schema
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName : schemaString.split(" ")) {
			fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
		StructType schema = DataTypes.createStructType(fields);
		// Convert records of the RDD (people) to Rows.
		JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
			public Row call(String record) throws Exception {
				String[] fields = record.split("\t");
				return RowFactory.create(fields[0], fields[1].trim(), fields[2]);
			}
		});
		// Apply the schema to the RDD.
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
		// Register the DataFrame as a table.
		peopleDataFrame.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame results = sqlContext.sql("SELECT name FROM people");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> names = results.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(names);
	}
}

2.2.5、HiveContext

Spark SQL也支持从Apache Hive中读出和写入数据。
当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也 可以创建HiveContext。

当没有通过 hive-site.xml 配置,上下文将会在当前目录自动地创建 metastore_db 和 warehouse 

public class HiveContextOpts {
	public static void main(String[] args) {
		hiveRead();
	}
	public static void hiveRead() {
		SparkConf conf = new SparkConf().setAppName(HiveContextOpts.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		// sc is an existing JavaSparkContext.
		HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
		sqlContext.setConf("fs.default.name", "hdfs://breath:9000");
		sqlContext.setConf("hive.metastore.uris", "thrift://breath:9083");
		sqlContext.setConf("javax.jdo.option.ConnectionURL",
				"jdbc:mysql://breath:3306/hive?createDatabaseIfNotExist=true&amp;characterEncoding=UTF-8");
		sqlContext.setConf("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver");
		sqlContext.sql(
				"CREATE TABLE IF NOT EXISTS userinfo (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE");
		sqlContext.sql("LOAD DATA LOCAL INPATH 'data/userinfo.txt' OVERWRITE INTO TABLE userinfo ");
		// Queries are expressed in HiveQL.
		Row[] results = sqlContext.sql("FROM userinfo SELECT name, age").collect();
		for (Row row : results) {
			System.out.println(row.getString(0) + ":" + row.getInt(1));
		}
		sqlContext.sql("CREATE TABLE IF NOT EXISTS userinfobak AS SELECT name,age FROM userinfo");
	}
}

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 9
博文 106
码字总数 191972
作品 0
朝阳
部门经理
私信 提问
如何使用Spark SQL 的JDBC server

简介 Spark SQL provides JDBC connectivity, which is useful for connecting business intelligence (BI) tools to a Spark cluster and for sharing a cluster across multipleusers. The......

cloud-coder
2015/06/17
0
0
Spark sql RDD 问题

Sparksql RDD 执行aciton 时( ps:toJavaRDD().map(new Funcation).collect() ) 太慢了。差不多5,6秒才能出结果。不知道是什么问题。现在等大神。 java 1.7 Spark 1.6...

天巧星-浪子燕青
2016/12/05
159
1
慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL

4.1 SQLContext/HiveContext/SparkSesson 1.SQLContext 老版本文档:http://spark.apache.org/docs/1.6.1/ SQLContext示例文件: 打包: 提交Spark Application到环境中运行 文档: http://spa......

Meet相识_bfa5
07/11
0
0
阿里年薪50WJAVA工程师转大数据学习路线!

大数据有两个方向,一个是偏计算机的,另一个是偏经济的。你学过Java,所以你可以偏将计算机的。 Java程序员想转大数据可行吗?Java是全世界使用人数最多的编程语言。不少程序员选择Java做为...

JAVA丶学习
04/25
0
0
Spark数据统计(java版)

Java数据统计 spark版本2.1.2,包含Dateset使用,SparkStreaming数据统计 项目地址为https://github.com/baifanwudi/big-data-analysis 代码示例 SparkSql demo: 读取json文件写入hive import...

baifanwudi
04/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

《Maven官方文档》-Maven依赖机制简介

《Maven官方文档》-Maven依赖机制简介 原文地址 译者:Tyrian 依赖机制是Maven最为用户熟知的特性之一,同时也是Maven所擅长的领域之一。单个项目的依赖管理并不难, 但是当你面对包含数百个...

tantexian
8分钟前
0
0
基于 Docker 快速部署多需求 Spark 自动化测试环境

引言 在进行数据分析时,Spark 越来越广泛的被使用。在测试需求越来越多、测试用例数量越来越大的情况下,能够根据需求快速自动化部署 Spark 环境、快速完成所有测试越来越重要。 本文基于 ...

呐呐丶嘿
26分钟前
2
0
支付宝APP支付之查看支付宝商户ID

1、登录支付宝蚂蚁金服开放平台 2、查看账号详情,选择合作伙伴管理,账户管理,查看角色身份,此处的PID就是商户ID 3、点击秘钥管理,可查看绑定的相关应用及其APPID等信息

Code辉
29分钟前
2
0
崛起于Springboot2.X之通讯WebSocket(40)

技术简介:Springboot2.0.3+freemaker+websocket 1、添加pom依赖 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo......

木九天
37分钟前
1
0
Java常用四大线程池用法以及ThreadPoolExecutor详解

为什么用线程池? 1.创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处-理效率 2.线程并发数量过多,抢占系统资源从而导致阻塞 3.对线程进行一些简单的管理 在Java中...

孟飞阳
39分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部