TensorFlow学习系列（五）：如何使用队列和多线程优化输入管道
TensorFlow学习系列（五）：如何使用队列和多线程优化输入管道
AllenOR灵感 发表于5个月前
TensorFlow学习系列（五）：如何使用队列和多线程优化输入管道
• 发表于 5个月前
• 阅读 1
• 收藏 0
• 评论 0

# 目录

TensorFlow学习系列（一）：初识TensorFlow

TensorFlow学习系列（二）：形状和动态维度

TensorFlow学习系列（三）：保存/恢复和混合多个模型

TensorFlow学习系列（四）：利用神经网络实现泛逼近器（universal approximator）

TensorFlow学习系列（五）：如何使用队列和多线程优化输入管道

TensorFlow 1.0版本已经出来了，随着这次更新，一些不错的指导建议出现在官网上面。其中一个我比较关心的是 `feed_dict` 系统，当你在调用 sess.run() 时：

``````import time
import tensorflow as tf

# We simulate some raw input data
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_inputs_data = tf.random_normal([128, 1024], mean=0, stddev=1)
# We will try to predict this law:
# predict 1 if the sum of the elements is positive and 0 otherwise
y_inputs_data = tf.cast(tf.reduce_sum(x_inputs_data, axis=1, keep_dims=True) > 0, tf.int32)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("placeholder"):
input = tf.placeholder(tf.float32, shape=[None, 1024])
y_true = tf.placeholder(tf.int32, shape=[None, 1])
with tf.variable_scope('FullyConnected'):
w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
z = tf.matmul(input, w) + b
y = tf.nn.relu(z)

w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
z = tf.matmul(y, w2) + b2
with tf.variable_scope('Loss'):
losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
loss_op = tf.reduce_mean(losses)
with tf.variable_scope('Accuracy'):
y_pred = tf.cast(z > 0, tf.int32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training operation, ...

startTime = time.time()
with tf.Session() as sess:
# ... init our variables, ...
sess.run(tf.global_variables_initializer())

# ... check the accuracy before training, ...
x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
sess.run(accuracy, feed_dict={
input: x_input,
y_true: y_input
})

# ... train ...
for i in range(5000):
#  ... by sampling some input data (fetching) ...
x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
# ... and feeding it to our model
_, loss = sess.run([train_op, loss_op], feed_dict={
input: x_input,
y_true: y_input
})

# We regularly check the loss
if i % 500 == 0:
print('iter:%d - loss:%f' % (i, loss))

# Finally, we check our final accuracy
x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
sess.run(accuracy, feed_dict={
input: x_input,
y_true: y_input
})

print("Time taken: %f" % (time.time() - startTime))``````

• “文件系统模拟”是不可信的，但我们还是会在所有测试中进行这一行为，所以我们可以忽略它的影响。
• 我们使用 `feed_dict` 系统将数据提供给我们的模型，这会使得 TF 创建一个 Python 数据的副本到会话中。
• 在一整个训练中，我们只使用了大约 31% 的GPU。
• 训练这个神经网络大约需要 18 秒。

• 在这个脚本中，一切都是同步和单线程的（你必须等待一个Python脚本调用完成，然后才会进行下一个Python脚本）。
• 我们在Python和底层C++之间来回移动。

``````import tensorflow as tf

# We simulate some raw input data
x_input_data = tf.random_normal([3], mean=-1, stddev=4)

# We build a FIFOQueue inside the graph
# You can see it as a waiting line that holds waiting data
# In this case, a line with only 3 positions
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# We need an operation that will actually fill the queue with our data
# "enqueue_many" slices "x_input_data" along the 0th dimension to make multiple queue elements
enqueue_op = q.enqueue_many(x_input_data) # <- x1 - x2 -x3 |

# We need a dequeue op to get the next elements in the queue following the FIFO policy.
input = q.dequeue()
# The input tensor is the equivalent of a placeholder now
# but directly connected to the data sources in the graph

# Each time we use the input tensor, we print the number of elements left
# in the queue
input = tf.Print(input, data=[q.size()], message="Nb elements left:")

# fake graph: START
y = input + 1
# fake graph: END

# We start the session as usual
with tf.Session() as sess:
# We first run the enqueue_op to load our data into the queue
sess.run(enqueue_op)
# Now, our queue holds 3 elements, it's full.
# We can start to consume our data
sess.run(y)
sess.run(y)
sess.run(y)
# Now our queue is empty, if we call it again, our program will hang right here
# waiting for the queue to be filled by at least one more datum
sess.run(y)``````

QueueRunner 需要做 2 件事情：

• 一个队列

• 一些入队操作（你可以对一个队列，进行多个入队操作）

``````import tensorflow as tf

# This time, let's start with 6 samples of 1 data point
x_input_data = tf.random_normal([6], mean=-1, stddev=4)

# Note that the FIFO queue has still a capacity of 3
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# To check what is happening in this case:
# we will print a message each time "x_input_data" is actually computed
# to be used in the "enqueue_many" operation
x_input_data = tf.Print(x_input_data, data=[x_input_data], message="Raw inputs data generated:", summarize=6)
enqueue_op = q.enqueue_many(x_input_data)

# To leverage multi-threading we create a "QueueRunner"
# that will handle the "enqueue_op" outside of the main thread
# We don't need much parallelism here, so we will use only 1 thread
qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)

