文档章节

SparkSQL Java

Yulong_
 Yulong_
发布于 2017/08/14 09:37
字数 1157
阅读 7
收藏 0

SparkSQL是为了结构化数据处理准备的Spark模块。可以使用SQL、DataFrames、DataSets来跟SparkSQL交互。

1、项目创建

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
 

关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。

关于Spark版本:使用1.6.3进行编写。

maven 依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

2、SparkSQL

2.1、初始化

Spark 编程的第一步是需要创建一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在创建 JavaSparkContext之前,你需要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。

Spark SQL的所有函数的入口是SQLContext 类, 或者它的子类. 为了创建基本的SQLContext, 需要先创建一个SparkContext.

SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local");
		@SuppressWarnings("resource")
		JavaSparkContext jsc = new JavaSparkContext(conf)
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

2.2、SparkSQL数据处理

可以从parquet和json文件直接生成DF,但是text文件需要进行rdd+schema的转换。

2.2.1、from parquet

从parquet文件读取生成dataframe

DataFrame df = sqlContext.read().parquet("data/user.parquet");
DataFrame df = sqlContext.read().format("parquet").load("data/user.parquet");

从parquet文件执行查询

DataFrame df = sqlContext.sql("SELECT * FROM parquet.`data/user.parquet`");

2.2.2、from json

从json文件读取生成dataframe

DataFrame df = sqlContext.read().format("json").load("data/user.json");

从list的json直接生成dataframe

List<String> jsonData = Arrays.asList("{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

2.2.3、from JDBC

从oracle、Mysql数据库生成DataFrame

Properties prop=new Properties();
prop.put("user","tarena");
prop.put("password","tarenapassword");
//oracle.jdbc.driver.OracleDriver
		
DataFrame jdbcDF = sqlContext.read().jdbc("jdbc:oracle:thin:@172.16.13.80:1521:orcl","tarena.test",prop);

2.2.4、from text

从文本文件只能生成RDD,然后通过反射或者指定schema的形式来生成DataFrame。对于DataFrame可以进行注册生成TempTable然后进行相关的SQL操作。

2.2.4.1、SchemaUsingReflection

在map函数中指定类反射

public class SchemaUsingReflection {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SchemaUsingReflection.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<Person> people = sc.textFile("data/userinfo.txt").map(new Function<String, Person>() {
			public Person call(String line) throws Exception {
				String[] parts = line.split("\t");
				Person person = new Person();
				person.setName(parts[1]);
				person.setAge(Integer.parseInt(parts[2].trim()));
				return person;
			}
		});
		// Apply a schema to an RDD of JavaBeans and register it as a table.
		DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
		schemaPeople.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 24 AND age <= 29");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(teenagerNames);
	}
	public static class Person implements Serializable {
		private String name;
		private int age;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public int getAge() {
			return age;
		}
		public void setAge(int age) {
			this.age = age;
		}
	}
}

2.2.4.1、SpecifyingSchema

通过StructType 进行指定schema

public class SpecifyingSchema {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SpecifyingSchema.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<String> people = sc.textFile("data/userinfo.txt");
		// The schema is encoded in a string
		String schemaString = "id name age";
		// Generate the schema based on the string of schema
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName : schemaString.split(" ")) {
			fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
		StructType schema = DataTypes.createStructType(fields);
		// Convert records of the RDD (people) to Rows.
		JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
			public Row call(String record) throws Exception {
				String[] fields = record.split("\t");
				return RowFactory.create(fields[0], fields[1].trim(), fields[2]);
			}
		});
		// Apply the schema to the RDD.
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
		// Register the DataFrame as a table.
		peopleDataFrame.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame results = sqlContext.sql("SELECT name FROM people");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> names = results.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(names);
	}
}

2.2.5、HiveContext

Spark SQL也支持从Apache Hive中读出和写入数据。
当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也 可以创建HiveContext。

当没有通过 hive-site.xml 配置,上下文将会在当前目录自动地创建 metastore_db 和 warehouse 

