文档章节

聊聊flink的AsyncWaitOperator

go4it
 go4it
发布于 01/20 11:32
字数 3387
阅读 8
收藏 0

本文主要研究一下flink的AsyncWaitOperator

AsyncWaitOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

@Internal
public class AsyncWaitOperator<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
	private static final long serialVersionUID = 1L;

	private static final String STATE_NAME = "_async_wait_operator_state_";

	/** Capacity of the stream element queue. */
	private final int capacity;

	/** Output mode for this operator. */
	private final AsyncDataStream.OutputMode outputMode;

	/** Timeout for the async collectors. */
	private final long timeout;

	protected transient Object checkpointingLock;

	/** {@link TypeSerializer} for inputs while making snapshots. */
	private transient StreamElementSerializer<IN> inStreamElementSerializer;

	/** Recovered input stream elements. */
	private transient ListState<StreamElement> recoveredStreamElements;

	/** Queue to store the currently in-flight stream elements into. */
	private transient StreamElementQueue queue;

	/** Pending stream element which could not yet added to the queue. */
	private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

	private transient ExecutorService executor;

	/** Emitter for the completed stream element queue entries. */
	private transient Emitter<OUT> emitter;

	/** Thread running the emitter. */
	private transient Thread emitterThread;

	public AsyncWaitOperator(
			AsyncFunction<IN, OUT> asyncFunction,
			long timeout,
			int capacity,
			AsyncDataStream.OutputMode outputMode) {
		super(asyncFunction);
		chainingStrategy = ChainingStrategy.ALWAYS;

		Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
		this.capacity = capacity;

		this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

		this.timeout = timeout;
	}

	@Override
	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
		super.setup(containingTask, config, output);

		this.checkpointingLock = getContainingTask().getCheckpointLock();

		this.inStreamElementSerializer = new StreamElementSerializer<>(
			getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

		// create the operators executor for the complete operations of the queue entries
		this.executor = Executors.newSingleThreadExecutor();

		switch (outputMode) {
			case ORDERED:
				queue = new OrderedStreamElementQueue(
					capacity,
					executor,
					this);
				break;
			case UNORDERED:
				queue = new UnorderedStreamElementQueue(
					capacity,
					executor,
					this);
				break;
			default:
				throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
		}
	}

	@Override
	public void open() throws Exception {
		super.open();

		// create the emitter
		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);

		// start the emitter thread
		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
		emitterThread.setDaemon(true);
		emitterThread.start();

		// process stream elements from state, since the Emit thread will start as soon as all
		// elements from previous state are in the StreamElementQueue, we have to make sure that the
		// order to open all operators in the operator chain proceeds from the tail operator to the
		// head operator.
		if (recoveredStreamElements != null) {
			for (StreamElement element : recoveredStreamElements.get()) {
				if (element.isRecord()) {
					processElement(element.<IN>asRecord());
				}
				else if (element.isWatermark()) {
					processWatermark(element.asWatermark());
				}
				else if (element.isLatencyMarker()) {
					processLatencyMarker(element.asLatencyMarker());
				}
				else {
					throw new IllegalStateException("Unknown record type " + element.getClass() +
						" encountered while opening the operator.");
				}
			}
			recoveredStreamElements = null;
		}

	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);

		if (timeout > 0L) {
			// register a timeout for this AsyncStreamRecordBufferEntry
			long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

			final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
				timeoutTimestamp,
				new ProcessingTimeCallback() {
					@Override
					public void onProcessingTime(long timestamp) throws Exception {
						userFunction.timeout(element.getValue(), streamRecordBufferEntry);
					}
				});

			// Cancel the timer once we've completed the stream record buffer entry. This will remove
			// the register trigger task
			streamRecordBufferEntry.onComplete(
				(StreamElementQueueEntry<Collection<OUT>> value) -> {
					timerFuture.cancel(true);
				},
				executor);
		}

		addAsyncBufferEntry(streamRecordBufferEntry);

		userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
	}

	@Override
	public void processWatermark(Watermark mark) throws Exception {
		WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);

		addAsyncBufferEntry(watermarkBufferEntry);
	}

	@Override
	public void snapshotState(StateSnapshotContext context) throws Exception {
		super.snapshotState(context);

		ListState<StreamElement> partitionableState =
			getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
		partitionableState.clear();

		Collection<StreamElementQueueEntry<?>> values = queue.values();

		try {
			for (StreamElementQueueEntry<?> value : values) {
				partitionableState.add(value.getStreamElement());
			}

			// add the pending stream element queue entry if the stream element queue is currently full
			if (pendingStreamElementQueueEntry != null) {
				partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
			}
		} catch (Exception e) {
			partitionableState.clear();

			throw new Exception("Could not add stream element queue entries to operator state " +
				"backend of operator " + getOperatorName() + '.', e);
		}
	}

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);
		recoveredStreamElements = context
			.getOperatorStateStore()
			.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

	}

	@Override
	public void close() throws Exception {
		try {
			assert(Thread.holdsLock(checkpointingLock));

			while (!queue.isEmpty()) {
				// wait for the emitter thread to output the remaining elements
				// for that he needs the checkpointing lock and thus we have to free it
				checkpointingLock.wait();
			}
		}
		finally {
			Exception exception = null;

			try {
				super.close();
			} catch (InterruptedException interrupted) {
				exception = interrupted;

				Thread.currentThread().interrupt();
			} catch (Exception e) {
				exception = e;
			}

			try {
				// terminate the emitter, the emitter thread and the executor
				stopResources(true);
			} catch (InterruptedException interrupted) {
				exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

				Thread.currentThread().interrupt();
			} catch (Exception e) {
				exception = ExceptionUtils.firstOrSuppressed(e, exception);
			}

			if (exception != null) {
				LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
			}
		}
	}

	@Override
	public void dispose() throws Exception {
		Exception exception = null;

		try {
			super.dispose();
		} catch (InterruptedException interrupted) {
			exception = interrupted;

			Thread.currentThread().interrupt();
		} catch (Exception e) {
			exception = e;
		}

		try {
			stopResources(false);
		} catch (InterruptedException interrupted) {
			exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

			Thread.currentThread().interrupt();
		} catch (Exception e) {
			exception = ExceptionUtils.firstOrSuppressed(e, exception);
		}

		if (exception != null) {
			throw exception;
		}
	}

	private void stopResources(boolean waitForShutdown) throws InterruptedException {
		emitter.stop();
		emitterThread.interrupt();

		executor.shutdown();

		if (waitForShutdown) {
			try {
				if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
					executor.shutdownNow();
				}
			} catch (InterruptedException e) {
				executor.shutdownNow();

				Thread.currentThread().interrupt();
			}

			/*
			 * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
			 * that the emitter thread can complete/react to the interrupt signal.
			 */
			if (Thread.holdsLock(checkpointingLock)) {
				while (emitterThread.isAlive()) {
					checkpointingLock.wait(100L);
				}
			}

			emitterThread.join();
		} else {
			executor.shutdownNow();
		}
	}

	private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		assert(Thread.holdsLock(checkpointingLock));

		pendingStreamElementQueueEntry = streamElementQueueEntry;

		while (!queue.tryPut(streamElementQueueEntry)) {
			// we wait for the emitter to notify us if the queue has space left again
			checkpointingLock.wait();
		}

		pendingStreamElementQueueEntry = null;
	}

	@Override
	public void failOperator(Throwable throwable) {
		getContainingTask().getEnvironment().failExternally(throwable);
	}
}
  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法
  • setup方法使用Executors.newSingleThreadExecutor()创建了ExecutorService,之后根据不同的outputMode创建不同的StreamElementQueue(OrderedStreamElementQueue或者UnorderedStreamElementQueue);open方法使用Emitter创建并启动AsyncIO-Emitter-Thread,另外就是处理recoveredStreamElements,根据不同的类型分别调用processElement、processWatermark、processLatencyMarker方法
  • processElement方法首先根据timeout注册一个timer,在ProcessingTimeCallback的onProcessingTime方法里头执行userFunction.timeout,之后将StreamRecordQueueEntry添加到StreamElementQueue中,最后触发userFunction.asyncInvoke;close和dispose方法会调用stopResources方法来关闭资源,不同的是waitForShutdown参数传值不同,close方法传true,而dispose方法传false

Emitter

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java

@Internal
public class Emitter<OUT> implements Runnable {

	private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

	/** Lock to hold before outputting. */
	private final Object checkpointLock;

	/** Output for the watermark elements. */
	private final Output<StreamRecord<OUT>> output;

	/** Queue to consume the async results from. */
	private final StreamElementQueue streamElementQueue;

	private final OperatorActions operatorActions;

	/** Output for stream records. */
	private final TimestampedCollector<OUT> timestampedCollector;

	private volatile boolean running;

	public Emitter(
			final Object checkpointLock,
			final Output<StreamRecord<OUT>> output,
			final StreamElementQueue streamElementQueue,
			final OperatorActions operatorActions) {

		this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
		this.output = Preconditions.checkNotNull(output, "output");
		this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

		this.timestampedCollector = new TimestampedCollector<>(this.output);
		this.running = true;
	}

	@Override
	public void run() {
		try {
			while (running) {
				LOG.debug("Wait for next completed async stream element result.");
				AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

				output(streamElementEntry);
			}
		} catch (InterruptedException e) {
			if (running) {
				operatorActions.failOperator(e);
			} else {
				// Thread got interrupted which means that it should shut down
				LOG.debug("Emitter thread got interrupted, shutting down.");
			}
		} catch (Throwable t) {
			operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
				"unexpected throwable.", t));
		}
	}

	private void output(AsyncResult asyncResult) throws InterruptedException {
		if (asyncResult.isWatermark()) {
			synchronized (checkpointLock) {
				AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

				LOG.debug("Output async watermark.");
				output.emitWatermark(asyncWatermarkResult.getWatermark());

				// remove the peeked element from the async collector buffer so that it is no longer
				// checkpointed
				streamElementQueue.poll();

				// notify the main thread that there is again space left in the async collector
				// buffer
				checkpointLock.notifyAll();
			}
		} else {
			AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();

			if (streamRecordResult.hasTimestamp()) {
				timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
			} else {
				timestampedCollector.eraseTimestamp();
			}

			synchronized (checkpointLock) {
				LOG.debug("Output async stream element collection result.");

				try {
					Collection<OUT> resultCollection = streamRecordResult.get();

					if (resultCollection != null) {
						for (OUT result : resultCollection) {
							timestampedCollector.collect(result);
						}
					}
				} catch (Exception e) {
					operatorActions.failOperator(
						new Exception("An async function call terminated with an exception. " +
							"Failing the AsyncWaitOperator.", e));
				}

				// remove the peeked element from the async collector buffer so that it is no longer
				// checkpointed
				streamElementQueue.poll();

				// notify the main thread that there is again space left in the async collector
				// buffer
				checkpointLock.notifyAll();
			}
		}
	}

	public void stop() {
		running = false;
	}
}
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
  • Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素

StreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java

@Internal
public interface StreamElementQueue {

	<T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

	<T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

	AsyncResult peekBlockingly() throws InterruptedException;

	AsyncResult poll() throws InterruptedException;

	Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;

	boolean isEmpty();

	int size();
}
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry

UnorderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java

@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {

	private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

	/** Capacity of this queue. */
	private final int capacity;

	/** Executor to run the onComplete callbacks. */
	private final Executor executor;

	/** OperatorActions to signal the owning operator a failure. */
	private final OperatorActions operatorActions;

	/** Queue of uncompleted stream element queue entries segmented by watermarks. */
	private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

	/** Queue of completed stream element queue entries. */
	private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

	/** First (chronologically oldest) uncompleted set of stream element queue entries. */
	private Set<StreamElementQueueEntry<?>> firstSet;

	// Last (chronologically youngest) uncompleted set of stream element queue entries. New
	// stream element queue entries are inserted into this set.
	private Set<StreamElementQueueEntry<?>> lastSet;
	private volatile int numberEntries;

	/** Locks and conditions for the blocking queue. */
	private final ReentrantLock lock;
	private final Condition notFull;
	private final Condition hasCompletedEntries;

	public UnorderedStreamElementQueue(
			int capacity,
			Executor executor,
			OperatorActions operatorActions) {

		Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
		this.capacity = capacity;

		this.executor = Preconditions.checkNotNull(executor, "executor");

		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

		this.uncompletedQueue = new ArrayDeque<>(capacity);
		this.completedQueue = new ArrayDeque<>(capacity);

		this.firstSet = new HashSet<>(capacity);
		this.lastSet = firstSet;

		this.numberEntries = 0;

		this.lock = new ReentrantLock();
		this.notFull = lock.newCondition();
		this.hasCompletedEntries = lock.newCondition();
	}

	@Override
	public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (numberEntries >= capacity) {
				notFull.await();
			}

			addEntry(streamElementQueueEntry);
		} finally {
			lock.unlock();
		}
	}

	@Override
	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			if (numberEntries < capacity) {
				addEntry(streamElementQueueEntry);

				LOG.debug("Put element into unordered stream element queue. New filling degree " +
					"({}/{}).", numberEntries, capacity);

				return true;
			} else {
				LOG.debug("Failed to put element into unordered stream element queue because it " +
					"was full ({}/{}).", numberEntries, capacity);

				return false;
			}
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult peekBlockingly() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (completedQueue.isEmpty()) {
				hasCompletedEntries.await();
			}

			LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
				"({}/{}).", numberEntries, capacity);

			return completedQueue.peek();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult poll() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (completedQueue.isEmpty()) {
				hasCompletedEntries.await();
			}

			numberEntries--;
			notFull.signalAll();

			LOG.debug("Polled element from unordered stream element queue. New filling degree " +
				"({}/{}).", numberEntries, capacity);

			return completedQueue.poll();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];

			array = completedQueue.toArray(array);

			int counter = completedQueue.size();

			for (StreamElementQueueEntry<?> entry: firstSet) {
				array[counter] = entry;
				counter++;
			}

			for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {

				for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
					array[counter] = streamElementQueueEntry;
					counter++;
				}
			}

			return Arrays.asList(array);
		} finally {
			lock.unlock();
		}
	}

	@Override
	public boolean isEmpty() {
		return numberEntries == 0;
	}

	@Override
	public int size() {
		return numberEntries;
	}

	public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			if (firstSet.remove(streamElementQueueEntry)) {
				completedQueue.offer(streamElementQueueEntry);

				while (firstSet.isEmpty() && firstSet != lastSet) {
					firstSet = uncompletedQueue.poll();

					Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();

					while (it.hasNext()) {
						StreamElementQueueEntry<?> bufferEntry = it.next();

						if (bufferEntry.isDone()) {
							completedQueue.offer(bufferEntry);
							it.remove();
						}
					}
				}

				LOG.debug("Signal unordered stream element queue has completed entries.");
				hasCompletedEntries.signalAll();
			}
		} finally {
			lock.unlock();
		}
	}

	private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
		assert(lock.isHeldByCurrentThread());

		if (streamElementQueueEntry.isWatermark()) {
			lastSet = new HashSet<>(capacity);

			if (firstSet.isEmpty()) {
				firstSet.add(streamElementQueueEntry);
			} else {
				Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
				watermarkSet.add(streamElementQueueEntry);
				uncompletedQueue.offer(watermarkSet);
			}
			uncompletedQueue.offer(lastSet);
		} else {
			lastSet.add(streamElementQueueEntry);
		}

		streamElementQueueEntry.onComplete(
			(StreamElementQueueEntry<T> value) -> {
				try {
					onCompleteHandler(value);
				} catch (InterruptedException e) {
					// The accept executor thread got interrupted. This is probably cause by
					// the shutdown of the executor.
					LOG.debug("AsyncBufferEntry could not be properly completed because the " +
						"executor thread has been interrupted.", e);
				} catch (Throwable t) {
					operatorActions.failOperator(new Exception("Could not complete the " +
						"stream element queue entry: " + value + '.', t));
				}
			},
			executor);

		numberEntries++;
	}
}
  • UnorderedStreamElementQueue实现了StreamElementQueue接口,它emit结果的顺序是无序的,其内部使用了两个ArrayDeque,一个是uncompletedQueue,一个是completedQueue
  • peekBlockingly方法首先判断completedQueue是否有元素,没有的话则执行hasCompletedEntries.await(),有则执行completedQueue.peek();put及tryPut都会调用addEntry方法,该方法会往uncompletedQueue队列新增元素,然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
  • onCompleteHandler方法会将执行完成的streamElementQueueEntry从uncompletedQueue移除,然后添加到completedQueue

OrderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java

@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {

	private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

	/** Capacity of this queue. */
	private final int capacity;

	/** Executor to run the onCompletion callback. */
	private final Executor executor;

	/** Operator actions to signal a failure to the operator. */
	private final OperatorActions operatorActions;

	/** Lock and conditions for the blocking queue. */
	private final ReentrantLock lock;
	private final Condition notFull;
	private final Condition headIsCompleted;

	/** Queue for the inserted StreamElementQueueEntries. */
	private final ArrayDeque<StreamElementQueueEntry<?>> queue;

	public OrderedStreamElementQueue(
			int capacity,
			Executor executor,
			OperatorActions operatorActions) {

		Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
		this.capacity = capacity;

		this.executor = Preconditions.checkNotNull(executor, "executor");

		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

		this.lock = new ReentrantLock(false);
		this.headIsCompleted = lock.newCondition();
		this.notFull = lock.newCondition();

		this.queue = new ArrayDeque<>(capacity);
	}

	@Override
	public AsyncResult peekBlockingly() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (queue.isEmpty() || !queue.peek().isDone()) {
				headIsCompleted.await();
			}

			LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
				"({}/{}).", queue.size(), capacity);

			return queue.peek();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult poll() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (queue.isEmpty() || !queue.peek().isDone()) {
				headIsCompleted.await();
			}

			notFull.signalAll();

			LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
				"({}/{}).", queue.size() - 1, capacity);

			return queue.poll();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];

			array = queue.toArray(array);

			return Arrays.asList(array);
		} finally {
			lock.unlock();
		}
	}

	@Override
	public boolean isEmpty() {
		return queue.isEmpty();
	}

	@Override
	public int size() {
		return queue.size();
	}

	@Override
	public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (queue.size() >= capacity) {
				notFull.await();
			}

			addEntry(streamElementQueueEntry);
		} finally {
			lock.unlock();
		}
	}

	@Override
	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			if (queue.size() < capacity) {
				addEntry(streamElementQueueEntry);

				LOG.debug("Put element into ordered stream element queue. New filling degree " +
					"({}/{}).", queue.size(), capacity);

				return true;
			} else {
				LOG.debug("Failed to put element into ordered stream element queue because it " +
					"was full ({}/{}).", queue.size(), capacity);

				return false;
			}
		} finally {
			lock.unlock();
		}
	}

	private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
		assert(lock.isHeldByCurrentThread());

		queue.addLast(streamElementQueueEntry);

		streamElementQueueEntry.onComplete(
			(StreamElementQueueEntry<T> value) -> {
				try {
					onCompleteHandler(value);
				} catch (InterruptedException e) {
					// we got interrupted. This indicates a shutdown of the executor
					LOG.debug("AsyncBufferEntry could not be properly completed because the " +
						"executor thread has been interrupted.", e);
				} catch (Throwable t) {
					operatorActions.failOperator(new Exception("Could not complete the " +
						"stream element queue entry: " + value + '.', t));
				}
			},
			executor);
	}

	private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			if (!queue.isEmpty() && queue.peek().isDone()) {
				LOG.debug("Signal ordered stream element queue has completed head element.");
				headIsCompleted.signalAll();
			}
		} finally {
			lock.unlock();
		}
	}
}
  • OrderedStreamElementQueue实现了StreamElementQueue接口,它有序地emit结果,它内部有一个ArrayDeque类型的queue
  • peekBlockingly方法首先判断queue是否有元素而且是执行完成的,没有就执行headIsCompleted.await(),有则执行queue.peek();put及tryPut都会调用addEntry方法,该方法会执行queue.addLast(streamElementQueueEntry),然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
  • onCompleteHandler方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行headIsCompleted.signalAll()

AsyncResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java

@Internal
public interface AsyncResult {

	boolean isWatermark();

	boolean isResultCollection();

	AsyncWatermarkResult asWatermark();

	<T> AsyncCollectionResult<T> asResultCollection();
}
  • AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果

StreamElementQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java

@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {

	private final StreamElement streamElement;

	public StreamElementQueueEntry(StreamElement streamElement) {
		this.streamElement = Preconditions.checkNotNull(streamElement);
	}

	public StreamElement getStreamElement() {
		return streamElement;
	}

	public boolean isDone() {
		return getFuture().isDone();
	}

	public void onComplete(
			final Consumer<StreamElementQueueEntry<T>> completeFunction,
			Executor executor) {
		final StreamElementQueueEntry<T> thisReference = this;

		getFuture().whenCompleteAsync(
			// call the complete function for normal completion as well as exceptional completion
			// see FLINK-6435
			(value, throwable) -> completeFunction.accept(thisReference),
			executor);
	}

	protected abstract CompletableFuture<T> getFuture();

	@Override
	public final boolean isWatermark() {
		return AsyncWatermarkResult.class.isAssignableFrom(getClass());
	}

	@Override
	public final boolean isResultCollection() {
		return AsyncCollectionResult.class.isAssignableFrom(getClass());
	}

	@Override
	public final AsyncWatermarkResult asWatermark() {
		return (AsyncWatermarkResult) this;
	}

	@Override
	public final <T> AsyncCollectionResult<T> asResultCollection() {
		return (AsyncCollectionResult<T>) this;
	}
}
  • StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

WatermarkQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java

