文档章节

Spark UDTF 与 输出多列的UDF小笔记

问津已非少年
 问津已非少年
发布于 2017/07/21 16:56
字数 704
阅读 258
收藏 0

前言

对于UDF和UDAF相比大家并不陌生,用起来也都很顺手,在此就不多做介绍了~

通常我们的UDF都是一个或多个输入,然后一个输出。如果想要使用多个输出,比如像1.6版本里的json_tuple这样 :

df.select(json_tuple(col("jsonField"), "Field1", "Field2").as(Seq("Field1", "Field2")))

这种输出方式可以直接输出多列,同时 explode 也是类似的,这一类Spark内置函数称为 UDTF 也即 User Defined Table-generation Function,函数的输入是一个或多个列值,输出是一个table

不过Spark目前并未提供我们自己实现UDTF的方法,所以目前的使用还仅限于Spark内置的几个UDTF,想要达到多列输出,我们需要略作变通

使用Seq做返回类型,输出的多列是一个数组

def test: (String => Seq[String]) = aa => Seq("aa", aa, "cc")

这时候 udf(test) 的返回类型就是 ArrayType ,通过Column的getItem方法,我们可以取数得到多列

val testUDF = udf(test)
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getItem(0).as("col1"),
    col("testUDFCol").getItem(1).as("col2"))

使用元组或case class做返回类型

case class Test(id: String, name: String)
def test: (String => Test) = aa => Test("1", aa)

// 使用元组做返回类型
def test: (String => (String, String)) = aa => ("1", aa)

当使用元组或case class做函数返回类型时,UDF的返回类型就是StructType,通过Column的getField方法,我们可以取数得到多列

val testUDF = udf(test)
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getField("id").as("col1"),
    col("testUDFCol").getField("name").as("col2"))

// 使用元组的时候取值
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getField("_1").as("col1"),
    col("testUDFCol").getField("_2").as("col2"))

使用Map作为返回类型

def test: (String => Map[String, String]) = aa => Map("id" -> "1", "name" -> aa)

对于Map类型,UDF的返回类型是MapType,也是通过Column的getField方法,我们可以取数得到多列

val testUDF = udf(test)
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getField("id").as("col1"),
    col("testUDFCol").getField("name").as("col2"))

最后

想说的是,这种变通方式好像跟输出多列的UDF这个标题扯得有点远了,其实并没有输出多列,只是换一个类型而已,但问题由此引起,也就以此为题啦~~

另外,显然使用case class这种方式更好,因为返回类型有多种,更适合实际项目需要~~

© 著作权归作者所有

共有 人打赏支持
问津已非少年
粉丝 17
博文 21
码字总数 33944
作品 0
海淀
程序员
Spark2.x写入Elasticsearch的性能测试

一、Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的。其优点有: 1. 优秀的全文检索能力 2. 高效的列式存储与查询能力 3. 数据分布式存储(Shard 分片) 相...

openfea
2017/10/27
0
0
Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名
04/29
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
sparkSQL中UDF的使用

在spark中使用sql时一些功能需要自定义方法实现,这时候就可以使用UDF功能来实现 多参数支持 UDF不支持的方式输入多个参数,例如,不过可以使用array来解决这个问题。 定义udf方法,此处功能...

火力全開
2017/11/07
0
0
Spark笔记整理(三):Spark WC开发与应用部署

[TOC] Spark WordCount开发 创建的是maven工程,使用的依赖如下: spark wc之Java版本 本地执行,输出结果如下: ###spark wc之Java lambda版本 本地执行,输出结果如下: spark wc之scala版...

xpleaf
04/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

20180920 rzsz传输文件、用户和用户组相关配置文件与管理

利用rz、sz实现Linux与Windows互传文件 [root@centos01 ~]# yum install -y lrzsz # 安装工具sz test.txt # 弹出对话框,传递到选择的路径下rz # 回车后,会从对话框中选择对应的文件传递...

野雪球
今天
2
0
OSChina 周四乱弹 —— 毒蛇当辣条

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @ 达尔文:分享花澤香菜/前野智昭/小野大輔/井上喜久子的单曲《ミッション! 健?康?第?イチ》 《ミッション! 健?康?第?イチ》- 花澤香菜/前野智...

小小编辑
今天
10
3
java -jar运行内存设置

java -Xms64m #JVM启动时的初始堆大小 -Xmx128m #最大堆大小 -Xmn64m #年轻代的大小,其余的空间是老年代 -XX:MaxMetaspaceSize=128m # -XX:CompressedClassSpaceSize=6...

李玉长
今天
4
0
Spring | 手把手教你SSM最优雅的整合方式

HEY 本节主要内容为:基于Spring从0到1搭建一个web工程,适合初学者,Java初级开发者。欢迎与我交流。 MODULE 新建一个Maven工程。 不论你是什么工具,选这个就可以了,然后next,直至finis...

冯文议
今天
2
0
RxJS的另外四种实现方式(四)——性能最高的库(续)

接上一篇RxJS的另外四种实现方式(三)——性能最高的库 上一篇文章我展示了这个最高性能库的实现方法。下面我介绍一下这个性能提升的秘密。 首先,为了弄清楚Most库究竟为何如此快,我必须借...

一个灰
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部