Skip to main content

Runner - Running flows programmatically

The Runner class and its notebook-focused counterpart NBRunner allow you to start runs and resume them programmatically. For an overview, see Managing Flows in Notebooks and Scripts.

Runner

The Runner object is typically used as a context manager:

with Runner(...) as runner:
...

note that ExecutingRun returned by run and async_run works as a context manager as well, so you can do

with Runner(...).run() as running:
...

or

with await Runner(...).async_run() as running:
...

If you don't use Runner as a context manager, remember to call Runner.cleanup() to remove any leftover temp files.

Runner(flow_file, show_output=True, profile=None, env=None, cwd=None, **kwargs)

[source]

Metaflow's Runner API that presents a programmatic interface to run flows and perform other operations either synchronously or asynchronously. The class expects a path to the flow file along with optional arguments that match top-level options on the command-line.

This class works as a context manager, calling cleanup() to remove temporary files at exit.

Example:

with Runner('slowflow.py', pylint=False) as runner:
    result = runner.run(alpha=5, tags=["abc", "def"], max_workers=5)
    print(result.run.finished)
Parameters 

flow_file: str

Path to the flow file to run

show_output: bool, default True

Show the 'stdout' and 'stderr' to the console by default, Only applicable for synchronous 'run' and 'resume' functions.

profile: Optional[str], default None

Metaflow profile to use to run this run. If not specified, the default profile is used (or the one already set using METAFLOW_PROFILE)

env: Optional[Dict], default None

Additional environment variables to set for the Run. This overrides the environment set for this process.

cwd: Optional[str], default None

The directory to run the subprocess in; if not specified, the current directory is used.

**kwargs: Any

Additional arguments that you would pass to python myflow.py before the run command.

Runner.cleanup(self)

[source]

Delete any temporary files created during execution.

Blocking API

These calls block until the command completes.

Runner.run(self, **kwargs)

[source]

Blocking execution of the run. This method will wait until the run has completed execution.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the run command, in particular, any parameters accepted by the flow.

Returns 

ExecutingRun

ExecutingRun containing the results of the run.

Runner.resume(self, **kwargs)

[source]

Blocking resume execution of the run. This method will wait until the resumed run has completed execution.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python ./myflow.py after the resume command.

Returns 

ExecutingRun

ExecutingRun containing the results of the resumed run.

Non-Blocking API

Runner.async_run(self, **kwargs)

[source]

Non-blocking execution of the run. This method will return as soon as the run has launched.

Note that this method is asynchronous and needs to be awaited.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the run command, in particular, any parameters accepted by the flow.

Returns 

ExecutingRun

ExecutingRun representing the run that was started.

Runner.async_resume(self, **kwargs)

[source]

Non-blocking resume execution of the run. This method will return as soon as the resume has launched.

Note that this method is asynchronous and needs to be awaited.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the resume command.

Returns 

ExecutingRun

ExecutingRun representing the resumed run that was started.

NBRunner

NBRunner is a wrapper over Runner which allows you to refer to a flow defined in a notebook cell instead of a file. For examples, see Running flows in a notebook.

NBRunner(flow, show_output=True, profile=None, env=None, base_dir=None, **kwargs)

[source]

A wrapper over Runner for executing flows defined in a Jupyter notebook cell.

Instantiate this class on the last line of a notebook cell where a flow is defined. In contrast to Runner, this class is not meant to be used in a context manager. Instead, use a blocking helper function like nbrun (which calls cleanup() internally) or call cleanup() explictly when using non-blocking APIs.

run = NBRunner(FlowName).nbrun()
Parameters 

flow: FlowSpec

Flow defined in the same cell

show_output: bool, default True

Show the 'stdout' and 'stderr' to the console by default, Only applicable for synchronous 'run' and 'resume' functions.

profile: Optional[str], default None

Metaflow profile to use to run this run. If not specified, the default profile is used (or the one already set using METAFLOW_PROFILE)

env: Optional[Dict], default None

Additional environment variables to set for the Run. This overrides the environment set for this process.

base_dir: Optional[str], default None

The directory to run the subprocess in; if not specified, a temporary directory is used.

**kwargs: Any

Additional arguments that you would pass to python myflow.py before the run command.

