Skip to main content

Distributed Computing

Metaflow's foreach construct allows you to run tasks concurrently. In the case foreach, tasks execute independently. This pattern works well when the workload is embarrassingly parallel, that is, tasks don't communicate with each other and they don't have to execute simultaneously.

There are other workloads, such as distributed training of large models, which require tasks to interact with each other. Metaflow provides another mechanism, the @parallel decorator, which orchestrates such inter-dependent tasks. In effect, the decorator launches an ephemeral compute cluster on the fly, as a part of a Metaflow flow, benefiting from Metaflow features like dependency management, versioning, and production deployments.

Typically, this pattern is used through one of the framework-specific decorators like @torchrun or @deepspeed, described below, which make it easy to use a particular framework for distributed training. If you need low-level access to the cluster, e.g. to use it with a framework that doesn't have a corresponding high-level decorator yet, see documentation for low-level access at the end of this page.

info

To use distributed computing, follow set up instructions here. If you need help getting started, contact Metaflow Slack.

High-level decorators

The easiest way to get started is to use one of the high-level decorators - see an overview in this blog post:

Decorator ImplementationUXDescriptionPyPi ReleaseExample
@torchrunUse current.torch.run to submit your torch.distributed program. No need to log into each node, call the code once in @step.A torchrun command that runs @step function code on each node. Torch distributed is used under the hood to handle communication between nodes.metaflow-torchrunMinGPT
@deepspeedExposes current.deepspeed.run
Requires OpenSSH and OpenMPI installed in the Metaflow task container.
Form MPI cluster with passwordless SSH configured at task runtime (to reduce the risk of leaking private keys). Submit the Deepspeed program and run.metaflow-deepspeedBert & Dolly
@metaflow_rayWrite a Ray program locally or call script from @step function, @metaflow_ray takes care of forming the Ray cluster.Forms a Ray cluster dynamically. Runs the @step function code on the control task as Ray’s “head node”.metaflow-rayGPT-J & Distributed XGBoost
@tensorflowPut TensorFlow code in a distributed strategy scope, and call it from step function.Run the @step function code on each node. This means the user picks the appropriate strategy in their code.metaflow-tensorflowKeras Distributed
@mpiExposes current.mpi.cc, current.mpi.broadcast_file, current.mpi.run, current.mpi.exec. Cluster SSH config is handled automatically inside the decorator. Requires OpenSSH and an MPI implementation are installed in the Metaflow task container. It was tested against OpenMPI, which you can find a sample Dockerfile for here.Forms an MPI cluster with passwordless SSH configured at task runtime. Users can submit a mpi4py program or compile, broadcast, and submit a C program.metaflow-mpiLibgrape
info

Note that these decorators are not included in the metaflow package but they are implemented as Metaflow Extensions. You need to install them separately in your development environment, but they will get packaged automatically by Metaflow, so you don't need to include them in Docker images or @conda/@pypi. Also note that the extensions are not part of the stable Metaflow API, so they are subject to change.

Low-level access

Under the hood, Metaflow guarantees that you get a desired kind and number of compute nodes running simultaneously, so that they are able to communicate and coordinate amongst each other.

You can use this compute cluster to implement any distributed computing algorithms of your own. To illustrate this, consider a simple example that sets up a cluster of tasks that communicate with each other over MPI. Technically, MPI is not required - you could communicate with any protocol you want - but MPI is a popular choice.

MPI example

Let’s create a simple Hello World MPI program based on this example. The program identifies the main node (rank == 0) that sends a message to all workers nodes which they receive and print out. We use mpi4py as a Python wrapper for the MPI protocol.

First, let's create an MPI script, mpi_hello.py:

import mpi4py
from mpi4py import MPI

if __name__ == "__main__":

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
print(f"Cluster has {size} processes")
for i in range(1, size):
msg = "Main node says hi! 👋"
comm.send(msg, dest=i)
else:
msg = comm.recv()
print(f"👷 Worker node {rank} received message: {msg}")

Next, let's create a flow that launches a cluster of four nodes, thanks to num_parallel=4, and runs the MPI script we defined above in the cluster, launching two worker processes on each node.

from metaflow import FlowSpec, step, batch, mpi, current

N_CPU = 2
N_NODES = 4

class MPI4PyFlow(FlowSpec):

@step
def start(self):
self.next(self.multinode, num_parallel=N_NODES)

@batch(image="eddieob/mpi-base:1", cpu=N_CPU)
@mpi
@step
def multinode(self):
current.mpi.exec(
args=["-n", str(N_CPU * N_NODES), "--allow-run-as-root"],
program="python mpi_hello.py",
)
self.next(self.join)

@step
def join(self, inputs):
self.next(self.end)

@step
def end(self):
pass

if __name__ == "__main__":
MPI4PyFlow()

To run the flow, make sure your AWS Batch environment is configured to support multinode jobs. Then, install the MPI extension for Metaflow

pip install metaflow-mpi

and run the flow with

python mpiflow.py run

The example uses an image, eddieob/mpi-base, defined in this Dockerfile. The image includes MPI and ssh for communication. Note that Metaflow packages mpi_hello.py automatically, so it doesn't have to be included in the image.