Source code for mxnet.gluon.trainer
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# coding: utf-8
# pylint: disable=line-too-long
"""Parameter optimizer."""
__all__ = ['Trainer']
from .. import optimizer as opt
from ..model import _create_kvstore, _create_sparse_kvstore
from .parameter import ParameterDict, Parameter
[docs]class Trainer(object):
    """Applies an `Optimizer` on a set of Parameters. Trainer should
    be used together with `autograd`.
    .. note::
        For the following cases, updates will always happen on kvstore,
        i.e., you cannot set update_on_kvstore=False.
        - dist kvstore with sparse weights or sparse gradients
        - dist async kvstore
        - `optimizer.lr_scheduler` is not None
    Parameters
    ----------
    params : ParameterDict
        The set of parameters to optimize.
    optimizer : str or Optimizer
        The optimizer to use. See
        `help <http://mxnet.io/api/python/optimization/optimization.html#the-mxnet-optimizer-package>`_
        on Optimizer for a list of available optimizers.
    optimizer_params : dict
        Key-word arguments to be passed to optimizer constructor. For example,
        `{'learning_rate': 0.1}`. All optimizers accept learning_rate, wd (weight decay),
        clip_gradient, and lr_scheduler. See each optimizer's
        constructor for a list of additional supported arguments.
    kvstore : str or KVStore
        kvstore type for multi-gpu and distributed training. See help on
        :any:`mxnet.kvstore.create` for more information.
    compression_params : dict
        Specifies type of gradient compression and additional arguments depending
        on the type of compression being used. For example, 2bit compression requires a threshold.
        Arguments would then be {'type':'2bit', 'threshold':0.5}
        See mxnet.KVStore.set_gradient_compression method for more details on gradient compression.
    update_on_kvstore : bool, default None
        Whether to perform parameter updates on kvstore. If None, then trainer will choose the more
        suitable option depending on the type of kvstore.
    Properties
    ----------
    learning_rate : float
        The current learning rate of the optimizer. Given an Optimizer object
        optimizer, its learning rate can be accessed as optimizer.learning_rate.
    """
    def __init__(self, params, optimizer, optimizer_params=None, kvstore='device',
                 compression_params=None, update_on_kvstore=None):
        if isinstance(params, (dict, ParameterDict)):
            params = list(params.values())
        if not isinstance(params, (list, tuple)):
            raise ValueError(
                "First argument must be a list or dict of Parameters, " \
                "got %s."%(type(params)))
        self._params = []
        # parameters to initialize on the kvstore
        self._contains_sparse_weight = False
        self._contains_sparse_grad = False
        self._param2idx = {}
        for i, param in enumerate(params):
            if not isinstance(param, Parameter):
                raise ValueError(
                    "First argument must be a list or dict of Parameters, " \
                    "got list of %s."%(type(param)))
            self._param2idx[param.name] = i
            self._params.append(param)
            param._set_trainer(self)
            if param._stype != 'default':
                self._contains_sparse_weight = True
            if param._grad_stype != 'default':
                self._contains_sparse_grad = True
        self._compression_params = compression_params
        optimizer_params = optimizer_params if optimizer_params else {}
        self._scale = float(optimizer_params.get('rescale_grad', 1.0))
        self._contexts = self._check_contexts()
        self._init_optimizer(optimizer, optimizer_params)
        self._kvstore_params = {'kvstore': kvstore, 'update_on_kvstore': update_on_kvstore}
        self._kv_initialized = False
        self._kvstore = None
        self._update_on_kvstore = None
        self._distributed = None
        self._params_to_init = []
        self._reset_kvstore()
    def _check_contexts(self):
        contexts = None
        for param in self._params:
            ctx = param.list_ctx()
            assert contexts is None or contexts == ctx, \
                "All Parameters must be initialized on the same set of contexts, " \
                "but Parameter %s is initialized on %s while previous Parameters " \
                "are initialized on %s."%(param.name, str(ctx), str(contexts))
            contexts = ctx
        return contexts
    def _init_optimizer(self, optimizer, optimizer_params):
        param_dict = {i: param for i, param in enumerate(self._params)}
        if isinstance(optimizer, opt.Optimizer):
            assert not optimizer_params, \
                "optimizer_params must be None if optimizer is an instance of " \
                "Optimizer instead of str"
            self._optimizer = optimizer
            # param_dict must not be deep copied, so that if user mutate the lr_mult
            # or wd_mult of some parameters, it takes effect.
            self._optimizer.param_dict = param_dict
        else:
            self._optimizer = opt.create(optimizer, param_dict=param_dict,
                                         **optimizer_params)
        self._updaters = [opt.get_updater(self._optimizer) \
                            for _ in self._contexts]
    def _init_params(self):
        """Initialize parameters in the KVStore.
        Parameters with incomplete initialization are ignored.
        """
        assert self._kv_initialized, "Cannot initialize parameters in KVStore " \
                                     "when KVStore is not initialized."
        params_to_init = []
        if self._kvstore:
            for param in self._params_to_init:
                if param._deferred_init:
                    params_to_init.append(param)
                else:
                    param_arrays = param._check_and_get(param._data, list)
                    idx = self._param2idx[param.name]
                    self._kvstore.init(idx, param_arrays[0])
                    if param._stype == 'default':
                        self._kvstore.pull(idx, param_arrays, priority=-idx)
        self._params_to_init = params_to_init
    def _reset_kvstore(self):
        """Reset kvstore."""
        if self._kvstore and 'dist' in self._kvstore.type:
            raise RuntimeError("Cannot reset distributed KVStore.")
        self._kv_initialized = False
        self._kvstore = None
        self._distributed = None
        self._update_on_kvstore = None
        self._params_to_init = [param for param in self._params]
    def _init_kvstore(self):
        """Create kvstore."""
        config = self._kvstore_params
        # configure kvstore, update_on_kvstore and self._distributed on three cases:
        if self._contains_sparse_weight:
            # If weight is sparse, kvstore must be present and the weight must be updated on kvstore.
            # The training loop is the following:
            #    - row_sparse_pull(sparse_weight)
            #    - forward()
            #    - backward()
            #    - push_and_update(grad)
            #    - pull(weight)
            kvstore, update_on_kvstore = _create_sparse_kvstore(config['kvstore'])
            self._distributed = 'dist' in kvstore.type
            # raise err if user provides unsupported configs
            if config['update_on_kvstore'] is False:
                raise ValueError("Cannot set update_on_kvstore=False when sparse weights "
                                 "are present.")
        elif self._contains_sparse_grad:
            # For single node training with dense weight and sparse grad,
            # we prefer update_on_kvstore=False because this is usually faster.
            # This means we push and pull sparse gradients, and we do not store weight in kvstore.
            # The training loop is the following:
            #    - forward()
            #    - backward()
            #    - push(grad)
            #    - pull(grad)
            #    - update(grad, weight)
            #
            # For multi-node training with dense weight and sparse grad,
            # only update_on_kvstore=True is supported, due to the fact that
            # kv.row_sparse_pull(grad) is not implemented.
            # Therefore, we push sparse gradients and pull dense weights.
            # The training loop contains:
            #    - forward()
            #    - backward()
            #    - push_and_update(grad)
            #    - pull(weight)
            arg_arrays = {param.name: param.data(self._contexts[0]) for param in self._params}
            kvstore, _ = _create_kvstore(config['kvstore'], len(self._contexts), arg_arrays)
            self._distributed = 'dist' in kvstore.type if kvstore else False
            update_on_kvstore = self._distributed
            # raise err if user provides unsupported configs
            if config['update_on_kvstore'] is not None:
                if config['update_on_kvstore'] is False and self._distributed:
                    raise ValueError("Cannot set update_on_kvstore=False on dist kvstore "
                                     "when sparse gradients are present.")
                update_on_kvstore = config['update_on_kvstore']
        else:
            # Training with dense weight and dense gradients.
            # The only unsupported mode is async with update_on_kvstore=False
            arg_arrays = {param.name: param.data(self._contexts[0]) for param in self._params}
            kvstore, update_on_kvstore = _create_kvstore(config['kvstore'], len(self._contexts),
                                                         arg_arrays)
            self._distributed = 'dist' in kvstore.type if kvstore else False
            if self._distributed and 'async' in kvstore.type:
                update_on_kvstore = True
                # raise err if user provides unsupported configs
                if config['update_on_kvstore'] is False:
                    raise ValueError("Please set update_on_kvstore=True "
                                     "when training in async mode.")
            if config['update_on_kvstore'] is not None:
                update_on_kvstore = config['update_on_kvstore']
        # set grad compression and optimizers
        if kvstore:
            if self._compression_params:
                kvstore.set_gradient_compression(self._compression_params)
            if update_on_kvstore:
                # optimizer preferably needs to be set before init for multiprecision
                kvstore.set_optimizer(self._optimizer)
            self._kvstore = kvstore
            self._update_on_kvstore = update_on_kvstore
            if self._optimizer.lr_scheduler and not self._update_on_kvstore:
                raise ValueError("update_on_kvstore=False does not support " \
                                 "optimizer with LRScheduler. Please " \
                                 "consider setting learning rate manually.")
        else:
            self._kvstore = None
            self._update_on_kvstore = None
        self._kv_initialized = True
    @property
    def learning_rate(self):
        if not isinstance(self._optimizer, opt.Optimizer):
            raise UserWarning("Optimizer has to be defined before its learning "
                              "rate can be accessed.")
        else:
            return self._optimizer.learning_rate
