Skip to main content

FlowSpec - Constructing flows

Metaflow flows are defined by inhering from the FlowSpec class:

from metaflow import FlowSpec, step

class MyFlow(FlowSpec):

@step
def start(self):
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
MyFlow()

This class has no other uses. It can't be instantiated directly.

FlowSpec exposes a few methods and attributes that you can use to construct a flow, which are listed below. You can add more functionality in your flows through step-level decorators and flow-level decorators.

You can parametrize flows through the Parameter object that are defined as class variables inside a flow. You can also include files as parameters through the IncludeFile object.

To query and manipulate the currently executing run inside your flow, see the current object. To access results produced by a flow, see the Client API.

Defining a workflow

Annotate methods that are a part of your Metaflow workflow with the @step decorator. Use FlowSpec.next to define transitions between steps:

FlowSpec.next(*dsts, foreach=None)

[source]

Indicates the next step to execute after this step has completed.

This statement should appear as the last statement of each step, except the end step.

There are several valid formats to specify the next step:

  • Straight-line connection: self.next(self.next_step) where next_step is a method in the current class decorated with the @step decorator.

  • Static fan-out connection: self.next(self.step1, self.step2, ...) where stepX are methods in the current class decorated with the @step decorator.

  • Foreach branch:

    self.next(self.foreach_step, foreach='foreach_iterator')
    

    In this situation, foreach_step is a method in the current class decorated with the @step docorator and foreach_iterator is a variable name in the current class that evaluates to an iterator. A task will be launched for each value in the iterator and each task will execute the code specified by the step foreach_step.

Parameters 

dsts: Method

One or more methods annotated with @step.

Raises 

InvalidNextException

Raised if the format of the arguments does not match one of the ones given above.

Working with foreaches

Use the operations below, FlowSpec.input, FlowSpec.index, and FlowSpec.foreach_stack to query the status of the currently executing foreach branch. Use FlowSpec.merge_artifacts() to handle incoming artifacts in a join step.

FlowSpec.input

[source]

The value of the foreach artifact in this foreach branch.

In a foreach step, multiple instances of this step (tasks) will be executed, one for each element in the foreach. This property returns the element passed to the current task. If this is not a foreach step, this returns None.

If you need to know the values of the parent tasks in a nested foreach, use FlowSpec.foreach_stack.

Returns 

object

Input passed to the foreach task.

FlowSpec.index

[source]

The index of this foreach branch.

In a foreach step, multiple instances of this step (tasks) will be executed, one for each element in the foreach. This property returns the zero based index of the current task. If this is not a foreach step, this returns None.

If you need to know the indices of the parent tasks in a nested foreach, use FlowSpec.foreach_stack.

Returns 

int

Index of the task in a foreach step.

FlowSpec.foreach_stack(self)

[source]

Returns the current stack of foreach indexes and values for the current step.

Use this information to understand what data is being processed in the current foreach branch. For example, considering the following code:

@step
def root(self):
    self.split_1 = ['a', 'b', 'c']
    self.next(self.nest_1, foreach='split_1')

@step
def nest_1(self):
    self.split_2 = ['d', 'e', 'f', 'g']
    self.next(self.nest_2, foreach='split_2'):

@step
def nest_2(self):
    foo = self.foreach_stack()

foo will take the following values in the various tasks for nest_2:

    [(0, 3, 'a'), (0, 4, 'd')]
    [(0, 3, 'a'), (1, 4, 'e')]
    ...
    [(0, 3, 'a'), (3, 4, 'g')]
    [(1, 3, 'b'), (0, 4, 'd')]
    ...

where each tuple corresponds to:

  • The index of the task for that level of the loop.
  • The number of splits for that level of the loop.
  • The value for that level of the loop.

Note that the last tuple returned in a task corresponds to:

  • 1st element: value returned by self.index.
  • 3rd element: value returned by self.input.
Returns 

List[Tuple[int, int, object]]

An array describing the current stack of foreach steps.

FlowSpec.merge_artifacts(self, inputs, exclude, include)

[source]

Helper function for merging artifacts in a join step.

This function takes all the artifacts coming from the branches of a join point and assigns them to self in the calling step. Only artifacts not set in the current step are considered. If, for a given artifact, different values are present on the incoming edges, an error will be thrown and the artifacts that conflict will be reported.

As a few examples, in the simple graph: A splitting into B and C and joining in D:

A:
  self.x = 5
  self.y = 6
B:
  self.b_var = 1
  self.x = from_b
C:
  self.x = from_c

D:
  merge_artifacts(inputs)

In D, the following artifacts are set:

  • y (value: 6), b_var (value: 1)
  • if from_b and from_c are the same, x will be accessible and have value from_b
  • if from_b and from_c are different, an error will be thrown. To prevent this error, you need to manually set self.x in D to a merged value (for example the max) prior to calling merge_artifacts.
Parameters 

inputs: List[Steps]

Incoming steps to the join point.

exclude: List[str], optional

If specified, do not consider merging artifacts with a name in exclude. Cannot specify if include is also specified.

include: List[str], optional

If specified, only merge artifacts specified. Cannot specify if exclude is also specified.

Raises 

MetaflowException

This exception is thrown if this is not called in a join step.

UnhandledInMergeArtifactsException

This exception is thrown in case of unresolved conflicts.

MissingInMergeArtifactsException

This exception is thrown in case an artifact specified in include cannot be found.

Parameters

The Parameter class is used to define parameters for a flow.

The Parameter objects must be defined as class variables inside a flow. The parameter values are available as read-only artifacts in all steps of the flow. For instructions, see How to define parameters for flows.

Parameter(name, **kwargs)

[source]

Defines a parameter for a flow.

Parameters must be instantiated as class variables in flow classes, e.g.

class MyFlow(FlowSpec):
    param = Parameter('myparam')

in this case, the parameter is specified on the command line as

python myflow.py run --myparam=5

and its value is accessible through a read-only artifact like this:

print(self.param == 5)

Note that the user-visible parameter name, myparam above, can be different than the artifact name, param above.

The parameter value is converted to a Python type based on the type argument or to match the type of default, if it is set.

Parameters 

name: str

User-visible parameter name.

default: str or float or int or bool or JSONType or a function.

Default value for the parameter. Use a special JSONType class to indicate that the value must be a valid JSON object. A function implies that the parameter corresponds to a deploy-time parameter. The type of the default value is used as the parameter type.

type: type

If default is not specified, define the parameter type. Specify one of str, float, int, bool, or JSONType (Default: str).

help: str

Help text to show in run --help.

required: bool

Require that the user specified a value for the parameter. required=True implies that the default is not used.

show_default: bool

If True, show the default value in the help text (Default: True).

Deploy-time parameters

It is possible to define the default value programmatically before a run or a deployment is executed through a user-defined function. For more information, see documentation for Deploy Time Parameters.

For instance, the following deploy-time parameter, time, uses the current time as its default value:

def time_now(context):
return int(time.time())

class MyFlow(FlowSpec):
myparam = Parameter("time", type=int, default=time_now)

Note that if the function returns a non-string value, you must specify the parameter type when using deploy-time parameters, as the type of default can't be inferred automatically.

The function called gets a parameter context that contains attributes about the current parameter which you can use to customize the value returned:

ParameterContext()

[source]

Information about the parameter being evaluated.

Attributes 

flow_name: str

Flow name

user_name: str

User name

parameter_name: str

Parameter name

IncludeFile

The IncludeFile object is a special Parameter that reads its value from a local file. For an example, see Data in Local Files.

IncludeFile(name, **kwargs)

[source]

Includes a local file as a parameter for the flow.

IncludeFile behaves like Parameter except that it reads its value from a file instead of the command line. The user provides a path to a file on the command line. The file contents are saved as a read-only artifact which is available in all steps of the flow.

Parameters 

name: str

User-visible parameter name.

default: str

Default path to a local file.

is_text: bool

Convert the file contents to a string using the provided encoding (Default: True). If False, the artifact is stored in bytes.

encoding: str

Use this encoding to decode the file contexts if is_text=True (default: utf-8).

required: bool

Require that the user specified a value for the parameter. required=True implies that the default is not used.

help: str

Help text to show in run --help.

show_default: bool

If True, show the default value in the help text (Default: True).