【技术分享】改进官方TF源码,进行BERT文本分类的多卡训练

本文原作者:彭浩源,经授权后发布。

导语

Google-research开源的BERT代码中,微调BERT进行文本分类的demo代码是基于TPUEstimator的单卡实现,即使机器上有多块GPU,也无法并行训练,不满足大规模训练的要求。本文分析开源demo代码无法多卡训练的原因,并给出修改代码,支持多卡训练的方案。

1. 在多卡机器上单卡运行

Google-research开源的BERT官方Tensorflow代码(项目地址)中,提供了run_classifier.py作为接口,用于finetune一个文本分类模型。在run_classifier.py的代码中,使用tf.estimator进行模型的训练和评估,部分代码如下(run_classifier.py第847-880行):

  model_fn = model_fn_builder(
bert_config=bert_config,
num_labels=len(label_list),
init_checkpoint=FLAGS.init_checkpoint,
learning_rate=FLAGS.learning_rate,
num_train_steps=num_train_steps,
num_warmup_steps=num_warmup_steps,
use_tpu=FLAGS.use_tpu,
use_one_hot_embeddings=FLAGS.use_tpu)
# If TPU is not available, this will fall back to normal Estimator on CPU
# or GPU.
estimator = tf.contrib.tpu.TPUEstimator(
use_tpu=FLAGS.use_tpu,
model_fn=model_fn,
config=run_config,
train_batch_size=FLAGS.train_batch_size,
eval_batch_size=FLAGS.eval_batch_size,
predict_batch_size=FLAGS.predict_batch_size)
if FLAGS.do_train:
train_file = os.path.join(FLAGS.output_dir, "train.tf_record")
file_based_convert_examples_to_features(
train_examples, label_list, FLAGS.max_seq_length, tokenizer, 				train_file)
tf.logging.info("***** Running training *****")
tf.logging.info("  Num examples = %d", len(train_examples))
tf.logging.info("  Batch size = %d", FLAGS.train_batch_size)
tf.logging.info("  Num steps = %d", num_train_steps)
train_input_fn = file_based_input_fn_builder(
input_file=train_file,
seq_length=FLAGS.max_seq_length,
is_training=True,
drop_remainder=True)
estimator.train(input_fn=train_input_fn, max_steps=num_train_steps)

在这段代码中:

  • 首先调用先前定义的model_fn_builder函数,这个函数的返回值也是一个函数,称为model_fn。这个model_fn定义了计算图的结构,以及mode分别为TRAIN, EVAL和PREDICT时的不同操作。
  • 其次,利用得到的model_fn函数,结合其它变量,生成一个tf.contrib.tpu.TPUEstimator的实例。当没有TPU可用(即使用CPU或者GPU)的时候,TPUEstimator相当于普通的tf.estimator.Estimator。
  • 然后,调用file_based_input_fn_builder函数,这个函数的返回值也是一个函数,称为train_input_fn。这个train_input_fn的返回值是一个tf.data.Dataset。
  • 最后,使用estimator.train()方法,传入train_input_fn和max_steps,开始训练。

这是使用estimator API进行模型训练的基本流程。使用这一流程进行训练有一个很大的问题:

即使机器上有多块GPU,在默认配置下,它只能使用一块GPU,无法充分利用GPU的算力。

在一台有8块P40的机器上,使用tensorflow1.15和python3运行run_classifier.py,在开始训练后,如果执行nvidia-smi命令查看GPU的使用情况,会得到这样的结果:

启动命令为:

python run_classifier.py --data_dir /google-bert/CoLA --task_name cola --vocab_file /data/bert/uncased_L-12_H-768_A-12/vocab.txt --bert_config_file /data/bert/uncased_L-12_H-768_A-12/bert_config.json --output_dir ./bert_dir --init_checkpoint /data/bert/uncased_L-12_H-768_A-12/bert_model.ckpt --do_train --do_eval

如图,机器上有8块GPU,但是只有GPU:0正在计算,其余GPU均处于闲置状态。

CoLA数据集共有8551个训练样本,我们使用的batch_size为默认值32,训练epoch数为3.0,因此总训练步数为8551 * 3 / 32 = 801步。日志显示global_step/sec的值约为1.69,因此总训练时长约为474秒。

实验结果与预期相符,大约8分钟后,代码运行完毕,共训练801个step,验证集上的准确率为79.0%。

2. 直接加入MirroredStrategy(失败)

对于tf.estimator,常见的多卡分布式方案是使用tf.distribute.MirroredStrategy。我们尝试简单修改代码,在run_config中加入MirroredStrategy。修改后的代码如下:

# Add MirroredStrategy
strategy = tf.distribute.MirroredStrategy()
run_config = tf.contrib.tpu.RunConfig(
cluster=tpu_cluster_resolver,
master=FLAGS.master,
model_dir=FLAGS.output_dir,
save_checkpoints_steps=FLAGS.save_checkpoints_steps,
train_distribute=strategy,
tpu_config=tf.contrib.tpu.TPUConfig(
iterations_per_loop=FLAGS.iterations_per_loop,
num_shards=FLAGS.num_tpu_cores,
per_host_input_for_training=is_per_host))

