文档章节

聊聊flink的InternalTimeServiceManager

go4it
 go4it
发布于 01/18 13:37
字数 1363
阅读 7
收藏 0

本文主要研究一下flink的InternalTimeServiceManager

InternalTimeServiceManager

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java

@Internal
public class InternalTimeServiceManager<K> {

	@VisibleForTesting
	static final String TIMER_STATE_PREFIX = "_timer_state";
	@VisibleForTesting
	static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
	@VisibleForTesting
	static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

	private final KeyGroupRange localKeyGroupRange;
	private final KeyContext keyContext;

	private final PriorityQueueSetFactory priorityQueueSetFactory;
	private final ProcessingTimeService processingTimeService;

	private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

	private final boolean useLegacySynchronousSnapshots;

	InternalTimeServiceManager(
		KeyGroupRange localKeyGroupRange,
		KeyContext keyContext,
		PriorityQueueSetFactory priorityQueueSetFactory,
		ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {

		this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
		this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
		this.keyContext = Preconditions.checkNotNull(keyContext);
		this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
		this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;

		this.timerServices = new HashMap<>();
	}

	@SuppressWarnings("unchecked")
	public <N> InternalTimerService<N> getInternalTimerService(
		String name,
		TimerSerializer<K, N> timerSerializer,
		Triggerable<K, N> triggerable) {

		InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);

		timerService.startTimerService(
			timerSerializer.getKeySerializer(),
			timerSerializer.getNamespaceSerializer(),
			triggerable);

		return timerService;
	}

	@SuppressWarnings("unchecked")
	<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
		InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
		if (timerService == null) {

			timerService = new InternalTimerServiceImpl<>(
				localKeyGroupRange,
				keyContext,
				processingTimeService,
				createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
				createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));

			timerServices.put(name, timerService);
		}
		return timerService;
	}

	Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
		return Collections.unmodifiableMap(timerServices);
	}

	private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
		String name,
		TimerSerializer<K, N> timerSerializer) {
		return priorityQueueSetFactory.create(
			name,
			timerSerializer);
	}

	public void advanceWatermark(Watermark watermark) throws Exception {
		for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
			service.advanceWatermark(watermark.getTimestamp());
		}
	}

	//////////////////				Fault Tolerance Methods				///////////////////

	public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
		Preconditions.checkState(useLegacySynchronousSnapshots);
		InternalTimerServiceSerializationProxy<K> serializationProxy =
			new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);

		serializationProxy.write(stream);
	}

	public void restoreStateForKeyGroup(
			InputStream stream,
			int keyGroupIdx,
			ClassLoader userCodeClassLoader) throws IOException {

		InternalTimerServiceSerializationProxy<K> serializationProxy =
			new InternalTimerServiceSerializationProxy<>(
				this,
				userCodeClassLoader,
				keyGroupIdx);

		serializationProxy.read(stream);
	}

	////////////////////			Methods used ONLY IN TESTS				////////////////////

	@VisibleForTesting
	public int numProcessingTimeTimers() {
		int count = 0;
		for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
			count += timerService.numProcessingTimeTimers();
		}
		return count;
	}

	@VisibleForTesting
	public int numEventTimeTimers() {
		int count = 0;
		for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
			count += timerService.numEventTimeTimers();
		}
		return count;
	}
}
  • InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射
  • getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
  • registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的

PriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java

public interface PriorityQueueSetFactory {

	@Nonnull
	<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
		@Nonnull String stateName,
		@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
  • PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口

HeapPriorityQueueElement

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java

@Internal
public interface HeapPriorityQueueElement {

	/**
	 * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
	 * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
	 * elements are removed from a {@link HeapPriorityQueue}.
	 */
	int NOT_CONTAINED = Integer.MIN_VALUE;

	/**
	 * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
	 */
	int getInternalIndex();

	/**
	 * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
	 * {@link HeapPriorityQueue}.
	 *
	 * @param newIndex the new index in the timer heap.
	 */
	void setInternalIndex(int newIndex);
}
  • HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法

PriorityComparable

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java

public interface PriorityComparable<T> {

	int comparePriorityTo(@Nonnull T other);
}
  • PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对

Keyed

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java

public interface Keyed<K> {

	K getKey();
}
  • Keyed接口定义了getKey方法,用于返回该对象的key

InternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java

@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {

	/** Function to extract the key from a {@link InternalTimer}. */
	KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;

	/** Function to compare instances of {@link InternalTimer}. */
	PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =
		(left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());
	/**
	 * Returns the timestamp of the timer. This value determines the point in time when the timer will fire.
	 */
	long getTimestamp();

	/**
	 * Returns the key that is bound to this timer.
	 */
	@Nonnull
	@Override
	K getKey();

	/**
	 * Returns the namespace that is bound to this timer.
	 */
	@Nonnull
	N getNamespace();
}
  • InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR

TimerHeapInternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java

@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {

	/** The key for which the timer is scoped. */
	@Nonnull
	private final K key;

	/** The namespace for which the timer is scoped. */
	@Nonnull
	private final N namespace;

	/** The expiration timestamp. */
	private final long timestamp;

	private transient int timerHeapIndex;

	TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
		this.timestamp = timestamp;
		this.key = key;
		this.namespace = namespace;
		this.timerHeapIndex = NOT_CONTAINED;
	}

