Deployer - Deploying flows programmatically
The Deployer
class allows you to manage production deployments programmatically. For an overview, see Deploying flows programmatically.
Metaflow supports various production orchestrators, each offering slightly different functionalities. All of them operate with the same high-level interfaces, exposed through the Deployer
API:
- Start by instantiating the top-level
Deployer
object. - Choose a production orchestrator through the functions of
Deployer
. - Deploy a flow by calling an implementation-specific
create()
function that returns aDeployedFlow
representing a flow that is deployed but not yet running. - The
DeployedFlow
is ready to execute automatically if it is scheduled with@schedule
,@trigger
, and@trigger_on_finish
decorators. - Optionally, you can trigger a run explictly by calling
DeployedFlow.trigger()
that returns aTriggeredRun
representing a run that will start executing. - Once the run has started executing,
TriggeredRun.run
returns a correspondingRun
object.
Example
import time
from metaflow import Deployer
deployed_flow = Deployer('helloflow.py').argo_workflows().create()
print('Production token', deployed_flow.production_token)
triggered_run = deployed_flow.trigger()
while triggered_run.run is None:
print(f'Waiting for the run to start')
time.sleep(1)
print('Run started', triggered_run.run)
print('Terminating the flow', triggered_run.terminate())
Note that you can replace argo_workflows()
above with step_functions()
without changing anything
else in the code.
In addition to this basic functionality, each implementation-specific object exposes additional functions for managing deployed flows and runs, as documented below.
Common Deployer
Use the Deployer
class to configure and access one of the production
orchestrators supported by Metaflow.
flow_file: str
Path to the flow file to deploy.
show_output: bool, default True
Show the 'stdout' and 'stderr' to the console by default.
profile: Optional[str], default None
Metaflow profile to use for the deployment. If not specified, the default profile is used.
env: Optional[Dict[str, str]], default None
Additional environment variables to set for the deployment.
cwd: Optional[str], default None
The directory to run the subprocess in; if not specified, the current directory is used.
**kwargs: Any
Additional arguments that you would pass to python myflow.py
before
the deployment command.
Returns a deployer specific to Argo Workflows.
ArgoWorkflowsDeployer
a deployer class specific to Argo Workflows
Returns a deployer specific to Step Functions.
StepFunctionsDeployer
a deployer class specific to Step Functions
Deploy Argo Workflows with ArgoWorkflowsDeployer
Create a deployed flow using the deployer implementation.
**kwargs: Any
Additional arguments to pass to create
corresponding to the
command line arguments of create
DeployedFlow
DeployedFlow object representing the deployed flow.
Exception
If there is an error during deployment.
Manage a flow deployed on Argo Workflows with ArgoWorkflowsDeployedFlow
Get the production token for the deployed flow.
str, optional
The production token, None if it cannot be retrieved.
Trigger a new run for the deployed flow.
**kwargs: Any
Additional arguments to pass to the trigger command, Parameters
in particular
ArgoWorkflowsTriggeredRun
The triggered run instance.
Exception
If there is an error during the trigger process.
Delete the deployed flow.
**kwargs: Any
Additional arguments to pass to the delete command.
bool
True if the command was successful, False otherwise.
Manage a run triggered on Argo Workflows with ArgoWorkflowsTriggeredRun
Retrieve the Run
object for the triggered run.
Note that Metaflow Run
becomes available only when the start
task
has started executing.
Run, optional
Metaflow Run object if the start
step has started executing, otherwise None.
Terminate the running workflow.
**kwargs: Any
Additional arguments to pass to the terminate command.
bool
True if the command was successful, False otherwise.
Suspend the running workflow.
**kwargs: Any
Additional arguments to pass to the suspend command.
bool
True if the command was successful, False otherwise.
Unsuspend the suspended workflow.
**kwargs: Any
Additional arguments to pass to the unsuspend command.
bool
True if the command was successful, False otherwise.
Get the status of the triggered run.
str, optional
The status of the workflow considering the run object, or None if the status could not be retrieved.
Deploy Step Functions with StepFunctionsDeployer
Create a deployed flow using the deployer implementation.
**kwargs: Any
Additional arguments to pass to create
corresponding to the
command line arguments of create
DeployedFlow
DeployedFlow object representing the deployed flow.
Exception
If there is an error during deployment.
Manage a flow deployed on Step Functions with StepFunctionsDeployedFlow
Get the production token for the deployed flow.
str, optional
The production token, None if it cannot be retrieved.
Trigger a new run for the deployed flow.
**kwargs: Any
Additional arguments to pass to the trigger command, Parameters
in particular
StepFunctionsTriggeredRun
The triggered run instance.
Exception
If there is an error during the trigger process.
Delete the deployed flow.
**kwargs: Any
Additional arguments to pass to the delete command.
bool
True if the command was successful, False otherwise.
List runs of the deployed flow.
states: Optional[List[str]], optional
A list of states to filter the runs by. Allowed values are: RUNNING, SUCCEEDED, FAILED, TIMED_OUT, ABORTED. If not provided, all states will be considered.
List[TriggeredRun]
A list of TriggeredRun objects representing the runs of the deployed flow.
ValueError
If any of the provided states are invalid or if there are duplicate states.