文档章节

聊聊flink的Async I/O

go4it
 go4it
发布于 01/19 11:40
字数 2203
阅读 6
收藏 0

本文主要研究一下flink的Async I/O

实例

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
  • 本实例展示了flink Async I/O的基本用法,首先是实现AsyncFunction接口,用于编写异步请求逻辑及将结果或异常设置到resultFuture,然后就是使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation;AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行

AsyncFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java

/**
 * A function to trigger Async I/O operation.
 *
 * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
 * the result can be collected by calling {@link ResultFuture#complete}. For each async
 * operation, its context is stored in the operator immediately after invoking
 * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
 *
 * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
 * An error can also be propagate to the async IO operator by
 * {@link ResultFuture#completeExceptionally(Throwable)}.
 *
 * <p>Callback example usage:
 *
 * <pre>{@code
 * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
 *
 *   public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
 *     HBaseCallback cb = new HBaseCallback(result);
 *     Get get = new Get(Bytes.toBytes(row));
 *     hbase.asyncGet(get, cb);
 *   }
 * }
 * }</pre>
 *
 * <p>Future example usage:
 *
 * <pre>{@code
 * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
 *
 *   public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
 *     Get get = new Get(Bytes.toBytes(row));
 *     ListenableFuture<Result> future = hbase.asyncGet(get);
 *     Futures.addCallback(future, new FutureCallback<Result>() {
 *       public void onSuccess(Result result) {
 *         List<String> ret = process(result);
 *         result.complete(ret);
 *       }
 *       public void onFailure(Throwable thrown) {
 *         result.completeExceptionally(thrown);
 *       }
 *     });
 *   }
 * }
 * }</pre>
 *
 * @param <IN> The type of the input elements.
 * @param <OUT> The type of the returned elements.
 */
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

	/**
	 * Trigger async operation for each stream input.
	 *
	 * @param input element coming from an upstream task
	 * @param resultFuture to be completed with the result data
	 * @exception Exception in case of a user code error. An exception will make the task fail and
	 * trigger fail-over process.
	 */
	void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

	/**
	 * {@link AsyncFunction#asyncInvoke} timeout occurred.
	 * By default, the result future is exceptionally completed with a timeout exception.
	 *
	 * @param input element coming from an upstream task
	 * @param resultFuture to be completed with the result data
	 */
	default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
		resultFuture.completeExceptionally(
			new TimeoutException("Async function call has timed out."));
	}

}
  • AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture

RichAsyncFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

@PublicEvolving
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {

	private static final long serialVersionUID = 3858030061138121840L;

	@Override
	public void setRuntimeContext(RuntimeContext runtimeContext) {
		Preconditions.checkNotNull(runtimeContext);

		if (runtimeContext instanceof IterationRuntimeContext) {
			super.setRuntimeContext(
				new RichAsyncFunctionIterationRuntimeContext(
					(IterationRuntimeContext) runtimeContext));
		} else {
			super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
		}
	}

	@Override
	public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

	//......
}
  • RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装

