Skip to main content

Applying Decorators with Mutators

The previous pages showed how to create various kinds of custom decorators. Mutators operate at a higher level: they let you programmatically control which decorators and parameters are applied to your flow.

For instance, you can use mutators to

  • Apply stacks of decorators automatically, e.g. @retry and @fallback.

  • Create template flows that apply the right decorators automatically, maybe based on configs.

  • As a foundational element of the BaseFlow pattern which lets you define shared, domain-specific tooling for all flows in your project, ensuring that everyone follows the same path consistently.

Mutators look like decorators but instead of being functions annotated with @user_step_decorator, they are defined as classes derived from FlowMutator or StepMutator, using the Mutator API.

info

Unlike decorators, mutators are applied at deploy time, before a run or deployment begins. As a result, they cannot modify the flow during execution.

Defining a flow mutator

A flow mutator can manipulate parameters of the flow and decorators attached to its steps through a MutableFlow object, passed to the mutator's mutate method.

The following example defines a @robust_flow mutator which applies Metaflow's built-in @retry decorator and the custom @fallback decorator to all steps of the flow.

from metaflow import FlowMutator
from fallback import fallback

class robust_flow(FlowMutator):
def init(self, *args, **kwargs):
self.disable_fallback = bool(kwargs.get("disable_fallback"))
self.fallback_attributes = {}
fallback_indicator = kwargs.get("fallback_indicator")
if fallback_indicator:
self.fallback_attributes["indicator"] = fallback_indicator

def mutate(self, mutable_flow):
for step_name, step in mutable_flow.steps:
step.add_decorator("retry", duplicates=step.IGNORE)
if not self.disable_fallback:
step.add_decorator(
fallback,
deco_kwargs=self.fallback_attributes,
duplicates=step.IGNORE,
)

Note the following details:

  • A flow-level mutator is defined in a class derived from FlowMutator.
  • You can capture and process attributes in the init method - not the Python's default __init__ constructor.
  • Use the mutate method to mutate the flow through the mutable_flow handle.
  • When using mutators to add decorators, consider whether they should override or defer to the same decorators added by the user. This behavior is controlled by the duplicates argument, which is explained in detail in the next section.

You can test the mutator with our previously defined FailFlow:

import math
from metaflow import FlowSpec, step

from robust_flow import robust_flow

@robust_flow(fallback_indicator='failed')
class FailFlow(FlowSpec):

@step
def start(self):
x = 3
for i in range(5):
math.sqrt(x - i)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
FailFlow()

Execute the flow without specifying any --with options. Thanks to the decorators added by @robust_flow, the run will behave exactly the same as if it was run with:

python failflow.py run --with retry --with fallback.fallback:indicator=failed

How to handle duplicate decorators

What should happen if you run the above flow, decorated with @robust_flow(fallback_indicator='failed'), as follows:

python failflow.py run --with fallback.fallback:indicator=step_failed

Should the indicator be failed - as defined in the mutator attributes - or indicator=step_failed as defined on the command line?

The choice depends on the policy you want to implement: Sometimes the mutator should override the user's choice, sometimes the opposite. You can control the behavior through the duplicates attribute which one of the three options:

  • IGNORE - the decorator added by the mutator is ignored if a user-defined decorator exists.
  • OVERRIDE - the decorator added by the mutator overrides a user-defined decorator.
  • ERROR - adding duplicate decorators raises an error.

You can test the effect of the options with @robust_flow above. You can see the artifacts produced with

python failflow.py dump RUN_ID/start

Note that the same options apply to adding flow-level decorators as well.

Introspecting a flow and applying configs

Let's walk through a more advanced mutator that shows how you can utilize Configs and flow introspection in mutators. We develop a flow linter that ensures that @resources defined in a flow adhere to limits set in a config.

First, let's define a configuration that specified limits for compute resources, limits.json:

{
"cpu": 2,
"memory": 16000,
"disk": 10000
}

This mutator reads the limits through a Config, mutable_flow.limits, iterates through all decorators of each step using step.decorator_specs, finds the ones where resource limits apply, and enforces the limits by overwriting offending decorators.

