文档章节

SparkSQL Java

Yulong_
 Yulong_
发布于 2017/08/14 09:37
字数 1226
阅读 158
收藏 0

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

SparkSQL是为了结构化数据处理准备的Spark模块。可以使用SQL、DataFrames、DataSets来跟SparkSQL交互。

1、项目创建

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
 

关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。

关于Spark版本:使用1.6.3进行编写。

maven 依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

2、SparkSQL

2.1、初始化

Spark 编程的第一步是需要创建一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在创建 JavaSparkContext之前,你需要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。

Spark SQL的所有函数的入口是SQLContext 类, 或者它的子类. 为了创建基本的SQLContext, 需要先创建一个SparkContext.

SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local");
		@SuppressWarnings("resource")
		JavaSparkContext jsc = new JavaSparkContext(conf)
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

2.2、SparkSQL数据处理

可以从parquet和json文件直接生成DF,但是text文件需要进行rdd+schema的转换。

2.2.1、from parquet

从parquet文件读取生成dataframe

DataFrame df = sqlContext.read().parquet("data/user.parquet");
DataFrame df = sqlContext.read().format("parquet").load("data/user.parquet");

从parquet文件执行查询

DataFrame df = sqlContext.sql("SELECT * FROM parquet.`data/user.parquet`");

2.2.2、from json

从json文件读取生成dataframe

DataFrame df = sqlContext.read().format("json").load("data/user.json");

从list的json直接生成dataframe

List<String> jsonData = Arrays.asList("{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

2.2.3、from JDBC

从oracle、Mysql数据库生成DataFrame

Properties prop=new Properties();
prop.put("user","tarena");
prop.put("password","tarenapassword");
//oracle.jdbc.driver.OracleDriver
		
DataFrame jdbcDF = sqlContext.read().jdbc("jdbc:oracle:thin:@172.16.13.80:1521:orcl","tarena.test",prop);

2.2.4、from text

从文本文件只能生成RDD,然后通过反射或者指定schema的形式来生成DataFrame。对于DataFrame可以进行注册生成TempTable然后进行相关的SQL操作。

2.2.4.1、SchemaUsingReflection

在map函数中指定类反射

public class SchemaUsingReflection {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SchemaUsingReflection.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<Person> people = sc.textFile("data/userinfo.txt").map(new Function<String, Person>() {
			public Person call(String line) throws Exception {
				String[] parts = line.split("\t");
				Person person = new Person();
				person.setName(parts[1]);
				person.setAge(Integer.parseInt(parts[2].trim()));
				return person;
			}
		});
		// Apply a schema to an RDD of JavaBeans and register it as a table.
		DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
		schemaPeople.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 24 AND age <= 29");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(teenagerNames);
	}
	public static class Person implements Serializable {
		private String name;
		private int age;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public int getAge() {
			return age;
		}
		public void setAge(int age) {
			this.age = age;
		}
	}
}

2.2.4.1、SpecifyingSchema

通过StructType 进行指定schema

public class SpecifyingSchema {
	public static void main(String[] args) {
		createDF();
	}
	public static void createDF() {
		// sc is an existing JavaSparkContext.
		SparkConf conf = new SparkConf().setAppName(SpecifyingSchema.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<String> people = sc.textFile("data/userinfo.txt");
		// The schema is encoded in a string
		String schemaString = "id name age";
		// Generate the schema based on the string of schema
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName : schemaString.split(" ")) {
			fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
		StructType schema = DataTypes.createStructType(fields);
		// Convert records of the RDD (people) to Rows.
		JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
			public Row call(String record) throws Exception {
				String[] fields = record.split("\t");
				return RowFactory.create(fields[0], fields[1].trim(), fields[2]);
			}
		});
		// Apply the schema to the RDD.
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
		// Register the DataFrame as a table.
		peopleDataFrame.registerTempTable("people");
		// SQL can be run over RDDs that have been registered as tables.
		DataFrame results = sqlContext.sql("SELECT name FROM people");
		// The results of SQL queries are DataFrames and support all the normal
		// RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> names = results.javaRDD().map(new Function<Row, String>() {
			public String call(Row row) {
				return "Name: " + row.getString(0);
			}
		}).collect();
		PrintUtilPro.printList(names);
	}
}

2.2.5、HiveContext

Spark SQL也支持从Apache Hive中读出和写入数据。
当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也 可以创建HiveContext。

当没有通过 hive-site.xml 配置,上下文将会在当前目录自动地创建 metastore_db 和 warehouse 

