聊聊flink的PrintSinkFunction

原创
2018/12/01 11:00
阅读数 1.7K

本文主要研究一下flink的PrintSinkFunction

DataStream.print

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

	/**
	 * Writes a DataStream to the standard output stream (stdout).
	 *
	 * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
	 *
	 * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
	 * worker.
	 *
	 * @return The closed DataStream.
	 */
	@PublicEvolving
	public DataStreamSink<T> print() {
		PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
		return addSink(printFunction).name("Print to Std. Out");
	}

	/**
	 * Writes a DataStream to the standard output stream (stderr).
	 *
	 * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
	 *
	 * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
	 * worker.
	 *
	 * @return The closed DataStream.
	 */
	@PublicEvolving
	public DataStreamSink<T> printToErr() {
		PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true);
		return addSink(printFunction).name("Print to Std. Err");
	}

	/**
	 * Writes a DataStream to the standard output stream (stdout).
	 *
	 * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
	 *
	 * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
	 * worker.
	 *
	 * @param sinkIdentifier The string to prefix the output with.
	 * @return The closed DataStream.
	 */
	@PublicEvolving
	public DataStreamSink<T> print(String sinkIdentifier) {
		PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
		return addSink(printFunction).name("Print to Std. Out");
	}

	/**
	 * Writes a DataStream to the standard output stream (stderr).
	 *
	 * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
	 *
	 * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
	 * worker.
	 *
	 * @param sinkIdentifier The string to prefix the output with.
	 * @return The closed DataStream.
	 */
	@PublicEvolving
	public DataStreamSink<T> printToErr(String sinkIdentifier) {
		PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true);
		return addSink(printFunction).name("Print to Std. Err");
	}

	/**
	 * Adds the given sink to this DataStream. Only streams with sinks added
	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
	 * method is called.
	 *
	 * @param sinkFunction
	 *            The object containing the sink's invoke function.
	 * @return The closed DataStream.
	 */
	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();

		// configure the type if needed
		if (sinkFunction instanceof InputTypeConfigurable) {
			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
		}

		StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

		DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

		getExecutionEnvironment().addOperator(sink.getTransformation());
		return sink;
	}
  • DataStream提供了几个print开头的方法,内部是创建了PrintSinkFunction,通过调用addSink操作把该PrintSinkFunction添加进去
  • addSink方法的注释表明带有sinks的streams,会在StreamExecutionEnvironment.execute()调用的时候被执行
  • SinkFunction先是被StreamSink包装,然后被DataStreamSink包装,最后通过DataStreamSink.getTransformation作为operator添加到ExecutionEnvironment

SinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java

/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

	/**
	 * @deprecated Use {@link #invoke(Object, Context)}.
	 */
	@Deprecated
	default void invoke(IN value) throws Exception {}

	/**
	 * Writes the given value to the sink. This function is called for every record.
	 *
	 * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
	 * {@code default} method for backward compatibility with the old-style method only.
	 *
	 * @param value The input record.
	 * @param context Additional context about the input record.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	default void invoke(IN value, Context context) throws Exception {
		invoke(value);
	}

	/**
	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
	 * an input record.
	 *
	 * <p>The context is only valid for the duration of a
	 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
	 * afterwards!
	 *
	 * @param <T> The type of elements accepted by the sink.
	 */
	@Public // Interface might be extended in the future with additional methods.
	interface Context<T> {

		/** Returns the current processing time. */
		long currentProcessingTime();

		/** Returns the current event-time watermark. */
		long currentWatermark();

		/**
		 * Returns the timestamp of the current input record or {@code null} if the element does not
		 * have an assigned timestamp.
		 */
		Long timestamp();
	}
}
  • SinkFunction接口定义了invoke方法,用来触发sink逻辑;invoke方法里头传递了一个Context,该接口定义了currentProcessingTime、currentWatermark、timestamp三个方法

RichSinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java

/**
 * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}.
 */
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {

	private static final long serialVersionUID = 1L;
}
  • RichSinkFunction抽象类继承了AbstractRichFunction类,同时也声明实现SinkFunction接口;大部分内置的sink function都继承了RichSinkFunction;AbstractRichFunction主要是提供了RuntimeContext属性,可以用来获取function运行时的上下文

PrintSinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java

