文档章节

聊聊flink LocalEnvironment的execute方法

go4it
 go4it
发布于 11/21 11:08
字数 1861
阅读 17
收藏 0

本文主要研究一下flink LocalEnvironment的execute方法

实例

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");

        DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput
                .flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception {

                        out.collect(new Tuple2<String, Integer>(record.getCountry(), 1));
                    }
                }).groupBy(0).sum(1);
        System.out.println("===groupedByCountry===");
        groupedByCountry.print();
  • 这里使用DataSet从csv读取数据,然后进行flatMap、groupBy、sum操作,最后调用print输出

DataSet.print

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

	/**
	 * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
	 * the print() method. For programs that are executed in a cluster, this method needs
	 * to gather the contents of the DataSet back to the client, to print it there.
	 *
	 * <p>The string written for each element is defined by the {@link Object#toString()} method.
	 *
	 * <p>This method immediately triggers the program execution, similar to the
	 * {@link #collect()} and {@link #count()} methods.
	 *
	 * @see #printToErr()
	 * @see #printOnTaskManager(String)
	 */
	public void print() throws Exception {
		List<T> elements = collect();
		for (T e: elements) {
			System.out.println(e);
		}
	}
  • print方法这里主要是调用collect方法,获取结果,然后挨个打印

DataSet.collect

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

	/**
	 * Convenience method to get the elements of a DataSet as a List.
	 * As DataSet can contain a lot of data, this method should be used with caution.
	 *
	 * @return A List containing the elements of the DataSet
	 */
	public List<T> collect() throws Exception {
		final String id = new AbstractID().toString();
		final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());

		this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
		JobExecutionResult res = getExecutionEnvironment().execute();

		ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
		if (accResult != null) {
			try {
				return SerializedListAccumulator.deserializeList(accResult, serializer);
			} catch (ClassNotFoundException e) {
				throw new RuntimeException("Cannot find type class of collected data type.", e);
			} catch (IOException e) {
				throw new RuntimeException("Serialization error while deserializing collected data", e);
			}
		} else {
			throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
		}
	}
  • 这里调用了getExecutionEnvironment().execute()来获取JobExecutionResult;executionEnvironment这里是LocalEnvironment

ExecutionEnvironment.execute

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

	/**
	 * Triggers the program execution. The environment will execute all parts of the program that have
	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
	 * writing results (e.g. {@link DataSet#writeAsText(String)},
	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
	 *
	 * <p>The program execution will be logged and displayed with a generated default name.
	 *
	 * @return The result of the job execution, containing elapsed time and accumulators.
	 * @throws Exception Thrown, if the program executions fails.
	 */
	public JobExecutionResult execute() throws Exception {
		return execute(getDefaultName());
	}

	/**
	 * Gets a default job name, based on the timestamp when this method is invoked.
	 *
	 * @return A default job name.
	 */
	private static String getDefaultName() {
		return "Flink Java Job at " + Calendar.getInstance().getTime();
	}

	/**
	 * Triggers the program execution. The environment will execute all parts of the program that have
	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
	 * writing results (e.g. {@link DataSet#writeAsText(String)},
	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
	 *
	 * <p>The program execution will be logged and displayed with the given job name.
	 *
	 * @return The result of the job execution, containing elapsed time and accumulators.
	 * @throws Exception Thrown, if the program executions fails.
	 */
	public abstract JobExecutionResult execute(String jobName) throws Exception;
  • 具体的execute抽象方法由子类去实现,这里我们主要看一下LocalEnvironment的execute方法

LocalEnvironment.execute

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java

	@Override
	public JobExecutionResult execute(String jobName) throws Exception {
		if (executor == null) {
			startNewSession();
		}

		Plan p = createProgramPlan(jobName);

		// Session management is disabled, revert this commit to enable
		//p.setJobId(jobID);
		//p.setSessionTimeout(sessionTimeout);

		JobExecutionResult result = executor.executePlan(p);

		this.lastJobExecutionResult = result;
		return result;
	}

	@Override
	@PublicEvolving
	public void startNewSession() throws Exception {
		if (executor != null) {
			// we need to end the previous session
			executor.stop();
			// create also a new JobID
			jobID = JobID.generate();
		}

		// create a new local executor
		executor = PlanExecutor.createLocalExecutor(configuration);
		executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());

		// if we have a session, start the mini cluster eagerly to have it available across sessions
		if (getSessionTimeout() > 0) {
			executor.start();

			// also install the reaper that will shut it down eventually
			executorReaper = new ExecutorReaper(executor);
		}
	}
  • 这里判断executor为null的话,会调用startNewSession,startNewSession通过PlanExecutor.createLocalExecutor(configuration)来创建executor;如果sessionTimeout大于0,则这里会立马调用executor.start(),默认该值为0
  • 之后通过createProgramPlan方法来创建plan
  • 最后通过executor.executePlan(p)来获取JobExecutionResult

