Skip to main content

Deploying Flows Programmatically

Besides running flows via an API, you can deploy flows to one of the production orchestrators supported by Metaflow programmatically. For instance, you can use this feature to create a deployment script running as a part of your CI/CD system, e.g. on GitHub Actions, to deploy a flow to production automatically after a pull request has been approved.

tip

For a practical example of Deployer in action, see the sweep example in Config-Driven Experimentation.

Deploying to production with the Deployer API

Deployments are handled through the Deployer API which follows closely the command line interface used to push flows to production orchestrators like Argo Workflows and Step Functions.

This diagram outlines the deployment process and the objects involved:

  1. Instantiate a Deployer class pointing at the flow file you want to deploy:
from metaflow import Deployer
deployer = Deployer('helloflow.py')
  1. Choose an orchestrator - here Argo Workflows - and call create() to deploy the flow
deployed_flow = deployer.argo_workflows().create()

The flow is now scheduled for execution! If you had annotated the flow with a @schedule decorator, it would run automatically at the desired time. Had you annotated it with @trigger, or @trigger_on_finish, it would run automatically when the specified event arrives.

Triggering a flow explicitly

You can trigger a deployed flow explicitly by calling trigger()

triggered_run = deployed_flow.trigger()

You can specify any Parameters in trigger, e.g.

triggered_run = deployed_flow.trigger(alpha=5, algorithm='cubic')

Triggering returns a TriggeredRun object, representing a run that is about to get scheduled by the orchestrator. Only when the start step starts executing, a corresponding Run object becomes accessible. This may take a while, for instance, if a new cloud instance needs to start to execute the task:

import time
while triggered_run.run is None:
print(f'Waiting for the run to start')
time.sleep(1)
print('Run started', triggered_run.run)

Terminating a triggered run

You may terminate a triggered run at any time by calling

triggered_run.terminate()

Orchestrator-specific methods

Besides the common methods highlighted above, each orchestrator exposes additional methods for managing deployments and triggered runs. For details, see the API documentation for Deployer.

note

Currently, Deployer doesn't support deployments to Apache Airflow, as Airflow doesn't expose an API for deployments. Instead, you should copy the resulting Airflow dag manually to your Airflow server.