@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {

	private final CompletableFuture<Watermark> future;

	public WatermarkQueueEntry(Watermark watermark) {
		super(watermark);

		this.future = CompletableFuture.completedFuture(watermark);
	}

	@Override
	public Watermark getWatermark() {
		return (Watermark) getStreamElement();
	}

	@Override
	protected CompletableFuture<Watermark> getFuture() {
		return future;
	}
}
  • WatermarkQueueEntry继承了StreamElementQueueEntry,其元素类型为Watermark,同时实现了AsyncWatermarkResult接口

StreamRecordQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java

@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
	implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {

	/** Timestamp information. */
	private final boolean hasTimestamp;
	private final long timestamp;

	/** Future containing the collection result. */
	private final CompletableFuture<Collection<OUT>> resultFuture;

	public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
		super(streamRecord);

		hasTimestamp = streamRecord.hasTimestamp();
		timestamp = streamRecord.getTimestamp();

		resultFuture = new CompletableFuture<>();
	}

	@Override
	public boolean hasTimestamp() {
		return hasTimestamp;
	}

	@Override
	public long getTimestamp() {
		return timestamp;
	}

	@Override
	public Collection<OUT> get() throws Exception {
		return resultFuture.get();
	}

	@Override
	protected CompletableFuture<Collection<OUT>> getFuture() {
		return resultFuture;
	}

	@Override
	public void complete(Collection<OUT> result) {
		resultFuture.complete(result);
	}

	@Override
	public void completeExceptionally(Throwable error) {
		resultFuture.completeExceptionally(error);
	}
}
  • StreamRecordQueueEntry继承了StreamElementQueueEntry,同时实现了AsyncCollectionResult、ResultFuture接口

小结

  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 78
博文 891
码字总数 806039
作品 0
深圳
私信 提问
Apache Flink 1.4.1 发布,通用数据处理平台

Apache Flink 1.4.1 发布了。ApacheFlink 是一个开源的流处理框架,应用于分布式,高性能,始终可用的,准确的数据流应用程序。 此版本更新内容: Sub-task [FLINK-6321] - RocksDB state ba...

周其
2018/02/16
651
2
聊聊flink的Async I/O

序 本文主要研究一下flink的Async I/O 实例 本实例展示了flink Async I/O的基本用法,首先是实现AsyncFunction接口,用于编写异步请求逻辑及将结果或异常设置到resultFuture,然后就是使用A...

go4it
01/19
0
0
聊聊flink TaskManager的memory大小设置

序 本文主要研究一下flink TaskManager的memory大小设置 flink-conf.yaml flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml flink-conf.yaml提供了taskmanager.heap.size......

go4it
02/19
0
0
聊聊flink的logback配置

序 本文主要研究一下flink的logback配置 client端pom文件配置 添加logback-core、logback-classic及log4j-over-slf4j依赖,之后对flink-java、flink-streaming-java2.11、flink-clients2.11...

go4it
02/14
0
0
聊聊flink JobManager的heap大小设置

序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java jobmanager.heap.size配置用......

go4it
02/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

网站漏洞检测之WordPress 5.0.0 修复方案

2019年正月刚开始,WordPress最新版本存在远程代码注入获取SHELL漏洞,该网站漏洞影响的版本是wordpress5.0.0,漏洞的产生是因为image模块导致的,因为代码里可以进行获取目录权限,以及文件...

网站安全
38分钟前
0
0
MySql 优化 group by 语句

默认情况下,Mysql 对所有 group by 的字段进行排序,如果查询包括 group by ,用户想要避免排序结果的消耗。可以指定 order by null 禁止排序。 mysql> EXPLAIN select * from sys_log gro...

嘴角轻扬30
今天
9
0
Linux分区&格式化&文件系统&LVM&扩容

硬件 磁盘由 盘片组、主轴马达、机械臂、磁头、驱动芯片和电路、接口等构成 2. 磁盘的分割 每个盘片很多同心圆分割为磁道 Trace 一组盘片的同径磁道叫做一个柱面 Cylinder 每个磁道又被分为很...

可数局部基
今天
5
0
刷leetcode第705题- 设计哈希集合

这个我可能做的不是很符合题意,虽然AC了,但是没有去用到hash函数之类的方式。同样使用了位运算来搞定这一切,简单易懂。上代码如下: typedef char MyHashSet;/** Initialize your data ...

锟斤拷烫烫烫
今天
4
0
【spring】- springmvc 工作原理

核心:前端控制器:DispatcherServlet 功能:MVC设计模式中的Controller角色,掌控全局 类图 原理 本质是将DispatcherServlet及关联的Spring上下文环境的初始化工作织入Servlet的生命周期内,...

ZeroneLove
今天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部