文档章节

TensorFlow学习系列(五):如何使用队列和多线程优化输入管道

AllenOR灵感
 AllenOR灵感
发布于 2017/09/10 01:20
字数 2934
阅读 5
收藏 0
点赞 0
评论 0

这篇教程是翻译Morgan写的TensorFlow教程,作者已经授权翻译,这是原文


目录


TensorFlow学习系列(一):初识TensorFlow

TensorFlow学习系列(二):形状和动态维度

TensorFlow学习系列(三):保存/恢复和混合多个模型

TensorFlow学习系列(四):利用神经网络实现泛逼近器(universal approximator)

TensorFlow学习系列(五):如何使用队列和多线程优化输入管道


TensorFlow 1.0版本已经出来了,随着这次更新,一些不错的指导建议出现在官网上面。其中一个我比较关心的是 feed_dict 系统,当你在调用 sess.run() 时:

导致性能低下的一个常见原因是未充分利用GPU,或者没有设置一个有效的数据通道。除非情况特殊或者只是一个示例代码,否则不要将 Python 变量传送到 session 中...

当然,到目前为止,我一直专门使用 feed_dict 系统来训练我的模型...所以,让我们一起来改变这个习惯吧。

已经有一个关于 TF 队列的官方文档和 TF 网站上的一些非常好的可视化过程(我非常建议你去看一下它们)。为了避免冗余,我们将重点介绍具有完整代码的基本案例。

我们将探索队列,QueueRunner和协调器,以提高我们的训练速度。在一个非常基本的例子中,由于多线程和优化的内存处理,我们能得到33%的训练速度。而且,我们还将密切关注我们在单GPU(nvidia GTX Titan X)上面的性能。

让我们从一个最简单的神经网络开始,使用 feed_dict 系统来训练一个朴素的任务。然后我们将修改我们的代码,以便能体现利用队列的好处,并删除这个依赖。

那么,我们就从下面的代码开始分析:

import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_inputs_data = tf.random_normal([128, 1024], mean=0, stddev=1)
# We will try to predict this law:
# predict 1 if the sum of the elements is positive and 0 otherwise
y_inputs_data = tf.cast(tf.reduce_sum(x_inputs_data, axis=1, keep_dims=True) > 0, tf.int32)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("placeholder"):
    input = tf.placeholder(tf.float32, shape=[None, 1024])
    y_true = tf.placeholder(tf.int32, shape=[None, 1])
with tf.variable_scope('FullyConnected'):
    w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
    b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(input, w) + b
    y = tf.nn.relu(z)

    w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
    b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(y, w2) + b2
with tf.variable_scope('Loss'):
    losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
    loss_op = tf.reduce_mean(losses)
with tf.variable_scope('Accuracy'):
    y_pred = tf.cast(z > 0, tf.int32)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
    accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training operation, ...
adam = tf.train.AdamOptimizer(1e-2)
train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... check the accuracy before training, ...
    x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
    sess.run(accuracy, feed_dict={
        input: x_input,
        y_true: y_input
    })

    # ... train ...
    for i in range(5000):
        #  ... by sampling some input data (fetching) ...
        x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
        # ... and feeding it to our model
        _, loss = sess.run([train_op, loss_op], feed_dict={
            input: x_input,
            y_true: y_input
        })

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    x_input, y_input = sess.run([x_inputs_data, y_inputs_data])
    sess.run(accuracy, feed_dict={
        input: x_input,
        y_true: y_input
    })

print("Time taken: %f" % (time.time() - startTime))

这个程序在我的GPU上面运行,并且能得到下面的分析结果:



一些备注:

  • “文件系统模拟”是不可信的,但我们还是会在所有测试中进行这一行为,所以我们可以忽略它的影响。
  • 我们使用 feed_dict 系统将数据提供给我们的模型,这会使得 TF 创建一个 Python 数据的副本到会话中。
  • 在一整个训练中,我们只使用了大约 31% 的GPU。
  • 训练这个神经网络大约需要 18 秒。

人们可能认为这就是我们能完成的简单任务,但是也有人不这么想,比如:

  • 在这个脚本中,一切都是同步和单线程的(你必须等待一个Python脚本调用完成,然后才会进行下一个Python脚本)。
  • 我们在Python和底层C++之间来回移动。