重新运行后,运行失败,Traceback显示:

TypeError: _call_input_fn() takes 3 positional arguments but 4 were given

Traceback中的信息比较有限,难以排查具体错误原因,但我们可以初步得出结论,无法直接将MirroredStrategy应用到TPUEstimator中。

3. 改用普通Estimator和MirroredStrategy (失败)

由于我们是在GPU机器上训练,不使用TPU,因此我们尝试将TPUEstimator改为普通tf.estimator.Estimator。

首先我们修改run_config,将run_classifier.py中828行定义的run_config改为:

# Add MirroredStrategy
strategy = tf.distribute.MirroredStrategy()
run_config = tf.estimator.RunConfig(
model_dir=FLAGS.output_dir,
train_distribute=strategy,
save_checkpoints_steps=FLAGS.save_checkpoints_steps)

其次,修改estimator,将run_classifier.py中859行定义的estimator改为:

estimator = tf.estimator.Estimator(model_fn=model_fn, config=run_config)

然后,我们修改file_based_input_fn_builder函数。由于原有的file_based_input_fn_builder中返回的input_fn函数签名中包含参数params,并从params中读取batch_size,但我们定义的普通estimator的默认params中并没有这一参数,因此,我们将【从params中读取batch_size】改为【将batch_size作为参数传入file_based_input_fn_builder】。修改后的file_based_input_fn_builder函数如下:

def file_based_input_fn_builder(input_file, seq_length, is_training,
drop_remainder, batch_size):
"""Creates an `input_fn` closure to be passed to TPUEstimator."""
name_to_features = {
"input_ids": tf.FixedLenFeature([seq_length], tf.int64),
"input_mask": tf.FixedLenFeature([seq_length], tf.int64),
"segment_ids": tf.FixedLenFeature([seq_length], tf.int64),
"label_ids": tf.FixedLenFeature([], tf.int64),
"is_real_example": tf.FixedLenFeature([], tf.int64),
}
def _decode_record(record, name_to_features):
"""Decodes a record to a TensorFlow example."""
example = tf.parse_single_example(record, name_to_features)
# tf.Example only supports tf.int64, but the TPU only supports tf.int32.
# So cast all int64 to int32.
for name in list(example.keys()):
t = example[name]
if t.dtype == tf.int64:
t = tf.to_int32(t)
example[name] = t
return example
def input_fn():
"""The actual input function."""
# For training, we want a lot of parallel reading and shuffling.
# For eval, we want no shuffling and parallel reading doesn't matter.
d = tf.data.TFRecordDataset(input_file)
if is_training:
d = d.repeat()
d = d.shuffle(buffer_size=100)
d = d.apply(
tf.contrib.data.map_and_batch(
lambda record: _decode_record(record, name_to_features),
batch_size=batch_size,
drop_remainder=drop_remainder))
return d
return input_fn

在调用时,我们需要多传一个参数batch_size:

train_input_fn = file_based_input_fn_builder(
input_file=train_file,
seq_length=FLAGS.max_seq_length,
is_training=True,
drop_remainder=True,
batch_size=FLAGS.train_batch_size)

生成eval_input_fn和predict_input_fn时,应分别传入对应的batch_size。

最后,我们要修改model_fn本身,将不同mode下的返回值类型从tf.contrib.tpu.TPUEstimatorSpec改为tf.estimator.EstimatorSpec。对于train和predict两个mode,直接修改返回值类型即可,对于eval这个mode,我们需要改变eval_metrics的类型,修改后的具体代码如下,即将eval_metrics变为了eval_metric_ops:

eval_metric_ops = metric_fn(per_example_loss, label_ids,
logits, is_real_example)
output_spec = tf.estimator.EstimatorSpec(
mode=mode,
loss=total_loss,
eval_metric_ops=eval_metric_ops)

完成以上修改后,运行代码,出现了一个ValueError,内容如下:

ValueError: You must specify an aggregation method to update a MirroredVariable in Replica Context. You can do so by passing an explicit value for argument `aggregation` to tf.Variable(..).e.g. `tf.Variable(..., aggregation=tf.VariableAggregation.SUM)``tf.VariableAggregation` lists the possible aggregation methods.This is required because MirroredVariable should always be kept in sync. When updating them or assigning to them in a replica context, we automatically try to aggregate the values before updating the variable. For this aggregation, we need to know the aggregation method. Another alternative is to not try to update such MirroredVariable in replica context, but in cross replica context. You can enter cross replica context by calling `tf.distribute.get_replica_context().merge_call(merge_fn, ..)`.Inside `merge_fn`, you can then update the MirroredVariable using `tf.distribute.StrategyExtended.update()`.

观察Traceback,可以发现,这个ValueError是optimizer在apply_gradients时产生的,具体错误位于optimization.py的154行。

