【2.4.1.6】tensorflow多GPU的实现-6

  • 调整学习率?
  • 将噪声添加至梯度?
  • 调整批量大小?
  • 更改学习算法?

一、根据批量大小提高学习率

重要的是要认识到,当我们将训练扩展到多个GPU时,分给每个GPU的批量数据的大小并不是整个批量的有效大小。在处理了每个小批量数据集(mini-batch)并由单个GPU执行反向传播之后,网络连接的权重是所有ranks的权重的平均值。其结果是,对模型的权重的一次更新实际上考虑了所有GPU上的每个小批量数据集。因此,有效的批量大小是小批量的大小乘以GPU的数量。结果,当使用越来越多的GPU时,有效批量的大小会变得非常大。

正如您在实验1中可能已经看到的那样,较大的批量数据可能导致准确性下降,尤其是网络的泛化能力下降。较大批量所导致的效果还使我们更接近使用完整数据集时的梯度下降,它消除了训练过程中的某些变化或噪声。此外,如果批量数量较大,则每个训练周期所执行的更新次数会更少,这会减慢训练速度。

为了将这些变化加回到训练过程中,并弥补每个训练周期所经历的更少次数的权重更新,一个常见的方法是随着批量大小的增加而提高学习率。由于每个周期进行较少的更新,因此提高学习率会以较大的更新量对此进行补偿。

理论表明,如果按K倍增加批量大小,则应按K的平方根倍提高学习率,以保持原有的变化。在大规模训练的实践中,线性调整学习速率(K倍)也很常见。

实验

本练习大约需要15-20分钟,对不同的学习率和批量大小进行实验。我们先尝试在增加学习率的同时保持批量大小不变,然后在保持学习率不变的同时尝试增加批量大小;最后,增加批量大小的同时也提高学习率。尝试批量大小最大为128甚至256,学习率最大为0.08或更高。我们建议您使用–target-accuracy为训练选择最终要达到的验证准确度,或者设置成仅训练2-3分钟。

请定期查看一下图表,以比较各种训练配置情况下的性能。同样地,如果图表显得过于拥挤,请随时从“ training_data”文件夹中删减数据集。

练习的目的不一定是找到“最佳”组合,而是更多地了解在调整参数时训练性能的走势。请注意,由于训练过程中的随机性,无论是在初始权重还是数据集混排方面,您都不能保证在进行两次训练时得到相同的结果。

如果您觉得已充分理解了训练性能是如何受到上述因素的影响的,您就可以转到下一部分。

num_gpus = FIXME
!mpirun -np $num_gpus python fashion_mnist.py --base-lr FIXME --batch-size FIXME
# The training script needs to be defined again, now that you're in a different notebook
%matplotlib widget
import os
import numpy as np
import matplotlib.pyplot as plt
​
# By default we skip the first row, which contains the headers
# By skipping 2 rows, you can disregard the first data-point (0,0) to get a closer look
def plot_trainings(skiprows=1):
    plt.close()
    for filename in os.listdir("training_data"):
        if filename == ".ipynb_checkpoints": continue
        x, y = np.loadtxt("training_data/" + filename, delimiter=',', unpack=True, skiprows=skiprows)
        plt.plot(x,y, label=filename.split('.csv')[0])
​
    plt.xlabel('Time (s)')
    plt.ylabel('Validation Accuracy')
    plt.title('Training Comparison')
    plt.legend()
    plt.show()
# You can try running plot_trainings(2) to skip the (0,0) datapoint
plot_trainings()

二、增加学习率预热

