Triggering Flows Based on External Events
You can configure flows deployed on Argo Workflows to start automatically when an event occurs in an external system. For instance, you could start a flow whenever new data is available in a data warehouse:
All you have to do is to add a decorator, @trigger
, with
a desired event name above the flow:
from metaflow import FlowSpec, step, trigger
@trigger(event='data_updated')
class FreshDataFlow(FlowSpec):
@step
def start(self):
# load data from the data warehouse
print('processing fresh data!')
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
FreshDataFlow()
You can develop and test the flow locally as usual: @trigger
doesn't have any
effect on local runs. To test triggering, deploy the flow to Argo Workflows:
python freshdata.py argo-workflows create
The output should state something along the lines of
What will trigger execution of the workflow:
This workflow triggers automatically when the upstream
data_updated event is/are published.
indicating that the deployment has been linked to the desired event.
Defining events
In the above example, we used data_updated
as the name of the event that triggers the flow. You
can choose the name freely. By using different names, you can make flows react to different events.
If you are familiar with streaming systems like Kafka or queues like AWS SQS, you can think of the event name as a topic in these systems.
You can also define an event name through a configuration file,
e.g. @trigger(event=config.upstream_event)
instead of
specifying the name in the decorator directly.
Take a look at Configuring Flows
for more information.
Depending on multiple events
You can require that multiple events must be present before the flow gets triggered. Simply define a list of events:
@trigger(events=['data_updated', 'phase_of_the_moon'])
all the events need to occur within a configured time window for the flow to trigger.
Creating events
In order to trigger the flow deployed with @trigger
, we need an event.
Metaflow comes with a utility class, ArgoEvent
, which
makes it easy to create suitable events from any environment. You can call
it as a part of your ETL pipeline running outside Metaflow, in a microservice,
or in a notebook - wherever and whenever you want to trigger a Metaflow execution.
from metaflow.integrations import ArgoEvent
ArgoEvent(name="data_updated").publish()
This line will create an event that will trigger all flows deployed on Argo Workflows that are
waiting for the event data_updated
.
Note that publish()
only publishes an event and returns immediately. It does not guarantee that
a run will start -- it's possible that no flow is waiting for the particular event. Correspondingly,
if you call ArgoEvent
many times, you can trigger arbitrarily many runs of connected flows.
Before calling ArgoEvent
make sure that you have a valid Metaflow
configuration and a connection to the Kubernetes cluster set up in the
environment where you call .publish()
. If you call it from systems outside
Metaflow, make sure that these prerequisites are met.
Advanced case: Publishing events inside a flow
It is not common to publish events inside a Metaflow flow, since
the @trigger_on_finish
decorator
takes care of flow-to-flow
triggering conveniently. Should you have a more advanced use case that requires
publishing events inside a flow, it is recommended that you use the
ArgoEvent.safe_publish
method:
from metaflow.integrations import ArgoEvent
ArgoEvent(name="data_updated").safe_publish()
The only difference to publish()
is that events won't be created during local
runs. This means that you can include safe_publish()
in your code safely and
develop and test it locally as usual, knowing that you won't be causing
unintended side-effects in surrounding systems that may depend on the event.
Passing parameters in events
Besides simply starting runs through events, you can change their behavior on
the fly by letting the event
define Parameters
of the flow.
Consider this typical machine learning system that implements a continuously refreshing model:
- An event is created whenever new data is available in the data warehouse.
- The event contains information about the latest data available in the warehouse.
- Using the information, a model is refreshed with a training set containing the last N days of data.
The corresponding flow could look like this, ignoring details of data loading and the actual training:
from metaflow import FlowSpec, step, trigger, Parameter
from datetime import datetime, timedelta
@trigger(event="data_updated")
class ModelRefreshFlow(FlowSpec):
latest = Parameter("latest", default="2023-05-01")
window = Parameter("window", default=3)
def load_data(self):
# replace this with an actual data loader
SQL = f"select * from data where time > to_date('{self.start_date}')"
print("loading data since %s" % self.start_date)
return [1, 2, 3]
def train_model(self, df):
# replace this with actual model training
return df
@step
def start(self):
self.latest_date = datetime.fromisoformat(self.latest)
self.start_date = self.latest_date - timedelta(days=self.window)
self.next(self.train)
@step
def train(self):
df = self.load_data()
self.model = self.train_model(df)
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
ModelRefreshFlow()
To pass in parameters, we can simply define them in the payload
of ArgoEvent
:
from metaflow.integrations import ArgoEvent
from datetime import datetime
ArgoEvent(name="data_updated").publish(payload={'latest': datetime.now().isoformat()})
Mapping parameter names
Above, the payload field matches the parameter name latest
exactly. In certain situations you may want
to define manually how parameters get their values. For instance, a common event may be used to trigger
various kinds of flows and it may be hard to coordinate parameter names across all consumers of the event.
In this situation, you can remap payload fields to parameter names through the parameters
argument:
@trigger(event={'name':'some_event', 'parameters': {'window': 'how_many_days'}})
Here, we define that Parameter('window')
gets its value from the event payload field how_many_days
.
Note that you need to remap all parameters
that you want to assign through the event. Default assignments
are disabled when parameters
is specified, which allows you to stay in full control of parameter mappings.
Parameter mapping comes in handy when multiple events are present:
@trigger(events=[{'name':'one_event', 'parameters': {'window': 'how_many_days'}},
{'name':'another_event', 'parameters': {'latest': 'timestamp'}}])
In this case, window
gets its value through the event one_event
and latest
through another_event
.