public class HiveContextOpts {
	public static void main(String[] args) {
		hiveRead();
	}
	public static void hiveRead() {
		SparkConf conf = new SparkConf().setAppName(HiveContextOpts.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		// sc is an existing JavaSparkContext.
		HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
		sqlContext.setConf("fs.default.name", "hdfs://breath:9000");
		sqlContext.setConf("hive.metastore.uris", "thrift://breath:9083");
		sqlContext.setConf("javax.jdo.option.ConnectionURL",
				"jdbc:mysql://breath:3306/hive?createDatabaseIfNotExist=true&amp;characterEncoding=UTF-8");
		sqlContext.setConf("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver");
		sqlContext.sql(
				"CREATE TABLE IF NOT EXISTS userinfo (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE");
		sqlContext.sql("LOAD DATA LOCAL INPATH 'data/userinfo.txt' OVERWRITE INTO TABLE userinfo ");
		// Queries are expressed in HiveQL.
		Row[] results = sqlContext.sql("FROM userinfo SELECT name, age").collect();
		for (Row row : results) {
			System.out.println(row.getString(0) + ":" + row.getInt(1));
		}
		sqlContext.sql("CREATE TABLE IF NOT EXISTS userinfobak AS SELECT name,age FROM userinfo");
	}
}

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 8
博文 93
码字总数 169760
作品 0
朝阳
部门经理
如何使用Spark SQL 的JDBC server

简介 Spark SQL provides JDBC connectivity, which is useful for connecting business intelligence (BI) tools to a Spark cluster and for sharing a cluster across multipleusers. The......

cloud-coder
2015/06/17
0
0
Spark数据统计(java版)

Java数据统计 spark版本2.1.2,包含Dateset使用,SparkStreaming数据统计 项目地址为https://github.com/baifanwudi/big-data-analysis 代码示例 SparkSql demo: 读取json文件写入hive import...

baifanwudi
04/19
0
0
Intellij idea配置Spark开发环境,统计哈姆雷特词频(2)

idea 新建maven 项目 输入maven坐标 编辑maven文件 中间层Spark,即核心模块Spark Core,必须在maven中引用。 编译Spark还要声明java8编译工具。 idea自动加载引用,在窗口左侧Project导航栏...

白头雁
07/26
0
0
慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL

4.1 SQLContext/HiveContext/SparkSesson 1.SQLContext 老版本文档:http://spark.apache.org/docs/1.6.1/ SQLContext示例文件: 打包: 提交Spark Application到环境中运行 文档: http://spa......

Meet相识_bfa5
07/11
0
0
阿里年薪50WJAVA工程师转大数据学习路线!

大数据有两个方向,一个是偏计算机的,另一个是偏经济的。你学过Java,所以你可以偏将计算机的。 Java程序员想转大数据可行吗?Java是全世界使用人数最多的编程语言。不少程序员选择Java做为...

JAVA丶学习
04/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

通过ajax访问远程天气预报服务

http://www.webxml.com.cn/zh_cn/index.aspx 更改wsdl文件 打开文件将15行,51行,101行去掉 然后把文件复制到c盘 然后在桌面上面就生成了文件 将文件打成jar包 package cn.it.ws.weather;...

江戸川
11分钟前
0
0
聊聊storm的tickTuple

序 本文主要研究一下storm的tickTuple 实例 TickWordCountBolt public class TickWordCountBolt extends BaseBasicBolt { private static final Logger LOGGER = LoggerFactory.getLogg......

go4it
15分钟前
0
0
自动装箱和自动拆箱

自动装箱和自动拆箱 Java 提供了 8 种基本数据类型,每种数据类型都有其对应的包装类型,包装类是面向对象的类,是一种高级的数据类型,可以进行一些比较复杂的操作,它们是引用类型而不再基...

tsmyk0715
35分钟前
1
0
简易审计系统

1、有时候我们需要对线上用户的操作进行记录,可以进行追踪,出现问题追究责任,但是linux自带的history并不会实时的记录(仅仅在内存中,当用户正常退出(exit logout )时才会记录到history文件里...

芬野de博客
39分钟前
2
0
Qt那些事0.0.6

QML中使用Image,在设置source的后,通过Qt Quick2 Preview(qmlscene)遇到了图片找不到的问题: Image { id: success_img anchors.centerIn: parent ...

Ev4n
40分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部