Scheduling Metaflow Flows with Apache Airflow
Apache Airflow is a popular open-source workflow orchestrator. It has a number of limitations compared to Argo Workflows and AWS Step Functions, so we mainly recommend it if you are an existing Airflow user and you want to avoid introducing a new orchestrator in your environment.
The Metaflow-Airflow integration is a great way to modernize your Airflow deployment. It provides a more user-friendly and productive development API for data scientists and data engineers, without needing to change anything in your existing pipelines or operational playbooks, as described in its announcement blog post. To learn how to deploy and operate the integration, see Using Airflow with Metaflow.
Here are the main benefits of using Metaflow with Airflow:
- You get to use the human-friendly API of Metaflow to define and test workflows.
Almost all features of Metaflow work with Airflow out of the box, except nested
foreaches, which are not supported by Airflow, and
@batch
as the current integration only supports@kubernetes
. - You can deploy Metaflow flows to your existing Airflow server without having to change anything operationally. From the Airflow's point of view, Metaflow flows look like any other Airflow DAG.
- If you want to consider moving to another orchestrator supported by Metaflow, you can test them easily just by changing one command to deploy to Argo Workflows or AWS Step Functions.
When running on Airflow, Metaflow code works exactly as it does locally: No changes are
required in the code. All data artifacts produced by steps run on Airflow are available
using the Client API. All tasks are run on Kubernetes
respecting the @resources
decorator, as if the @kubernetes
decorator was added to
all steps, as explained in Executing Tasks
Remotely.
This document describes the basics of Airflow scheduling. If your project involves multiple people, multiple workflows, or it is becoming business-critical, check out the section around coordinating larger Metaflow projects.
Pushing a flow to production
Let's use the flow from the section about parameters as an example:
from metaflow import FlowSpec, Parameter, step
class ParameterFlow(FlowSpec):
alpha = Parameter('alpha',
help='Learning rate',
default=0.01)
@step
def start(self):
print('alpha is %f' % self.alpha)
self.next(self.end)
@step
def end(self):
print('alpha is still %f' % self.alpha)
if __name__ == '__main__':
ParameterFlow()
Save this script to a file parameter_flow.py
. To deploy a version to Airflow, simply
run
python parameter_flow.py --with retry airflow create parameter_dag.py
This command takes a snapshot of your code in the working directory, as well as the
version of Metaflow used, and creates an Airflow DAG in parameter_dag.py
for
scheduling on Airflow. You should deploy parameter_dag.py
to your Airflow instance
like you would do with any other user-written DAG.
Metaflow automatically maps the Parameters of your flow to corresponding parameters on Airflow. You can execute your Metaflow flow deployed on Airflow like any other Airflow DAG - seamlessly getting all the benefits of Airflow alongside all the benefits of Metaflow.
Hardening deployments
It is highly recommended that you enable
retries when deploying
to Airflow, which you can do easily with --with retry
as shown above. However, make
sure that all your steps are safe to retry before you do this. If some of your steps
interact with external services in ways that can't tolerate automatic retries, decorate
them with retry with times set to zero (times=0) as described in How to Prevent
Retries.
If you want to test on Airflow without interfering with a production flow, you can
change the name of your class, e.g. from ParameterFlow
to ParameterFlowStaging
, and
airflow create the dag under a new name or use the @project
decorator.
Note that airflow create creates a new isolated production namespace for your production flow. Read Organizing Results to learn all about namespace behavior.
Limiting the number of concurrent tasks
By default, Metaflow configures Airflow to execute at most 100 tasks concurrently within a foreach step. This should ensure that most workflows finish quickly without overwhelming your Kubernetes cluster, the execution backend.
If your workflow includes a large foreach and you need results faster, you can increase
the default with the --max-workers
option. For instance, airflow create --max-workers
500
allows 500 tasks to be executed concurrently for every foreach step.
This option is similar to run
--max-workers
that is used to
limit concurrency outside Airflow.
Deploy-time parameters
You can customize Airflow deployments through Parameters that are evaluated at the
deployment time, i.e. when airflow create
is executed.
For instance, you can change the default value of a Parameter
based on who deployed
the workflow or what Git branch the deployment was executed in. Crucially, the function
in Parameter is evaluated only once during airflow create
and not during the execution
of the flow.
You can run the flow locally as usual. The function inside Parameter
is called only
once when the execution starts.
from metaflow import FlowSpec, Parameter, step, JSONType
from datetime import datetime
import json
def deployment_info(context):
return json.dumps({'who': context.user_name,
'when': datetime.now().isoformat()})
class DeploymentInfoFlow(FlowSpec):
info = Parameter('deployment_info',
type=JSONType,
default=deployment_info)
@step
def start(self):
print('This flow was deployed at %s by %s'\
% (self.info['when'], self.info['who']))
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
DeploymentInfoFlow()
When airflow create
is called, deployment_info
is evaluated which captures your
username and the time of deployment. This information remains constant on Airflow
Workflows, although the user may override the default value.
The context
object is passed to any function defined in Parameter. It contains various
fields related to the flow being deployed. By relying on the values passed in context,
you can create generic deploy-time functions that can be reused by multiple flows.
Scheduling a flow
By default, a flow on Airflow does not run automatically. You need to set up a trigger to launch the flow when an event occurs.
On Airflow, Metaflow provides built-in support for triggering Metaflow flows through time-based (cron) triggers, which, as the name implies, triggers the workflow at a certain time. As of today, [event-based triggering] (/production/event-triggering) is not supported on Airflow.
Time-based triggers are implemented at the FlowSpec-level using the @schedule
decorator. This flow is triggered hourly:
from metaflow import FlowSpec, schedule, step
from datetime import datetime
@schedule(hourly=True)
class HourlyFlow(FlowSpec):
@step
def start(self):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('time is %s' % now)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
HourlyFlow()
You can define the schedule with @schedule
in one of the following ways:
@schedule(weekly=True)
runs the workflow on Sundays at midnight.@schedule(daily=True)
runs the workflow every day at midnight.@schedule(hourly=True)
runs the workflow every hour.@schedule(cron='0 10 * * ? *')
runs the workflow at the given Cron schedule, in this case at 10am UTC every day.
Reproducing failed production runs
Let's use DebugFlow
from the debugging
section as an example. The flow
contains a bug in the step b
. When you run it, the failed run will look like this on
the Airflow UI:
The graph visualization shows that step b failed, as expected. First, you should inspect the logs of the failed step in the Airflow UI (or the Metaflow UI) to get an idea of why it failed.
Notice the Metaflow Run ID of airflow-ec19e85042a1
that is available from the Rendered
Template page for the failed task in the Airflow UI (look for the metaflow_run_id
attribute). You can use this Run ID to locate the execution in the Metaflow UI as well
if needed.
Next, we want to reproduce the above error locally. We do this by resuming the specific Airflow run that failed:
python debug.py resume --origin-run-id airflow-ec19e85042a1
This will reuse the results of the start and a step from the Airflow run. It will try to
rerun the step b
locally, which fails with the same error as it does in production.
You can fix the error locally, as above. In the case of this simple flow, you can run the whole flow locally to confirm that the fix works. After validating the results, you would deploy a new version to production with airflow create.
However, this might not be a feasible approach for complex production flow. For instance, the flow might process large amounts of data that can not be handled in your local instance. We have better approaches for staging flows for production:
Staging flows for production
The easiest approach to test a demanding flow is to run it on Kubernetes. This works even with resume:
python debug.py resume --origin-run-id airflow-ec19e85042a1 --with kubernetes
This will resume your flow and run every step on Kubernetes. When you are ready to test a fixed flow end-to-end, just run it as follows:
python debug.py run --with kubernetes
Alternatively, you can change the name of the flow temporarily, e.g. from DebugFlow to
DebugFlowStaging. Then you can run airflow create
with the new name, which will create
a separate staging flow on Airflow. You can also use the
@project
decorator.
You can test the staging flow freely without interfering with the production flow. Once the staging flow runs successfully, you can confidently deploy a new version to production.