public class HiveContextOpts {
	public static void main(String[] args) {
		hiveRead();
	}
	public static void hiveRead() {
		SparkConf conf = new SparkConf().setAppName(HiveContextOpts.class.getSimpleName()).setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		// sc is an existing JavaSparkContext.
		HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
		sqlContext.setConf("fs.default.name", "hdfs://breath:9000");
		sqlContext.setConf("hive.metastore.uris", "thrift://breath:9083");
		sqlContext.setConf("javax.jdo.option.ConnectionURL",
				"jdbc:mysql://breath:3306/hive?createDatabaseIfNotExist=true&amp;characterEncoding=UTF-8");
		sqlContext.setConf("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver");
		sqlContext.sql(
				"CREATE TABLE IF NOT EXISTS userinfo (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE");
		sqlContext.sql("LOAD DATA LOCAL INPATH 'data/userinfo.txt' OVERWRITE INTO TABLE userinfo ");
		// Queries are expressed in HiveQL.
		Row[] results = sqlContext.sql("FROM userinfo SELECT name, age").collect();
		for (Row row : results) {
			System.out.println(row.getString(0) + ":" + row.getInt(1));
		}
		sqlContext.sql("CREATE TABLE IF NOT EXISTS userinfobak AS SELECT name,age FROM userinfo");
	}
}

© 著作权归作者所有

Yulong_
粉丝 10
博文 145
码字总数 253510
作品 0
朝阳
部门经理
私信 提问
加载中

评论(0)

Spark SQL JAVA和Scala编写Spark SQL程序实现RDD转换成DataFrame+操作HiveContext+操作Mysql

一、 以编程方式执行Spark SQL查询 1. 编写Spark SQL程序实现RDD转换成DataFrame 前面我们学习了如何在Spark Shell中使用SQL完成查询,现在我们通过IDEA编写Spark SQL查询程序。 Spark官网提...

osc_ihb4vmpt
2019/08/16
3
0
基于Spark Mllib, 使用java api操作的电影推荐系统(spark1.5.2 jdk1.7)

最近在学习Spark Mllib,看了一些它的算法,但不知道算法怎么去应用,网上的实例大部分都是使用Scala语言写的,没有java的代码,从网上找到了一篇基于Spark Mllib,SparkSQL的电影推荐系统 ...

osc_7g5sy6xk
2019/12/13
2
0
Spark SQL大数据处理并写入Elasticsearch

SparkSQL(Spark用于处理结构化数据的模块) 通过SparkSQL导入的数据可以来自MySQL数据库、Json数据、Csv数据等,通过load这些数据可以对其做一系列计算 下面通过程序代码来详细查看SparkSQL导...

osc_i05nmotv
2018/10/16
8
0
Spark操作外部数据源--MySQL

操作MySQL的数据: spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/sparksql").option("dbtable", "sparksql.TBLS").option("user", "root").option("password", "r......

osc_8mj3ztvg
2019/03/30
2
0
执行sparksql出现OOM问题

一开始使用yarn-client模式提交作业时一切正常,但换成cluster模式下 使用sparksql方法执行hive查询语句时,却出现了如下的OOM问题: 出现这个错误原主要原因是太多的类或者太大的类都被加载...

osc_2koyq9mo
2019/02/16
2
0

没有更多内容

加载失败,请刷新页面

加载更多

时间片轮询法

时间片轮询法 时间片轮询法是一种比较简单易用的系统架构之一,它对于系统中的任务调度算法是分时处理。核心思路是把 CPU 的时间分时给各个任务使用。我们常用的定时方法是定时器,把调度器放...

osc_j7rfhwi0
40分钟前
15
0
二项堆(三)之 Java的实现

概要 前面分别通过C和C++实现了二项堆,本章给出二项堆的Java版本。还是那句老话,三种实现的原理一样,择其一了解即可。 目录 1. 二项树的介绍 2. 二项堆的介绍 3. 二项堆的基本操作 4. 二项...

osc_ct8a6sdg
41分钟前
9
0
Windows 10 安装 Hadoop 2.10

  1. 配置JAVA_HOME环境和Hadoop环境:      2. 配置bin   3. 进入Hadoop中/etc/hadoop下     配置hdfs-site.xml增加:    <property> <!-- 单节点,所以配置成1 -->...

osc_jmtenr3d
41分钟前
32
0
如何让你在众多二手车中挑中满意的?python帮你实现(附源码)

前言 老司机带你去看车,网上的几千条的二手车数据,只需几十行代码,就可以统统获取,保存数据到我们本地电脑上 知识点: 1.python基础知识 2.函数 3.requests库 4.xpath适合零基础的同学 ...

osc_rmqoxylv
42分钟前
12
0
Selenium IDE使用指南三(控制流)

Selenium IDE附带的命令使您可以添加条件逻辑和循环到测试中。 这使您仅在满足应用程序中的某些条件时才执行命令(或一组命令),或根据预定义的标准重复执行命令。 JavaScript表达式 通过使...

分布式编程
42分钟前
17
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部