在实验过程中,您可能已经发现较高的学习率也许会导致网络永远无法收敛,验证准确性保持在0.10左右,这意味着该模型仅与随机猜测的水平相当。如果您尚未看到这种效果,可以多进行几次训练。(注意:训练可以“很幸运”并最终收敛。尝试运行多次,您就会发现发散的情况。

!mpirun -np $num_gpus python fashion_mnist.py --base-lr .06 --batch-size 32

我们经常遇到的情况是,在训练开始时较高的学习率会导致网络发散。在这种情况下,权重以很高的幅度更新,以至于它们会调整过头而永远找不到可以走向损失函数最小点的斜坡。

为了解决这个问题,我们将实施一种称为学习率预热的技术。使用这种方法,起始学习率是目标学习率的一小部分,但随着训练的进行而逐渐增大。这样一来,网络一开始时缓慢地移动,并在发现朝向最小值的斜坡时仍以谨慎的步伐前进。随着学习率增加到目标值,较大学习率的好处将产生我们期待的效果。

实现

Horovod为Keras API提供了一个方便使用的回调函数,该回调函数实现这个逻辑:horovod.tensorflow.keras.callbacks.LearningRateWarmupCallback。默认情况下,在开始的5个训练周期内,学习率将从 学习率/GPU数量 开始逐渐增加。执行下一个单元格以查看有关回调函数的信息。

import horovod.tensorflow.keras as hvd
?hvd.callbacks.LearningRateWarmupCallback

要实现回调函数,请执行以下步骤。

第1步:使用以下代码注册新的warmup-epochs参数。

parser.add_argument('--warmup-epochs', type=float, default=5,
                    help='number of warmup epochs')

第2步:使用args.warmup_epochs作为warmup_epochs参数,在Horovod回调函数的数组中实现回调函数。要想在预热完成时获得打印输出,请将verbose参数设置为verbose。

第3步(可选):更新CSV文件名以包含预热时的训练次数。

如果遇到问题,可以在solutions / add_lr_warmup.py中找到解决方案。

实验

完成后,再试一次,看看结果是否会更好。

!mpirun -np $num_gpus python fashion_mnist.py --base-lr .06 --batch-size 32

三、大规模优化

对于数量不多的GPU而言,我们在本课程中一直进行的Fashion MNIST练习也算是一个相当小的问题。这使我们能够在几分钟的时间内快速进行训练,并查看有趣的结果。随着问题变得越来越大,就数据集大小和模型复杂性而言,具有更多GPU的更大系统对于减少训练时间至关重要。

但是,随着规模的扩大以及批量大小和学习率的增加,可能会出现问题。当学习率较大时,权重的更新可能会比权重本身更大,从而导致训练出现发散现象。

为了以高学习率来近似进行大规模训练,请运行下一个单元格(请注意,该算法可能会“幸运地”收敛,但是如果您多次运行,则应该看到发散的现象。)

!mpirun -np $num_gpus python fashion_mnist.py --base-lr .16 --batch-size 512

四、NovoGrad优化器

有一系列优化器被创建来解决此问题,它们允许非常大的批量大小和学习率。 在本练习中,我们将使用NovoGrad优化器。 NovoGrad具有权重更新的以下标准形式,

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, \mathbf{m} \end{equation*} $

不过,$\mathbf{m}$项使用类似SGD利用动量进行归一化的梯度平均方法,也对梯度进行了适当的归一化,以避免梯度消失(或梯度爆炸)问题 。 NovoGrad确保学习率在网络的每个层上都能得到适当的调整,从经验上讲,这在大批量方案中很重要。 如果您有兴趣在本课程结束后继续进行此探索,则LAMB优化器是另一种非常有前途的值得探索的新方法,它与NovoGrad非常相似,因为它将SGD的流行变体Adam以及分层学习率结合在一起了。

如果您想了解有关NovoGrad的理论方面的更多信息,可以选择扩展下面的单元格。 否则,请继续做练习。

逐层学习率控制

NovoGrad结合了我们对SGD的一些深入研究结果。首先,它认识到应该将梯度更新与梯度的绝对大小分离开来 - 更新方向比大小更重要。更新的幅度应为权重的幅度乘以学习率,而由于学习率足够小,故这意味着我们在搜索最佳值时进行相对较小的更新。不幸的是,传统的SGD无法强制执行此操作。权重$\mathbf{w}$的更新形式为:

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, \mathbf{g} \end{equation*}$

这里$\lambda$是学习率,$\mathbf{g}$是损失函数的梯度。 梯度的大小由损失函数确定,且不必与权重的大小相对应。此外,反向传播还会加剧这个问题(即消失梯度(或爆炸梯度)的问题),该问题一直困扰着深层卷积网,直到像残差连接这样的改进算法获得开发。过去几年开发的大多数SGD算法都试图以一种或另一种方式解决此问题。

一种直观的处理方法是将每一层的梯度除以该层的梯度范数:

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, \frac{\mathbf{g}}{|\mathbf{g}|} \end{equation*} $

范数$|\mathbf{g}|$通常是均方根运算。通常可以将其描述为随机归一化梯度下降。

换一个角度看这种方法,从某种意义上说,原来的权重更新犯了衡量单位的错误(请参阅ADADELTA论文中的3.2节),对此问题的讨论要比这里的更新且更为严格。)。也就是说,如果我们假设权重具有一个空间维度(例如米),而对它采用相对于时间的偏导数,则梯度的单位为米/秒(即速度),而把一个速度值增加到一个位置的值作为位置的更新,这是不对的,我们需要以米为单位更新距离。用梯度除以范数可以使更新无量纲,并且我们可以通过按权重的范数缩放更新来恢复正确的比例。 也就是说,我们用以下形式来更新:

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, |\mathbf{w}| \frac{\mathbf{g}}{|\mathbf{g}|} \end{equation*}$

它具有所需的比例和单位,但仍指向损失函数的梯度方向。

这两种方式都在很大程度上防止了梯度消失/梯度爆炸导致优化过程的发散,因为现在更新的幅度与梯度的绝对比例脱开了,后者可能比该层的权重大得多或小得多。

第二种方法是使用LARS优化器,该方法在每一层上的权重更新定义为:

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda^{\mathrm{global}}\, \lambda^{\mathrm{local}}\, \mathbf{g} \end{equation*}$

其中“全局”(global)学习率是您熟悉的常规学习率策略(是一些小的数值,如0.01,且可能会随着时间而衰减),而“本地的”(local)每层学习率定义为

$\begin{equation*} \large \lambda^{\mathrm{local}} = \eta\, \frac{|\mathbf{w}|}{|\mathbf{g}|} \end{equation*}$

$\eta$是一个“信任系数”,应小于1,它决定了在更新过程中我们要更新多大值的权重。请注意,该方案基本上等同于先前的公式。之所以现在SGD可以使用大的批量,LARS和与它相关的方法产生了重要的影响。

请注意,LARS与LARC(分层自适应速率控制)密切相关,这两个术语有时可以互换使用。 LARC是LARS的一个细微变体,它“裁剪”了本地学习率,使得它不高于全局学习率。也就是说,权重更新形式为:

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, \mathbf{g} \end{equation*}$

所使用的学习率由下式决定:

$\begin{equation*} \large \lambda = \mathrm{min}(\lambda^{\mathrm{global}}, \lambda^{\mathrm{local}}) \end{equation*}$

附带说明一下,在此讨论中,为简单起见,我们忽略了权重衰减,但可以很容易地将其添加到这些优化器中。

梯度平均和动量

同时进行的另一些工作则使用了梯度平均的概念:我们使用的梯度应该是当前步的梯度和上一步的梯度的平均值。作为说明,我们已经讨论了如何使用动量来避免陷入局部最小值并更有效地避开损失函数的鞍点,而具有动量的SGD实际上可以看作是梯度平均的一种形式 - 有效梯度是当前步的梯度和最后一步的梯度的线性组合。诸如RMSprop之类的优化器均实现了这一想法;除了分母中使用的梯度范数是当前步的梯度和最后一步的梯度的线性组合之外,它的更新看起来与LARS更新非常相似。

此概念最流行的实现是Adam优化器,其工作原理如下。假设我们的权重更新具有如下形式:

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, \frac{\mathbf{m}}{\sqrt{v}} \end{equation*}$

其中$\mathbf{m}$是类似梯度的项,而$v$是类似于梯度平方的范数的项(因此,如果$\mathbf{m} = \mathbf{g}$ 并且 $v = |\mathbf{g}^2|$,我们就可以恢复较早的形式)。 我们可以通过以下方式实现梯度平均:

