文档章节

Spark: Custom UDF Example

刀锋
 刀锋
发布于 2017/11/13 20:56
字数 494
阅读 3
收藏 0
点赞 0
评论 0

UDF (User defined functions) and UDAF (User defined aggregate functions) are key components of big data languages such as Pig and Hive. They allow to extend the language constructs to do adhoc processing on distributed dataset. Previously I have blogged about how to write custom UDF/UDAF in Pig (here) and Hive(PartI & II) . In this post I will focus on writing custom UDF in spark. UDF and UDAF is fairly new feature in spark and was just released in Spark 1.5.1. So its still in evolution stage and quite limited on things you can do, especially when trying to write generic UDAFs. I will talk about its current limitations later on. 

As a motivating example assume we are given some student data containing student’s name, subject and score and we want to convert numerical score into ordinal categories based on the following logic:

  • A –> if score >= 80
  • B –> if score >= 60
  • C –> if score >= 35
  • D –> otherwise

Below is the relevant python code if you are using pyspark.

# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
 
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))
 
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])
 
# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
 
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
    if score >= 80: return 'A'
    elif score >= 60: return 'B'
    elif score >= 35: return 'C'
    else: return 'D'
 
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)

 2-10 is the basic python stuff. We are generating a random dataset that looks something like this:

STUDENT SUBJECT SCORE
John Math 13
Mike Sci 45
Mike Geography 65

Next line 12-24 are dealing with constructing the dataframe. The main part of the code is in line 27-34. We first define our function in a normal python way.

Below is scala example of the same:

// Construct Dummy Data
import util.Random
import org.apache.spark.sql.Row
implicit class Crossable[X](xs: Traversable[X]) {
  def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
}
val students = Seq("John", "Mike","Matt")
val subjects = Seq("Math", "Sci", "Geography", "History")
val random = new Random(1)
val data =(students cross subjects).map{x  =>  Row(x._1, x._2,random.nextInt(100))}.toSeq
 
// Create Schema Object
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}
val schema = StructType(Array(
            StructField("student", StringType, nullable=false),
            StructField("subject", StringType, nullable=false),
            StructField("score", IntegerType, nullable=false)
    ))
 
// Create DataFrame 
import org.apache.spark.sql.hive.HiveContext
val rdd = sc.parallelize(data)
val df = sqlContext.createDataFrame(rdd, schema)
 
// Define udf
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((score: Int) => {
        score match {
        case t if t >= 80 => "A"
        case t if t >= 60 => "B"
        case t if t >= 35 => "C"
        case _ => "D"
    }})
df.withColumn("category", udfScoreToCategory(df("score"))).show(10)

 

本文转载自:https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/

共有 人打赏支持
刀锋
粉丝 2
博文 23
码字总数 289803
作品 0
济南
程序员
Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名 ⋅ 04/29 ⋅ 0

Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer ⋅ 05/24 ⋅ 0

你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836 ⋅ 04/12 ⋅ 0

Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D ⋅ 06/15 ⋅ 0

教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据 ⋅ 05/20 ⋅ 0

利用Knime建立Spark Machine learning 模型 1:开发环境搭建

1、Knime Analytics 安装 从官方网站下载合适的版本 https://www.knime.com/downloads 将下载的安装包在安装路径解压 https://www.knime.com/installation-0 下图是knime启动后的欢迎页面...

forestwater ⋅ 05/09 ⋅ 0

Spark 的Core深入(二)

Spark 的 Core 深入(二) 标签(空格分隔): Spark的部分 一、日志清洗的优化: 1.1 日志清洗有脏数据问题 rdd.partitions.length rdd.cacherdd.count 一个分区默认一个task 分区去处理默认...

flyfish225 ⋅ 05/08 ⋅ 0

Spark2.1.0之剖析spark-shell

通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢? 脚本分析 在Spark安装目录的bin文件夹下可以找...

beliefer ⋅ 04/20 ⋅ 0

利用KNIME建立Spark Machine learning模型 2:泰坦尼克幸存预测

