文档章节

Spark: Custom UDF Example

刀锋
 刀锋
发布于 2017/11/13 20:56
字数 494
阅读 6
收藏 0

UDF (User defined functions) and UDAF (User defined aggregate functions) are key components of big data languages such as Pig and Hive. They allow to extend the language constructs to do adhoc processing on distributed dataset. Previously I have blogged about how to write custom UDF/UDAF in Pig (here) and Hive(PartI & II) . In this post I will focus on writing custom UDF in spark. UDF and UDAF is fairly new feature in spark and was just released in Spark 1.5.1. So its still in evolution stage and quite limited on things you can do, especially when trying to write generic UDAFs. I will talk about its current limitations later on. 

As a motivating example assume we are given some student data containing student’s name, subject and score and we want to convert numerical score into ordinal categories based on the following logic:

  • A –> if score >= 80
  • B –> if score >= 60
  • C –> if score >= 35
  • D –> otherwise

Below is the relevant python code if you are using pyspark.

# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
 
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))
 
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])
 
# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
 
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
    if score >= 80: return 'A'
    elif score >= 60: return 'B'
    elif score >= 35: return 'C'
    else: return 'D'
 
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)

 2-10 is the basic python stuff. We are generating a random dataset that looks something like this:

STUDENT SUBJECT SCORE
John Math 13
Mike Sci 45
Mike Geography 65

Next line 12-24 are dealing with constructing the dataframe. The main part of the code is in line 27-34. We first define our function in a normal python way.

Below is scala example of the same:

// Construct Dummy Data
import util.Random
import org.apache.spark.sql.Row
implicit class Crossable[X](xs: Traversable[X]) {
  def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
}
val students = Seq("John", "Mike","Matt")
val subjects = Seq("Math", "Sci", "Geography", "History")
val random = new Random(1)
val data =(students cross subjects).map{x  =>  Row(x._1, x._2,random.nextInt(100))}.toSeq
 
// Create Schema Object
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}
val schema = StructType(Array(
            StructField("student", StringType, nullable=false),
            StructField("subject", StringType, nullable=false),
            StructField("score", IntegerType, nullable=false)
    ))
 
// Create DataFrame 
import org.apache.spark.sql.hive.HiveContext
val rdd = sc.parallelize(data)
val df = sqlContext.createDataFrame(rdd, schema)
 
// Define udf
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((score: Int) => {
        score match {
        case t if t >= 80 => "A"
        case t if t >= 60 => "B"
        case t if t >= 35 => "C"
        case _ => "D"
    }})
df.withColumn("category", udfScoreToCategory(df("score"))).show(10)

 

本文转载自:https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/

共有 人打赏支持
刀锋
粉丝 2
博文 397
码字总数 301030
作品 0
济南
程序员
组件集(MX + Spark)

【Controls】 【Additional Spark controls】 spark.components.ToggleButton spark.components.HScrollBar spark.components.VScrollBar 【Layouts/Containers】 【Additional Spark Layout......

bigYuan
2012/03/18
0
1
Spark2.x写入Elasticsearch的性能测试

一、Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的。其优点有: 1. 优秀的全文检索能力 2. 高效的列式存储与查询能力 3. 数据分布式存储(Shard 分片) 相...

openfea
2017/10/27
0
0
基于spark1.3.1的spark-sql实战-01

sqlContext总的一个过程如下图所示: SQL语句经过SqlParse解析成UnresolvedLogicalPlan; 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan; 使用optimizer对res...

stark_summer
2015/05/19
0
0
sparkSQL中UDF的使用

在spark中使用sql时一些功能需要自定义方法实现,这时候就可以使用UDF功能来实现 多参数支持 UDF不支持的方式输入多个参数,例如,不过可以使用array来解决这个问题。 定义udf方法,此处功能...

火力全開
2017/11/07
0
0
[Spark]Shark, Spark SQL, Hive on Spark以及SQL On Spark的未来

随着的引入以及的新功能(HIVE-7292)的引入,我们对这两个项目的立场以及它们与Shark的关系有了很多的关注。在今天的Spark Summit上,我们宣布我们正在停止的开发,并将资源全部集中在上,这...

sjf0115
2017/06/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

中秋快乐!!!

HiBlock
52分钟前
1
0
Node安装教程

1、安装最新版的node 2、设置相关目录(以D盘为例) 分别建立目录:D:\node,D:\node\node-globa,D:\node\node-cache 命令行输入: // 设置npm国内镜像 npm config set registry https://re...

Mohan710
今天
3
0
中国发布域名系统基础软件 “红枫”

9月12日消息,域名工程中心(英文缩写 ZDNS)发布了宣称自主开发的域名系统基础软件 “红枫(Maple DNS)”。 9月12日消息,域名工程中心(英文缩写 ZDNS)发布了宣称自主开发的域名系统基础软...

问题终结者
今天
3
0
Shell编程(分发系统介绍、expect远程登录、expect远程执行命令、expect传递参数)

分发系统介绍expect 分发系统expect即分发脚本,是一种脚本语言;通过他可以实现传输,输入命令(上线代码) 应用场景:业务越来越大,网站app,后端,编程语言是php,所以就需要配置lamp或者...

蛋黄_Yolks
今天
4
0
Java Http请求工具类

public static String httpPost(String source, String params) {URL url = null;HttpURLConnection conn = null;OutputStream os = null;String ret = null;try {......

yuewawa
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部