那么如何避免这些陷阱呢?

解决方案是使用 TF 的队列系统,你可以把它想象成设计你的数据输入管道,然后直接进入图,并且停止使用Python输入!事实上,我们将尝试从输入管道中删除任何 Python 依赖。

由于去除了 feed_dict 系统,这将会更好的使用多线程,异步性和内存优化(这是非常酷的事,因为如果你计划在分布式系统上训练你的模型,那么TF会给你意想不到的惊喜)。

但首先,让我们通过简单的例子来探索 TF 中的队列。再次,请跟着我的注释来阅读:

import tensorflow as tf

# We simulate some raw input data
# let's start with only 3 samples of 1 data point
x_input_data = tf.random_normal([3], mean=-1, stddev=4)

# We build a FIFOQueue inside the graph 
# You can see it as a waiting line that holds waiting data
# In this case, a line with only 3 positions
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# We need an operation that will actually fill the queue with our data
# "enqueue_many" slices "x_input_data" along the 0th dimension to make multiple queue elements
enqueue_op = q.enqueue_many(x_input_data) # <- x1 - x2 -x3 |

# We need a dequeue op to get the next elements in the queue following the FIFO policy.
input = q.dequeue() 
# The input tensor is the equivalent of a placeholder now 
# but directly connected to the data sources in the graph

# Each time we use the input tensor, we print the number of elements left
# in the queue
input = tf.Print(input, data=[q.size()], message="Nb elements left:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual
with tf.Session() as sess:
    # We first run the enqueue_op to load our data into the queue
    sess.run(enqueue_op)
    # Now, our queue holds 3 elements, it's full. 
    # We can start to consume our data
    sess.run(y)
    sess.run(y) 
    sess.run(y) 
    # Now our queue is empty, if we call it again, our program will hang right here
    # waiting for the queue to be filled by at least one more datum
    sess.run(y)

这里发生了什么呢?为什么程序不再往下执行了呢?

那么,这就是 TF 的具体实现,如果队列是空的,那么出对操作会导致真个图去等待更多的数据。但是,这种行为只有在你手动使用队列时才会发生,但这显然是非常麻烦的,甚至是完全没用的,因为我们仍然只有一个线程在调用入队和出队操作。

注意:如果要进行异步操作,那么它们必须都在自己的线程中,而不是主线程。正如我的发过奶奶曾经说过的那样:如果许多厨师去使用同一个刀具来做饭,那么他们不会比只有一个厨师要快。

为了解决这个问题,让我来介绍一下 QueueRunner 和协调器,它们的唯一目的是在自己的线程中处理队列,并确保同步(启动,排队,出队,停止等等)。

QueueRunner 需要做 2 件事情:

  • 一个队列

  • 一些入队操作(你可以对一个队列,进行多个入队操作)

协调器不需要做任何事:它是一个方便的高级API,专门用来处理 “tf.train” 命名空间下的队列。如果你像我一样创建自定义队列,并天机器一个 QueueRunner 来处理它。只要你不要忘记将 QueueRunner 添加到 TF 的 QUEUE_RUNNERS 集合中,则可以安全地使用高级 API 。

让我们先来看看以前的例子,在原来的例子中我们让每一个线程去处理自己的队列:

import tensorflow as tf

# This time, let's start with 6 samples of 1 data point
x_input_data = tf.random_normal([6], mean=-1, stddev=4)

# Note that the FIFO queue has still a capacity of 3
q = tf.FIFOQueue(capacity=3, dtypes=tf.float32)

# To check what is happening in this case:
# we will print a message each time "x_input_data" is actually computed
# to be used in the "enqueue_many" operation
x_input_data = tf.Print(x_input_data, data=[x_input_data], message="Raw inputs data generated:", summarize=6)
enqueue_op = q.enqueue_many(x_input_data)

# To leverage multi-threading we create a "QueueRunner"
# that will handle the "enqueue_op" outside of the main thread
# We don't need much parallelism here, so we will use only 1 thread
numberOfThreads = 1 
qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
# Don't forget to add your "QueueRunner" to the QUEUE_RUNNERS collection
tf.train.add_queue_runner(qr) 

input = q.dequeue() 
input = tf.Print(input, data=[q.size(), input], message="Nb elements left, input:")

# fake graph: START
y = input + 1
# fake graph: END 

# We start the session as usual ...
with tf.Session() as sess:
    # But now we build our coordinator to coordinate our child threads with
    # the main thread
    coord = tf.train.Coordinator()
    # Beware, if you don't start all your queues before runnig anything
    # The main threads will wait for them to start and you will hang again
    # This helper start all queues in tf.GraphKeys.QUEUE_RUNNERS
    threads = tf.train.start_queue_runners(coord=coord)

    # The QueueRunner will automatically call the enqueue operation
    # asynchronously in its own thread ensuring that the queue is always full
    # No more hanging for the main process, no more waiting for the GPU
    sess.run(y)
    sess.run(y) 
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)
    sess.run(y)

    # We request our child threads to stop ...
    coord.request_stop()
    # ... and we wait for them to do so before releasing the main thread
    coord.join(threads)

