文档章节

在scala中使用spark sql解决特定需求(2)

九劫散仙
 九劫散仙
发布于 2017/07/21 16:05
字数 712
阅读 34
收藏 1

接着上篇文章,本篇来看下如何在scala中完成使用spark sql将不同日期的数据导入不同的es索引里面。

首下看下用到的依赖包有哪些:


elasticsearch-spark-20_2.11   5.3.2
elasticsearch                 2.3.4
spark-sql_2.11                2.1.0
spark-hive_2.11               2.1.0
spark-core_2.11               2.1.0
hadoop-client                 2.7.3
scala-library                 2.11.8

下面看相关的代码,代码可直接在跑在win上的idea中,使用的是local模式,数据是模拟造的:



import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Row, SparkSession}//导入Row对象

/**
  * spark sql to es 本地测试例子
  */
object SparkGroupES {


  def main(args: Array[String]): Unit = {

    //构建spark session
    val spark = SparkSession
      .builder().master("local[1]")
      .appName("Spark SQL basic example")
      .config("es.nodes","192.168.10.125").config("es.port","9200")
      .getOrCreate()

    //导入es-spark的包
    import org.elasticsearch.spark.sql._
    import spark.implicits._


    //使用Seq造数据,四列数据
    val df = spark.sparkContext.parallelize(Seq(
      (0,"p1",30.9,"2017-03-04"),
      (0,"u",22.1,"2017-03-05"),
      (1,"r",19.6,"2017-03-04"),
      (2,"cat40",20.7,"2017-03-05"),
      (3,"cat187",27.9,"2017-03-04"),
      (4,"cat183",11.3,"2017-03-06"),
      (5,"cat8",35.6,"2017-03-08"))

     ).toDF("id", "name", "price","dt")//转化df的四列数据s
    //创建表明为pro
    df.createTempView("pro")

    import spark.sql //导入sql函数

    //按照id分组,统计每组数量,统计每组里面最小的价格,然后收集每组里面的数据
    val ds=sql("select dt, count(*) as c ,collect_list(struct(id,name, price)) as res  from pro   group by dt ")
    //需要多次查询的数据,可以缓存起来
    ds.cache()
    //获取查询的结果,遍历获取结果集
    ds.select("dt","c","res").collect().foreach(line=>{
      val dt=line.getAs[String]("dt") //获取日期
      val count=line.getAs[Long]("c")//获取数量
      val value=line.getAs[Seq[Row]]("res")//获取每组内的数据集合,注意是一个Row实体
      println("日期:"+dt+" 销售数量: "+count)

      //创建一个schema针对struct结构
      val schema = DataTypes
        .createStructType( Array[StructField](
          DataTypes.createStructField("id", DataTypes.IntegerType, false), //不允许为null
          DataTypes.createStructField("name", DataTypes.StringType, true),
          DataTypes.createStructField("price", DataTypes.DoubleType, true)
        ))
        //将value转化成rdd
        val rdd=spark.sparkContext.makeRDD(value)
        //将rdd注册成DataFrame
        val df =spark.createDataFrame(rdd,schema)
        //保存每一个分组的数据到es索引里面
        EsSparkSQL.saveToEs(df,"spark"+dt+"/spark",Map("es.mapping.id" -> "id"))
//      value.foreach(row=>{//遍历组内数据集合,然后打印
//        println(row.getAs[String]("name")+" "+row.getAs[Double]("price"))
//      })

    })
    println("索引成功")
    spark.stop()
  }

}



分析下,代码执行过程:

(1)首先创建了一个SparkSession对象,注意这是新版本的写法,然后加入了es相关配置

(2)导入了隐式转化的es相关的包

(3)通过Seq+Tuple创建了一个DataFrame对象,并注册成一个表

(4)导入spark sql后,执行了一个sql分组查询

(5)获取每一组的数据

(6)处理组内的Struct结构

(7)将组内的Seq[Row]转换为rdd,最终转化为df

(8)执行导入es的方法,按天插入不同的索引里面

(9)结束

需要注意的是必须在执行collect方法后,才能在循环内使用sparkContext,否则会报错的,在服务端是不能使用sparkContext的,只有在Driver端才可以。

© 著作权归作者所有

九劫散仙
粉丝 268
博文 175
码字总数 189625
作品 0
海淀
私信 提问
Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)

在Spark中,也支持Hive中的自定义函数。自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似tochar,todate等 UDAF(User- Defined Aggregation Funcation...

青夜之衫
2017/12/04
0
0
sparkSql catalyst优化器

---title: sparkSql catalyst优化器subtitle: 自定义catalyst优化器description: catalyst优化器解析keywords: [sparkSql,catalyst,解析]author: liyzdate: 2019-01-10tags: [sparkSql,优化]......

freeli
01/14
45
0
Spark(三) -- Shark与SparkSQL

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45726665 首先介绍一下Shark的概念 Shark简单的说就是Spark上的Hive,其底层依...

jchubby
2015/05/14
0
0
sparkSQL的整体实现框架

这篇博客的目的是让那些初次接触sparkSQL框架的童鞋们,希望他们对sparkSQL整体框架有一个大致的了解,降低他们进入spark世界的门槛,避免他们在刚刚接触sparkSQL时,不知所措,不知道该学习...

hffzkl
2018/06/26
0
0
hive,shark,sparkSQL,hive on spark,impala,drill比较

Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优...

hblt-j
2018/08/13
207
0

没有更多内容

加载失败,请刷新页面

加载更多

java数据类型

基本类型: 整型:Byte,short,int,long 浮点型:float,double 字符型:char 布尔型:boolean 引用类型: 类类型: 接口类型: 数组类型: Byte 1字节 八位 -128 -------- 127 short 2字节...

audience_1
34分钟前
5
0
太全了|万字详解Docker架构原理、功能及使用

一、简介 1、了解Docker的前生LXC LXC为Linux Container的简写。可以提供轻量级的虚拟化,以便隔离进程和资源,而且不需要提供指令解释机制以及全虚拟化的其他复杂性。相当于C++中的NameSpa...

Java技术剑
35分钟前
8
0
Wifiphisher —— 非常非常非常流氓的 WIFI 网络钓鱼框架

编者注:这是一个非常流氓的 WIFI 网络钓鱼工具,甚至可能是非法的工具(取决于你的使用场景)。在没有事先获得许可的情况下使用 Wifiphisher 攻击基础网络设施将被视为非法活动。使用时请遵...

红薯
今天
46
1
MongoDB 4 on CentOS 7安装指南

本教程为CentOS x86_64 7.x操作系统下,MongoDB Community x86_64 4.2(GA)安装指南。 安装方式一:yum repo在线安装 [此方式较为简单,官方推荐] Step1:新建MongDB社区版Yum镜像源。 # vim ...

王焱君
今天
7
0
go-micro 入门教程1.搭建 go-micro环境

微服务的本质是让专业的人做专业的事情,做出更好的东西。 golang具备高并发,静态编译等特性,在性能、安全等方面具备非常大的优势。go-micro是基于golang的微服务编程框架,go-micro操作简单...

非正式解决方案
今天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部