Advanced Custom Decorators
In addition to running logic before and after user code (as shown on
the previous page), a decorator can
override the @step
code entirely, executing alternative logic in its place.
Or, a decorator can take action if the user code fails.
Catching failures in the user code
A decorator can catch failures in the user code by wrapping yield
in a try
-except
block. The
following example shows the pattern in action, capturing any exceptions in the user code, and asking ChatGPT for
advice how to fix it. Save the module to ai_debug.py
:
import os
import inspect
import traceback
from metaflow import user_step_decorator
PROMPT = """
I have a Metaflow step that is defined as follows:
{source}
It raised the following exception:
{stack_trace}
Provide suggestions how to fix it.
"""
@user_step_decorator
def ai_debug(step_name, flow, inputs=None, attributes=None):
source = inspect.getsource(getattr(flow, step_name))
try:
yield
except:
print("❌ Step failed:")
stack_trace = traceback.format_exc()
prompt_gpt(PROMPT.format(source=source, stack_trace=stack_trace))
raise
def prompt_gpt(prompt):
import requests
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
if OPENAI_API_KEY:
print("🧠 Asking AI for help..")
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}]
}
response = requests.post(url, headers=headers, json=data)
resp = response.json()["choices"][0]["message"]["content"]
print(f"🧠💡 AI suggestion:\n\n{resp}")
else:
print("Specify OPENAI_API_KEY for debugging help")
You can test the decorator e.g. with this flow:
import math
from metaflow import FlowSpec, step
from ai_debug import ai_debug
class FailFlow(FlowSpec):
@ai_debug
@step
def start(self):
x = 3
for i in range(5):
math.sqrt(x - i)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
FailFlow()
Set your OpenAI API key in an environment variable OPENAI_API_KEY
and run the flow. The results are impressive:
Skipping the user code
A decorator can decide to skip execution of the user code by yielding an empty dictionary, i.e. yield {}
. Even when
skipping the user code a task is started - to execute the custom decorator - but the task is finished right after the
decorator finishes.
The following example leverages the feature to implement a @memoize
decorator that reuses past results, skipping
redundant recomputation:
import os
from metaflow import Flow, user_step_decorator, current
@user_step_decorator
def memoize(step_name, flow, inputs=None, attributes=None):
artifact = attributes['artifact']
reset = attributes.get('reset')
if reset and getattr(flow, reset, False):
print("⚙️ memoized results disabled - running the step")
yield
else:
try:
run = Flow(current.flow_name).latest_successful_run
previous_value = run[step_name].task[artifact].data
except:
print("⚙️ previous results not found - running the step")
yield
else:
print(f"✅ reusing results from a previous run {run.id}")
setattr(flow, artifact, previous_value)
yield {}
Note that Flow
adheres to Metaflow namespaces, so @memoize
can be used safely by many
concurrent users and production runs, without intermixing results.
The following flow utilizes @memoize
to skip downloading of data and recomputation of taxi fares in the
compute_fare
step:
from metaflow import FlowSpec, step, Parameter, pypi
from memoize import memoize
URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet'
class ComputeTotalFare(FlowSpec):
reset = Parameter('reset', default=False, is_flag=True)
url = Parameter('url', default=URL)
@step
def start(self):
self.next(self.compute_fare)
@memoize(artifact='total_fare', reset='reset')
@pypi(packages={'duckdb': '1.3.2'})
@step
def compute_fare(self):
import duckdb
SQL = f"SELECT SUM(fare_amount) AS total_fare FROM '{self.url}'"
self.total_fare = duckdb.query(SQL).fetchone()[0]
self.next(self.end)
@step
def end(self):
print(f"Total taxi fares: ${self.total_fare}")
if __name__ == '__main__':
ComputeTotalFare()
You can use the --reset
flag to force recomputation of results.
Replacing the user code
A decorator may decide to execute another function instead of the step function defined in the flow - just
yield
a callable that takes a FlowSpec
object (self
in steps) as an argument.
The following example implements a @fallback
decorator that first attempts to run the user code and if it
fails - current.retry_count > 0
- it executes a fallback function instead of re-executing the user code.
from metaflow import user_step_decorator, current
@user_step_decorator
def fallback(step_name, flow, inputs=None, attributes=None):
def _fallback_step(self):
print("🛟 step failed: executing a fallback")
var = attributes.get('indicator')
if var:
setattr(self, var, True)
if current.retry_count == 0:
yield
else:
yield _fallback_step
If you pass an attribute indicator
to the decorator, it stores a corresponding artifact indicating that the
step failed. You can test the decorator with the FailFlow
above. Note that you need to apply the @retry
decorator to enable retries:
python failflow.py run --with retry --with fallback.fallback:indicator=failed
The fallback function cannot modify the flow’s control logic - it cannot change the target of
a self.next
call. The overall flow structure remains unchanged, even when a fallback
function is used.