小练习:
在查看日志之前,思考一下 tf.random_normal 被调用了多少次?

具体日志结果如下:


Logs of the queue exercise

正如你所看到的,x_input_data 被调用了 3 次。并且每次我们尝试推送更多的元素而不是队列容量,额外的元素不会像预期的那样被丢弃,他们会等待别的线程(或队列)来调用。

所以我们只需要在第四和第十个调用中填满队列中的空位,在队列中只剩下 2 个元素。(因为我们现在是异步的,所以打印语句的顺序可能会有点混乱。)

注意:我不会深入的探讨队列和TF的生态系统,因为它太让人心动了。你一定要更加熟悉它,你可以多阅读以下最后面的链接。

感谢所有这些新知识,我们终于可以利用这个队列系统来更新我们的第一个脚本,看看是否有任何改进!

import time
import tensorflow as tf

# We simulate some raw input data 
# (think about it as fetching some data from the file system)
# let's say: batches of 128 samples, each containing 1024 data points
x_input_data = tf.random_normal([128, 1024], mean=0, stddev=1)

# We build our small model: a basic two layers neural net with ReLU
with tf.variable_scope("queue"):
    q = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
    # We use the "enqueue" operation so 1 element of the queue is the full batch
    enqueue_op = q.enqueue(x_input_data)
    numberOfThreads = 1
    qr = tf.train.QueueRunner(q, [enqueue_op] * numberOfThreads)
    tf.train.add_queue_runner(qr)
    input = q.dequeue() # It replaces our input placeholder
    # We can also compute y_true right into the graph now
    y_true = tf.cast(tf.reduce_sum(input, axis=1, keep_dims=True) > 0, tf.int32)

with tf.variable_scope('FullyConnected'):
    w = tf.get_variable('w', shape=[1024, 1024], initializer=tf.random_normal_initializer(stddev=1e-1))
    b = tf.get_variable('b', shape=[1024], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(input, w) + b
    y = tf.nn.relu(z)

    w2 = tf.get_variable('w2', shape=[1024, 1], initializer=tf.random_normal_initializer(stddev=1e-1))
    b2 = tf.get_variable('b2', shape=[1], initializer=tf.constant_initializer(0.1))
    z = tf.matmul(y, w2) + b2

with tf.variable_scope('Loss'):
    losses = tf.nn.sigmoid_cross_entropy_with_logits(None, tf.cast(y_true, tf.float32), z)
    loss_op = tf.reduce_mean(losses)

with tf.variable_scope('Accuracy'):
    y_pred = tf.cast(z > 0, tf.int32)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_true), tf.float32))
    accuracy = tf.Print(accuracy, data=[accuracy], message="accuracy:")

# We add the training op ...
adam = tf.train.AdamOptimizer(1e-2)
train_op = adam.minimize(loss_op, name="train_op")

startTime = time.time()
with tf.Session() as sess:
    # ... init our variables, ...
    sess.run(tf.global_variables_initializer())

    # ... add the coordinator, ...
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    # ... check the accuracy before training (without feed_dict!), ...
    sess.run(accuracy)

    # ... train ...
    for i in range(5000):
        #  ... without sampling from Python and without a feed_dict !
        _, loss = sess.run([train_op, loss_op])

        # We regularly check the loss
        if i % 500 == 0:
            print('iter:%d - loss:%f' % (i, loss))

    # Finally, we check our final accuracy
    sess.run(accuracy)

    coord.request_stop()
    coord.join(threads)