RichAsyncFunctionRuntimeContext

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

	/**
	 * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
	 * context only supports basic operations which are thread safe. Consequently, state access,
	 * accumulators, broadcast variables and the distributed cache are disabled.
	 */
	private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
		private final RuntimeContext runtimeContext;

		RichAsyncFunctionRuntimeContext(RuntimeContext context) {
			runtimeContext = Preconditions.checkNotNull(context);
		}

		@Override
		public String getTaskName() {
			return runtimeContext.getTaskName();
		}

		@Override
		public MetricGroup getMetricGroup() {
			return runtimeContext.getMetricGroup();
		}

		@Override
		public int getNumberOfParallelSubtasks() {
			return runtimeContext.getNumberOfParallelSubtasks();
		}

		@Override
		public int getMaxNumberOfParallelSubtasks() {
			return runtimeContext.getMaxNumberOfParallelSubtasks();
		}

		@Override
		public int getIndexOfThisSubtask() {
			return runtimeContext.getIndexOfThisSubtask();
		}

		@Override
		public int getAttemptNumber() {
			return runtimeContext.getAttemptNumber();
		}

		@Override
		public String getTaskNameWithSubtasks() {
			return runtimeContext.getTaskNameWithSubtasks();
		}

		@Override
		public ExecutionConfig getExecutionConfig() {
			return runtimeContext.getExecutionConfig();
		}

		@Override
		public ClassLoader getUserCodeClassLoader() {
			return runtimeContext.getUserCodeClassLoader();
		}

		// -----------------------------------------------------------------------------------
		// Unsupported operations
		// -----------------------------------------------------------------------------------

		@Override
		public DistributedCache getDistributedCache() {
			throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
		}

		@Override
		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
			throw new UnsupportedOperationException("State is not supported in rich async functions.");
		}

		@Override
		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
			throw new UnsupportedOperationException("State is not supported in rich async functions.");
		}

		@Override
		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
			throw new UnsupportedOperationException("State is not supported in rich async functions.");
		}

		@Override
		public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
			throw new UnsupportedOperationException("State is not supported in rich async functions.");
		}

		@Override
		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
			throw new UnsupportedOperationException("State is not supported in rich async functions.");
		}

		@Override
		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
			throw new UnsupportedOperationException("State is not supported in rich async functions.");
		}

		@Override
		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
		}

		@Override
		public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
		}

		@Override
		public Map<String, Accumulator<?, ?>> getAllAccumulators() {
			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
		}

		@Override
		public IntCounter getIntCounter(String name) {
			throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
		}

		@Override
		public LongCounter getLongCounter(String name) {
			throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
		}

		@Override
		public DoubleCounter getDoubleCounter(String name) {
			throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
		}

		@Override
		public Histogram getHistogram(String name) {
			throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
		}

		@Override
		public boolean hasBroadcastVariable(String name) {
			throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
		}

		@Override
		public <RT> List<RT> getBroadcastVariable(String name) {
			throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
		}

		@Override
		public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
			throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
		}
	}
  • RichAsyncFunctionRuntimeContext实现了RuntimeContext接口,它将一些方法代理给RuntimeContext,其余的Unsupported的方法都覆盖抛出UnsupportedOperationException

RichAsyncFunctionIterationRuntimeContext

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

	private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {

		private final IterationRuntimeContext iterationRuntimeContext;

		RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
			super(iterationRuntimeContext);

			this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
		}

		@Override
		public int getSuperstepNumber() {
			return iterationRuntimeContext.getSuperstepNumber();
		}

		// -----------------------------------------------------------------------------------
		// Unsupported operations
		// -----------------------------------------------------------------------------------

		@Override
		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
			throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
		}

		@Override
		public <T extends Value> T getPreviousIterationAggregate(String name) {
			throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
		}
	}
  • RichAsyncFunctionIterationRuntimeContext继承了RichAsyncFunctionRuntimeContext,实现了IterationRuntimeContext接口,它将getSuperstepNumber方法交由IterationRuntimeContext处理,然后覆盖getIterationAggregator、getPreviousIterationAggregate方法抛出UnsupportedOperationException

AsyncDataStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/AsyncDataStream.java

@PublicEvolving
public class AsyncDataStream {

	/**
	 * Output mode for asynchronous operations.
	 */
	public enum OutputMode { ORDERED, UNORDERED }

	private static final int DEFAULT_QUEUE_CAPACITY = 100;

	private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
			DataStream<IN> in,
			AsyncFunction<IN, OUT> func,
			long timeout,
			int bufSize,
			OutputMode mode) {

		TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
			func,
			AsyncFunction.class,
			0,
			1,
			new int[]{1, 0},
			in.getType(),
			Utils.getCallLocationName(),
			true);

		// create transform
		AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
			in.getExecutionEnvironment().clean(func),
			timeout,
			bufSize,
			mode);

		return in.transform("async wait operator", outTypeInfo, operator);
	}

	public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
			DataStream<IN> in,
			AsyncFunction<IN, OUT> func,
			long timeout,
			TimeUnit timeUnit,
			int capacity) {
		return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
	}

	public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
			DataStream<IN> in,
			AsyncFunction<IN, OUT> func,
			long timeout,
			TimeUnit timeUnit) {
		return addOperator(
			in,
			func,
			timeUnit.toMillis(timeout),
			DEFAULT_QUEUE_CAPACITY,
			OutputMode.UNORDERED);
	}

	public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
			DataStream<IN> in,
			AsyncFunction<IN, OUT> func,
			long timeout,
			TimeUnit timeUnit,
			int capacity) {
		return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
	}

	public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
			DataStream<IN> in,
			AsyncFunction<IN, OUT> func,
			long timeout,
			TimeUnit timeUnit) {
		return addOperator(
			in,
			func,
			timeUnit.toMillis(timeout),
			DEFAULT_QUEUE_CAPACITY,
			OutputMode.ORDERED);
	}
}
  • AsyncDataStream提供了unorderedWait、orderedWait两类方法来将AsyncFunction作用于DataStream
  • unorderedWait、orderedWait方法有带capacity参数的也有不带capacity参数的,不带capacity参数即默认使用DEFAULT_QUEUE_CAPACITY,即100;这些方法最后都是调用addOperator私有方法来实现,它使用的是AsyncWaitOperator;unorderedWait、orderedWait方法都带了timeout参数,用于指定等待async操作完成的超时时间
  • AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载

