文档章节

Spark: Custom UDF Example

刀锋
 刀锋
发布于 2017/11/13 20:56
字数 494
阅读 6
收藏 0

UDF (User defined functions) and UDAF (User defined aggregate functions) are key components of big data languages such as Pig and Hive. They allow to extend the language constructs to do adhoc processing on distributed dataset. Previously I have blogged about how to write custom UDF/UDAF in Pig (here) and Hive(PartI & II) . In this post I will focus on writing custom UDF in spark. UDF and UDAF is fairly new feature in spark and was just released in Spark 1.5.1. So its still in evolution stage and quite limited on things you can do, especially when trying to write generic UDAFs. I will talk about its current limitations later on. 

As a motivating example assume we are given some student data containing student’s name, subject and score and we want to convert numerical score into ordinal categories based on the following logic:

  • A –> if score >= 80
  • B –> if score >= 60
  • C –> if score >= 35
  • D –> otherwise

Below is the relevant python code if you are using pyspark.

# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
 
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))
 
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])
 
# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
 
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
    if score >= 80: return 'A'
    elif score >= 60: return 'B'
    elif score >= 35: return 'C'
    else: return 'D'
 
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)

 2-10 is the basic python stuff. We are generating a random dataset that looks something like this:

STUDENT SUBJECT SCORE
John Math 13
Mike Sci 45
Mike Geography 65

Next line 12-24 are dealing with constructing the dataframe. The main part of the code is in line 27-34. We first define our function in a normal python way.

Below is scala example of the same:

// Construct Dummy Data
import util.Random
import org.apache.spark.sql.Row
implicit class Crossable[X](xs: Traversable[X]) {
  def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
}
val students = Seq("John", "Mike","Matt")
val subjects = Seq("Math", "Sci", "Geography", "History")
val random = new Random(1)
val data =(students cross subjects).map{x  =>  Row(x._1, x._2,random.nextInt(100))}.toSeq
 
// Create Schema Object
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}
val schema = StructType(Array(
            StructField("student", StringType, nullable=false),
            StructField("subject", StringType, nullable=false),
            StructField("score", IntegerType, nullable=false)
    ))
 
// Create DataFrame 
import org.apache.spark.sql.hive.HiveContext
val rdd = sc.parallelize(data)
val df = sqlContext.createDataFrame(rdd, schema)
 
// Define udf
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((score: Int) => {
        score match {
        case t if t >= 80 => "A"
        case t if t >= 60 => "B"
        case t if t >= 35 => "C"
        case _ => "D"
    }})
df.withColumn("category", udfScoreToCategory(df("score"))).show(10)

 

本文转载自:https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/

共有 人打赏支持
刀锋
粉丝 3
博文 403
码字总数 305764
作品 0
济南
程序员
私信 提问
Apache Spark 2.4 正式发布,重要功能详细介绍

美国时间 2018年11月08日 正式发布了。一如既往,为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.4 带来了许多新功能,如下: 添加一种支持屏障模式(barrier mode)的调度器,以便...

Spark
11/10
0
0
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
11/09
0
0
Apache Spark 1.5.0 正式发布

Spark 1.5.0 是 1.x 系列的第六个版本,收到 230+ 位贡献者和 80+ 机构的努力,总共 1400+ patches。值得关注的改进如下: APIs:RDD, DataFrame 和 SQL 后端执行:DataFrame 和 SQL 集成:数...

oschina
2015/09/09
5.4K
8
Apache Zeppelin 0.6.2 发布

Apache Zeppelin 0.6.2 发布了,更新内容如下: 改进 Spark interpreter binary is compatibile with Spark 1.6/Scala 2.10 and Spark 2.0/Scala 2.11 without rebuild Note storage aware ......

局长
2016/10/18
1K
1
组件集(MX + Spark)

【Controls】 【Additional Spark controls】 spark.components.ToggleButton spark.components.HScrollBar spark.components.VScrollBar 【Layouts/Containers】 【Additional Spark Layout......

bigYuan
2012/03/18
0
1

没有更多内容

加载失败,请刷新页面

加载更多

EOS官方钱包keosd

EOS官方钱包的名称是keosd,它负责管理你的私钥,并且帮你进行交易的签名。 不过不幸的是,keosd钱包对普通用户并不友好,它是一个命令行程序,目前还没有像以太坊的mist那样的图形化界面,而...

汇智网教程
24分钟前
1
0
ArrayList的实现原理以及实现线程安全

一、ArrayList概述 ArrayList是基于数组实现的,是一个动态的数字,可以自动扩容。 ArrayList不是线程安全的,效率比较高,只能用于单线程的环境中,在多线程环境中可以使用Collections.syn...

一看就喷亏的小猿
40分钟前
2
0
Netty 备录 (一)

入职新公司不久,修修补补1个月的bug,来了点实战性的技术---基于netty即时通信 还好之前对socket有所使用及了解,入手netty应该不是很难吧,好吧,的确有点难,刚看这玩意的时候,可能都不知道哪里...

_大侠__
昨天
4
0
Django简单介绍和用户访问流程

Python下有许多款不同的 Web 框架。Django是重量级选手中最有代表性的一位。许多成功的网站和APP都基于Django。 Django是一个开放源代码的Web应用框架,由Python写成。 Django遵守BSD版权,初...

枫叶云
昨天
8
0
Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试...

程序猿DD
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部