print("Time taken: %f" % (time.time() - startTime))

Training phase monitoring of the second example with logs and nvidia-smi

最后总结:

  • 在队列系统之外,我们使用了与以前完全相同的代码。

  • y_true 是在图内部进行计算的,您可以比较当人们必须将其输入数据分割为输入数据和标签时的情况。

  • 不需要任何 feed_dict,不再浪费内存。

  • 我们现在对GPU的使用率 ~43%,优于 31% 。这意味着我们的程序至少多使用了12%的GPU资源。在这种情况下,这意味着你可以增加 batch_size 的值,但是你要小心,batch_size 将会影响你的收敛速度。

  • 训练时间需要 ~11.5秒,这大约是以前的33%的训练时间,太棒了!


Reference:

https://github.com/metaflow-ai/blog/blob/master/tf-queues/enqueue.py

https://github.com/metaflow-ai/blog/blob/master/tf-queues/test_batching.py

https://github.com/metaflow-ai/blog/blob/master/tf-queues/reader_test.py

https://www.tensorflow.org/programmers_guide/reading_data

本文转载自:http://www.jianshu.com/p/f07f28448313

共有 人打赏支持
AllenOR灵感
粉丝 10
博文 2634
码字总数 82983
作品 0
程序员
第3章 TensorFlow入门

第3章 TensorFlow入门 TensorFlow是Google创造的数值运算库,作为深度学习的底层使用。本章包括: TensorFlow介绍 如何用TensorFlow定义、编译并运算表达式 如何寻求帮助 注意:TensorFlow暂...

优惠券发放
05/31
0
0
TensorFlow 教程预告 & #01 - 简单线性模型

本文首发于微信公众号 AIMaster。 在这一系列文章中,你将学到深度学习的一些基本概念以及TensorFlow的使用,并完成手写体数字识别、图像分类、迁移学习、Deep Dream、风格迁移和强化学习等项...

AIMaster
2017/07/09
0
0
PaddleFluid和TensorFlow基本使用概念对比 | PaddlePaddle专栏

深度学习平台的演化 时至今日,深度学习已成为事实上最流行的机器学习技术。学术界多年研究加上工业界的长期实践提出了若干有效的基本建模单元:全连接,卷积,循环神经网络等;设计各类训练...

技术小能手
06/21
0
0
资源 | 概率编程工具:TensorFlow Probability官方简介

  选自Medium   作者:Josh Dillon、Mike Shwe、Dustin Tran   机器之心编译   参与:白妤昕、李泽南      在 2018 年 TensorFlow 开发者峰会上,谷歌发布了 TensorFlow Probabi...

机器之心
04/22
0
0
TensorFlow常用的函数

