Skip to main content

Deployer - Deploying flows programmatically

The Deployer class allows you to manage production deployments programmatically. For an overview, see Deploying flows programmatically.

Metaflow supports various production orchestrators, each offering slightly different functionalities. All of them operate with the same high-level interfaces, exposed through the Deployer API:

  1. Start by instantiating the top-level Deployer object.
  2. Choose a production orchestrator through the functions of Deployer.
  3. Deploy a flow by calling an implementation-specific create() function that returns a DeployedFlow representing a flow that is deployed but not yet running.
  4. The DeployedFlow is ready to execute automatically if it is scheduled with @schedule, @trigger, and @trigger_on_finish decorators.
  5. Optionally, you can trigger a run explictly by calling DeployedFlow.trigger() that returns a TriggeredRun representing a run that will start executing.
  6. Once the run has started executing, TriggeredRun.run returns a corresponding Run object.

Example

import time
from metaflow import Deployer
deployed_flow = Deployer('helloflow.py').argo_workflows().create()
print('Production token', deployed_flow.production_token)
triggered_run = deployed_flow.trigger()
while triggered_run.run is None:
print(f'Waiting for the run to start')
time.sleep(1)
print('Run started', triggered_run.run)
print('Terminating the flow', triggered_run.terminate())

Note that you can replace argo_workflows() above with step_functions() without changing anything else in the code.

In addition to this basic functionality, each implementation-specific object exposes additional functions for managing deployed flows and runs, as documented below.

Common Deployer

Deployer(flow_file, show_output=True, profile=None, env=None, cwd=None, **kwargs)

[source]

Use the Deployer class to configure and access one of the production orchestrators supported by Metaflow.

Parameters 

flow_file: str

Path to the flow file to deploy.

show_output: bool, default True

Show the 'stdout' and 'stderr' to the console by default.

profile: Optional[str], default None

Metaflow profile to use for the deployment. If not specified, the default profile is used.

env: Optional[Dict[str, str]], default None

Additional environment variables to set for the deployment.

cwd: Optional[str], default None

The directory to run the subprocess in; if not specified, the current directory is used.

**kwargs: Any

Additional arguments that you would pass to python myflow.py before the deployment command.

Deployer.argo_workflows(self)

[source]

Returns a deployer specific to Argo Workflows.

Returns 

ArgoWorkflowsDeployer

a deployer class specific to Argo Workflows

Deployer.step_functions(self)

[source]

Returns a deployer specific to Step Functions.

Returns 

StepFunctionsDeployer

a deployer class specific to Step Functions

Deploy Argo Workflows with ArgoWorkflowsDeployer

ArgoWorkflowsDeployer.create(self, **kwargs)

[source]

Create a deployed flow using the deployer implementation.

Parameters 

**kwargs: Any

Additional arguments to pass to create corresponding to the command line arguments of create

Returns 

DeployedFlow

DeployedFlow object representing the deployed flow.

Raises 

Exception

If there is an error during deployment.

Manage a flow deployed on Argo Workflows with ArgoWorkflowsDeployedFlow

ArgoWorkflowsDeployedFlow.production_token

[source]

Get the production token for the deployed flow.

Returns 

str, optional

The production token, None if it cannot be retrieved.

ArgoWorkflowsDeployedFlow.trigger(self)

[source]

Trigger a new run for the deployed flow.

Parameters 

**kwargs: Any

Additional arguments to pass to the trigger command, Parameters in particular

Returns 

ArgoWorkflowsTriggeredRun

The triggered run instance.

Raises 

Exception

If there is an error during the trigger process.

ArgoWorkflowsDeployedFlow.delete(self)

[source]

Delete the deployed flow.

Parameters 

**kwargs: Any

Additional arguments to pass to the delete command.

Returns 

bool

True if the command was successful, False otherwise.

Manage a run triggered on Argo Workflows with ArgoWorkflowsTriggeredRun

ArgoWorkflowsTriggeredRun.run

[source]

Retrieve the Run object for the triggered run.

Note that Metaflow Run becomes available only when the start task has started executing.

Returns 

Run, optional

Metaflow Run object if the start step has started executing, otherwise None.

ArgoWorkflowsTriggeredRun.terminate(self)

[source]

Terminate the running workflow.

Parameters 

**kwargs: Any

Additional arguments to pass to the terminate command.

Returns 

bool

True if the command was successful, False otherwise.

ArgoWorkflowsTriggeredRun.suspend(self)

[source]

Suspend the running workflow.

Parameters 

**kwargs: Any

Additional arguments to pass to the suspend command.

Returns 

bool

True if the command was successful, False otherwise.

ArgoWorkflowsTriggeredRun.unsuspend(self)

[source]

Unsuspend the suspended workflow.

Parameters 

**kwargs: Any

Additional arguments to pass to the unsuspend command.

Returns 

bool

True if the command was successful, False otherwise.

ArgoWorkflowsTriggeredRun.status(self)

[source]

Get the status of the triggered run.

Returns 

str, optional

The status of the workflow considering the run object, or None if the status could not be retrieved.

Deploy Step Functions with StepFunctionsDeployer

StepFunctionsDeployer.create(self, **kwargs)

[source]

Create a deployed flow using the deployer implementation.

Parameters 

**kwargs: Any

Additional arguments to pass to create corresponding to the command line arguments of create

Returns 

DeployedFlow

DeployedFlow object representing the deployed flow.

Raises 

Exception

If there is an error during deployment.

Manage a flow deployed on Step Functions with StepFunctionsDeployedFlow

StepFunctionsDeployedFlow.production_token

[source]

Get the production token for the deployed flow.

Returns 

str, optional

The production token, None if it cannot be retrieved.

StepFunctionsDeployedFlow.trigger(self)

[source]

Trigger a new run for the deployed flow.

Parameters 

**kwargs: Any

Additional arguments to pass to the trigger command, Parameters in particular

Returns 

StepFunctionsTriggeredRun

The triggered run instance.

Raises 

Exception

If there is an error during the trigger process.

StepFunctionsDeployedFlow.delete(self)

[source]

Delete the deployed flow.

Parameters 

**kwargs: Any

Additional arguments to pass to the delete command.

Returns 

bool

True if the command was successful, False otherwise.

StepFunctionsDeployedFlow.list_runs(self, states)

[source]

List runs of the deployed flow.

Parameters 

states: Optional[List[str]], optional

A list of states to filter the runs by. Allowed values are: RUNNING, SUCCEEDED, FAILED, TIMED_OUT, ABORTED. If not provided, all states will be considered.

Returns 

List[TriggeredRun]

A list of TriggeredRun objects representing the runs of the deployed flow.

Raises 

ValueError

If any of the provided states are invalid or if there are duplicate states.

Manage a run triggered on Step Functions with StepFunctionsTriggeredRun

StepFunctionsTriggeredRun.run

[source]

Retrieve the Run object for the triggered run.

Note that Metaflow Run becomes available only when the start task has started executing.

Returns 

Run, optional

Metaflow Run object if the start step has started executing, otherwise None.

StepFunctionsTriggeredRun.terminate(self)

[source]

Terminate the running workflow.

Parameters 

**kwargs: Any

Additional arguments to pass to the terminate command.

Returns 

bool

True if the command was successful, False otherwise.