/**
 * Implementation of the SinkFunction writing every tuple to the standard
 * output or standard error stream.
 *
 * <p>
 * Four possible format options:
 *	{@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} provided, parallelism > 1
 *	{@code sinkIdentifier}> output         <- {@code sinkIdentifier} provided, parallelism == 1
 *  taskId> output         				   <- no {@code sinkIdentifier} provided, parallelism > 1
 *  output                 				   <- no {@code sinkIdentifier} provided, parallelism == 1
 * </p>
 *
 * @param <IN> Input record type
 */
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {

	private static final long serialVersionUID = 1L;

	private final PrintSinkOutputWriter<IN> writer;

	/**
	 * Instantiates a print sink function that prints to standard out.
	 */
	public PrintSinkFunction() {
		writer = new PrintSinkOutputWriter<>(false);
	}

	/**
	 * Instantiates a print sink function that prints to standard out.
	 *
	 * @param stdErr True, if the format should print to standard error instead of standard out.
	 */
	public PrintSinkFunction(final boolean stdErr) {
		writer = new PrintSinkOutputWriter<>(stdErr);
	}

	/**
	 * Instantiates a print sink function that prints to standard out and gives a sink identifier.
	 *
	 * @param stdErr True, if the format should print to standard error instead of standard out.
	 * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
	 */
	public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
		writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
		writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
	}

	@Override
	public void invoke(IN record) {
		writer.write(record);
	}

	@Override
	public String toString() {
		return writer.toString();
	}
}
  • PrintSinkFunction继承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的时候调用PrintSinkOutputWriter的write方法来执行输出

PrintSinkOutputWriter

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java

/**
 * Print sink output writer for DataStream and DataSet print API.
 */
@Internal
public class PrintSinkOutputWriter<IN> implements Serializable {

	private static final long serialVersionUID = 1L;

	private static final boolean STD_OUT = false;
	private static final boolean STD_ERR = true;

	private final boolean target;
	private transient PrintStream stream;
	private final String sinkIdentifier;
	private transient String completedPrefix;

	public PrintSinkOutputWriter() {
		this("", STD_OUT);
	}

	public PrintSinkOutputWriter(final boolean stdErr) {
		this("", stdErr);
	}

	public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) {
		this.target = stdErr;
		this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier);
	}

	public void open(int subtaskIndex, int numParallelSubtasks) {
		// get the target stream
		stream = target == STD_OUT ? System.out : System.err;

		completedPrefix = sinkIdentifier;

		if (numParallelSubtasks > 1) {
			if (!completedPrefix.isEmpty()) {
				completedPrefix += ":";
			}
			completedPrefix += (subtaskIndex + 1);
		}

		if (!completedPrefix.isEmpty()) {
			completedPrefix += "> ";
		}
	}

	public void write(IN record) {
		stream.println(completedPrefix + record.toString());
	}

	@Override
	public String toString() {
		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
	}
}
  • PrintSinkOutputWriter的构造器最多可以接收两个参数,分别是sinkIdentifier以及stdErr;sinkIdentifier即为输出的前缀,stdErr用于表示是否输出到System.err
  • open方法主要用于做一些准备工作,它在PrintSinkFunction的open方法里头会被调用,PrintSinkFunction的open方法会从AbstractRichFunction定义的RuntimeContext里头获取subtaskIndex及numParallelSubtasks传递过来;这里的open方法根据sinkIdentifier以及subtaskIndex、numParallelSubtasks信息构建completedPrefix
  • write方法就是调用System.out或者System.err的println方法,带上completedPrefix及record的信息

小结

  • DataStream的几个print开头的方法内部创建的是PrintSinkFunction,然后调用addSink方法添加到ExecutionEnvironment中(先是被StreamSink包装,然后被DataStreamSink包装,最后通过DataStreamSink.getTransformation作为operator添加到ExecutionEnvironment)
  • SinkFunction是sink function的基础接口,它主要定义了invoke方法,该方法里头传递了一个Context;而内置的一些sink function大多是继承的RichSinkFunction,RichSinkFunction主要是继承了AbstractRichFunction,可以提供funtion运行时的RuntimeContext信息
  • PrintSinkFunction继承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的时候调用PrintSinkOutputWriter的write方法来执行输出

doc

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部