小结

  • flink给外部数据访问提供了Asynchronous I/O的API,用于提升streaming的吞吐量,其基本使用就是定义一个实现AsyncFunction接口的function,然后使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation
  • AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装
  • AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 77
博文 885
码字总数 793059
作品 0
深圳
私信 提问
聊聊flink Table的OrderBy及Limit

序 本文主要研究一下flink Table的OrderBy及Limit 实例 orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch Table flink-table_2.11-1.7.0-source...

go4it
01/31
0
0
Apache Flink 1.4.2 发布,通用数据处理平台

Apache Flink 1.4 系列的第二个 bug 修复版本已发布。此版本包含 10 多个修补程序和对 Flink 1.4.1 的小改进。官方强烈建议所有用户升级到 Flink 1.4.2。 下面的内容包含了所有修补程序的详细...

局长
2018/03/09
1K
1
Apache Flink 1.4.1 发布,通用数据处理平台

Apache Flink 1.4.1 发布了。ApacheFlink 是一个开源的流处理框架,应用于分布式,高性能,始终可用的,准确的数据流应用程序。 此版本更新内容: Sub-task [FLINK-6321] - RocksDB state ba...

周其
2018/02/16
651
2
手把手 | 一文读懂Apache Flink技术

本文目录 一、Flink介绍 1.1 Flink基石 1.2 Flink API 1.3 Flink的用途 1.4 Flink Title的变化 二、Flink过去与现在 2.1 Flink High-Level API的历史变迁 2.2 Flink API的历史变迁 2.3 Flin...

技术小能手
2018/10/29
0
0
聊聊flink的Managed Keyed State

序 本文主要研究一下flink的Managed Keyed State State flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java State是所有不同类型的State必须实现的接口,它定义了...

go4it
2018/12/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

富兰克林的人生信条

春节假期期间读了富兰克林自传,这位饱经风霜的老人出身贫寒,只读过两年书,但是通过刻苦自学和不懈奋斗还是取得了令人难以置信的成就,他的一生可以作为我们普通人的励志典范。 富兰克林 ...

春哥大魔王的博客
今天
1
0
不用中间变量交换 a ,b(三种方法)

1、加减法:该方法可以交换整型和浮点型数值的变量,但在处理浮点型的时候有可能出现精度的损失。 a = a + b; b = a - b; a = a - b; 2、异或法:可以完成对整型变量的交换,对于浮点型变量它...

robslove
今天
5
0
一文了解 OutOfMemory 及解决方案

1. Java 堆空间 发生频率 5颗星 造成原因 无法在 Java 堆中分配对象 吞吐量增加 应用程序无意中保存了对象引用,对象无法被 GC 回收 应用程序过度使用 finalizer。finalizer 对象不能被 GC 立...

java菜分享
今天
5
0
高效遍历Java容器

通过本文,你可以更深入的学习 Java 语言中 forEach 语法的知识,以及它和 C 语言形式的 for 循环、 Steam API 的对比。 简介 Java 程序员经常使用容器,比如 ArrayList 和 HashSet。Java 8 ...

微笑向暖wx
今天
4
0
SpringBoot整合Swagger测试api构建

什么是Swagger? Swagger是什么:THE WORLD’S MOST POPULAR API TOOLING 根据官网的介绍: Swagger Inspector:测试API和生成OpenAPI的开发工具。Swagger Inspector的建立是为了解决开发者的...

编程SHA
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部