在Google公开的BERT代码中,从optimization.py可以看出,模型训练时没有用tensorflow内置的优化器,而是通过继承tf.train.Optimizer,并重写apply_gradients等方法自行实现了一个AdamWeightDecayOptimizer类作为优化器。Google-research的源代码中,实现优化器时没有考虑到优化器和分布式训练的兼容,没有定义优化器中的变量在多卡训练时的聚合(Aggregation)方式,因而在多卡训练时会报错。

4. 改用普通Estimator和MirroredStrategy,同时更换优化器(成功)

最简单的更换优化器的方式就是修改optimization.py。在optimization.py第60行中定义optimizer变量为同一个py文件中自行实现的AdamWeightDecayOptimizer,我们先尝试将其替换为普通的AdamOptimizer:

optimizer = tf.train.AdamOptimizer(
learning_rate=learning_rate,
epsilon=1e-6)

注意,由于Google-research实现的AdamWeightDecayOptimizer在apply_gradient方法中没有调整global_step,他们在optimization.py的第82-83行手动增加了global_step。如果我们换成普通的optimizer,需要删除这两行的代码,否则global_step的值会变成实际值的两倍!

重新运行后,运行成功,nvidia-smi命令显示,机器上的8张GPU全部处于运作状态,这也进一步验证了原有代码运行失败的原因出在优化器上。

但是,使用tensorflow内置的AdamOptimizer代替AdamWeightDecayOptimizer的话,就失去了weight decay的功能。如果想要使用AdamWeightDecayOptimizer,在Github上,可以找到支持多卡的实现,如JayYip的实现,我们可以用这些多卡版本的AdamWeightDecayOptimizer代替BERT源码中的AdamWeightDecayOptimizer,从而满足对多卡训练的要求。

5. 根据GPU数量调整训练步数

在Google-research提供的源代码中是通过num_epochs控制训练步数的,如run_classifier.py第842-845行所示,代码中根据训练集样本个数,训练epoch数和训练时的batch_size计算出训练步数,具体的计算方式是:训练步数=训练集样本个数 * 训练epoch数 / 训练时batch_size。训练步数被用于控制estimator的训练。

当我们成功使用多卡并行训练时,每张卡上均有一个batch在训练,因此实际的batch_size是设定的batch_size与GPU数目的乘积,我们可以通过以下实验来简单证明这一点:

我们将file_based_input_fn_builder中d = d.repeat()一行去掉,同时将main函数中的estimator.train(input_fn=train_input_fn, max_steps=num_train_steps)改为estimator.train(input_fn=train_input_fn)。这一步的目的是让dataset只有一份训练集的样本(即8551个训练样本各出现一次),且以遍历完dataset来作为训练结束的条件。同时,我们在启动命令中加入–train_batch_size 1,将batch_size改为1。

完成上述改动后,我们使用8卡并行训练。我们发现,训练结束时global_step的值是1068,正好是8551 / 8,这说明我们的训练策略是每张卡上的batch_size为1,由于有8张卡,实际每个step训练了8个batch。

因此,当多卡并行训练时,如果我们还是以epoch来控制训练步数,那么计算实际的max_steps时,要除以GPU的数量

我们使用8块GPU,32的train_batch_size,在CoLA数据集上训练3.0个epoch,则实际的训练步数是8551 * 3 / 8/ 32 = 100步。经过了3分32秒的准备时间和1分50秒的实际训练时间后,训练成功完成,验证集准确率与单卡训练相当,训练时间减少了约3分钟。这是由于多卡训练较长的准备时间导致的。CoLA数据集上的实验表明,8卡并行训练时,global_step/sec的值约为0.91,相比于单卡的1.69,训练速度是单卡的0.91 * 8 / 1.69 = 4.31倍。因此当训练数据量很大时,多卡训练能显著节省训练时间。

6. 其它注意事项

使用上述改动进行多卡训练时,要注意:

  • 多卡并行的调度和同步等操作本身需要一定的时间。以前面CoLA数据集的实验为例,当使用8块P40GPU并行训练时,在执行训练命令大约3-4分钟后,实际的训练才开始。因此,是否使用多卡并行训练需要考虑训练量的大小。如果训练量不大的话,可能单卡训练的实际耗时反而更短。

7. 总结

综上所述,改动BERT的run_classifier.py以进行多卡训练需要以下步骤:

  1. 将tf.contrib.tpu.TPUEstimator改为tf.estimator.Estimator。此处需要修改的代码较多,包括model_fn_builder和file_based_input_fn_builder,以及file_based_input_fn_builder的调用代码。
  2. 在run_config中加入MirroredStrategy。
  3. 修改optimization.py中的优化器,使用tensorflow内置的优化器或者支持多卡训练的AdamWeightDecayOptimizer的实现,此处需要注意优化器的apply_gradients方法是否增加了global_step,以决定是否需要手动增加global_step。
  4. 如果按照epoch数控制训练,需要根据GPU的数量,调整传给estimator.train的max_steps参数。