[docs]    def set_learning_rate(self, lr):
        """Sets a new learning rate of the optimizer.
        Parameters
        ----------
        lr : float
            The new learning rate of the optimizer.
        """
        if not isinstance(self._optimizer, opt.Optimizer):
            raise UserWarning("Optimizer has to be defined before its learning "
                              "rate is mutated.")
        else:
            self._optimizer.set_learning_rate(lr)
    def _row_sparse_pull(self, parameter, out, row_id, full_idx=False):
        """Internal method to invoke pull operations on KVStore. If `full_idx` is set to True,
        `kv.pull` is preferred instead of `kv.row_sparse_pull`.
        """
        # initialize kv and params if not already
        if not self._kv_initialized:
            self._init_kvstore()
        if self._params_to_init:
            self._init_params()
        idx = self._param2idx[parameter.name]
        if full_idx and 'dist' not in self._kvstore.type:
            assert row_id.size == out.shape[0]
            self._kvstore.pull(idx, out=out, priority=-idx, ignore_sparse=False)
        else:
            self._kvstore.row_sparse_pull(idx, out=out, row_ids=row_id, priority=-idx)
    def _check_and_rescale_grad(self, scale):
        if self._update_on_kvstore and self._distributed and self._kv_initialized:
            if self._optimizer.rescale_grad != scale:
                raise UserWarning('Possible change in the `batch_size` from previous '
                                  '`step` detected. Optimizer gradient normalizing '
                                  'factor will not change w.r.t new batch_size when '
                                  'update_on_kvstore=True and when distributed kvstore '
                                  'is used.')
        self._optimizer.rescale_grad = scale
