文档章节

Spark UDTF 与 输出多列的UDF小笔记

问津已非少年
 问津已非少年
发布于 2017/07/21 16:56
字数 704
阅读 128
收藏 0
点赞 0
评论 0

前言

对于UDF和UDAF相比大家并不陌生,用起来也都很顺手,在此就不多做介绍了~

通常我们的UDF都是一个或多个输入,然后一个输出。如果想要使用多个输出,比如像1.6版本里的json_tuple这样 :

df.select(json_tuple(col("jsonField"), "Field1", "Field2").as(Seq("Field1", "Field2")))

这种输出方式可以直接输出多列,同时 explode 也是类似的,这一类Spark内置函数称为 UDTF 也即 User Defined Table-generation Function,函数的输入是一个或多个列值,输出是一个table

不过Spark目前并未提供我们自己实现UDTF的方法,所以目前的使用还仅限于Spark内置的几个UDTF,想要达到多列输出,我们需要略作变通

使用Seq做返回类型,输出的多列是一个数组

def test: (String => Seq[String]) = aa => Seq("aa", aa, "cc")

这时候 udf(test) 的返回类型就是 ArrayType ,通过Column的getItem方法,我们可以取数得到多列

val testUDF = udf(test)
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getItem(0).as("col1"),
    col("testUDFCol").getItem(1).as("col2"))

使用元组或case class做返回类型

case class Test(id: String, name: String)
def test: (String => Test) = aa => Test("1", aa)

// 使用元组做返回类型
def test: (String => (String, String)) = aa => ("1", aa)

当使用元组或case class做函数返回类型时,UDF的返回类型就是StructType,通过Column的getField方法,我们可以取数得到多列

val testUDF = udf(test)
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getField("id").as("col1"),
    col("testUDFCol").getField("name").as("col2"))

// 使用元组的时候取值
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getField("_1").as("col1"),
    col("testUDFCol").getField("_2").as("col2"))

使用Map作为返回类型

def test: (String => Map[String, String]) = aa => Map("id" -> "1", "name" -> aa)

对于Map类型,UDF的返回类型是MapType,也是通过Column的getField方法,我们可以取数得到多列

val testUDF = udf(test)
df.select(testUDF(col("testCol")).as("testUDFCol"))
  .select(col("testUDFCol").getField("id").as("col1"),
    col("testUDFCol").getField("name").as("col2"))

最后

想说的是,这种变通方式好像跟输出多列的UDF这个标题扯得有点远了,其实并没有输出多列,只是换一个类型而已,但问题由此引起,也就以此为题啦~~

另外,显然使用case class这种方式更好,因为返回类型有多种,更适合实际项目需要~~

© 著作权归作者所有

共有 人打赏支持
问津已非少年
粉丝 15
博文 21
码字总数 33944
作品 0
海淀
程序员
Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名 ⋅ 04/29 ⋅ 0

Spark笔记整理(三):Spark WC开发与应用部署

[TOC] Spark WordCount开发 创建的是maven工程,使用的依赖如下: spark wc之Java版本 本地执行,输出结果如下: ###spark wc之Java lambda版本 本地执行,输出结果如下: spark wc之scala版...

xpleaf ⋅ 04/25 ⋅ 0

Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer ⋅ 05/24 ⋅ 0

Spark Streaming入门

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。 什么...

腾讯云加社区 ⋅ 05/16 ⋅ 0

你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836 ⋅ 04/12 ⋅ 0

spark和hive storm mapreduce的比较

Spark Streaming与Storm都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一 就是,Spank Streaming和Stom的计算模型完全不一样,Spark Streaming是基于RDD的,因此需要将...

necther ⋅ 04/28 ⋅ 0

Scala笔记整理(九):Actor和AKKA

[TOC] 概述 Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用...

xpleaf ⋅ 04/24 ⋅ 0

Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf ⋅ 05/12 ⋅ 0

