azkaban支持spark程序提交

原创
2017/11/22 18:09
阅读数 4.4K

通过阅读源码,得知参数的存储情况,所有的配置信息在一个文件中,

文件名为{working.dir}/{azkaban.job.id}_props_{随机数}_tmp

而且该配置文件会在job提交前生成

package azkaban.jobExecutor;
public abstract class AbstractProcessJob extends AbstractJob {
//摘自azkaban源码
public File createFlattenedPropsFile(final String workingDir) {
    final File directory = new File(workingDir);
    File tempFile = null;
    try {
      // The temp file prefix must be at least 3 characters.
      tempFile = File.createTempFile(getId() + "_props_", "_tmp", directory);
      this.jobProps.storeFlattened(tempFile);
    } catch (final IOException e) {
      throw new RuntimeException("Failed to create temp property file ", e);
    }

    return tempFile;
  }
}

 通过直接读取内置程序生成的该配置文件,设计如下接口

trait AzSparkJob {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val appName = sparkConf.get("spark.app.name")
    println(s"appName=$appName")
    val propsNamePrefix = s"${appName}_props"
    val propsFile = new File(".").listFiles().
      filter(file => file.getName.startsWith(propsNamePrefix))(0)
    val prop = new Properties()
    Source.fromFile(propsFile, "UTF-8").getLines().filter(!_.startsWith("#")).foreach(line => {
      val index = line.indexOf('=')
      val key = line.substring(0, index)
      val value = line.substring(index + 1).replaceAll("\\\\", "")
      prop.put(key, value)
      println(s"jobProp [$key] = [$value]")
    })
    sparkConf.getAll.map(x => s"sparkConf [${x._1}] = [${x._2}]").foreach(println)
    val strFlowTime = prop.getProperty("azkaban.flow.start.timestamp")
    flowTime = DateUtil.toDate(strFlowTime, "yyyy-MM-dd'T'HH:mm:ss.SSSX")
    println("flowTime=" + DateUtil.format(flowTime))
    setSparkConf(sparkConf, prop)
    val sc = new SparkContext(sparkConf)
    try {
      run(sc, prop)
    } finally {
      sc.stop()
    }
  }

  def run(sc: SparkContext, prop: Properties)

  def setSparkConf(sparkConf: SparkConf, prop: Properties)

  var flowTime: Date = new Date()

}

WordCount实现

import java.util.Properties

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by zstorm on 2017/10/21.
  */
object WordCount extends AzSparkJob {

  override def run(sc: SparkContext, prop: Properties): Unit = {
    println("flowTime=" + DateUtil.format(flowTime))
    val input = prop.getProperty("param.inData")
    val output = prop.getProperty("param.outData")
    FileSystem.get(sc.hadoopConfiguration).delete(new Path(output), true)
    sc.textFile(input).flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _, 1).
      map(_.productIterator.mkString("\t")).saveAsTextFile(output)
  }

  override def setSparkConf(sparkConf: SparkConf, prop: Properties): Unit = {
    val appName = sparkConf.get("spark.app.name")
    val month = prop.getProperty("param.month")
    if (month != null) sparkConf.setAppName(s"$appName[$month]")
  }
}

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部