PlanExecutor.createLocalExecutor

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/PlanExecutor.java

	private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";

	/**
	 * Creates an executor that runs the plan locally in a multi-threaded environment.
	 * 
	 * @return A local executor.
	 */
	public static PlanExecutor createLocalExecutor(Configuration configuration) {
		Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
		
		try {
			return leClass.getConstructor(Configuration.class).newInstance(configuration);
		}
		catch (Throwable t) {
			throw new RuntimeException("An error occurred while loading the local executor ("
					+ LOCAL_EXECUTOR_CLASS + ").", t);
		}
	}

	private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
		try {
			Class<?> leClass = Class.forName(className);
			return leClass.asSubclass(PlanExecutor.class);
		}
		catch (ClassNotFoundException cnfe) {
			throw new RuntimeException("Could not load the executor class (" + className
					+ "). Do you have the 'flink-clients' project in your dependencies?");
		}
		catch (Throwable t) {
			throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
		}
	}
  • PlanExecutor.createLocalExecutor方法通过反射创建org.apache.flink.client.LocalExecutor

LocalExecutor.executePlan

flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java

	/**
	 * Executes the given program on a local runtime and waits for the job to finish.
	 *
	 * <p>If the executor has not been started before, this starts the executor and shuts it down
	 * after the job finished. If the job runs in session mode, the executor is kept alive until
	 * no more references to the executor exist.</p>
	 *
	 * @param plan The plan of the program to execute.
	 * @return The net runtime of the program, in milliseconds.
	 *
	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
	 *                   caused an exception.
	 */
	@Override
	public JobExecutionResult executePlan(Plan plan) throws Exception {
		if (plan == null) {
			throw new IllegalArgumentException("The plan may not be null.");
		}

		synchronized (this.lock) {

			// check if we start a session dedicated for this execution
			final boolean shutDownAtEnd;

			if (jobExecutorService == null) {
				shutDownAtEnd = true;

				// configure the number of local slots equal to the parallelism of the local plan
				if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
					int maxParallelism = plan.getMaximumParallelism();
					if (maxParallelism > 0) {
						this.taskManagerNumSlots = maxParallelism;
					}
				}

				// start the cluster for us
				start();
			}
			else {
				// we use the existing session
				shutDownAtEnd = false;
			}

			try {
				// TODO: Set job's default parallelism to max number of slots
				final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
				final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
				plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

				Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
				OptimizedPlan op = pc.compile(plan);

				JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
				JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

				return jobExecutorService.executeJobBlocking(jobGraph);
			}
			finally {
				if (shutDownAtEnd) {
					stop();
				}
			}
		}
	}
  • 这里当jobExecutorService为null的时候,会调用start方法启动cluster创建jobExecutorService
  • 之后创建JobGraphGenerator,然后通过JobGraphGenerator.compileJobGraph方法,将plan构建为JobGraph
  • 最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult

LocalExecutor.start

flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java

	@Override
	public void start() throws Exception {
		synchronized (lock) {
			if (jobExecutorService == null) {
				// create the embedded runtime
				jobExecutorServiceConfiguration = createConfiguration();

				// start it up
				jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
			} else {
				throw new IllegalStateException("The local executor was already started.");
			}
		}
	}

	private Configuration createConfiguration() {
		Configuration newConfiguration = new Configuration();
		newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
		newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());

		newConfiguration.addAll(baseConfiguration);

		return newConfiguration;
	}

	private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
		final JobExecutorService newJobExecutorService;
		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {

			if (!configuration.contains(RestOptions.PORT)) {
				configuration.setInteger(RestOptions.PORT, 0);
			}

			final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
				.setConfiguration(configuration)
				.setNumTaskManagers(
					configuration.getInteger(
						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
						ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
				.setRpcServiceSharing(RpcServiceSharing.SHARED)
				.setNumSlotsPerTaskManager(
					configuration.getInteger(
						TaskManagerOptions.NUM_TASK_SLOTS, 1))
				.build();

			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
			miniCluster.start();

			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

			newJobExecutorService = miniCluster;
		} else {
			final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
			localFlinkMiniCluster.start();

			newJobExecutorService = localFlinkMiniCluster;
		}

		return newJobExecutorService;
	}
  • start方法这里先通过createConfiguration创建配置文件,再通过createJobExecutorService创建JobExecutorService
  • createConfiguration主要设置了TaskManagerOptions.NUM_TASK_SLOTS以及CoreOptions.FILESYTEM_DEFAULT_OVERRIDE
  • createJobExecutorService方法这里主要是根据configuration.getString(CoreOptions.MODE)的配置来创建不同的newJobExecutorService
  • 默认是CoreOptions.NEW_MODE模式,它先创建MiniClusterConfiguration,然后创建MiniCluster(JobExecutorService),然后调用MiniCluster.start方法启动之后返回
  • 非CoreOptions.NEW_MODE模式,则创建的是LocalFlinkMiniCluster(JobExecutorService),然后调用LocalFlinkMiniCluster.start()启动之后返回

MiniCluster.executeJobBlocking

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java

	/**
	 * This method runs a job in blocking mode. The method returns only after the job
	 * completed successfully, or after it failed terminally.
	 *
	 * @param job  The Flink job to execute
	 * @return The result of the job execution
	 *
	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
	 *         or if the job terminally failed.
	 */
	@Override
	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
		checkNotNull(job, "job is null");

		final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

		final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
			(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

		final JobResult jobResult;

		try {
			jobResult = jobResultFuture.get();
		} catch (ExecutionException e) {
			throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
		}

		try {
			return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
		} catch (IOException | ClassNotFoundException e) {
			throw new JobExecutionException(job.getJobID(), e);
		}
	}
  • MiniCluster.executeJobBlocking方法,先调用submitJob(job)方法,提交这个JobGraph,它返回一个CompletableFuture(submissionFuture)
  • 该CompletableFuture(submissionFuture)通过thenCompose连接了requestJobResult方法来根据jobId请求jobResult(jobResultFuture)
  • 最后通过jobResultFuture.get()获取JobExecutionResult

小结

  • DataSet的print方法调用了collect方法,而collect方法则调用getExecutionEnvironment().execute()来获取JobExecutionResult,executionEnvironment这里是LocalEnvironment
  • ExecutionEnvironment.execute方法内部调用了抽象方法execute(String jobName),该抽象方法由子类实现,这里是LocalEnvironment.execute,它先通过startNewSession,使用PlanExecutor.createLocalExecutor创建LocalExecutor,之后通过createProgramPlan创建plan,最后调用LocalExecutor.executePlan来获取JobExecutionResult
  • LocalExecutor.executePlan方法它先判断jobExecutorService,如果为null,则调用start方法创建jobExecutorService(这里根据CoreOptions.MODE配置,如果是CoreOptions.NEW_MODE则创建的jobExecutorService是MiniCluster,否则创建的jobExecutorService是LocalFlinkMiniCluster),这里创建的jobExecutorService为MiniCluster;之后通过JobGraphGenerator将plan转换为jobGraph;最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 71
博文 822
码字总数 695862
作品 0
深圳
私信 提问
聊聊flink的RichParallelSourceFunction

序 本文主要研究一下flink的RichParallelSourceFunction RichParallelSourceFunction RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunction Parall......

go4it
11/28
0
0
聊聊flink的BoltWrapper

序 本文主要研究一下flink的BoltWrapper BoltWrapper flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java flink用BoltWrapper来包装storm的IRichBolt,......

go4it
11/25
0
0
聊聊flink的SpoutWrapper

序 本文主要研究一下flink的SpoutWrapper SpoutWrapper flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.java SpoutWrapper继承了RichParallelSourceFun......

go4it
11/24
0
0
聊聊flink如何兼容StormTopology

序 本文主要研究一下flink如何兼容StormTopology 实例 这里使用FlinkLocalCluster.getLocalCluster()来创建或获取FlinkLocalCluster,之后调用FlinkLocalCluster.submitTopology来提交topol...

go4it
11/23
0
0
聊聊flink的CsvReader

序 本文主要研究一下flink的CsvReader 实例 ExecutionEnvironment.readCsvFile flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java 这里根据filePath创建了......

go4it
11/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux之自定义命令

本人使用的是ubuntu系统,不喜欢建各种桌面快捷链接,但是每次启动个软件,去查找又麻烦,所以自定义了命令,来快捷的启动应用: 1、修改/etc/bash.bashrc,在文件末尾,加上如下List-1中的内...

克虏伯
15分钟前
2
0
linux基础

系统安全 sudo su chmod setfacl 进程管理 w top ps kill pkill pstree killall 用户管理 id usermod useradd groupad userdel 文件系统 mount umount fsck df du 网络应用 curl telnet mail......

关元
17分钟前
2
0
Caffe-源码分析(一)

CHECK_X函数,用于比较两个blob之间的值 CHECK_EQ(x,y)<<"x!=y",EQ即equation,意为“等于”,当x!=y时,函数打印出x!=y。 CHECK_NE(x,y)<<"x=y",NE即not equation,意为“不等于”,,...

Pulsar-V
17分钟前
1
0
三星Galaxy S10可能会配备TOF 3D摄像头

12月3日消息,据Phone Arena报道,三星Galaxy S10可能会配备TOF 3D摄像头。 Phone Arena报道称三星Galaxy S10一共有五颗摄像头(前置双摄+后置三摄),而5G版本的Galaxy S10后置四颗摄像头,...

问题终结者
40分钟前
10
0
fabric增删改查Mac

备份1.3版本,重新下载1.1版本到fabric文件夹 /opt/gopath/src/github.com/hyperledger/fabric -> /opt/gopath/src/github.com/hyperledger/fabric1.3 新建/opt/gopath/src/github.com/hype......

八戒八戒八戒
今天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部