通过阅读源码,得知参数的存储情况,所有的配置信息在一个文件中,
文件名为{working.dir}/{azkaban.job.id}_props_{随机数}_tmp
而且该配置文件会在job提交前生成
package azkaban.jobExecutor;
public abstract class AbstractProcessJob extends AbstractJob {
//摘自azkaban源码
public File createFlattenedPropsFile(final String workingDir) {
final File directory = new File(workingDir);
File tempFile = null;
try {
// The temp file prefix must be at least 3 characters.
tempFile = File.createTempFile(getId() + "_props_", "_tmp", directory);
this.jobProps.storeFlattened(tempFile);
} catch (final IOException e) {
throw new RuntimeException("Failed to create temp property file ", e);
}
return tempFile;
}
}
通过直接读取内置程序生成的该配置文件,设计如下接口
trait AzSparkJob {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val appName = sparkConf.get("spark.app.name")
println(s"appName=$appName")
val propsNamePrefix = s"${appName}_props"
val propsFile = new File(".").listFiles().
filter(file => file.getName.startsWith(propsNamePrefix))(0)
val prop = new Properties()
Source.fromFile(propsFile, "UTF-8").getLines().filter(!_.startsWith("#")).foreach(line => {
val index = line.indexOf('=')
val key = line.substring(0, index)
val value = line.substring(index + 1).replaceAll("\\\\", "")
prop.put(key, value)
println(s"jobProp [$key] = [$value]")
})
sparkConf.getAll.map(x => s"sparkConf [${x._1}] = [${x._2}]").foreach(println)
val strFlowTime = prop.getProperty("azkaban.flow.start.timestamp")
flowTime = DateUtil.toDate(strFlowTime, "yyyy-MM-dd'T'HH:mm:ss.SSSX")
println("flowTime=" + DateUtil.format(flowTime))
setSparkConf(sparkConf, prop)
val sc = new SparkContext(sparkConf)
try {
run(sc, prop)
} finally {
sc.stop()
}
}
def run(sc: SparkContext, prop: Properties)
def setSparkConf(sparkConf: SparkConf, prop: Properties)
var flowTime: Date = new Date()
}
WordCount实现
import java.util.Properties
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by zstorm on 2017/10/21.
*/
object WordCount extends AzSparkJob {
override def run(sc: SparkContext, prop: Properties): Unit = {
println("flowTime=" + DateUtil.format(flowTime))
val input = prop.getProperty("param.inData")
val output = prop.getProperty("param.outData")
FileSystem.get(sc.hadoopConfiguration).delete(new Path(output), true)
sc.textFile(input).flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _, 1).
map(_.productIterator.mkString("\t")).saveAsTextFile(output)
}
override def setSparkConf(sparkConf: SparkConf, prop: Properties): Unit = {
val appName = sparkConf.get("spark.app.name")
val month = prop.getProperty("param.month")
if (month != null) sparkConf.setAppName(s"$appName[$month]")
}
}