Spark笔记整理(二):RDD与spark核心概念名词

[TOC] Spark RDD 非常基本的说明,下面一张图就能够有基本的理解: Spark RDD基本说明 1、Spark的核心概念是RDD (resilient distributed dataset,弹性分布式数据集),指的是一个只读的,可分...

xpleaf ⋅ 04/25 ⋅ 0

教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据 ⋅ 05/20 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

容器之重命名镜像

使用docker tag命令来重命名镜像名称,先执行help,查看如何使用如下 mjduan@mjduandeMacBook-Pro:~/Docker % docker tag --helpUsage:docker tag SOURCE_IMAGE[:TAG] TARGET_IMAGE[:TA...

汉斯-冯-拉特 ⋅ 21分钟前 ⋅ 0

with 的高级用法

那么 上下文管理器 又是什么呢? 上下文管理器协议包含 __enter__ 和 __exit__ 两个方法。with 语句开始运行时,会在上下文管理器对象上调用 __enter__ 方法。with 语句运行结束后,会在上下...

阿豪boy ⋅ 40分钟前 ⋅ 0

使用 jsoup 模拟登录 urp 教务系统

需要的 jsoup 相关 jar包:https://www.lanzous.com/i1abckj 1、首先打开教务系统的登录页面,F12 开启浏览器调试,注意一下 Request Headers 一栏的 Cookie 选项,我们一会需要拿这个 Cook...

大灰狼时间 ⋅ 40分钟前 ⋅ 0

关于线程的创建

转自自己的笔记: http://note.youdao.com/noteshare?id=87584d4874acdeaf4aa027bdc9cb7324&sub=B49E8956E145476191C3FD1E4AB40DFA 1.创建线程的方法 Java使用Thread类代表线程,所有的线程对......

MarinJ_Shao ⋅ 52分钟前 ⋅ 0

工厂模式学习

1. 参考资料 工厂模式-伯乐在线 三种工厂-思否 深入理解工厂模式 2. 知识点理解 2.1 java三种工厂 简单工厂 工厂模式 抽象工厂 2.2 异同点 逐级复杂 简单工厂通过构造时传入的标识来生产产品...

liuyan_lc ⋅ 今天 ⋅ 0

Java NIO

1.目录 Java IO的历史 Java NIO之Channel Java NIO之Buffer Java NIO之Selector Java NIO之文件处理 Java NIO之Charset Java 可扩展IO 2.简介 “IO的历史”讲述了Java IO API从开始到现在的发...

士别三日 ⋅ 今天 ⋅ 0

[Err] ORA-24344: success with compilation error

从txt文本复制出创建function的脚本,直接执行,然后报错:[Err] ORA-24344: success with compilation error。 突然发现脚本的关键字,居然不是高亮显示。 然后我把脚本前面的空格去掉,执行...

wenzhizhon ⋅ 今天 ⋅ 0

Spring Security授权过程

前言 本文是接上一章Spring Security认证过程进一步分析Spring Security用户名密码登录授权是如何实现得; 类图 调试过程 使用debug方式启动https://github.com/longfeizheng/logback该项目,...

hutaishi ⋅ 今天 ⋅ 0

HAProxy基于KeepAlived实现Web高可用及动静分离

前言 软件负载均衡一般通过两种方式来实现: 基于操作系统的软负载实现 基于第三方应用的软负载实现 LVS是基于Linux操作系统实现的一种软负载,而HAProxy则是基于第三方应用实现的软负载。 ...

寰宇01 ⋅ 今天 ⋅ 0

微软自研处理器的小动作:已经开始移植其他平台的工具链

微软将 Windows 10 、Linux 以及工具链如 C/C++ 和 .NET Core 运行时库、Visual C++ 2017 命令行工具、RyuJIT 编辑器等移植到其自主研发的处理器架构 E2。微软还移植了广泛使用的 LLVM C/C++...

linux-tao ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部