input = q.dequeue()
input = tf.Print(input, data=[q.size(), input], message="Nb elements left, input:")

# fake graph: START
y = input + 1
# fake graph: END

# We start the session as usual ...
with tf.Session() as sess:
# But now we build our coordinator to coordinate our child threads with
coord = tf.train.Coordinator()
# Beware, if you don't start all your queues before runnig anything
# The main threads will wait for them to start and you will hang again
# This helper start all queues in tf.GraphKeys.QUEUE_RUNNERS

# The QueueRunner will automatically call the enqueue operation
# asynchronously in its own thread ensuring that the queue is always full
# No more hanging for the main process, no more waiting for the GPU
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)
sess.run(y)

# We request our child threads to stop ...
coord.request_stop()
# ... and we wait for them to do so before releasing the main thread

Logs of the queue exercise

``````import time
import tensorflow as tf

# We simulate some raw input data
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_input_data = tf.random_normal([128, 1024], mean=0, stddev=1)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("queue"):
q = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
# We use the "enqueue" operation so 1 element of the queue is the full batch
enqueue_op = q.enqueue(x_input_data)
qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
input = q.dequeue() # It replaces our input placeholder
# We can also compute y_true right into the graph now
y_true = tf.cast(tf.reduce_sum(input, axis=1, keep_dims=True) > 0, tf.int32)

with tf.variable_scope('FullyConnected'):
w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
z = tf.matmul(input, w) + b
y = tf.nn.relu(z)

w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
z = tf.matmul(y, w2) + b2

with tf.variable_scope('Loss'):
losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
loss_op = tf.reduce_mean(losses)

with tf.variable_scope('Accuracy'):
y_pred = tf.cast(z > 0, tf.int32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training op ...

startTime = time.time()
with tf.Session() as sess:
# ... init our variables, ...
sess.run(tf.global_variables_initializer())

# ... add the coordinator, ...
coord = tf.train.Coordinator()

# ... check the accuracy before training (without feed_dict!), ...
sess.run(accuracy)

# ... train ...
for i in range(5000):
#  ... without sampling from Python and without a feed_dict !
_, loss = sess.run([train_op, loss_op])

# We regularly check the loss
if i % 500 == 0:
print('iter:%d - loss:%f' % (i, loss))

# Finally, we check our final accuracy
sess.run(accuracy)

coord.request_stop()

print("Time taken: %f" % (time.time() - startTime))``````

Training phase monitoring of the second example with logs and nvidia-smi

• 在队列系统之外，我们使用了与以前完全相同的代码。

• `y_true` 是在图内部进行计算的，您可以比较当人们必须将其输入数据分割为输入数据和标签时的情况。

• 不需要任何 `feed_dict`，不再浪费内存。

• 我们现在对GPU的使用率 ~43%，优于 31% 。这意味着我们的程序至少多使用了12%的GPU资源。在这种情况下，这意味着你可以增加 `batch_size` 的值，但是你要小心，`batch_size` 将会影响你的收敛速度。

• 训练时间需要 ~11.5秒，这大约是以前的33%的训练时间，太棒了！

# Reference：

https://github.com/metaflow-ai/blog/blob/master/tf-queues/enqueue.py

https://github.com/metaflow-ai/blog/blob/master/tf-queues/test_batching.py