Running Flows Programmatically
The Runner API allows you to start and manage Metaflow runs and other operations programmatically, for instance, to run flows in a script.
The Runner
class exposes a blocking API, which waits for operations
to complete, as well as a non-blocking (asynchronous) APIs, prefixed with async
which execute
operations in the background. This document provides an overview of common patterns. For
detailed API documentation, see the Runner API reference.
The Runner
API orchestrates flows locally, equivalent to local runs started
on the command line. To trigger runs in production, orchestrated by a production
scheduler, take a look at event triggering.
To execute the examples below, save this flow in slowflow.py
:
import time
from metaflow import FlowSpec, Parameter, step
class SlowFlow(FlowSpec):
secs = Parameter('seconds', default=3)
@step
def start(self):
for i in range(self.secs):
print(f"{i} seconds passed")
time.sleep(1)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
SlowFlow()
Blocking runs
Here's a simple example that runs the above flow and waits for it to complete:
from metaflow import Runner
with Runner('slowflow.py').run() as running:
print(f'{running.run} finished')
You can access the results of the run through the Client API, accessible through
the Run
object that you can find in the ExecutingRun.run
attribute.
Runner
is best used as a context manager so it can clean up any temporary files created during
execution automatically. Otherwise you should call runner.cleanup()
once
you are done with the object to make sure no temporary files are left behind.
All methods that start a run return an ExecutingRun
object
which contains the Run
started as well as metadata about the corresponding
subprocess such as its status
, and output in stdout
and stderr
:
from metaflow import Runner
SECONDS = 5
with Runner('slowflow.py', show_output=False).run(seconds=SECONDS) as running:
if running.status == 'failed':
print(f'❌ {running.run} failed:')
elif running.status == 'successful':
print(f'✅ {running.run} succeeded:')
print(f'-- stdout --\n{running.stdout}')
print(f'-- stderr --\n{running.stderr}')
print(f'⏰ The flow waited for {running.run.data.secs} seconds')
Note that we set show_output=False
to hide the output of the run while it executes.
Passing parameters
You can pass parameters to the run through the keyword arguments in run
, such as seconds
above.
Note that the parameter values are type-checked automatically, ensuring that the types match with
the actual types expected by each Parameter
.
Setting options
You can set top-level options directly in the Runner
constructor. The names map to the corresponding
command-line options with flags like --no-pylint
mapping to a corresponding boolean, pylint=False
. As
a special case, the option --with
maps to decospecs
, as with
is a reserved keyword in Python.
For instance, you can run a flow remotely in the cloud as follows
from metaflow import Runner
with Runner('slowflow.py', pylint=False, decospecs=['kubernetes', 'retry']).run() as running:
print(f'{running.run} completed on Kubernetes!')
which is equivalent to running
python slowflow.py --no-pylint --with kubernetes --with retry run
on the command line. Also, you can pass options to the run
command, such as max_workers
or tags
:
from metaflow import Runner
with Runner('slowflow.py').run(max_workers=1, tags=['myrun', 'another_tag']) as running:
print(f'{running.run} has tags {running.run.user_tags}')
Non-blocking API
The non-blocking Runner API leverages Python's asynchronous APIs to manage operations running in the background. This allows you to inspect and interact with runs and other operations while they are executing, and handle many concurrent operations.
The API is nearly equivalent to the blocking API, except you must await
all asynchronous operations. For instance,
you can run a flow, check its status once a second, and access its outputs as follows:
import asyncio
from metaflow import Runner
async def main():
with await Runner('slowflow.py').async_run() as running:
while running.status == 'running':
print(f"Run status is {running.status}")
await asyncio.sleep(1)
print(f'{running.run} finished')
asyncio.run(main())
You can accomplish the above also using the wait()
method which waits for a run to complete:
import asyncio
from metaflow import Runner
async def main():
with await Runner('slowflow.py').async_run() as running:
await running.wait()
print(f'{running.run} finished')
asyncio.run(main())
Streaming logs
Use the stream_log
asynchronous iterator to stream logs in real-time. This snippet prints
logs from stdout
line by line until the run completes:
import asyncio
from metaflow import Runner
async def main():
with await Runner('slowflow.py').async_run() as running:
async for _, line in running.stream_log('stdout'):
print(f'OUT: {line}')
print(f'{running.run} finished')
asyncio.run(main())
If you want to perform other operations while ingesting logs periodically, you can pull lines
from the iterator without a for
loop. This snippet outputs one line at a time from stdout
until the run finishes, and then flushes any remaining logs to the output leveraging the
position
argument of stream_log
which allows streaming to continue from a known location:
import asyncio
from metaflow import Runner
async def main():
with await Runner('slowflow.py', quiet=True).async_run(seconds=7) as running:
stdout = aiter(running.stream_log('stdout'))
while running.status == 'running':
out_pos, out_line = await anext(stdout)
print(f'STILL RUNNING: {out_line}')
await asyncio.sleep(1)
# output remaining lines, if any
async for _, out_line in running.stream_log('stdout', position=out_pos):
print(out_line)
print(f'{running.run} finished')
asyncio.run(main())
Managing concurrent runs
A key benefit of the asynchronous API is that it allows multiple operations to run at the same time. For example, you can execute many runs in parallel, each with its own set of parameters.
This snippet demonstrates the pattern: It starts five concurrent runs, each with a different value of
the seconds
parameter, and shows the values one by one as the runs complete.
import asyncio
from metaflow import Runner
async def main():
# start five concurrent runs
runs = [await Runner('slowflow.py').async_run(seconds=i, tags=['manyruns']) for i in range(5)]
while runs:
print(f'{len(runs)} runs are still running')
still_running = []
for running in runs:
if running.status == 'running':
still_running.append(running)
else:
print(f'{running.run} ran for {running.run.data.secs} seconds')
runs = still_running
await asyncio.sleep(1)
asyncio.run(main())
You can observe the five runs executing in parallel in the Metaflow UI:
Programmatic resume
Besides managing runs, you can resume
past executions,
either in a blocking or non-blocking manner.
This snippet finds the oldest run of SlowFlow
and resumes it using a blocking call:
from metaflow import Runner, Flow
old_id = list(Flow('SlowFlow'))[-1].id
print(f'Resuming the first run of SlowFlow, {old_id}')
with Runner('slowflow.py').resume(origin_run_id=old_id) as running:
print(f"Resumed run {running.run} completed")
This snippet does the same using the asynchronous API:
import asyncio
from metaflow import Runner, Flow
async def main():
old_id = list(Flow('SlowFlow'))[-1].id
print(f'Resuming the first run of SlowFlow, {old_id}')
with await Runner('slowflow.py').async_resume(origin_run_id=old_id) as running:
await running.wait()
print(f"Resumed run {running.run} completed")
asyncio.run(main())