$\begin{equation*} \large \mathbf{m} = \beta_{1}\, \mathbf{m}_{\mathrm{prev}} + (1 - \beta_{1})\, \mathbf{g} \end{equation*}$

其中$\mathbf{m}_{\mathrm{prev}}$是上一步中的“梯度”项,并且

$\begin{equation*} \large v = \beta_{2}\, v_{\mathrm{prev}} + (1 - \beta_{2})\, |\mathbf{g}^2| \end{equation*}$

其中$v_{\mathrm{prev}}$是上一步中的“梯度平方”项。这意味着我们要计算梯度的移动平均值,而不是简单地应用当前步的梯度。因此,与传统的SGD等较简单的优化器相比,Adam作为训练优化器,性能是非常稳定的。但是,与SGD相比,它的泛化能力却可能更差。

逐层学习率控制和梯度平均的结合

NovoGrad结合了这两个概念。其更新的形式返回到

$\begin{equation*} \large \Delta \mathbf{w} = -\lambda\, \mathbf{m} \end{equation*}$

但是$\mathbf{m}$项对梯度进行了适当的归一化:

$\begin{equation*} \large \mathbf{m} = \beta_{1}\, \mathbf{m}_{\mathrm{prev}} + (1 - \beta_{1})\, \frac{\mathbf{g}}{\sqrt{v}} \end{equation*}$

(并且我们首先计算对$v$的更新,以便可以在对$\mathbf{m}$的更新中使用它)。

五、NovoGrad的实现

您可以在novograd.py中查看NovoGrad优化器的Keras实现。下一步将用NovoGrad优化器替换fashion_mnist.py中的SGD优化器。

第1步:导入NovoGrad优化器:

from tensorflow_addons.optimizers import NovoGrad

第2步:用NovoGrad替换SGD优化器:

opt = NovoGrad(lr=args.base_lr, grad_averaging=True)

grad_averaging参数使用当前步和前一步的加权平均动量(如同Adam那样),经验证明它对我们要解决的问题有帮助。

如果遇到任何问题,可以在solutions / add_novograd.py中找到解决方案。实施优化程序后,请在下面的单元格中再次运行训练过程。

!mpirun -np $num_gpus python fashion_mnist.py --base-lr .16 --batch-size 256

六、结果

希望现在的训练比用标准的随机梯度下降法能更成功地收敛。验证准确性在训练早期可能很差,但是在等待至少10-20个训练次数之后,您可能会看到验证准确性开始稳步提高并最终收敛到理想的结果。NovoGrad以及其它可以获得的优化器是构建训练流水线时要考虑的重要工具。

七、结论和后续步骤

对于工程师和数据科学家而言,多GPU训练正迅速成为关键的工具。加速的训练使以前不可能的AI挑战(如复杂的图像分类)变得可以解决。同样,我们通过BERT模型在自然语言处理方面取得了惊人的进步,该模型通常在数十到数百个GPU上进行训练。但是,多GPU训练在各个层次上都有使深度学习工作改观的能力,而不仅仅是针对高端的产品级的实现。如果您的训练要花费数小时甚至数天,那么实验可能会变得乏味甚至不切实际。使用多个GPU加快训练速度,您可以花费更多时间来完善和改进模型。它使您在开发的初始阶段以及随着时间的推移更新和改进模型时,都变得更加灵活和响应迅速。

展望未来,您可能会使用多GPU系统,例如DGX-1或DGX-2,或计划使用云服务器,例如本课程提供的服务器。无论哪种情况,NVIDIA GPU云容器站(NGC)都可以为您提供免费访问的容器,提供您在本课程中使用过的所有框架和库(实际上,此课程是在NGC容器上运行的!)。除了可以避开实际安装这些框架可能遇到的挑战之外,NGC容器还针对NVIDIA GPU的训练和推理进行了高度优化。GTC 2018大会提供了一个在云中部署NGC容器的分步操作指南;您还可以在NVIDIA docs中找到最新文档,以及Azure、AWS和GCP的设置步骤。NGC容器也可以在此类平台上直接使用作为DGX-1和DGX-2。

