文档章节

分布式TensorFlow 【部署】

片刻
 片刻
发布于 2017/06/14 10:03
字数 2344
阅读 197
收藏 0

 

 

原文链接 : https://www.tensorflow.org/deploy/distributed

译文链接 : http://www.apache.wiki/pages/viewpage.action?pageId=10029526

贡献者 : 片刻 ApacheCN Apache中文网

本文档介绍如何创建TensorFlow服务器集群,以及如何在该集群中分发计算图。我们假设您熟悉编写TensorFlow程序的基本概念

你好 分布式TensorFlow!

要查看一个简单的TensorFlow集群,请执行以下操作:

# Start a TensorFlow server as a single-process "cluster".

$ python

>>> import tensorflow as tf

>>> c = tf.constant("Hello, distributed TensorFlow!")

>>> server = tf.train.Server.create_local_server()

>>> sess = tf.Session(server.target)  # Create a session on the server.

>>> sess.run(c)

'Hello, distributed TensorFlow!' 

该 tf.train.Server.create_local_server 方法创建具有进程内服务器的单进程集群。

创建一个集群

<iframe width="412" height="232" src="https://www.youtube.com/embed/la_M6bCV91M?ecver=1" frameborder="0" allowfullscreen></iframe>

TensorFlow“集群”是参与TensorFlow图形的分布式执行的一组“任务”。每个任务与TensorFlow“服务器”相关联,该服务器包含可用于创建会话的“主”,以及在图中执行操作的“工作者”。群集还可以分为一个或多个“作业”,其中每个作业包含一个或多个任务。

要创建集群,请在集群中为每个任务启动一个TensorFlow服务器。每个任务通常在不同的机器上运行,但您可以在同一台机器上运行多个任务(例如控制不同的GPU设备)。在每个任务中,执行以下操作:

  1. 创建一个tf.train.ClusterSpec描述集群中所有任务的内容。这对每个任务应该是一样的。

  2. 创建一个tf.train.Server,传递tf.train.ClusterSpec给构造函数,并使用作业名和任务索引识别本地任务。

创建一个tf.train.ClusterSpec描述集群

群集规范字典将作业名称映射到网络地址列表。将此字典传递给tf.train.ClusterSpec 构造函数。例如:

tf.train.ClusterSpec construction

Available tasks

tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
/job:local/task:0
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1

tf.train.Server在每个任务中创建一个实例

一个tf.train.Server对象包含了一组本地设备,一组到其他任务的连接 tf.train.ClusterSpec,并且 tf.Session能够使用这些来进行分布式计算。每个服务器是特定命名作业的成员,并且在该作业中具有任务索引。服务器可以与群集中的任何其他服务器进行通信。

例如,推出了集群两台服务器上运行localhost:2222 ,并localhost:2223在本地机器上的两个不同的进程运行下面的代码片段:

# In task 0:

cluster = tf.train.ClusterSpec({"local": ["localhost:2222""localhost:2223"]})

server = tf.train.Server(cluster, job_name="local", task_index=0)

 

# In task 1:

cluster = tf.train.ClusterSpec({"local": ["localhost:2222""localhost:2223"]})

server = tf.train.Server(cluster, job_name="local", task_index=1

注意:手动指定这些集群规范可能很乏味,特别是对于大型集群。我们正在开发以编程方式启动任务的工具,例如使用像Kubernetes这样的集群管理器 。如果您想要查看支持的特定集群管理器,请提出一个 GitHub问题

 

在模型中指定分布式设备

要对特定进程进行操作,您可以使用与tf.device 用于指定运行在CPU或GPU上的操作相同的 功能。例如:

with tf.device("/job:ps/task:0"):

  weights_1 = tf.Variable(...)

  biases_1 = tf.Variable(...)

 

with tf.device("/job:ps/task:1"):

  weights_2 = tf.Variable(...)

  biases_2 = tf.Variable(...)

 

with tf.device("/job:worker/task:7"):

  input, labels = ...

  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)

  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)

  # ...

  train_op = ...

 

with tf.Session("grpc://worker7.example.com:2222") as sess:

  for in range(10000):

    sess.run(train_op) 

在上述示例中,在作业中的两个任务上创建变量ps,并且在作业中创建模型的计算密集型部分worker 。TensorFlow将插入作业之间的适当的数据传输(从psworker用于直传,以及从workerps用于施加梯度)。

 

复印训练

一种称为“数据并行性”的常见培训配置涉及到worker在不同小批量数据上对同一模型进行职业培训的多个任务,更新托管在ps 作业中的一个或多个任务中的共享参数。所有任务通常在不同的机器上运行。有很多方法可以在TensorFlow中指定此结构,并且我们正在构建库,以简化指定复制模型的工作。可能的方法包括:

  • 图形内复制。在这种方法中,客户端构建一个 tf.Graph包含一组参数(在tf.Variable固定到节点中/job:ps)的参数; 和模型的计算密集型部分的多个副本,每个副本都固定在不同的任务中/job:worker

  • 图形间复制。在这种方法中,每个/job:worker任务都有一个单独的客户端,通常与worker任务相同。每个客户端构建一个包含参数的类似图(固定为 /job:ps使用前 tf.train.replica_device_setter 将其确定性地映射到相同的任务); 和模型的计算密集型部分的单个副本,固定到本地任务 /job:worker

  • 异步训练 在这种方法中,图的每个副本都有一个独立的训练循环,无需协调地执行。它与上述两种复制形式兼容。

  • 同步训练 在这种方法中,所有的副本都会读取当前参数的相同值,并行计算梯度,然后将它们应用在一起。它与图形内复制(例如使用CIFAR-10多GPU培训师中的梯度平均 )和图形间复制(例如使用tf.train.SyncReplicasOptimizer)兼容 。

把它们放在一起:示例教练程序

以下代码显示了分布式教练程序的框架,实现了图形间复制异步训练。它包括参数服务器和工作任务的代码。

import argparse

import sys

 

import tensorflow as tf

 

FLAGS = None

 

def main(_):

  ps_hosts = FLAGS.ps_hosts.split(",")

  worker_hosts = FLAGS.worker_hosts.split(",")

 

  # Create a cluster from the parameter server and worker hosts.

  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

 

  # Create and start a server for the local task.

  server = tf.train.Server(cluster,

                           job_name=FLAGS.job_name,

                           task_index=FLAGS.task_index)

 

  if FLAGS.job_name == "ps":

    server.join()

  elif FLAGS.job_name == "worker":

 

    # Assigns ops to the local worker by default.

    with tf.device(tf.train.replica_device_setter(

        worker_device="/job:worker/task:%d" % FLAGS.task_index,

        cluster=cluster)):

 

      # Build model...

      loss = ...

      global_step = tf.contrib.framework.get_or_create_global_step()

 

      train_op = tf.train.AdagradOptimizer(0.01).minimize(

          loss, global_step=global_step)

 

    # The StopAtStepHook handles stopping after running given steps.

    hooks=[tf.train.StopAtStepHook(last_step=1000000)]

 

    # The MonitoredTrainingSession takes care of session initialization,

    # restoring from a checkpoint, saving to a checkpoint, and closing when done

    # or an error occurs.

    with tf.train.MonitoredTrainingSession(master=server.target,

                                           is_chief=(FLAGS.task_index == 0),

                                           checkpoint_dir="/tmp/train_logs",

                                           hooks=hooks) as mon_sess:

      while not mon_sess.should_stop():

        # Run a training step asynchronously.

        # See `tf.train.SyncReplicasOptimizer` for additional details on how to

        # perform *synchronous* training.

        # mon_sess.run handles AbortedError in case of preempted PS.

        mon_sess.run(train_op)

 

if __name__ == "__main__":

  parser = argparse.ArgumentParser()

  parser.register("type""bool"lambda v: v.lower() == "true")

  # Flags for defining the tf.train.ClusterSpec

  parser.add_argument(

      "--ps_hosts",

      type=str,

      default="",

      help="Comma-separated list of hostname:port pairs"

  )

  parser.add_argument(

      "--worker_hosts",

      type=str,

      default="",

      help="Comma-separated list of hostname:port pairs"

  )

  parser.add_argument(

      "--job_name",

      type=str,

      default="",

      help="One of 'ps', 'worker'"

  )

  # Flags for defining the tf.train.Server

  parser.add_argument(

      "--task_index",

      type=int,

      default=0,

      help="Index of task within the job"

  )

  FLAGS, unparsed = parser.parse_known_args()

  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) 