本文利用KNIME基于Spark决策树模型算法,通过对泰坦尼克的包含乘客及船员的特征属性的训练数据集进行训练,得出决策树幸存模型,并利用测试数据集对模型进行测试。 1、从Kaggle网站下载训练...

forestwater ⋅ 05/09 ⋅ 0

【DataMagic】如何在万亿级别规模的数据量上使用Spark

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文首发在云+社区,未经许可,不得转载。 作者:张国鹏 | 腾讯 运营开发工程师 一、前言 Spark作为大数据计算引擎,凭借其快速、...

⋅ 04/18 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

一篇文章学懂Shell脚本

Shell脚本,就是利用Shell的命令解释的功能,对一个纯文本的文件进行解析,然后执行这些功能,也可以说Shell脚本就是一系列命令的集合。 Shell可以直接使用在win/Unix/Linux上面,并且可以调用...

Jake_xun ⋅ 30分钟前 ⋅ 0

大数据工程师需要精通算法吗,要达到一个什么程度呢?

机器学习是人工智能的一个重要分支,而机器学习下最重要的就是算法,本文讲述归纳了入门级的几个机器学习算法,加大数据学习群:716581014一起加入AI技术大本营。 1、监督学习算法 这个算法由...

董黎明 ⋅ 今天 ⋅ 0

Kylin 对维度表的的要求

1.要具有数据一致性,主键值必须是唯一的;Kylin 会进行检查,如果有两行的主键值相同则会报错。 2.维度表越小越好,因为 Kylin 会将维度表加载到内存中供查询;过大的表不适合作为维度表,默...

无精疯 ⋅ 今天 ⋅ 0

58到家数据库30条军规解读

军规适用场景:并发量大、数据量大的互联网业务 军规:介绍内容 解读:讲解原因,解读比军规更重要 一、基础规范 (1)必须使用InnoDB存储引擎 解读:支持事务、行级锁、并发性能更好、CPU及...

kim_o ⋅ 今天 ⋅ 0

代码注释中顺序更改 文件读写换行

`package ssh; import com.xxx.common.log.LogFactory; import com.xxx.common.log.LoggerUtil; import org.apache.commons.lang3.StringUtils; import java.io.*; public class DirErgodic ......

林伟琨 ⋅ 今天 ⋅ 0

linux实用操作命令

参考 http://blog.csdn.net/qwe6112071/article/details/50806734 ls [选项] [目录名 | 列出相关目录下的所有目录和文件 -a 列出包括.a开头的隐藏文件的所有文件-A 同-a,但不列出"."和"...

简心 ⋅ 今天 ⋅ 0

preg_match处理中文符号 url编码方法

之前想过直接用符号来替换,但失败了,或者用其他方式,但有有些复杂,这个是一个新的思路,亲测可用 <?php$str='637朗逸·超速新风王(300)(白光)'; $str=iconv("UTF-8","GBK",$s...

大灰狼wow ⋅ 今天 ⋅ 0

DevOps 资讯 | PostgreSQL 的时代到来了吗 ?

PostgreSQL是对象-关系型数据库,BSD 许可证。拼读为"post-gress-Q-L"。 作者: Tony Baer 原文: Has the time finally come for PostgreSQL?(有删节) 近30年来 PostgreSQL 无疑是您从未听...

RiboseYim ⋅ 今天 ⋅ 0

github太慢

1:用浏览器访问 IPAddress.com or http://tool.chinaz.com 使用 IP Lookup 工具获得github.com和github.global.ssl.fastly.net域名的ip地址 2:/etc/hosts文件中添加如下格式(IP最好自己查一...

whoisliang ⋅ 今天 ⋅ 0

非阻塞同步之 CAS

为解决线程安全问题,互斥同步相当于以时间换空间。多线程情况下,只有一个线程可以访问同步代码。这种同步也叫阻塞同步(Blocking Synchronization). 这种同步属于一种悲观并发策略。认为只...

长安一梦 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部