Basics of Metaflow
This document introduces the basic concepts of Metaflow. If you are eager to try out Metaflow in practice, you can start with the tutorial. After the tutorial, you can return to this document to learn more about how Metaflow works.
The Structure of Metaflow Code
Metaflow follows the dataflow paradigm which models a program as a directed graph of operations. This is a natural paradigm for expressing data processing pipelines, machine learning in particular.
We call the graph of operations a flow. You define the operations, called steps, which are nodes of the graph and contain transitions to the next steps, which serve as edges.
Metaflow sets some constraints on the structure of the graph. For starters, every flow
needs a step called start
and a step called end
. An execution of the flow, which we
call a run, starts at start
. The run is successful if the final end
step
finishes successfully.
What happens between start
and end
is up to you. You can construct the graph in
between using an arbitrary combination of the following three types of transitions
supported by Metaflow:
Linear
The most basic type of transition is a linear transition. It moves from one step to another one.
Here is a graph with two linear transitions:
The corresponding Metaflow script looks like this:
library(metaflow)
start <- function(self){
self$my_var <- "hello world"
}
a <- function(self){
print(paste("the data artifact is", self$my_var))
}
end <- function(self){
print("End of the linear flow")
}
metaflow("LinearFlow") %>%
step(step="start",
r_function=start,
next_step="a") %>%
step(step="a",
r_function=a,
next_step="end") %>%
step(step="end",
r_function=end) %>%
run()
You can execute this directly in your RStudio IDE or via the terminal:
- Terminal
- RStudio
Rscript myflow.R run
# Execute as is
Besides executing the steps start
, a
, and end
in order, this flow creates a data
artifact called my_var
. In Metaflow, data artifacts are created simply by assigning
values to $
-indexed variables under the self
object such as self$my_var
.
Data artifacts are available in all steps after they have been created, so they behave like any normal instance variables. An exception to this rule is branching, as explained below.
Branch
You can express parallel steps with a branch. In the figure below, start
transitions to two parallel steps, a
and b
. Any number of parallel steps are
allowed. A benefit of a branch like this is performance: Metaflow can execute a
and
b
over multiple CPU cores or over multiple instances in the cloud.
library(metaflow)
a <- function(self){
self$var <- 1
}
b <- function(self){
self$var <- 2
}
join <- function(self, inputs){
print(paste("var in step a is", inputs$a$var))
print(paste("var in step b is", inputs$b$var))
}
metaflow("BranchFlow") %>%
step(step = "start",
next_step = c("a", "b")) %>%
step(step = "a",
r_function=a,
next_step="join") %>%
step(step="b",
r_function=b,
next_step="join") %>%
step(step="join",
r_function=join,
next_step="end",
join=TRUE) %>%
step(step="end") %>%
run()
Every branch must be joined. The join step does not need to be called join
as above
but it must take an extra argument, like inputs
above.
In the example above, the value of var
above is ambiguous: a
sets it to 1
and b
to 2
. To disambiguate the branches, the join step can refer to a specific step in the
branch, like inputs$a$var
above. For convenience, you can also iterate over all steps
in the branch using inputs
, which is simply a list. For more details, see the section
about data flow through the graph.
Note that you can nest branches arbitrarily, that is, you can branch inside a branch. Just remember that all branches must have a corresponding join.
Foreach
Static branches, as shown in the previous section are useful when you know the branches at development time. Alternatively, you may want to branch based on data dynamically. This is the use case for a foreach branch.
Foreach works likes the branch construct above but instead of creating named step methods, many parallel copies of steps inside a foreach loop are executed.
A foreach loop can iterate over any list like params
below.
library(metaflow)
start <- function(self){
self$params <- c("param1", "param2", "param3")
}
a <- function(self){
self$result <- paste(self$input, "processed")
}
join <- function(self, inputs){
results <- gather_inputs(inputs, "result")
print(results)
}
metaflow("ForeachFlow") %>%
step(step = "start",
r_function = start,
next_step = "a",
foreach="params") %>%
step(step = "a",
r_function=a,
next_step="join") %>%
step(step="join",
r_function=join,
next_step="end",
join=TRUE) %>%
step(step="end") %>%
run()
The foreach loop is initialized by specifying a keyword argument foreach
in step
.
The foreach
argument takes a string that is the name of a list stored in an self$
variable, like params
above.
Steps inside a foreach loop create separate tasks to process each item of the list.
Here, Metaflow creates three parallel tasks for the step a
to process the three items
of the params
list in parallel. You can access the specific item assigned to a task
with an self$
variable called input
.
Foreach loops must be joined like static branches. Note that tasks inside a foreach loop
are not named, so you can only iterate over them with inputs
. If you want, you can
assign a value to an self$
variable in a foreach step which helps you to identify the
task.
You can nest foreaches and combine them with branches and linear steps arbitrarily.
What should be a step?
There is not a single right way of structuring code as a graph of steps but here are some best practices that you can follow.
Metaflow treats steps as indivisible units of execution. That is, a step either succeeds or fails as a whole. After the step has finished successfully, Metaflow persists all instance variables that were created in the step code, so the step does not have to be executed again even if a subsequent step fails. In other words, you can inspect data artifacts that were present when the step finished, but you can not inspect data that were manipulated within a step.
This makes a step a checkpoint. The more granular your steps are, the more control you have over inspecting results and resuming failed runs.
A downside of making steps too granular is that checkpointing adds some overhead. It would not make sense to execute each line of code as a separate step. Keep your steps small but not too small. A good rule of thumb is that a single step should not take more than an hour to run, preferably much less than that.
Another important consideration is the readability of your code. Try running
- Terminal
- RStudio
Rscript myflow.R show
# Replace run() in myflow.R with
# run(show = TRUE)
# and execute in RStudio
which prints out the steps of your flow. Does the overview give you a good idea of your code? If the steps are too broad, it might make sense to split them up just to make the overall flow more descriptive.
How to define parameters for flows?
Here is an example of a flow that defines a parameter, alpha
:
library(metaflow)
start <- function(self){
print(paste("alpha is", self$alpha))
}
end <- function(self){
print(paste("alpha still is", self$alpha))
}
metaflow("ParameterFlow") %>%
parameter("alpha",
help="learning rate",
required = TRUE) %>%
step(step="start",
r_function=start,
next_step="end") %>%
step(step="end",
r_function=end) %>%
run()
Parameters are defined by using the method parameter
. Parameter variables are
automatically available in all steps, like alpha
above.
You can set the parameter values on the command line as follows:
- Terminal
- RStudio
Rscript parameter_flow.R run --alpha 0.6
# Replace run() in parameter_flow.R with
# run(alpha = 0.6)
# and execute in RStudio
You can see available parameters with:
- Terminal
- RStudio
Rscript parameter_flow.R run --help
# Replace run() in parameter_flow.R with
# run(help = TRUE)
# and execute in RStudio
Parameters are typed based on the type of their default value. If there is no meaningful default for a parameter, you can define it as follows:
parameter("num_components",
help="Number of components",
required=TRUE,
type="int")
Now the flow can not be run without setting num_components
to an integer value.
You can set the type as int
, float
, double
or bool
.
Data flow through the graph
As previously mentioned, for linear steps, data artifacts are propagated and any linear step can access data artifacts created by previous steps using instance variables. In this case, Metaflow can easily determine the value of each artifact by simply taking the value of that artifact at the end of the previous step.
In a join step, however, the value of artifacts can potentially be set to different values on the incoming branches; the value of the artifact is said to be ambiguous.
To make it easier to implement a join step after foreach or branch, Metaflow provides a
utility function, merge_artifacts
, to aid in propagating unambiguous values.
library(metaflow)
start <- function(self){
self$pass_down <- "non-modified"
}
a <- function(self){
self$common <- "common in a and b"
self$x <- "x in a"
self$y <- "y in a"
self$from_a <- "only in a"
}
b <- function(self){
self$common <- "common in a and b"
self$x <- "x in b"
self$y <- "y in b"
}
join <- function(self, inputs){
# manually propogate variable that has different values in different branches
self$x <- inputs$a$x
merge_artifacts(self, inputs, exclude=list("y"))
# Without merge_artifact, the following artifact accesses wouldn’t work.
print(paste('pass_down is', self$pass_down))
print(paste('from_a is', self$from_a))
print(paste('common is', self$common))
}
metaflow("BranchFlow") %>%
step(step = "start",
r_function=start,
next_step = c("a", "b")) %>%
step(step = "a",
r_function=a,
next_step="join") %>%
step(step="b",
r_function=b,
next_step="join") %>%
step(step="join",
r_function=join,
next_step="end",
join=TRUE) %>%
step(step="end") %>%
run()
In the example above, the merge_artifacts
function behaves as follows:
pass_down
is propagated because it is unmodified in botha
andb
.common
is also propagated because it is set to the same value in both branches. Remember that it is the value of the artifact that matters when determining whether an artifact is ambiguous; Metaflow uses content based deduplication to store artifacts and can therefore determine if the value of two artifacts is the same.x
is handled by the code explicitly prior to the call tomerge_artifacts
which causesmerge_artifacts
to ignorex
when propagating artifacts. This pattern allows you to manually resolve any ambiguity in artifacts you would like to see propagated.y
is not propagated because it is listed in theexclude
list. This pattern allows you to prevent the propagation of artifacts that are no longer relevant. Remember that the default behavior ofmerge_artifacts
is to propagate all incoming artifacts.from_a
is propagated because it is only set in one branch and therefore is unambiguous.merge_artifacts
will propagate all values even if they are present on only one incoming branch.
The merge_artifacts
function will raise an exception if an artifact that it should
merge has an ambiguous value. Remember that merge_artifacts
will attempt to merge all
incoming artifacts except if they are already present in the step or have been
explicitly excluded in the exclude
list.