oozie 调度pyspark

原创
2018/11/21 15:40
阅读数 417

http://www.learn4master.com/big-data/pyspark/run-pyspark-on-oozie

使用Oozie在YARN上安排PySpark程序 考虑一个用Spark Python API编写的简单字数统计应用程序。以下步骤说明如何使用Oozie在YARN上安排和启动此PySpark作业。完整的程序列表显示在该部分的末尾。

首先,这里有一些关于在多节点集群上运行带有纱线集群模式的PySpark的先决条件的注释:

提交Spark作业时,Spark代码会检查PYSPARK_ARCHIVES_PATH环境变量。如果找不到PYSPARK_ARCHIVES_PATH,Spark会查找SPARK_HOME。您可以使用oozie.launcher.yarn.app.mapreduce.am.env属性设置PYSPARK_ARCHIVES_PATH。 py4j-0.10.4-src.zip和pyspark.zip文件(版本可能因Spark版本而异)是在Spark中运行Python脚本所必需的。因此,在脚本运行时,两个文件都必须存在于类路径中。只需将它们放在工作流的lib /目录下即可。 必须配置-py-files选项并在<spark-opts>选项中传递。 创建工作流定义(workflow.xml)。以下简单工作流定义执行一个Spark作业: <workflow-app xmlns='uri:oozie:workflow:0.5' name='PySpark'> <global> <configuration> <property> <name>oozie.launcher.yarn.app.mapreduce.am.env</name> <value>PYSPARK_ARCHIVES_PATH=pyspark.zip</value> </property> </configuration> </global> <start to='spark-node' /> <action name='spark-node'> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data"/> </prepare> <master>${master}</master> <name>MyApp</name> <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/MyPython.py</jar> <spark-opts>--conf spark.driver.extraJavaOptions=-Diop.version=4.2.5.0 --conf spark.yarn.archive=hdfs://nn:8020/iop/apps/4.2.5.0-0000/spark2/spark2-iop-yarn-archive.tar.gz --py-files pyspark.zip,py4j-0.10.4-src.zip</spark-opts> <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data</arg> <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name='end' /> </workflow-app> 创建Oozie作业配置(job.properties)。 nameNode=hdfs://nn:8020 jobTracker=rm:8050 master=yarn-cluster queueName=default examplesRoot=spark-example oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot} 创建一个Oozie应用程序目录。使用工作流定义和资源创建应用程序目录结构,如以下示例所示: +-~/spark-example/ +-job.properties +-workflow.xml +-MyPython.py +-WordCount.txt +-lib 将应用程序复制到HDFS。将spark-example /目录复制到HDFS中的用户HOME目录。确保HDFS中的spark-example位置与job.properties中的oozie.wf.application.path值相匹配。 $ hadoop fs -put spark-example /user/ambari-qa/ 将py4j-0.10.4-src.zip和pyspark.zip复制到HDFS。 $ hadoop fs -put /usr/iop/current/spark-client/python/lib/pyspark.zip /user/ambari-qa/spark-example/lib $ hadoop fs -put /usr/iop/current/spark-client/python/lib/py4j-0.10.4-src.zip /user/ambari-qa/spark-example/lib 运行示例作业。 通过运行以下命令提交Oozie作业: $ cd ~/spark-example

$ oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run job: 0000031-161115185001062-oozie-oozi-W 检查工作流作业状态: $ oozie job -oozie http://oozie-host:11000/oozie -info 0000031-161115185001062-oozie-oozi-W

Job ID : 0000031-161115185001062-oozie-oozi-W

Workflow Name : PySpark App Path : hdfs://oozie-host:8020/user/ambari-qa/spark-example Status : SUCCEEDED Run : 0 User : ambari-qa Group : - Created : 2016-11-16 08:21 GMT Started : 2016-11-16 08:21 GMT Last Modified : 2016-11-16 08:22 GMT Ended : 2016-11-16 08:22 GMT CoordAction ID: -

Actions

ID Status Ext ID Ext Status Err Code

0000031-161115185001062-oozie-oozi-W@:start: OK - OK -

0000031-161115185001062-oozie-oozi-W@spark-node OK job_1479264601071_0068 SUCCEEDED -

0000031-161115185001062-oozie-oozi-W@end OK - OK -

完整的PySpark计划 from pyspark import SparkConf, SparkContext from operator import add def main(): conf = SparkConf().setAppName("MyApp") sc = SparkContext(conf=conf) lines = sc.textFile("/user/ambari-qa/spark-example/WordCount.txt") words = lines.flatMap(lambda line: line.split(' ')) wc = words.map(lambda x:(x,1)) counts = wc.reduceByKey(add) counts.saveAsTextFile("wcres12") if name == 'main': main()

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部