import argparse import os import time from threading import Lock
import torch import torch.distributed.autograd as dist_autograd import torch.distributed.rpc as rpc import torch.multiprocessing as mp import torch.nn as nn import torch.nn.functional as F from torch import optim from torch.distributed.optim import DistributedOptimizer from torchvision import datasets, transforms
# --------- MNIST Network to train, from pytorch/examples -----
classNet(nn.Module): def__init__(self, num_gpus=0): super(Net, self).__init__() print(f"Using {num_gpus} GPUs to train") self.num_gpus = num_gpus device = torch.device( "cuda:0"if torch.cuda.is_available() andself.num_gpus > 0else"cpu") print(f"Putting first 2 convs on {str(device)}") # Put conv layers on the first cuda device, or CPU if no cuda device self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device) self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device) # Put rest of the network on the 2nd cuda device, if there is one if"cuda"instr(device) and num_gpus > 1: device = torch.device("cuda:1")
print(f"Putting rest of layers on {str(device)}") self.dropout1 = nn.Dropout2d(0.25).to(device) self.dropout2 = nn.Dropout2d(0.5).to(device) self.fc1 = nn.Linear(9216, 128).to(device) self.fc2 = nn.Linear(128, 10).to(device)
defforward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.max_pool2d(x, 2)
x = self.dropout1(x) x = torch.flatten(x, 1) # Move tensor to next device if necessary next_device = next(self.fc1.parameters()).device x = x.to(next_device)
x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output
# On the local node, call a method with first arg as the value held by the # RRef. Other args are passed in as arguments to the function called. # Useful for calling instance methods. method could be any matching function, including # class methods. defcall_method(method, rref, *args, **kwargs): return method(rref.local_value(), *args, **kwargs)
# Given an RRef, return the result of calling the passed in method on the value # held by the RRef. This call is done on the remote node that owns # the RRef and passes along the given argument. # Example: If the value held by the RRef is of type Foo, then # remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling # <foo_instance>.bar(arg1, arg2) on the remote node and getting the result # back.
# --------- Parameter Server -------------------- classParameterServer(nn.Module): def__init__(self, num_gpus=0): super().__init__() model = Net(num_gpus=num_gpus) self.model = model self.input_device = torch.device( "cuda:0"if torch.cuda.is_available() and num_gpus > 0else"cpu")
接下来,我们将定义前进通道。 请注意,无论模型输出的设备如何,我们都会将输出移至 CPU,因为分布式 RPC 框架当前仅支持通过 RPC 发送 CPU 张量。 由于有可能在调用者/被调用者上使用不同的设备(CPU/GPU),因此我们有意禁用通过 RPC 发送 CUDA 张量,但在将来的版本中可能会支持此功能。
1 2 3 4 5 6 7 8 9 10
classParameterServer(nn.Module): ... defforward(self, inp): inp = inp.to(self.input_device) out = self.model(inp) # This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors. # Tensors must be moved in and out of GPU memory due to this. out = out.to("cpu") return out
# Use dist autograd to retrieve gradients accumulated for this model. # Primarily used for verification. defget_dist_gradients(self, cid): grads = dist_autograd.get_gradients(cid) # This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors. # Tensors must be moved in and out of GPU memory due to this. cpu_grads = {} for k, v in grads.items(): k_cpu, v_cpu = k.to("cpu"), v.to("cpu") cpu_grads[k_cpu] = v_cpu return cpu_grads
# Wrap local parameters in a RRef. Needed for building the # DistributedOptimizer which optimizes paramters remotely. defget_param_rrefs(self): param_rrefs = [rpc.RRef(param) for param inself.model.parameters()] return param_rrefs
# The global parameter server instance. param_server = None # A lock to ensure we only have one parameter server. global_lock = Lock()
defget_parameter_server(num_gpus=0): """ Returns a singleton parameter server to all trainer processes """ global param_server # Ensure that we get only one handle to the ParameterServer. with global_lock: ifnot param_server: # construct it once param_server = ParameterServer(num_gpus=num_gpus) return param_server
defrun_parameter_server(rank, world_size): # The parameter server just acts as a host for the model and responds to # requests from trainers. # rpc.shutdown() will wait for all workers to complete by default, which # in this case means that the parameter server will wait for all trainers # to complete, and then exit. print("PS master initializing RPC") rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size) print("RPC initialized! Running parameter server...") rpc.shutdown() print("RPC shutdown on parameter server.")
接下来,我们将定义TrainerNet类。 这也将是nn.Module的子类,并且我们的__init__方法将使用rpc.remote API 获取对我们的参数服务器的 RRef 或远程引用。 请注意,此处我们没有将参数服务器复制到本地进程,而是可以将self.param_server_rref视为指向驻留在单独进程中的参数服务器的分布式共享指针。
1 2 3 4 5 6 7 8 9 10 11 12
# --------- Trainers --------------------
# nn.Module corresponding to the network trained by this trainer. The # forward() method simply invokes the network on the given parameter # server. classTrainerNet(nn.Module): def__init__(self, num_gpus=0): super().__init__() self.num_gpus = num_gpus self.param_server_rref = rpc.remote( "parameter_server", get_parameter_server, args=(num_gpus,))
接下来,我们将定义一个名为get_global_param_rrefs的方法。 为了激发对这种方法的需求,值得阅读DistributedOptimizer上的文档,尤其是 API 签名。 必须向优化器传递与要优化的远程参数相对应的RRef列表,因此在这里我们获得了必要的RRef。 由于给定TrainerNet与之交互的唯一远程工作器是ParameterServer,因此我们只需在ParameterServer上调用remote_method。 我们使用在ParameterServer类中定义的get_param_rrefs方法。 此方法将RRef的列表返回到需要优化的参数。 请注意,在这种情况下,我们的TrainerNet没有定义自己的参数; 如果确实如此,我们还需要将每个参数都包装在RRef中,并将其包含在DistributedOptimizer的输入中。
defrun_training_loop(rank, num_gpus, train_loader, test_loader): ... for i, (data, target) inenumerate(train_loader): with dist_autograd.context() as cid: model_output = net(data) target = target.to(model_output.device) loss = F.nll_loss(model_output, target) if i % 5 == 0: print(f"Rank {rank} training batch {i} loss {loss.item()}") dist_autograd.backward(cid, [loss]) # Ensure that dist autograd ran successfully and gradients were # returned. assert remote_method( ParameterServer.get_dist_gradients, net.param_server_rref, cid) != {} opt.step(cid)
defget_accuracy(test_loader, model): model.eval() correct_sum = 0 # Use GPU to evaluate if possible device = torch.device("cuda:0"if model.num_gpus > 0 and torch.cuda.is_available() else"cpu") with torch.no_grad(): for i, (data, target) inenumerate(test_loader): out = model(data, -1) pred = out.argmax(dim=1, keepdim=True) pred, target = pred.to(device), target.to(device) correct = pred.eq(target.view_as(pred)).sum().item() correct_sum += correct
if __name__ == '__main__': parser = argparse.ArgumentParser( description="Parameter-Server RPC based training") parser.add_argument( "--world_size", type=int, default=4, help="""Total number of participating processes. Should be the sum of master node and all training nodes.""") parser.add_argument( "rank", type=int, default=None, help="Global rank of this process. Pass in 0 for master.") parser.add_argument( "num_gpus", type=int, default=0, help="""Number of GPUs to use for training, Currently supports between 0 and 2 GPUs. Note that this argument will be passed to the parameter servers.""") parser.add_argument( "--master_addr", type=str, default="localhost", help="""Address of master, will default to localhost if not provided. Master must be able to accept network traffic on the address + port.""") parser.add_argument( "--master_port", type=str, default="29500", help="""Port that master is listening on, will default to 29500 if not provided. Master must be able to accept network traffic on the host and port.""")