SparkTask未序列化问题

2017/02/07 10:40
阅读数 225

为了执行作业,Spark将RDD操作的处理分解为tasks,每个task由Executor执行。在执行之前,Spark会计算task的闭包。闭包是Executor在RDD上进行计算的时候必须可见的那些变量和方法(在这种情况下是foreach())。闭包会被序列化并发送给每个Executor。

如果在涉及到的所有的变量中有任何不支持序列化或没有指明如何序列化自己时,你就会遇到这样的错误:

org.apache.spark.SparkException: Task not serializable

在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,从而引发Task未序列化问题。然而,Spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换等。为了解决上述Task未序列化问题,这里对其进行了研究和总结。

  出现“org.apache.spark.SparkException: Task not serializable”这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化(不是说不可以引用外部变量,只是要做好序列化工作,具体后面详述)。其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现Task未序列化问题。

引用成员变量的实例分析
  如上所述,由于Spark程序中的map、filter等算子内部引用了类成员函数或变量导致需要该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发Task无法序列化问题。解决方法,将不需要序列化的的成员变量使用关键字“@transent”标注。

引用成员函数的实例分析
  成员变量与成员函数的对序列化的影响相同,即引用了某类的成员函数,会导致该类所有成员都支持序列化。

如上所述,引用了某类的成员函数,会导致该类及所有成员都需要支持序列化。因此,对于使用了某类成员变量或函数的情形,首先该类需要序列化(extends Serializable),同时需要对某些不需要序列化的成员变量标记以避免为序列化造成影响。
map等算子内部可以引用外部变量和某类的成员变量,但是要做好该类的序列化处理。首先是该类需要继承Serializable类,此外,对于类中某些序列化会出错的成员变量做好处理,这也是Task未序列化问题的主要原因。对于出现这类问题,首先查看未能序列化的成员变量是哪个,对于可以不需要序列化的成员变量可使用“@transent”标注。 
此外,也不是map操作所在的类必须序列化不可(继承Serializable类),对于不需要引用某类成员变量或函数的情形,就不会要求相应的类必须实现序列化。

解决办法与编程建议
  承上所述,这个问题主要是引用了某类的成员变量或函数,并且相应的类没有做好序列化处理导致的。因此解决这个问题无非以下两种方法:
不在(或不直接在)map等闭包内部直接引用某类(通常是当前类)的成员函数或成员变量
如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理
(一)不在(或不直接在)map等闭包内部直接引用某类成员函数或成员变量
如果程序依赖的值相对固定,可取固定的值,或定义在map、filter等操作内部,或定义在scala 
object对象中(类似于Java中的static变量),把它声明为一个全局静态的变量就可以绕过序列化
如果依赖值需要程序调用时动态指定(以函数参数形式),则在map、filter等操作时,可不直接引用该成员变量,而是在函数中根据成员变量的值重新定义一个局部变量,这样map等算子就无需引用类的成员变量。


(二)如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理
  对于这种情况,则需对该类做好序列化处理,首先该类继承序列化类,然后对于不能序列化的成员变量使用“@transent”标注,告诉编译器不需要序列化。 
此外如果可以,可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部