SparkSQL是为了结构化数据处理准备的Spark模块。可以使用SQL、DataFrames、DataSets来跟SparkSQL交互。
1、项目创建
关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。
关于Spark版本:使用1.6.3进行编写。
maven 依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.3</version> </dependency> </dependencies> |
2、SparkSQL
2.1、初始化
Spark 编程的第一步是需要创建一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在创建 JavaSparkContext之前,你需要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。
Spark SQL的所有函数的入口是SQLContext
类, 或者它的子类. 为了创建基本的SQLContext
, 需要先创建一个SparkContext.
SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local"); @SuppressWarnings("resource") JavaSparkContext jsc = new JavaSparkContext(conf) SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); |
2.2、SparkSQL数据处理
可以从parquet和json文件直接生成DF,但是text文件需要进行rdd+schema的转换。
2.2.1、from parquet
从parquet文件读取生成dataframe
DataFrame df = sqlContext.read().parquet("data/user.parquet"); DataFrame df = sqlContext.read().format("parquet").load("data/user.parquet"); |
从parquet文件执行查询
DataFrame df = sqlContext.sql("SELECT * FROM parquet.`data/user.parquet`"); |
2.2.2、from json
从json文件读取生成dataframe
DataFrame df = sqlContext.read().format("json").load("data/user.json"); |
从list的json直接生成dataframe
List<String> jsonData = Arrays.asList("{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD); |
2.2.3、from JDBC
从oracle、Mysql数据库生成DataFrame
Properties prop=new Properties(); prop.put("user","tarena"); prop.put("password","tarenapassword"); //oracle.jdbc.driver.OracleDriver DataFrame jdbcDF = sqlContext.read().jdbc("jdbc:oracle:thin:@172.16.13.80:1521:orcl","tarena.test",prop); |
2.2.4、from text
从文本文件只能生成RDD,然后通过反射或者指定schema的形式来生成DataFrame。对于DataFrame可以进行注册生成TempTable然后进行相关的SQL操作。
2.2.4.1、SchemaUsingReflection
在map函数中指定类反射
public class SchemaUsingReflection { public static void main(String[] args) { createDF(); } public static void createDF() { // sc is an existing JavaSparkContext. SparkConf conf = new SparkConf().setAppName(SchemaUsingReflection.class.getSimpleName()).setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("data/userinfo.txt").map(new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split("\t"); Person person = new Person(); person.setName(parts[1]); person.setAge(Integer.parseInt(parts[2].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 24 AND age <= 29"); // The results of SQL queries are DataFrames and support all the normal // RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); PrintUtilPro.printList(teenagerNames); } public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } } |
2.2.4.1、SpecifyingSchema
通过StructType 进行指定schema
public class SpecifyingSchema { public static void main(String[] args) { createDF(); } public static void createDF() { // sc is an existing JavaSparkContext. SparkConf conf = new SparkConf().setAppName(SpecifyingSchema.class.getSimpleName()).setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("data/userinfo.txt"); // The schema is encoded in a string String schemaString = "id name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName : schemaString.split(" ")) { fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split("\t"); return RowFactory.create(fields[0], fields[1].trim(), fields[2]); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal // RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); PrintUtilPro.printList(names); } } |
2.2.5、HiveContext
Spark SQL也支持从Apache Hive中读出和写入数据。
当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也 可以创建HiveContext。
当没有通过 hive-site.xml 配置,上下文将会在当前目录自动地创建 metastore_db 和 warehouse
public class HiveContextOpts { public static void main(String[] args) { hiveRead(); } public static void hiveRead() { SparkConf conf = new SparkConf().setAppName(HiveContextOpts.class.getSimpleName()).setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc()); sqlContext.setConf("fs.default.name", "hdfs://breath:9000"); sqlContext.setConf("hive.metastore.uris", "thrift://breath:9083"); sqlContext.setConf("javax.jdo.option.ConnectionURL", "jdbc:mysql://breath:3306/hive?createDatabaseIfNotExist=true&characterEncoding=UTF-8"); sqlContext.setConf("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver"); sqlContext.sql( "CREATE TABLE IF NOT EXISTS userinfo (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE"); sqlContext.sql("LOAD DATA LOCAL INPATH 'data/userinfo.txt' OVERWRITE INTO TABLE userinfo "); // Queries are expressed in HiveQL. Row[] results = sqlContext.sql("FROM userinfo SELECT name, age").collect(); for (Row row : results) { System.out.println(row.getString(0) + ":" + row.getInt(1)); } sqlContext.sql("CREATE TABLE IF NOT EXISTS userinfobak AS SELECT name,age FROM userinfo"); } } |