感谢您今天加入我们的课程。我们希望您为将多GPU训练应用到您的下一个AI挑战做好了准备,并为此感到兴奋。

八、脚本

add_novograd.py

import argparse
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Dense, \
                                    Add, Activation, Dropout, MaxPooling2D, GlobalAveragePooling2D
import numpy as np
import os
from time import time
import horovod.tensorflow.keras as hvd
import csv
from tensorflow_addons.optimizers import NovoGrad

# Initialize Horovod
hvd.init()

# Pin to a GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    tf.config.experimental.set_memory_growth(gpus[hvd.local_rank()], True)
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Parse input arguments

parser = argparse.ArgumentParser(description='Fashion MNIST Example',
                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--log-dir', default='./logs',
                    help='tensorboard log directory')
parser.add_argument('--batch-size', type=int, default=32,
                    help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
                    help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=40,
                    help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.01,
                    help='learning rate for a single GPU')
parser.add_argument('--momentum', type=float, default=0.9,
                    help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.000005,
                    help='weight decay')
parser.add_argument('--warmup-epochs', type=float, default=5,
                    help='number of warmup epochs')
parser.add_argument('--target-accuracy', type=float, default=.85,
                    help='Target accuracy to stop training')
parser.add_argument('--patience', type=float, default=2,
                    help='Number of epochs that meet target before stopping')

args = parser.parse_args()

# Define a function for a simple learning rate decay over time

def lr_schedule(epoch):
    
    if epoch < 15:
        return args.base_lr
    if epoch < 25:
        return 1e-1 * args.base_lr
    if epoch < 35:
        return 1e-2 * args.base_lr
    return 1e-3 * args.base_lr

# Define the function that creates the model

def cbr(x, conv_size):
    channel_axis = 1 if K.image_data_format() == 'channels_first' else -1

    x = Conv2D(conv_size, (3,3), padding='same')(x)
    x = BatchNormalization(axis=channel_axis)(x)
    x = Activation('relu')(x)

    return x

def conv_block(x, conv_size, scale_input = False):
    x_0 = x
    if scale_input:
        x_0 = Conv2D(conv_size, (1, 1), activation='linear', padding='same')(x_0)

    x = cbr(x, conv_size)
    x = Dropout(0.01)(x)
    x = cbr(x, conv_size)
    x = Add()([x_0, x])

    return x

def create_model():

    # Implementation of WideResNet (depth = 16, width = 10) based on keras_contrib
    # https://github.com/keras-team/keras-contrib/blob/master/keras_contrib/applications/wide_resnet.py

    inputs = Input(shape=(28, 28, 1))

    x = cbr(inputs, 16)

    x = conv_block(x, 160, True)
    x = conv_block(x, 160)
    x = MaxPooling2D((2, 2))(x)
    x = conv_block(x, 320, True)
    x = conv_block(x, 320)
    x = MaxPooling2D((2, 2))(x)
    x = conv_block(x, 640, True)
    x = conv_block(x, 640)
    x = GlobalAveragePooling2D()(x)

    outputs = Dense(num_classes, activation='softmax')(x)

    model = tf.keras.models.Model(inputs, outputs)

    opt = NovoGrad(lr=args.base_lr, grad_averaging=True)

    # Wrap the optimizer in a Horovod distributed optimizer
    opt = hvd.DistributedOptimizer(opt)

    model.compile(loss=tf.keras.losses.categorical_crossentropy,
                  optimizer=opt,
                  metrics=['accuracy'])
        
    return model

if hvd.rank() == 0:
    verbose = 1
else:
    verbose = 0

# Input image dimensions
img_rows, img_cols = 28, 28
num_classes = 10

# Load Fashion MNIST data.
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

# Convert class vectors to binary class matrices
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

# Training data iterator.
train_gen = image.ImageDataGenerator(featurewise_center=True, featurewise_std_normalization=True,
                                     horizontal_flip=True, width_shift_range=0.2, height_shift_range=0.2)
train_gen.fit(x_train)
train_iter = train_gen.flow(x_train, y_train, batch_size=args.batch_size)

# Validation data iterator.
test_gen = image.ImageDataGenerator(featurewise_center=True, featurewise_std_normalization=True)
test_gen.mean = train_gen.mean
test_gen.std = train_gen.std
test_iter = test_gen.flow(x_test, y_test, batch_size=args.val_batch_size)


callbacks = []
callbacks.append(tf.keras.callbacks.LearningRateScheduler(lr_schedule))

# Add learning rate warmup callback.
callbacks.append(hvd.callbacks.LearningRateWarmupCallback(initial_lr=args.base_lr, warmup_epochs=args.warmup_epochs, verbose=verbose))

# Broadcast initial variable states from the first worker to all others.
callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0))

# Average the metrics among workers at the end of every epoch.
callbacks.append(hvd.callbacks.MetricAverageCallback())

class PrintThroughput(tf.keras.callbacks.Callback):
    def __init__(self, total_images=0):
        self.total_images = total_images

    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time()

    def on_epoch_end(self, epoch, logs={}):
        epoch_time = time() - self.epoch_start_time
        images_per_sec = round(self.total_images / epoch_time, 2)
        print('Images/sec: {}'.format(images_per_sec))

if verbose:
    callbacks.append(PrintThroughput(total_images=len(y_train)))

class StopAtAccuracy(tf.keras.callbacks.Callback):
    def __init__(self, target=0.85, patience=2, verbose=0):
        self.target = target
        self.patience = patience
        self.verbose = verbose
        self.stopped_epoch = 0
        self.met_target = 0

    def on_epoch_end(self, epoch, logs=None):
        if logs.get('val_accuracy') > self.target:
            self.met_target += 1
        else:
            self.met_target = 0

        if self.met_target >= self.patience:
            self.stopped_epoch = epoch
            self.model.stop_training = True

    def on_train_end(self, logs=None):
        if self.stopped_epoch > 0 and self.verbose == 1:
            print('Early stopping after epoch {}'.format(self.stopped_epoch + 1))

callbacks.append(StopAtAccuracy(target=args.target_accuracy, patience=args.patience, verbose=verbose))

class PrintTotalTime(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.start_time = time()

    def on_epoch_end(self, epoch, logs=None):
        total_time = round(time() - self.start_time, 2)
        print("Cumulative training time after epoch {}: {}".format(epoch + 1, total_time))

    def on_train_end(self, logs=None):
        total_time = round(time() - self.start_time, 2)
        print("Cumulative training time: {}".format(total_time))

if verbose:
    callbacks.append(PrintTotalTime())

class SaveTrainingData(tf.keras.callbacks.Callback):
    def __init__(self, data_filepath=''):
        self.data_filepath = data_filepath

    def on_train_begin(self, logs=None):       
        file = open(self.data_filepath, 'w', newline='')
        writer = csv.writer(file)
        writer.writerow(['time', 'val_accuracy'])
        writer.writerow([0.0, 0.0])
        file.close()  

        self.train_start_time = time()

    def on_epoch_end(self, epoch, logs={}):
        total_time = time() - self.train_start_time
        file = open(self.data_filepath, 'a')
        writer = csv.writer(file)
        writer.writerow([round(total_time,1), round(logs['val_accuracy'], 4)])
        file.close()

# Save the training data.
if hvd.rank() == 0:
    data_filepath = "training_data/{}ranks-{}bs-{}lr-{}m-{}w.csv".format(hvd.size(), args.batch_size,
                                                                         args.base_lr, args.momentum, args.warmup_epochs)
    callbacks.append(SaveTrainingData(data_filepath=data_filepath))

# Create the model.
model = create_model()

# Train the model.
model.fit(train_iter,
          steps_per_epoch=len(train_iter) // hvd.size(),
          callbacks=callbacks,
          epochs=args.epochs,
          verbose=verbose,
          workers=4,
          initial_epoch=0,
          validation_data=test_iter,
          validation_steps=3 * len(test_iter) // hvd.size())

# Evaluate the model on the full data set.
score = model.evaluate(test_iter, steps=len(test_iter), workers=4, verbose=verbose)
if verbose:
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

add_lr_warmup.py:

import argparse
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Dense, \
                                    Add, Activation, Dropout, MaxPooling2D, GlobalAveragePooling2D
import numpy as np
import os
from time import time
import horovod.tensorflow.keras as hvd
import csv

# Initialize Horovod
hvd.init()

# Pin to a GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    tf.config.experimental.set_memory_growth(gpus[hvd.local_rank()], True)
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Parse input arguments

parser = argparse.ArgumentParser(description='Fashion MNIST Example',
                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--log-dir', default='./logs',
                    help='tensorboard log directory')
parser.add_argument('--batch-size', type=int, default=32,
                    help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
                    help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=40,
                    help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.01,
                    help='learning rate for a single GPU')
parser.add_argument('--momentum', type=float, default=0.9,
                    help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.000005,
                    help='weight decay')
parser.add_argument('--warmup-epochs', type=float, default=5,
                    help='number of warmup epochs')
parser.add_argument('--target-accuracy', type=float, default=.85,
                    help='Target accuracy to stop training')
parser.add_argument('--patience', type=float, default=2,
                    help='Number of epochs that meet target before stopping')

args = parser.parse_args()

# Define a function for a simple learning rate decay over time

def lr_schedule(epoch):
    
    if epoch < 15:
        return args.base_lr
    if epoch < 25:
        return 1e-1 * args.base_lr
    if epoch < 35:
        return 1e-2 * args.base_lr
    return 1e-3 * args.base_lr

# Define the function that creates the model

def cbr(x, conv_size):
    channel_axis = 1 if K.image_data_format() == 'channels_first' else -1

    x = Conv2D(conv_size, (3,3), padding='same')(x)
    x = BatchNormalization(axis=channel_axis)(x)
    x = Activation('relu')(x)

    return x

def conv_block(x, conv_size, scale_input = False):
    x_0 = x
    if scale_input:
        x_0 = Conv2D(conv_size, (1, 1), activation='linear', padding='same')(x_0)

    x = cbr(x, conv_size)
    x = Dropout(0.01)(x)
    x = cbr(x, conv_size)
    x = Add()([x_0, x])

    return x

def create_model():

    # Implementation of WideResNet (depth = 16, width = 10) based on keras_contrib
    # https://github.com/keras-team/keras-contrib/blob/master/keras_contrib/applications/wide_resnet.py

    inputs = Input(shape=(28, 28, 1))

    x = cbr(inputs, 16)

    x = conv_block(x, 160, True)
    x = conv_block(x, 160)
    x = MaxPooling2D((2, 2))(x)
    x = conv_block(x, 320, True)
    x = conv_block(x, 320)
    x = MaxPooling2D((2, 2))(x)
    x = conv_block(x, 640, True)
    x = conv_block(x, 640)
    x = GlobalAveragePooling2D()(x)

    outputs = Dense(num_classes, activation='softmax')(x)

    model = tf.keras.models.Model(inputs, outputs)

    opt = tf.keras.optimizers.SGD(lr=args.base_lr, momentum=args.momentum)

    # Wrap the optimizer in a Horovod distributed optimizer
    opt = hvd.DistributedOptimizer(opt)

    model.compile(loss=tf.keras.losses.categorical_crossentropy,
                  optimizer=opt,
                  metrics=['accuracy'])
        
    return model

if hvd.rank() == 0:
    verbose = 1
else:
    verbose = 0

# Input image dimensions
img_rows, img_cols = 28, 28
num_classes = 10

# Load Fashion MNIST data.
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

# Convert class vectors to binary class matrices
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

# Training data iterator.
train_gen = image.ImageDataGenerator(featurewise_center=True, featurewise_std_normalization=True,
                                     horizontal_flip=True, width_shift_range=0.2, height_shift_range=0.2)
train_gen.fit(x_train)
train_iter = train_gen.flow(x_train, y_train, batch_size=args.batch_size)

# Validation data iterator.
test_gen = image.ImageDataGenerator(featurewise_center=True, featurewise_std_normalization=True)
test_gen.mean = train_gen.mean
test_gen.std = train_gen.std
test_iter = test_gen.flow(x_test, y_test, batch_size=args.val_batch_size)


callbacks = []
callbacks.append(tf.keras.callbacks.LearningRateScheduler(lr_schedule))

# Add learning rate warmup callback.
callbacks.append(hvd.callbacks.LearningRateWarmupCallback(initial_lr=args.base_lr, warmup_epochs=args.warmup_epochs, verbose=verbose))

# Broadcast initial variable states from the first worker to all others.
callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0))