TensorFlow中维护的集合列表 在一个计算图中,可以通过集合()来管理不同类别的资源。比如通过 函数可以将资源加入一个 或多个集合中,然后通过 获取一个集合里面的所有资源(如张量,变量,或...

xinet
2017/11/09
0
0
教程 | 如何使用Docker、TensorFlow目标检测API和OpenCV实现实时目标检测和视频处理

  选自TowardsDataScience   作者:Léo Beaucourt   机器之心编译   参与:李诗萌、路雪      本文展示了如何使用 Docker 容器中的 TensorFlow 目标检测 API,通过网络摄像头执...

机器之心
04/20
0
0
TensorFlow基本原理,入门教程网址

TensorFlow 进阶 Python代码的目的是用来 构建这个可以在外部运行的计算图,以及 安排计算图的哪一部分应该被运行。 http://tensorfly.cn/ github 地址 : https://github.com/tensorflow/te...

寒月谷
06/05
0
0
《Scikit-Learn与TensorFlow机器学习实用指南》第9章 启动并运行TensorFlow

第9章 启动并运行TensorFlow 来源:ApacheCN《Sklearn 与 TensorFlow 机器学习实用指南》翻译项目 译者:@akonwang @WilsonQu 校对:@Lisanaaa @飞龙 TensorFlow 是一款用于数值计算的强大的...

apachecn_飞龙
04/23
0
0
Google 和 Nvidia 强强联手,带来优化版 TensorFlow 1.7

雷锋网(公众号:雷锋网) AI 研习社按,日前,谷歌和英伟达宣布将 NVIDIA TensorRT 集成到 TensorFlow 1.7 中。在谷歌开发者博客中,他们介绍了此次合作的详细信息以及整合之后的性能,雷锋网...

高云河
03/29
0
0
使用PaddleFluid和TensorFlow实现图像分类网络SE_ResNeXt | 文末超大福利

视觉(vision)、自然语言处理(Nature Language Processing, NLP)、语音(Speech)是深度学习研究的三大方向。三大领域各自都诞生了若干经典的模块,用来建模该领域数据所蕴含的不同特性的...

技术小能手
06/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

OSChina 周五乱弹 —— 我们是食物链的最底层

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @温家成 :分享谢安琪的单曲《姿色份子》 《姿色份子》- 谢安琪 手机党少年们想听歌,请使劲儿戳(这里) @贪吃飒:最近p2p怎么了、半个月爆了...

小小编辑
15分钟前
3
1
Android Studio 3.0 之后打包apk出现应用未安装问题

1、废话 出现这个问题的原因,并不是只有一个,而是有多个原因,不懂的估计会被搞得一头雾水,下面我列举的是我遇到的几种问题和网友遇到的几种问题,但不一定是全部,也有可能有些莫名其妙的...

她叫我小渝
35分钟前
0
0
前端基础

1. get请求传参长度的误区 误区:我们经常说get请求参数的大小存在限制,而post请求的参数大小是无限制的。 实际上HTTP 协议从未规定 GET/POST 的请求长度限制是多少。对get请求参数的限制是...

wenxingjun
今天
0
0
拦截SQLSERVER的SSL加密通道替换传输过程中的用户名密码实现运维审计(一)

工作准备 •一台SQLSERVER 2005/SQLSERVER 2008服务 •SQLSERVER jdbc驱动程序 •Java开发环境eclipse + jdk1.8 •java反编译工具JD-Core 反编译JDBC分析SQLSERVER客户端与服务器通信原理 SQ...

紅顏為君笑
今天
8
0
jQuery零基础入门——(六)修改DOM结构

《jQuery零基础入门》系列博文是在廖雪峰老师的博文基础上,可能补充了个人的理解和日常遇到的点,用我的理解表述出来,主干出处来自廖雪峰老师的技术分享。 在《零基础入门JavaScript》的时...

JandenMa
今天
0
0
linux mint 1.9 qq 安装

转: https://www.jianshu.com/p/cdc3d03c144d 1. 下载 qq 轻聊版,可在百度搜索后下载 QQ7.9Light.exe 2. 去wine的官网(https://wiki.winehq.org/Ubuntu) 安装 wine . 提醒网页可以切换成中...

Canaan_
今天
0
0
PHP后台运行命令并管理运行程序

php后台运行命令并管理后台运行程序 class ProcessModel{ private $pid; private $command; private $resultToFile = ''; public function __construct($cl=false){......

colin_86
今天
1
0
数据结构与算法4

在此程序中,HighArray类中的find()方法用数据项的值作为参数传递,它的返回值决定是否找到此数据项。 insert()方法向数组下一个空位置放置一个新的数据项。一个名为nElems的字段跟踪记录着...

沉迷于编程的小菜菜
今天
1
1
fiddler安装和基本使用以及代理设置

项目需求 由于开发过程中客户端和服务器数据交互非常频繁,有时候服务端需要知道客户端调用接口传了哪些参数过来,这个时候就需要一个工具可以监听这些接口请求参数,已经接口的响应的数据,这种...

银装素裹
今天
0
0
Python分析《我不是药神》豆瓣评论

读取 Mongo 中的短评数据,进行中文分词 对分词结果取 Top50 生成词云 生成词云效果 看来网上关于 我不是药神 vs 达拉斯 的争论很热啊。关于词频统计就这些,代码中也会完成一些其它的分析任...

猫咪编程
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部