[docs]    def step(self, batch_size, ignore_stale_grad=False):
        """Makes one step of parameter update. Should be called after
        `autograd.backward()` and outside of `record()` scope.
        For normal parameter updates, `step()` should be used, which internally calls
        `allreduce_grads()` and then `update()`. However, if you need to get the reduced
        gradients to perform certain transformation, such as in gradient clipping, then
        you may want to manually call `allreduce_grads()` and `update()` separately.
        Parameters
        ----------
        batch_size : int
            Batch size of data processed. Gradient will be normalized by `1/batch_size`.
            Set this to 1 if you normalized loss manually with `loss = mean(loss)`.
        ignore_stale_grad : bool, optional, default=False
            If true, ignores Parameters with stale gradient (gradient that has not
            been updated by `backward` after last step) and skip update.
        """
        rescale_grad = self._scale / batch_size
        self._check_and_rescale_grad(rescale_grad)
        if not self._kv_initialized:
            self._init_kvstore()
        if self._params_to_init:
            self._init_params()
        self._allreduce_grads()
        self._update(ignore_stale_grad)
[docs]    def allreduce_grads(self):
        """For each parameter, reduce the gradients from different contexts.
        Should be called after `autograd.backward()`, outside of `record()` scope,
        and before `trainer.update()`.
        For normal parameter updates, `step()` should be used, which internally calls
        `allreduce_grads()` and then `update()`. However, if you need to get the reduced
        gradients to perform certain transformation, such as in gradient clipping, then
        you may want to manually call `allreduce_grads()` and `update()` separately.
        """
        if not self._kv_initialized:
            self._init_kvstore()
        if self._params_to_init:
            self._init_params()
        assert not (self._kvstore and self._update_on_kvstore), \
                'allreduce_grads() when parameters are updated on kvstore ' \
                'is not supported. Try setting `update_on_kvstore` ' \
                'to False when creating trainer.'
        self._allreduce_grads()
    def _allreduce_grads(self):
        if self._kvstore:
            for i, param in enumerate(self._params):
                if param.grad_req != 'null':
                    self._kvstore.push(i, param.list_grad(), priority=-i)
                    if not self._update_on_kvstore:
                        self._kvstore.pull(i, param.list_grad(), priority=-i,
                                           ignore_sparse=self._distributed)