Blocking API

NBRunner.nbrun(self, **kwargs)

[source]

Blocking execution of the run. This method will wait until the run has completed execution.

Note that in contrast to run, this method returns a metaflow.Run object directly and calls cleanup() internally to support a common notebook pattern of executing a flow and retrieving its results immediately.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the run command, in particular, any parameters accepted by the flow.

Returns 

Run

A metaflow.Run object representing the finished run.

NBRunner.nbresume(self, **kwargs)

[source]

Blocking resuming of a run. This method will wait until the resumed run has completed execution.

Note that in contrast to resume, this method returns a metaflow.Run object directly and calls cleanup() internally to support a common notebook pattern of executing a flow and retrieving its results immediately.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the resume command.

Returns 

Run

A metaflow.Run object representing the resumed run.

Non-Blocking API

NBRunner.async_run(self, **kwargs)

[source]

Non-blocking execution of the run. This method will return as soon as the run has launched. This method is equivalent to Runner.async_run.

Note that this method is asynchronous and needs to be awaited.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the run command, in particular, any parameters accepted by the flow.

Returns 

ExecutingRun

ExecutingRun representing the run that was started.

NBRunner.async_resume(self, **kwargs)

[source]

Non-blocking execution of the run. This method will return as soon as the run has launched. This method is equivalent to Runner.async_resume.

Note that this method is asynchronous and needs to be awaited.

Parameters 

**kwargs: Any

Additional arguments that you would pass to python myflow.py after the run command, in particular, any parameters accepted by the flow.

Returns 

ExecutingRun

ExecutingRun representing the run that was started.

NBRunner.cleanup(self)

[source]

Delete any temporary files created during execution.

Call this method after using async_run or async_resume. You don't have to call this after nbrun or nbresume.

ExecutingRun

ExecutingRun()

[source]

This class contains a reference to a metaflow.Run object representing the currently executing or finished run, as well as metadata related to the process.

ExecutingRun is returned by methods in Runner and NBRunner. It is not meant to be instantiated directly.

This class works as a context manager, allowing you to use a pattern like

with Runner(...).run() as running:
    ...

Note that you should use either this object as the context manager or Runner, not both in a nested manner.

ExecutingRun.returncode

[source]

Gets the return code of the underlying subprocess. A non-zero code indicates a failure, None a currently executing run.

Returns 

Optional[int]

The return code of the underlying subprocess.

ExecutingRun.status

[source]

Returns the status of the underlying subprocess that is responsible for executing the run.

The return value is one of the following strings:

  • running indicates a currently executing run.
  • failed indicates a failed run.
  • successful a successful run.
Returns 

str

The current status of the run.

ExecutingRun.stdout

[source]

Returns the current stdout of the run. If the run is finished, this will contain the entire stdout output. Otherwise, it will contain the stdout up until this point.

Returns 

str

The current snapshot of stdout.

ExecutingRun.stderr

[source]

Returns the current stderr of the run. If the run is finished, this will contain the entire stderr output. Otherwise, it will contain the stderr up until this point.

Returns 

str

The current snapshot of stderr.

Non-Blocking API

ExecutingRun.wait(self, timeout, stream)

[source]

Wait for this run to finish, optionally with a timeout and optionally streaming its output.

Note that this method is asynchronous and needs to be awaited.

Parameters 

timeout: Optional[float], default None

The maximum time to wait for the run to finish. If the timeout is reached, the run is terminated

stream: Optional[str], default None

If specified, the specified stream is printed to stdout. stream can be one of stdout or stderr.

Returns 

ExecutingRun

This object, allowing you to chain calls.

ExecutingRun.stream_log(self, stream, position)

[source]

Asynchronous iterator to stream logs from the subprocess line by line.

Note that this method is asynchronous and needs to be awaited.

Parameters 

stream: str

The stream to stream logs from. Can be one of stdout or stderr.

position: Optional[int], default None

The position in the log file to start streaming from. If None, it starts from the beginning of the log file. This allows resuming streaming from a previously known position

Yields 

Tuple[int, str]

A tuple containing the position in the log file and the line read. The position returned can be used to feed into another stream_logs call for example.

Deployer