from metaflow import FlowMutator, config_expr, current

class flow_linter(FlowMutator):
def mutate(self, mutable_flow):
limits = mutable_flow.limits
for step_name, step in mutable_flow.steps:
for deco_name, _module, _args, attributes in step.decorator_specs:
if deco_name in ("kubernetes", "batch", "resources"):
for key, limit in limits.items():
val = attributes.get(key)
if val and float(val) > limit:
print(
f"⚠️ Step[{step_name}] @{deco_name}({key}={val}) "
f"is higher than the limit of {limit} - fixed"
)
attributes[key] = limit
step.add_decorator(
deco_name,
deco_kwargs=attributes,
duplicates=step.OVERRIDE,
)

Try it with this flow:

from metaflow import FlowSpec, step, resources, Config

from flow_linter import flow_linter

@flow_linter
class HungryFlow(FlowSpec):

limits = Config('limits', default='limits.json')

@resources(cpu=16)
@step
def start(self):
print(self._graph_info)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
HungryFlow()

Run the flow e.g. as

python hungryflow.py run --with resources:memory=64000

and notice the warnings.

Applying multiple decorators with a step mutator

Imagine you’ve built a custom decorator that depends on third-party libraries. You could use Metaflow’s built-in dependency management, e.g. a @pypi decorator, to install those libraries. However, this requires users to remember to apply both your custom decorator and the appropriate @pypi decorator, which is error-prone.

A better solution is to create a step mutator that adds the decorators automatically. As an example, let's create a custom data access decorator that fetches a dataset, preprocesses it, and returns a dataframe to the user step - making sure that all necessary dependencies are installed automatically.

We can define a step mutator @dataset and a decorator @process_dataset in the same module, as they are tightly coupled:

from metaflow import StepMutator, config_expr, current, user_step_decorator

DEPS = {"duckdb": "1.3.2", "pyarrow": "20.0.0"}

@user_step_decorator
def process_dataset(step_name, flow, inputs=None, attr=None):
import duckdb

sql = f"""SELECT * FROM '{attr["url"]}'"""
fltr = attr.get("filter")
if fltr:
sql += f"WHERE {fltr}"
con = duckdb.connect()
print("🔄 Preparing data")
flow.table = con.execute(sql).fetch_arrow_table()
print("✅ Data prepared")
yield
del flow.table

class dataset(StepMutator):
def init(self, *args, **kwargs):
self.url = kwargs["url"]
self.filter = kwargs.get("filter")

def mutate(self, mutable_step):
mutable_step.add_decorator(
"pypi", deco_kwargs={"packages": DEPS}, duplicates=mutable_step.ERROR
)
mutable_step.add_decorator(
process_dataset,
deco_kwargs={"filter": self.filter, "url": self.url},
duplicates=mutable_step.ERROR,
)

From the user’s perspective, the step mutator @dataset behaves like a regular decorator. Its role is to capture attributes such as url and filter, and automatically apply two additional decorators — @pypi and @process_dataset — to the step where it is used.

After this, the @process_dataset decorator can import duckdb knowing that the library is available. Note that we use the temporary artifact pattern to expose an Arrow table, flow.df, to the user code, but we don't persist it as an artifact.

Let's try @dataset in a flow. To demonstrate another useful pattern, we load attributes from a config file, dataset.json which can look like this:

{
"url": "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet",
"filter": "tip_amount > fare_amount"
}

..and pass them to the @dataset mutator:

from metaflow import FlowSpec, step, resources, Config

from dataset import dataset

class DatasetFlow(FlowSpec):

data_config = Config('dataset', default='dataset.json')

@dataset(url=data_config.url, filter=data_config.filter)
@step
def start(self):
print(self.table)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
DatasetFlow()

Note that you can apply step mutators with --with, similar to decorators:

python datasetflow.py --environment=pypi run --with 'dataset.dataset:url=SOME_URL'

The internals of @dataset are fully encapsulated - users don’t need to worry about installing duckdb or pyarrow themselves, or even know that duckdb is used.