[docs]    def update(self, batch_size, ignore_stale_grad=False):
        """Makes one step of parameter update.
        Should be called after `autograd.backward()` and outside of `record()` scope,
        and after `trainer.update()`.
        For normal parameter updates, `step()` should be used, which internally calls
        `allreduce_grads()` and then `update()`. However, if you need to get the reduced
        gradients to perform certain transformation, such as in gradient clipping, then
        you may want to manually call `allreduce_grads()` and `update()` separately.
        Parameters
        ----------
        batch_size : int
            Batch size of data processed. Gradient will be normalized by `1/batch_size`.
            Set this to 1 if you normalized loss manually with `loss = mean(loss)`.
        ignore_stale_grad : bool, optional, default=False
            If true, ignores Parameters with stale gradient (gradient that has not
            been updated by `backward` after last step) and skip update.
        """
        if not self._kv_initialized:
            self._init_kvstore()
        if self._params_to_init:
            self._init_params()
        assert not (self._kvstore and self._update_on_kvstore), \
                'update() when parameters are updated on kvstore ' \
                'is not supported. Try setting `update_on_kvstore` ' \
                'to False when creating trainer.'
        self._check_and_rescale_grad(self._scale / batch_size)
        self._update(ignore_stale_grad)
    def _update(self, ignore_stale_grad=False):
        for i, param in enumerate(self._params):
            if param.grad_req == 'null':
                continue
            if not ignore_stale_grad:
                for data in param._check_and_get(param._data, list):
                    if not data._fresh_grad:
                        raise UserWarning(
                            "Gradient of Parameter `%s` on context %s has not been updated "
                            "by backward since last `step`. This could mean a bug in your "
                            "model that made it only use a subset of the Parameters (Blocks) "
                            "for this iteration. If you are intentionally only using a subset, "
                            "call step with ignore_stale_grad=True to suppress this "
                            "warning and skip updating of Parameters with stale gradient" \
                            %(param.name, str(data.context)))
            if self._kvstore and self._update_on_kvstore:
                if param._stype == 'default':
                    # 'row_sparse' parameters are not pulled immediately - they're pulled
                    # in `Block.forward`
                    self._kvstore.pull(i, param.list_data(), priority=-i)
                continue
            for upd, arr, grad in zip(self._updaters, param.list_data(), param.list_grad()):
                if not ignore_stale_grad or arr._fresh_grad:
                    upd(i, grad, arr)
                    arr._fresh_grad = False
[docs]    def save_states(self, fname):
        """Saves trainer states (e.g. optimizer, momentum) to a file.
        Parameters
        ----------
        fname : str
            Path to output states file.
        Note
        ----
        `optimizer.param_dict`, which contains Parameter information (such as
        `lr_mult` and `wd_mult`) will not be saved.
        """
        assert self._optimizer is not None
        if not self._kv_initialized:
            self._init_kvstore()
        if self._params_to_init:
            self._init_params()
        if self._update_on_kvstore:
            assert not self._params_to_init, "Cannot save trainer states when some " \
                                             "parameters are not yet initialized in kvstore."
            self._kvstore.save_optimizer_states(fname, dump_optimizer=True)
        else:
            with open(fname, 'wb') as fout:
                fout.write(self._updaters[0].get_states(dump_optimizer=True))
[docs]    def load_states(self, fname):
        """Loads trainer states (e.g. optimizer, momentum) from a file.
        Parameters
        ----------
        fname : str
            Path to input states file.
        Note
        ----
        `optimizer.param_dict`, which contains Parameter information (such as
        `lr_mult` and `wd_mult`) will not be loaded from the file, but rather set
        based on current Trainer's parameters.
        """
        if not self._kv_initialized:
            self._init_kvstore()
        if self._params_to_init:
            self._init_params()
        if self._update_on_kvstore:
            self._kvstore.load_optimizer_states(fname)
            self._optimizer = self._kvstore._updater.optimizer
        else:
            with open(fname, 'rb') as f:
                states = f.read()
            for updater in self._updaters:
                updater.set_states(states)
                updater.optimizer = self._updaters[0].optimizer
            self._optimizer = self._updaters[0].optimizer
        param_dict = {i: param for i, param in enumerate(self._params)}
        self._optimizer.param_dict = param_dict