SparkSQL内置函数统计每日销售金额实例
博客专区 > 别寒 的博客 > 博客详情
SparkSQL内置函数统计每日销售金额实例
别寒 发表于5个月前
SparkSQL内置函数统计每日销售金额实例
  • 发表于 5个月前
  • 阅读 10
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

package com.hhb.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, _}
import org.apache.spark.sql.functions._

/**
  * Created by dell on 2017/7/28.
  */
object DailySale {

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("DailySale")
      .setMaster("local")
      .set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // 要使用spark sql 内置函数,就必须在这里导入sqlcontext下的隐式转换
    import sqlContext.implicits._

    // 模拟数据
    val userSalelog = Array(
      "2017-07-01,55.05,1133",
      "2017-07-01,54.05,",
      "2017-07-01,35.05,1144",
      "2017-07-02,25.05,1155",
      "2017-07-01,65.05,1123",
      "2017-07-01,25.15,",
      "2017-07-02,65.05,1123",
      "2017-07-02,25.15,"
    )
    val userSaleLogRDD = sc.parallelize(userSalelog, 5)

    // 有效销售日志的过滤(过滤掉无效的日志)
    val filterdUserSaleLogRDD = userSaleLogRDD
      .filter{ log => if (log.split(",").length == 3) true else false }

    val userSaleLogRowRDD = filterdUserSaleLogRDD
      .map{
        log => Row(log.split(",")(0),
        log.split(",")(1).toDouble)
      }

    val structType = StructType(Array(
      StructField("date", StringType, true),
      StructField("sale_amount", DoubleType, true)
    ))

    // 转成dataframe
    val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)

    // 开始进行每日销售额的统计
    userSaleLogDF.groupBy("date")
      .agg('date, sum('sale_amount))
      .map{ row => Row(row(1), row(2)) }
      .collect()
      .foreach(println)

  }

}

共有 人打赏支持
粉丝 29
博文 254
码字总数 130346
×
别寒
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: