文档章节

聊聊flink taskmanager的jvm-exit-on-oom配置

go4it
 go4it
发布于 02/23 11:22
字数 1787
阅读 11
收藏 0

本文主要研究一下flink taskmanager的jvm-exit-on-oom配置

taskmanager.jvm-exit-on-oom

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
	//......

	/**
	 * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
	 */
	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
			key("taskmanager.jvm-exit-on-oom")
			.defaultValue(false)
			.withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.");

	//......
}
  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager

TaskManagerConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {

	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);

	private final int numberSlots;

	private final String[] tmpDirectories;

	private final Time timeout;

	// null indicates an infinite duration
	@Nullable
	private final Time maxRegistrationDuration;

	private final Time initialRegistrationPause;
	private final Time maxRegistrationPause;
	private final Time refusedRegistrationPause;

	private final UnmodifiableConfiguration configuration;

	private final boolean exitJvmOnOutOfMemory;

	private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;

	private final String[] alwaysParentFirstLoaderPatterns;

	@Nullable
	private final String taskManagerLogPath;

	@Nullable
	private final String taskManagerStdoutPath;

	public TaskManagerConfiguration(
		int numberSlots,
		String[] tmpDirectories,
		Time timeout,
		@Nullable Time maxRegistrationDuration,
		Time initialRegistrationPause,
		Time maxRegistrationPause,
		Time refusedRegistrationPause,
		Configuration configuration,
		boolean exitJvmOnOutOfMemory,
		FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
		String[] alwaysParentFirstLoaderPatterns,
		@Nullable String taskManagerLogPath,
		@Nullable String taskManagerStdoutPath) {

		this.numberSlots = numberSlots;
		this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
		this.timeout = Preconditions.checkNotNull(timeout);
		this.maxRegistrationDuration = maxRegistrationDuration;
		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
		this.classLoaderResolveOrder = classLoaderResolveOrder;
		this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
		this.taskManagerLogPath = taskManagerLogPath;
		this.taskManagerStdoutPath = taskManagerStdoutPath;
	}

	public int getNumberSlots() {
		return numberSlots;
	}

	public Time getTimeout() {
		return timeout;
	}

	@Nullable
	public Time getMaxRegistrationDuration() {
		return maxRegistrationDuration;
	}

	public Time getInitialRegistrationPause() {
		return initialRegistrationPause;
	}

	@Nullable
	public Time getMaxRegistrationPause() {
		return maxRegistrationPause;
	}

	public Time getRefusedRegistrationPause() {
		return refusedRegistrationPause;
	}

	@Override
	public Configuration getConfiguration() {
		return configuration;
	}

	@Override
	public String[] getTmpDirectories() {
		return tmpDirectories;
	}

	@Override
	public boolean shouldExitJvmOnOutOfMemoryError() {
		return exitJvmOnOutOfMemory;
	}

	public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {
		return classLoaderResolveOrder;
	}

	public String[] getAlwaysParentFirstLoaderPatterns() {
		return alwaysParentFirstLoaderPatterns;
	}

	@Nullable
	public String getTaskManagerLogPath() {
		return taskManagerLogPath;
	}

	@Nullable
	public String getTaskManagerStdoutPath() {
		return taskManagerStdoutPath;
	}

	// --------------------------------------------------------------------------------------------
	//  Static factory methods
	// --------------------------------------------------------------------------------------------

	public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
		int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

		if (numberSlots == -1) {
			numberSlots = 1;
		}

		final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);

		final Time timeout;

		try {
			timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
		} catch (Exception e) {
			throw new IllegalArgumentException(
				"Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +
					"'.Use formats like '50 s' or '1 min' to specify the timeout.");
		}

		LOG.info("Messages have a max timeout of " + timeout);

		final Time finiteRegistrationDuration;

		try {
			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
			if (maxRegistrationDuration.isFinite()) {
				finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
			} else {
				finiteRegistrationDuration = null;
			}
		} catch (NumberFormatException e) {
			throw new IllegalArgumentException("Invalid format for parameter " +
				TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
		}

		final Time initialRegistrationPause;
		try {
			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
			if (pause.isFinite()) {
				initialRegistrationPause = Time.milliseconds(pause.toMillis());
			} else {
				throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
			}
		} catch (NumberFormatException e) {
			throw new IllegalArgumentException("Invalid format for parameter " +
				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
		}

		final Time maxRegistrationPause;
		try {
			Duration pause = Duration.create(configuration.getString(
				TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
			if (pause.isFinite()) {
				maxRegistrationPause = Time.milliseconds(pause.toMillis());
			} else {
				throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
			}
		} catch (NumberFormatException e) {
			throw new IllegalArgumentException("Invalid format for parameter " +
				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
		}

		final Time refusedRegistrationPause;
		try {
			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
			if (pause.isFinite()) {
				refusedRegistrationPause = Time.milliseconds(pause.toMillis());
			} else {
				throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
			}
		} catch (NumberFormatException e) {
			throw new IllegalArgumentException("Invalid format for parameter " +
				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
		}

		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

		final String classLoaderResolveOrder =
			configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);

		final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);

		final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
		final String taskManagerStdoutPath;

		if (taskManagerLogPath != null) {
			final int extension = taskManagerLogPath.lastIndexOf('.');

			if (extension > 0) {
				taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
			} else {
				taskManagerStdoutPath = null;
			}
		} else {
			taskManagerStdoutPath = null;
		}

		return new TaskManagerConfiguration(
			numberSlots,
			tmpDirPaths,
			timeout,
			finiteRegistrationDuration,
			initialRegistrationPause,
			maxRegistrationPause,
			refusedRegistrationPause,
			configuration,
			exitOnOom,
			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
			alwaysParentFirstLoaderPatterns,
			taskManagerLogPath,
			taskManagerStdoutPath);
	}
}
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性

Task

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {
	//......

	@Override
	public void run() {

		// ----------------------------
		//  Initial State transition
		// ----------------------------
		//......

		// all resource acquisitions and registrations from here on
		// need to be undone in the end
		Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
		AbstractInvokable invokable = null;

		try {
			//......

			// ----------------------------------------------------------------
			//  call the user code initialization methods
			// ----------------------------------------------------------------

			TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());

			Environment env = new RuntimeEnvironment(
				jobId,
				vertexId,
				executionId,
				executionConfig,
				taskInfo,
				jobConfiguration,
				taskConfiguration,
				userCodeClassLoader,
				memoryManager,
				ioManager,
				broadcastVariableManager,
				taskStateManager,
				accumulatorRegistry,
				kvStateRegistry,
				inputSplitProvider,
				distributedCacheEntries,
				producedPartitions,
				inputGates,
				network.getTaskEventDispatcher(),
				checkpointResponder,
				taskManagerConfig,
				metrics,
				this);

			// now load and instantiate the task's invokable code
			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

			// ----------------------------------------------------------------
			//  actual task core work
			// ----------------------------------------------------------------

			// we must make strictly sure that the invokable is accessible to the cancel() call
			// by the time we switched to running.
			this.invokable = invokable;

			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
			if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
				throw new CancelTaskException();
			}

			// notify everyone that we switched to running
			taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

			// make sure the user code classloader is accessible thread-locally
			executingThread.setContextClassLoader(userCodeClassLoader);

			// run the invokable
			invokable.invoke();

			// make sure, we enter the catch block if the task leaves the invoke() method due
			// to the fact that it has been canceled
			if (isCanceledOrFailed()) {
				throw new CancelTaskException();
			}

			// ----------------------------------------------------------------
			//  finalization of a successful execution
			// ----------------------------------------------------------------

			// finish the produced partitions. if this fails, we consider the execution failed.
			for (ResultPartition partition : producedPartitions) {
				if (partition != null) {
					partition.finish();
				}
			}

			// try to mark the task as finished
			// if that fails, the task was canceled/failed in the meantime
			if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
				throw new CancelTaskException();
			}
		}
		catch (Throwable t) {

			// unwrap wrapped exceptions to make stack traces more compact
			if (t instanceof WrappingRuntimeException) {
				t = ((WrappingRuntimeException) t).unwrap();
			}

			// ----------------------------------------------------------------
			// the execution failed. either the invokable code properly failed, or
			// an exception was thrown as a side effect of cancelling
			// ----------------------------------------------------------------

			try {
				// check if the exception is unrecoverable
				if (ExceptionUtils.isJvmFatalError(t) ||
						(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {

					// terminate the JVM immediately
					// don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
					try {
						LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
					} finally {
						Runtime.getRuntime().halt(-1);
					}
				}

				// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
				// loop for multiple retries during concurrent state changes via calls to cancel() or
				// to failExternally()
				while (true) {
					ExecutionState current = this.executionState;

					if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
						if (t instanceof CancelTaskException) {
							if (transitionState(current, ExecutionState.CANCELED)) {
								cancelInvokable(invokable);
								break;
							}
						}
						else {
							if (transitionState(current, ExecutionState.FAILED, t)) {
								// proper failure of the task. record the exception as the root cause
								failureCause = t;
								cancelInvokable(invokable);

								break;
							}
						}
					}
					else if (current == ExecutionState.CANCELING) {
						if (transitionState(current, ExecutionState.CANCELED)) {
							break;
						}
					}
					else if (current == ExecutionState.FAILED) {
						// in state failed already, no transition necessary any more
						break;
					}
					// unexpected state, go to failed
					else if (transitionState(current, ExecutionState.FAILED, t)) {
						LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
						break;
					}
					// else fall through the loop and
				}
			}
			catch (Throwable tt) {
				String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
				LOG.error(message, tt);
				notifyFatalError(message, tt);
			}
		}
		finally {
			//......
		}
	}

	//......
}
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM

ExceptionUtils.isJvmFatalError

flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java

@Internal
public final class ExceptionUtils {
	//......

	/**
	 * Checks whether the given exception indicates a situation that may leave the
	 * JVM in a corrupted state, meaning a state where continued normal operation can only be
	 * guaranteed via clean process restart.
	 *
	 * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
	 * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
	 * and {@link java.util.zip.ZipError} (a special case of InternalError).
	 * The {@link ThreadDeath} exception is also treated as a fatal error, because when
	 * a thread is forcefully stopped, there is a high chance that parts of the system
	 * are in an inconsistent state.
	 *
	 * @param t The exception to check.
	 * @return True, if the exception is considered fatal to the JVM, false otherwise.
	 */
	public static boolean isJvmFatalError(Throwable t) {
		return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath);
	}

	//......
}
  • isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true

Runtime.getRuntime().halt

java.base/java/lang/Runtime.java

public class Runtime {
	//......

	private static final Runtime currentRuntime = new Runtime();

    /**
     * Returns the runtime object associated with the current Java application.
     * Most of the methods of class {@code Runtime} are instance
     * methods and must be invoked with respect to the current runtime object.
     *
     * @return  the {@code Runtime} object associated with the current
     *          Java application.
     */
    public static Runtime getRuntime() {
        return currentRuntime;
    }

