FlowSpec - Constructing flows
Metaflow flows are defined by inhering from the FlowSpec
class:
from metaflow import FlowSpec, step
class MyFlow(FlowSpec):
@step
def start(self):
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
MyFlow()
This class has no other uses. It can't be instantiated directly.
FlowSpec
exposes a few methods and attributes that you can use to construct a flow, which are listed below. You can add more functionality in your flows through step-level decorators and flow-level decorators.
You can parametrize flows through the Parameter
object that are defined as class variables inside a flow. You can also include files as parameters through the IncludeFile
object.
To query and manipulate the currently executing run inside your flow, see the current
object. To access results produced by a flow, see the Client API.
Defining a workflow
Annotate methods that are a part of your Metaflow workflow with the @step
decorator. Use FlowSpec.next
to define transitions between steps:
Indicates the next step to execute after this step has completed.
This statement should appear as the last statement of each step, except the end step.
There are several valid formats to specify the next step:
-
Straight-line connection:
self.next(self.next_step)
wherenext_step
is a method in the current class decorated with the@step
decorator. -
Static fan-out connection:
self.next(self.step1, self.step2, ...)
wherestepX
are methods in the current class decorated with the@step
decorator. -
Foreach branch:
self.next(self.foreach_step, foreach='foreach_iterator')
In this situation,
foreach_step
is a method in the current class decorated with the@step
decorator andforeach_iterator
is a variable name in the current class that evaluates to an iterator. A task will be launched for each value in the iterator and each task will execute the code specified by the stepforeach_step
.
dsts: Callable[..., None]
One or more methods annotated with @step
.
InvalidNextException
Raised if the format of the arguments does not match one of the ones given above.
Working with foreaches
Use the operations below, FlowSpec.input
, FlowSpec.index
, and FlowSpec.foreach_stack
to query the status of the currently executing foreach branch. Use FlowSpec.merge_artifacts()
to handle incoming artifacts in a join step.
The value of the foreach artifact in this foreach branch.
In a foreach step, multiple instances of this step (tasks) will be executed, one for each element in the foreach. This property returns the element passed to the current task. If this is not a foreach step, this returns None.
If you need to know the values of the parent tasks in a nested foreach, use
FlowSpec.foreach_stack
.
object, optional
Input passed to the foreach task.
The index of this foreach branch.
In a foreach step, multiple instances of this step (tasks) will be executed, one for each element in the foreach. This property returns the zero based index of the current task. If this is not a foreach step, this returns None.
If you need to know the indices of the parent tasks in a nested foreach, use
FlowSpec.foreach_stack
.
int, optional
Index of the task in a foreach step.
Returns the current stack of foreach indexes and values for the current step.
Use this information to understand what data is being processed in the current foreach branch. For example, considering the following code:
@step
def root(self):
self.split_1 = ['a', 'b', 'c']
self.next(self.nest_1, foreach='split_1')
@step
def nest_1(self):
self.split_2 = ['d', 'e', 'f', 'g']
self.next(self.nest_2, foreach='split_2'):
@step
def nest_2(self):
foo = self.foreach_stack()
foo
will take the following values in the various tasks for nest_2:
[(0, 3, 'a'), (0, 4, 'd')]
[(0, 3, 'a'), (1, 4, 'e')]
...
[(0, 3, 'a'), (3, 4, 'g')]
[(1, 3, 'b'), (0, 4, 'd')]
...
where each tuple corresponds to:
- The index of the task for that level of the loop.
- The number of splits for that level of the loop.
- The value for that level of the loop.
Note that the last tuple returned in a task corresponds to:
- 1st element: value returned by
self.index
. - 3rd element: value returned by
self.input
.
List[Tuple[int, int, Any]]
An array describing the current stack of foreach steps.
Helper function for merging artifacts in a join step.
This function takes all the artifacts coming from the branches of a join point and assigns them to self in the calling step. Only artifacts not set in the current step are considered. If, for a given artifact, different values are present on the incoming edges, an error will be thrown and the artifacts that conflict will be reported.
As a few examples, in the simple graph: A splitting into B and C and joining in D:
A:
self.x = 5
self.y = 6
B:
self.b_var = 1
self.x = from_b
C:
self.x = from_c
D:
merge_artifacts(inputs)
In D, the following artifacts are set:
y
(value: 6),b_var
(value: 1)- if
from_b
andfrom_c
are the same,x
will be accessible and have valuefrom_b
- if
from_b
andfrom_c
are different, an error will be thrown. To prevent this error, you need to manually setself.x
in D to a merged value (for example the max) prior to callingmerge_artifacts
.
inputs: Inputs
Incoming steps to the join point.
exclude: List[str], optional, default None
If specified, do not consider merging artifacts with a name in exclude
.
Cannot specify if include
is also specified.
include: List[str], optional, default None
If specified, only merge artifacts specified. Cannot specify if exclude
is
also specified.
MetaflowException
This exception is thrown if this is not called in a join step.
UnhandledInMergeArtifactsException
This exception is thrown in case of unresolved conflicts.
MissingInMergeArtifactsException
This exception is thrown in case an artifact specified in include
cannot
be found.
Parameters
The Parameter
class is used to define parameters for a flow.
The Parameter
objects must be defined as class variables inside a flow. The parameter values are available as read-only artifacts in all steps of the flow. For instructions, see How to define parameters for flows.
Defines a parameter for a flow.
Parameters must be instantiated as class variables in flow classes, e.g.
class MyFlow(FlowSpec):
param = Parameter('myparam')
in this case, the parameter is specified on the command line as
python myflow.py run --myparam=5
and its value is accessible through a read-only artifact like this:
print(self.param == 5)
Note that the user-visible parameter name, myparam
above, can be
different from the artifact name, param
above.
The parameter value is converted to a Python type based on the type
argument or to match the type of default
, if it is set.
name: str
User-visible parameter name.
default: str or float or int or bool or JSONType
or a function.
Default value for the parameter. Use a special JSONType
class to
indicate that the value must be a valid JSON object. A function
implies that the parameter corresponds to a deploy-time parameter.
The type of the default value is used as the parameter type
.
type: Type, default None
If default
is not specified, define the parameter type. Specify
one of str
, float
, int
, bool
, or JSONType
. If None, defaults
to the type of default
or str
if none specified.
help: str, optional
Help text to show in run --help
.
required: bool, default False
Require that the user specified a value for the parameter.
required=True
implies that the default
is not used.
show_default: bool, default True
If True, show the default value in the help text.
Deploy-time parameters
It is possible to define the default
value programmatically before a run or a deployment is executed through a user-defined function. For more information, see documentation for Deploy Time Parameters.
For instance, the following deploy-time parameter, time
, uses the current time as its default value:
def time_now(context):
return int(time.time())
class MyFlow(FlowSpec):
myparam = Parameter("time", type=int, default=time_now)
Note that if the function returns a non-string value, you must specify the parameter type
when using deploy-time parameters, as the type of default
can't be inferred automatically.
The function called gets a parameter context
that contains attributes about the current parameter which you can use to customize the value returned:
Information about the parameter being evaluated.
flow_name: str
Flow name
user_name: str
User name
parameter_name: str
Parameter name
IncludeFile
The IncludeFile
object is a special Parameter
that reads its value from a local file. For an example, see Data in Local Files.
Includes a local file as a parameter for the flow.
IncludeFile
behaves like Parameter
except that it reads its value from a file instead of
the command line. The user provides a path to a file on the command line. The file contents
are saved as a read-only artifact which is available in all steps of the flow.
name: str
User-visible parameter name.
default: Union[str, Callable[ParameterContext, str]]
Default path to a local file. A function implies that the parameter corresponds to a deploy-time parameter.
is_text: bool, default True
Convert the file contents to a string using the provided encoding
.
If False, the artifact is stored in bytes
.
encoding: str, optional, default 'utf-8'
Use this encoding to decode the file contexts if is_text=True
.
required: bool, default False
Require that the user specified a value for the parameter.
required=True
implies that the default
is not used.
help: str, optional
Help text to show in run --help
.
show_default: bool, default True
If True, show the default value in the help text.