	@Override
	public long getTimestamp() {
		return timestamp;
	}

	@Nonnull
	@Override
	public K getKey() {
		return key;
	}

	@Nonnull
	@Override
	public N getNamespace() {
		return namespace;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}

		if (o instanceof InternalTimer) {
			InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
			return timestamp == timer.getTimestamp()
				&& key.equals(timer.getKey())
				&& namespace.equals(timer.getNamespace());
		}

		return false;
	}

	@Override
	public int getInternalIndex() {
		return timerHeapIndex;
	}

	@Override
	public void setInternalIndex(int newIndex) {
		this.timerHeapIndex = newIndex;
	}

	void removedFromTimerQueue() {
		setInternalIndex(NOT_CONTAINED);
	}

	@Override
	public int hashCode() {
		int result = (int) (timestamp ^ (timestamp >>> 32));
		result = 31 * result + key.hashCode();
		result = 31 * result + namespace.hashCode();
		return result;
	}

	@Override
	public String toString() {
		return "Timer{" +
				"timestamp=" + timestamp +
				", key=" + key +
				", namespace=" + namespace +
				'}';
	}

	@Override
	public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
		return Long.compare(timestamp, other.getTimestamp());
	}
}
  • TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除

HeapPriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java

public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {

	@Nonnull
	private final KeyGroupRange keyGroupRange;

	@Nonnegative
	private final int totalKeyGroups;

	@Nonnegative
	private final int minimumCapacity;

	public HeapPriorityQueueSetFactory(
		@Nonnull KeyGroupRange keyGroupRange,
		@Nonnegative int totalKeyGroups,
		@Nonnegative int minimumCapacity) {

		this.keyGroupRange = keyGroupRange;
		this.totalKeyGroups = totalKeyGroups;
		this.minimumCapacity = minimumCapacity;
	}

	@Nonnull
	@Override
	public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
		@Nonnull String stateName,
		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {

		return new HeapPriorityQueueSet<>(
			PriorityComparator.forPriorityComparableObjects(),
			KeyExtractorFunction.forKeyedObjects(),
			minimumCapacity,
			keyGroupRange,
			totalKeyGroups);
	}
}
  • HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet

小结

  • InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射;getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
  • registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
  • PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口(InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口);HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 77
博文 890
码字总数 804252
作品 0
深圳
私信 提问
聊聊flink的TimerService

序 本文主要研究一下flink的TimerService TimerService flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java TimerService接口定义了currentPr......

go4it
01/17
0
0
聊聊flink TaskManager的memory大小设置

序 本文主要研究一下flink TaskManager的memory大小设置 flink-conf.yaml flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml flink-conf.yaml提供了taskmanager.heap.size......

go4it
02/19
0
0
聊聊flink的logback配置

序 本文主要研究一下flink的logback配置 client端pom文件配置 添加logback-core、logback-classic及log4j-over-slf4j依赖,之后对flink-java、flink-streaming-java2.11、flink-clients2.11...

go4it
02/14
0
0
聊聊flink JobManager的heap大小设置

序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java jobmanager.heap.size配置用......

go4it
02/18
0
0
聊聊flink的BoltWrapper

序 本文主要研究一下flink的BoltWrapper BoltWrapper flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java flink用BoltWrapper来包装storm的IRichBolt,......

go4it
2018/11/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

java框架学习日志-13(Mybatis基本概念和简单的例子)

在mybatis初次学习Mybatis的时候,遇到了很多问题,虽然阿里云的视频有教学,但是视频教学所使用的软件和我自己使用的软件不用,我自己用的数据库是oracle数据库,开发环境是idea。而且视频中...

白话
今天
2
0
Java基础:String、StringBuffer和StringBuilder的区别

1 String String:字符串常量,字符串长度不可变。Java中String是immutable(不可变)的。 String类的包含如下定义: /** The value is used for character storage. */private final cha...

watermelon11
今天
2
0
mogodb服务

部署MongoDB 官网: https://www.mongodb.com/download-center/community 创建mongo数据目录 mkdir /data/mongodb 二进制部署 wget -c https://fastdl.mongodb.org/linux/mongodb-linux-x8......

以谁为师
昨天
5
0
大神教你Debian GNU/Linux 9.7 “Stretch” Live和安装镜像开放下载

Debian项目团队于昨天发布了Debian GNU/Linux 9 "Stretch" 的第7个维护版本更新,重点修复了APT软件管理器中存在的安全漏洞。在敦促每位用户尽快升级系统的同时,Debian团队还发布了Debian ...

linux-tao
昨天
4
0
PHP 相关配置

1. php-fpm的pool 编辑php-fpm配置文件php-fpm.con vim /usr/local/php/etc/php-fpm.conf //在[global]部分增加以下内容 include = etc/php-fpm.d/*.conf # 相当与Nginx的虚拟主机文件 “vho......

Yue_Chen
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部