# Average the metrics among workers at the end of every epoch.
callbacks.append(hvd.callbacks.MetricAverageCallback())

class PrintThroughput(tf.keras.callbacks.Callback):
    def __init__(self, total_images=0):
        self.total_images = total_images

    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time()

    def on_epoch_end(self, epoch, logs={}):
        epoch_time = time() - self.epoch_start_time
        images_per_sec = round(self.total_images / epoch_time, 2)
        print('Images/sec: {}'.format(images_per_sec))

if verbose:
    callbacks.append(PrintThroughput(total_images=len(y_train)))

class StopAtAccuracy(tf.keras.callbacks.Callback):
    def __init__(self, target=0.85, patience=2, verbose=0):
        self.target = target
        self.patience = patience
        self.verbose = verbose
        self.stopped_epoch = 0
        self.met_target = 0

    def on_epoch_end(self, epoch, logs=None):
        if logs.get('val_accuracy') > self.target:
            self.met_target += 1
        else:
            self.met_target = 0

        if self.met_target >= self.patience:
            self.stopped_epoch = epoch
            self.model.stop_training = True

    def on_train_end(self, logs=None):
        if self.stopped_epoch > 0 and self.verbose == 1:
            print('Early stopping after epoch {}'.format(self.stopped_epoch + 1))

callbacks.append(StopAtAccuracy(target=args.target_accuracy, patience=args.patience, verbose=verbose))

class PrintTotalTime(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.start_time = time()

    def on_epoch_end(self, epoch, logs=None):
        total_time = round(time() - self.start_time, 2)
        print("Cumulative training time after epoch {}: {}".format(epoch + 1, total_time))

    def on_train_end(self, logs=None):
        total_time = round(time() - self.start_time, 2)
        print("Cumulative training time: {}".format(total_time))

if verbose:
    callbacks.append(PrintTotalTime())

class SaveTrainingData(tf.keras.callbacks.Callback):
    def __init__(self, data_filepath=''):
        self.data_filepath = data_filepath

    def on_train_begin(self, logs=None):       
        file = open(self.data_filepath, 'w', newline='')
        writer = csv.writer(file)
        writer.writerow(['time', 'val_accuracy'])
        writer.writerow([0.0, 0.0])
        file.close()  

        self.train_start_time = time()

    def on_epoch_end(self, epoch, logs={}):
        total_time = time() - self.train_start_time
        file = open(self.data_filepath, 'a')
        writer = csv.writer(file)
        writer.writerow([round(total_time,1), round(logs['val_accuracy'], 4)])
        file.close()

# Save the training data.
if hvd.rank() == 0:
    data_filepath = "training_data/{}ranks-{}bs-{}lr-{}m-{}w.csv".format(hvd.size(), args.batch_size,
                                                                         args.base_lr, args.momentum, args.warmup_epochs)
    callbacks.append(SaveTrainingData(data_filepath=data_filepath))

# Create the model.
model = create_model()

# Train the model.
model.fit(train_iter,
          steps_per_epoch=len(train_iter) // hvd.size(),
          callbacks=callbacks,
          epochs=args.epochs,
          verbose=verbose,
          workers=4,
          initial_epoch=0,
          validation_data=test_iter,
          validation_steps=3 * len(test_iter) // hvd.size())

# Evaluate the model on the full data set.
score = model.evaluate(test_iter, steps=len(test_iter), workers=4, verbose=verbose)
if verbose:
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

参考资料

  • Nvidia的课件《用多 GPU 训练神经网络》
药企,独角兽,苏州。团队长期招人,感兴趣的都可以发邮件聊聊:tiehan@sina.cn
个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn