# 学习笔记TF040:多GPU并行 原

利炳根

TensorFlow并行，模型并行，数据并行。模型并行根据不同模型设计不同并行方式，模型不同计算节点放在不同硬伯上资源运算。数据并行，比较通用简便实现大规模并行方式，同时使用多个硬件资源计算不同batch数据梯度，汇总梯度全局参数更新。

loss从最开始4点几，到第70万步，降到0.07。平均每个batch耗时0.021s，平均每秒训练6000个样本，单GPU 4倍。

import os.path
import re
import time
import numpy as np
import tensorflow as tf
import cifar10
batch_size=128
#train_dir='/tmp/cifar10_train'
max_steps=1000000
num_gpus=4
#log_device_placement=False
def tower_loss(scope):
"""Calculate the total loss on a single tower running the CIFAR model.
Args:
scope: unique prefix string identifying the CIFAR tower, e.g. 'tower_0'
Returns:
Tensor of shape [] containing the total loss for a batch of data
"""
# Get images and labels for CIFAR-10.
images, labels = cifar10.distorted_inputs()
# Build inference Graph.
logits = cifar10.inference(images)
# Build the portion of the Graph calculating the losses. Note that we will
# assemble the total_loss using a custom function below.
_ = cifar10.loss(logits, labels)
# Assemble all of the losses for the current tower only.
losses = tf.get_collection('losses', scope)
# Calculate the total loss for the current tower.
# Compute the moving average of all individual losses and the total loss.
# loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg')
# loss_averages_op = loss_averages.apply(losses + [total_loss])
# Attach a scalar summary to all individual losses and the total loss; do the
# same for the averaged version of the losses.
# for l in losses + [total_loss]:
# Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training
# session. This helps the clarity of presentation on tensorboard.
# loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name)
# Name each loss as '(raw)' and name the moving average version of the loss
# as the original loss name.
# tf.scalar_summary(loss_name +' (raw)', l)
# tf.scalar_summary(loss_name, loss_averages.average(l))
# with tf.control_dependencies([loss_averages_op]):
# total_loss = tf.identity(total_loss)
"""Calculate the average gradient for each shared variable across all towers.
Note that this function provides a synchronization point across all towers.
Args:
calculation for each tower.
Returns:
List of pairs of (gradient, variable) where the gradient has been averaged
across all towers.
"""
# Note that each grad_and_vars looks like the following:
expanded_g = tf.expand_dims(g, 0)
# Append on a 'tower' dimension which we will average over below.
# Average over the 'tower' dimension.
# Keep in mind that the Variables are redundant because they are shared
# across towers. So .. we will just return the first tower's pointer to
# the Variable.
def train():
"""Train CIFAR-10 for a number of steps."""
with tf.Graph().as_default(), tf.device('/cpu:0'):
# Create a variable to count the number of train() calls. This equals the
# number of batches processed * FLAGS.num_gpus.
global_step = tf.get_variable(
'global_step', [],
initializer=tf.constant_initializer(0), trainable=False)
# Calculate the learning rate schedule.
num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /
batch_size)
decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)
# Decay the learning rate exponentially based on the number of steps.
lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,
global_step,
decay_steps,
cifar10.LEARNING_RATE_DECAY_FACTOR,
staircase=True)
# Create an optimizer that performs gradient descent.
# Calculate the gradients for each model tower.
for i in range(num_gpus):
with tf.device('/gpu:%d' % i):
with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:
# Calculate the loss for one tower of the CIFAR model. This function
# constructs the entire CIFAR model but shares the variables across
# all towers.
loss = tower_loss(scope)
# Reuse variables for the next tower.
tf.get_variable_scope().reuse_variables()
# Retain the summaries from the final tower.
# summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)
# Calculate the gradients for the batch of data on this CIFAR tower.
# Keep track of the gradients across all towers.
# We must calculate the mean of each gradient. Note that this is the
# synchronization point across all towers.
# Add a summary to track the learning rate.
# summaries.append(tf.scalar_summary('learning_rate', lr))
#     if grad is not None:
#         summaries.append(
# Add histograms for trainable variables.
# for var in tf.trainable_variables():
#     summaries.append(tf.histogram_summary(var.op.name, var))
# Track the moving averages of all trainable variables.
# variable_averages = tf.train.ExponentialMovingAverage(
#     cifar10.MOVING_AVERAGE_DECAY, global_step)
# variables_averages_op = variable_averages.apply(tf.trainable_variables())
# Group all updates to into a single train op.
# Create a saver.
saver = tf.train.Saver(tf.all_variables())
# Build the summary operation from the last tower summaries.
# summary_op = tf.merge_summary(summaries)
# Build an initialization operation to run below.
init = tf.global_variables_initializer()
# Start running operations on the Graph. allow_soft_placement must be set to
# True to build towers on GPU, as some of the ops do not have GPU
# implementations.
sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))
sess.run(init)
# Start the queue runners.
tf.train.start_queue_runners(sess=sess)
# summary_writer = tf.train.SummaryWriter(train_dir, sess.graph)
for step in range(max_steps):
start_time = time.time()
duration = time.time() - start_time
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
if step % 10 == 0:
num_examples_per_step = batch_size * num_gpus
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / num_gpus
format_str = ('step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print (format_str % (step, loss_value,
examples_per_sec, sec_per_batch))
# if step % 100 == 0:
#     summary_str = sess.run(summary_op)
# Save the model checkpoint periodically.
if step % 1000 == 0 or (step + 1) == max_steps:
# checkpoint_path = os.path.join(train_dir, 'model.ckpt')
saver.save(sess, '/tmp/cifar10_train/model.ckpt', global_step=step)
#if tf.gfile.Exists(train_dir):
#  tf.gfile.DeleteRecursively(train_dir)
#tf.gfile.MakeDirs(train_dir)
train()

### 利炳根

Golang学习笔记目录

Golang 介绍 Go语言是谷歌2009发布的第二款开源编程语言。 Go语言专门针对多处理器系统应用程序的编程进行了优化，使用Go编译的程序可以媲美C或C++代码的速度，而且更加安全、支持并行进程。...

ChainZhang
2017/12/26
0
0

2016/06/02
4.5K
0

CUDA™ 工具包是一种针对支持CUDA功能的GPU（图形处理器）的C语言开发环境。CUDA开发环境包括: · nvcc C语言编译器 · 适用于GPU（图形处理器）的CUDA FFT和BLAS库 · 分析器 · 适用于GPU...

2008/11/21
14.1K
0

2017/11/25
0
0

2017/05/12
0
0

PHP生成CSV之内部换行

1
0

4
0

3
0
JAVA-四元数类

public class Quaternion { private final double x0, x1, x2, x3; // 四元数构造函数 public Quaternion(double x0, double x1, double x2, double x3) { this.x0 = ......

Pulsar-V

17
0
Xshell利用Xftp传输文件,使用pure-ftpd搭建ftp服务

Xftp传输文件 如果已经通过Xshell登录到服务器，此时可以使用快捷键ctrl+alt+f 打开Xftp并展示Xshell当前的目录，之后直接拖拽传输文件即可。 pure-ftpd搭建ftp服务 pure-ftpd要比vsftp简单，...

2
0