    /**
     * Forcibly terminates the currently running Java virtual machine.  This
     * method never returns normally.
     *
     * <p> This method should be used with extreme caution.  Unlike the
     * {@link #exit exit} method, this method does not cause shutdown
     * hooks to be started.  If the shutdown sequence has already been
     * initiated then this method does not wait for any running
     * shutdown hooks to finish their work.
     *
     * @param  status
     *         Termination status. By convention, a nonzero status code
     *         indicates abnormal termination. If the {@link Runtime#exit exit}
     *         (equivalently, {@link System#exit(int) System.exit}) method
     *         has already been invoked then this status code
     *         will override the status code passed to that method.
     *
     * @throws SecurityException
     *         If a security manager is present and its
     *         {@link SecurityManager#checkExit checkExit} method
     *         does not permit an exit with the specified status
     *
     * @see #exit
     * @see #addShutdownHook
     * @see #removeShutdownHook
     * @since 1.3
     */
    public void halt(int status) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkExit(status);
        }
        Shutdown.beforeHalt();
        Shutdown.halt(status);
    }

	//......
}
  • halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

小结

  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 78
博文 919
码字总数 848808
作品 0
深圳
私信 提问
聊聊flink TaskManager的memory大小设置

序 本文主要研究一下flink TaskManager的memory大小设置 flink-conf.yaml flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml flink-conf.yaml提供了taskmanager.heap.size......

go4it
02/19
0
0
聊聊flink JobManager的heap大小设置

序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java jobmanager.heap.size配置用......

go4it
02/18
0
0
聊聊flink的logback配置

序 本文主要研究一下flink的logback配置 client端pom文件配置 添加logback-core、logback-classic及log4j-over-slf4j依赖,之后对flink-java、flink-streaming-java2.11、flink-clients2.11...

go4it
02/14
0
0
Flink技术源码解析(一):Flink概述与源码研读准备

一、前言 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理领域的一颗炙手可热的新星。关于Flink与其它主流实时大数据处理引擎Storm、Spark...

binggozhan
2018/06/06
0
0
Flink Quickstart

Quickstart# Get a Flink example program up and running in a few simple steps. Setup: Download and Start Flink# Flink runs on Linux, Mac OS X, and Windows. To be able to run Flin......

群星纪元
03/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如果让你写一个消息队列,该如何进行架构设计?

面试题 如果让你写一个消息队列,该如何进行架构设计?说一下你的思路。 面试官心理分析 其实聊到这个问题,一般面试官要考察两块: 你有没有对某一个消息队列做过较为深入的原理的了解,或者...

李红欧巴
今天
4
0
错题

无知的小狼
今天
2
0
PowerShell因为在此系统中禁止执行脚本的解决方法

参考:window系统包管理工具--chocolatey 报错提示: & : 无法加载文件 C:\Users\liuzidong\AppData\Local\Temp\chocolatey\chocInstall\tools\chocolateyInstall.ps1,因为在此系统上禁止运...

近在咫尺远在天涯
今天
3
0
TP5 跨域请求处理

https://blog.csdn.net/a593706205/article/details/81774987 https://blog.csdn.net/wyk9916/article/details/82315700...

15834278076
今天
3
0
深入理解java虚拟机-Java内存区域与内存溢出异常

深入理解java虚拟机 Java内存区域与内存溢出异常 运行时数据区域 程序计数器 线程私有,内存小,是当前线程执行的字节码行号指示器,字节码解释器通过改变这个计数器的值来选取下一条需要执行...

须臾之余
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部