Executing Tasks Remotely
There are two ways to handle larger amounts of data and compute:
- Scale up by running your code on a larger machine with more memory, CPU cores, and GPUs, or
- Scale out by using more machines in parallel.
As described below, Metaflow supports the former through the @resources
decorator and
the latter through foreach when flows are run on Kubernetes
or AWS Batch.
Everything described on this page applies to all compute platforms supported by
Metaflow. The data scientist can write their flows using foreaches and the @resource
decorator knowing that the code will execute on any supported platforms. For additional
tips and tricks related to specific systems, see Using AWS Batch and Using
Kubernetes.
Requesting resources with resources
decorator
Consider the following example:
from metaflow import FlowSpec, step, resources
class BigSum(FlowSpec):
@resources(memory=60000, cpu=1)
@step
def start(self):
import numpy
import time
big_matrix = numpy.random.ranf((80000, 80000))
t = time.time()
self.sum = numpy.sum(big_matrix)
self.took = time.time() - t
self.next(self.end)
@step
def end(self):
print("The sum is %f." % self.sum)
print("Computing it took %dms." % (self.took * 1000))
if __name__ == '__main__':
BigSum()
This example creates a huge 80000x80000 random matrix, big_matrix
. The matrix requires
about 80000^2 * 8 bytes = 48GB of memory.
If you attempt to run this on your local machine, it is likely that the following will happen:
$ python BigSum.py run
2019-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "BugSum.py", line 11, in start
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] big_matrix = numpy.random.ranf((80000, 80000))
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 856, in mtrand.RandomState.random_sample
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 167, in mtrand.cont0_array
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] MemoryError
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)]
2018-11-29 02:43:39.844 [5/start/21975 (pid 83812)] Task failed.
2018-11-29 02:43:39.844 Workflow failed.
Step failure:
Step start (task-id 21975) failed.
This fails quickly due to a MemoryError
on most laptops as we are unable to allocate
48GB of memory.
The @resources
decorator suggests resource requirements for a step. The memory
argument specifies the amount of RAM in megabytes and cpu
the number of CPU cores
requested. It does not produce the resources magically, which is why the run above
failed. The @resources
decorator takes effect only when combined with another
decorator that describes what compute platform, like Kubernetes or AWS Batch, to use.
Let's use the --with
option to attach a desired decorator to all steps on the command
line. Choose one of the commands in the tabs below corresponding to whichever you use-
Kubernetes or AWS Batch. This assumes that you have configured one of these systems
work with Metaflow.
- Kubernetes
- AWS Batch
$ python BigSum.py run --with kubernetes
$ python BigSum.py run --with batch
The --with batch
or --with kubernetes
option instructs Metaflow to run all tasks as
separate jobs on the chosen compute platform, instead of using a local process for each
task. It has the same effect as adding the decorator above all steps in the source code.
This time the run should succeed thanks to the large enough instance, assuming a large
enough instance is available in your compute environment. In this case the resources
decorator is used as a prescription for the size of the instance that the job should run
on. Make sure that this resource requirement can be met. If a large enough instance is
not available, the task won't start executing.
You should see an output like this:
The sum is 3200003911.795288.
Computing it took 4497ms.
In addition to cpu
and memory
you can specify gpu=N
to request N GPUs for the
instance.
Running only specific steps remotely
The resources
decorator is an annotation that signals how much resources are required
by a step. By itself, it does not force the step to be executed on any particular
platform. This is convenient as you can make the choice later, executing the same flow
on different environments without changes.
Sometimes it is useful to make sure that a step always executes on a certain compute
platform, maybe using a platform-specific configuration. You can achieve this by adding
either @batch
or @kubernetes
above steps that should be executed remotely. The
decorators accept the same keyword arguments as @resources
as well as
platform-specific arguments that you can find listed in the API
reference.
For instance, in the example above, replace @resources
with @batch
or @kubernetes
and run it as follows:
$ python BigSum.py run
You will see that the start
step gets executed on a remote instance but the end
step, which does not need special resources, is executed locally. You could even mix
decorators so that some steps execute on @kubernetes
, some on @batch
, and some
locally.
Parallelization over multiple cores
When running locally, tasks are executed as separate processes. The operating system takes care of allocating them to separate CPU cores, so they will actually execute in parallel assuming that enough CPU cores are available. Hence, your flow can utilize multiple cores without you having to do anything special besides defining branches in the flow.
When running remotely on @batch
or @kubernetes
, branches are mapped to separate jobs
that are executed in parallel, allowing you to scale horizontally to any number of
parallel tasks. In addition, you may take advantage of multiple CPU cores inside a task.
This may happen automatically if you use a modern ML library like PyTorch or Scikit
Learn, or you may parallelize functions explicitly, as explained below.
Parallel map
Metaflow provides a utility function called parallel_map
that helps take advantage of
multiple CPU cores. This function is almost equivalent to Pool().map
in the Python's
built-in
multiprocessing
library. The main differences are the following:
parallel_map
supports lambdas and any other callables of Python.parallel_map
does not suffer from bugs present inmultiprocessing
.parallel_map
can handle larger amounts of data.
You may also use parallel_map
to parallelize simple operations that might be too
cumbersome to implement as separate steps.
Here is an extension of our previous example that implements a multicore sum()
by
partitioning the matrix by row:
from metaflow import FlowSpec, step, batch, parallel_map
class BigSum(FlowSpec):
@resources(memory=60000, cpu=8)
@step
def start(self):
import numpy
import time
big_matrix = numpy.random.ranf((80000, 80000))
t = time.time()
parts = parallel_map(lambda i: big_matrix[i:i+10000].sum(),
range(0, 80000, 10000))
self.sum = sum(parts)
self.took = time.time() - t
self.next(self.end)
@step
def end(self):
print("The sum is %f." % self.sum)
print("Computing it took %dms." % (self.took * 1000))
if __name__ == '__main__':
BigSum()
Note that we use cpu=8
to request enough CPU cores, so our parallel_map
can benefit
from optimal parallelism. Disappointingly, in this case the parallel sum
is not faster
than the original simple implementation due to the overhead of launching separate
processes in parallel_map
. A less trivial operation might see a much larger
performance boost.
Safeguard flags
It is almost too easy to execute tasks remotely using Metaflow. Consider a foreach loop defined as follows:
self.params = range(1000)
self.next(self.fanned_out, foreach='params')
When run with --with batch
or --with kubernetes
, this code would launch up to 1000
parallel instances which may turn out to be quite expensive.
To safeguard against inadvertent launching of many parallel jobs, the run
and resume
commands have a flag --max-num-splits
which fails the task if it attempts to launch
more than 100 splits by default. Use the flag to increase the limit if you actually need
more tasks.
$ python myflow.py run --max-num-splits 200
Another flag, --max-workers
, limits the number of tasks run in parallel. Even if a
foreach launched 100 splits, --max-workers
would make only 16 (by default) of them
run in parallel at any point in time. If you want more parallelism, increase the value
of --max-workers
.
$ python myflow.py run --max-workers 32
Big Data
Thus far, we have focused on CPU and memory-bound steps. Loading and processing big data is often an IO-bound operation which requires a different approach. Read Loading and Storing Data for more details about how to build efficient data pipelines in Metaflow.