文档章节

Spark UDF变长参数的二三事儿

问津已非少年
 问津已非少年
发布于 2017/07/04 19:43
字数 1351
阅读 439
收藏 0

引子

变长参数对于我们来说并不陌生,在Java里我们这么写

public void varArgs(String... args)

在Scala里我们这么写

def varArgs(cols: String*): String

而在Spark里,很多时候我们有自己的业务逻辑,现成的functions满足不了我们的需求,而当我们需要处理同一行的多个列,将其经过我们自己的逻辑合并为一个列时,变长参数及其变种实现可以给我们提供帮助。

但是在Spark UDF里我们是 无法使用变长参数传值 的,但之所以本文以变长参数开头,是因为需求起于它,而通过对它进行变换,我们可以使用变长参数或Seq类型来接收参数。

下面通过Spark-Shell来做演示,以下三种方法都可以做到多列传参,分别是

变长参数类型的UDF

定义UDF方法

def myConcatVarargs(sep: String, cols: String*): String = cols.filter(_ != null).mkString(sep)

注册UDF函数

由于变长参数只能通过方法定义,所以这里使用部分应用函数来转换

val myConcatVarargsUDF = udf(myConcatVarargs _)

可以看到该UDF的定义如下

UserDefinedFunction(<function2>,StringType,List(StringType, ArrayType(StringType,true)))

也即变长参数转换为了ArrayType,而且函数是只包括两个参数,所以变长参数列表由此也可看出无法使用的。

变长参数列表传值

我们构造一个DataFrame如下

val df = sc.parallelize(Array(("aa", "bb", "cc"),("dd","ee","ff"))).toDF("A", "B", "C")

然后直接传入多个String类型的列到myConcatVarargsUDF

df.select(myConcatVarargsUDF(lit("-"), col("A"), col("B"), col("C"))).show

结果出现如下报错

java.lang.ClassCastException: anonfun$1 cannot be cast to scala.Function4

由此可以看出,使用变长参数列表的方式Spark是不支持的,它会被识别为四个参数的函数,而UDF确是被定义为两个参数而不是四个参数的函数!

变换:使用array()转换做第二个参数

我们使用Spark提供的array() function来转换参数为Array类型

df.select(myConcatVarargsUDF(lit("-"), array(col("A"), col("B"), col("C")))).show

结果如下

+-------------------+
|UDF(-,array(A,B,C))|
+-------------------+
|           aa-bb-cc|
|           dd-ee-ff|
+-------------------+

由此可以看出,使用变长参数构造的UDF方法,可以通过构造Array的方式传参,来达到多列合并的目的。

使用Seq类型参数的UDF

上面提到,变长参数最后被转为ArrayType,那不禁要想我们为嘛不使用Array或List类型呢?

实际上在UDF里,类型并不是我们可以随意定义的,比如使用List和Array就是不行的,我们自己定义的类型也是不行的,因为这涉及到数据的序列化和反序列化。

以Array/List为示例的错误

下面以Array类型为示例

定义函数

val myConcatArray = (cols: Array[String], sep: String) => cols.filter(_ != null).mkString(sep)

注册UDF

val myConcatArrayUDF = udf(myConcatArray)

可以看到给出的UDF签名是

UserDefinedFunction(<function2>,StringType,List())

应用UDF

df.select(myConcatArrayUDF(array(col("A"), col("B"), col("C")), lit("-"))).show

会发现报错

scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String

同样List作为参数类型也会报错,因为反序列化的时候无法构建对象,所以List和Array是无法直接作为UDF的参数类型的

以Seq做参数类型

定义调用如下

val myConcatSeq = (cols: Seq[Any], sep: String) => cols.filter(_ != null).mkString(sep)

val myConcatSeqUDF = udf(myConcatSeq)

df.select(myConcatSeqUDF(array(col("A"), col("B"), col("C")), lit("-"))).show

结果如下

+-------------------+
|UDF(array(A,B,C),-)|
+-------------------+
|           aa-bb-cc|
|           dd-ee-ff|
+-------------------+

使用Row类型参数的UDF

我们可以使用Spark functions里struct方法构造结构体类型传参,然后用Row类型接UDF的参数,以达到多列传值的目的。

def myConcatRow: ((Row, String) => String) = (row, sep) => row.toSeq.filter(_ != null).mkString(sep)

val myConcatRowUDF = udf(myConcatRow)

df.select(myConcatRowUDF(struct(col("A"), col("B"), col("C")), lit("-"))).show

可以看到UDF的签名如下

UserDefinedFunction(<function2>,StringType,List())

结果如下

+--------------------+
|UDF(struct(A,B,C),-)|
+--------------------+
|            aa-bb-cc|
|            dd-ee-ff|
+--------------------+

使用Row类型还可以使用模式提取,用起来会更方便

row match {
  case Row(aa:String, bb:Int) =>
}

最后

对于上面三种方法,变长参数和Seq类型参数都需要array的函数包装为ArrayType,而使用Row类型的话,则需要struct函数构建结构体类型,其实都是为了数据的序列化和反序列化。三种方法中,Row的方式更灵活可靠,而且支持不同类型并且可以明确使用模式提取,用起来相当方便。

而由此我们也可以看出,UDF不支持List和Array类型的参数,同时 自定义参数类型 如果没有混合Spark的特质实现序列化和反序列化,那么在UDF里也是 无法用作参数类型 的。当然,Seq类型是可以 的,可以接多列的数组传值。

此外,我们也可以使用柯里化来达到多列传参的目的,只是不同参数个数需要定义不同的UDF了。

欢迎阅读转载,转载请注明出处:https://my.oschina.net/u/2539801/blog/1154536

© 著作权归作者所有

共有 人打赏支持
问津已非少年
粉丝 17
博文 21
码字总数 33944
作品 0
海淀
程序员
私信 提问
Apache Spark 2.4 正式发布,重要功能详细介绍

美国时间 2018年11月08日 正式发布了。一如既往,为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.4 带来了许多新功能,如下: 添加一种支持屏障模式(barrier mode)的调度器,以便...

Spark
2018/11/10
0
0
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
2018/11/09
0
0
Spark2.x写入Elasticsearch的性能测试

一、Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的。其优点有: 1. 优秀的全文检索能力 2. 高效的列式存储与查询能力 3. 数据分布式存储(Shard 分片) 相...

openfea
2017/10/27
0
0
sparkSQL中UDF的使用

在spark中使用sql时一些功能需要自定义方法实现,这时候就可以使用UDF功能来实现 多参数支持 UDF不支持的方式输入多个参数,例如,不过可以使用array来解决这个问题。 定义udf方法,此处功能...

火力全開
2017/11/07
0
0
Apache Zeppelin 0.6.2 发布

Apache Zeppelin 0.6.2 发布了,更新内容如下: 改进 Spark interpreter binary is compatibile with Spark 1.6/Scala 2.10 and Spark 2.0/Scala 2.11 without rebuild Note storage aware ......

局长
2016/10/18
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

sql根据日期查询,本周,本月,本年,今日相关统计

sql根据日期查询,本周,本月,本年,今日相关统计 昨天 select * from tb where datediff(day, 时间字段 ,getdate()) = 1 今天 select * from tb where datediff(day, 时间字段 ,getdate()) = ...

BraveLN
26分钟前
2
0
Delphi 折叠代码编译变量$REGION

编译变量$REGION,用于在delphi2006以后版本的折叠代码显示,非常方便。 procedure TForm1.Button1Click(Sender: TObject); var uStr: UnicodeString; begin {$REGION '显示uStr变量内容'} ......

dillonxiao
27分钟前
1
0
【更新】SyntaxEditor发布v2018.1,可共享相同代码库

SyntaxEditor最新版本下载 SyntaxEditor是一款强大的代码语法检验控件,采用了当今最前沿的代码编辑的技术,可以为你代码编辑提供强大的管理功能。最新版支持Visual Studio 2013和Windows 8...

电池盒
27分钟前
3
0
如何在基于Bytom开发过程中集成IPFS

本文介绍了基于Bytom开发过程中集成IPFS。 step1: 搭建bytom节点 比原相关资料:https://github.com/Bytom-Community/Bytom_Docs 搭建bytom节点有很多方式,然后开启RPC调用模式。这里推荐用...

比原链Bytom
31分钟前
0
0
sqlyog注册码

sqlyog注册码 1.方式一 用户名: 随意填写 秘钥: ccbfc13e-c31d-42ce-8939-3c7e63ed5417 a56ea5da-f30b-4fb1-8a05-95f346a9b20b a0fe8645-3916-45d4-9976-cb6b88fecc6c b70d7f66-dac2-4462-......

dragon_tech
34分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部