要启动具有两个参数服务器和两个工作人员的培训师,请使用以下命令行(假设调用脚本trainer.py):

# On ps0.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=ps --task_index=0

# On ps1.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=ps --task_index=1

# On worker0.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=worker --task_index=0

# On worker1.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=worker --task_index=1 

词汇表

Client

客户端通常是一个构建TensorFlow图并构建一个 tensorflow::Session与集群进行交互的程序。客户端通常用Python或C ++编写。单个客户端进程可以直接与多个TensorFlow服务器进行交互(请参见上面的“复制培训”),单个服务器可以为多个客户端提供服务。

Cluster

TensorFlow集群包括一个或多个“作业”,每个“作业”分为一个或多个“任务”的列表。集群通常专用于特定的高级目标,例如训练神经网络,并行使用许多机器。集群由tf.train.ClusterSpec对象定义。

Job

一份工作包括一份通常用于共同目的的“任务”清单。例如,名为ps(对于“参数服务器”)的作业通常会托管存储和更新变量的节点; 而名为“ workerjob” 的作业通常会承载执行计算密集型任务的无状态节点。作业中的任务通常在不同的机器上运行。一组工作角色是灵活的:例如,a worker可能会保持一些状态。

Master service

提供远程访问一组分布式设备并充当会话目标的RPC服务。主服务实现 tensorflow::Session接口,负责协调一个或多个“工作服务”的工作。所有TensorFlow服务器都实现主服务。

Task

任务对应于特定的TensorFlow服务器,通常对应于单个进程。任务属于特定的“作业”,并由该作业的任务列表中的索引识别。

TensorFlow服务器运行tf.train.Server实例的进程,该实例是集群的成员,并导出“主服务”和“工作服务”。

Worker service

使用本地设备执行TensorFlow图形的一部分的RPC服务。worker服务实现了worker_service.proto。所有TensorFlow服务器都实现了工作服务。

© 著作权归作者所有

片刻
粉丝 107
博文 269
码字总数 306754
作品 0
海淀
高级程序员
私信 提问
Kubeflow实战系列: 利用TFJob运行分布式TensorFlow

介绍 本系列将介绍如何在阿里云容器服务上运行Kubeflow, 本文介绍如何使用运行分布式模型训练。 第一篇:阿里云上使用JupyterHub 第二篇:阿里云上小试TFJob 第三篇:利用TFJob运行分布式Ten...

Mr_zebra
2018/06/14
94
0
分布式TensorFlow编程模型演进

引言 TensorFlow从15年10月开源至今,可谓是发展迅猛,从v0.5到如今的v2.0.0-alpha,经历了无数个功能特性的升级,性能、可用性、易用性等都在稳步提升。相对来说,对于我们工业界,大家可能...

腾讯Bugly
04/11
0
0
TensorFlow 2.0.0-beta0 发布,分布式策略改进、API 冻结

TensorFlow 2.0.0-beta0 发布了。2.0 专注于简单性和易用性,具有以下更新: 使用 Keras 和热切的执行轻松建模 在任何平台上进行生产中的健壮模型部署 强大的研究实验 通过减少重复和删除已弃...

h4cd
06/08
1K
6
深度学习框架大战正在进行,谁将夺取“深度学习工业标准”的荣耀?

本文最早发表于本人博客:博客 随着深度学习研究热潮的持续发酵,各种开源深度学习框架层出不穷,其中包括:Caffe、CNTK、MXNet、Neon、TensorFlow、Theano 和 Torch (著名开源深度学习框架 ...

algorithmdog
2016/03/06
0
0
Kubeflow实战系列:利用TensorFlow Serving进行模型预测

介绍 本系列将介绍如何在阿里云容器服务上运行Kubeflow, 本文介绍如何使用加载训练模型并且进行模型预测。 第一篇:阿里云上使用JupyterHub 第二篇:阿里云上小试TFJob 第三篇:利用TFJob运行...

必嘫
2018/06/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

3_数组

3_数组

行者终成事
今天
7
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0
OpenJDK之CyclicBarrier

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用: await()方法,如果当前线...

克虏伯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部