Loading and Storing Data
This chapter describes tools and patterns for moving data in and out of your Metaflow flows.
Besides the mundane concern of loading data, there is also the question of how to organize code related to model-specific data transformations, such as feature engineering. Short answer: keep data access separate from feature engineering.
In a perfect world, the data scientist could design and test features without having to concern themselves with the underlying mechanics of data transfer and processing. Unfortunately the larger the dataset, the more intermingled the two concerns become.
Metaflow can not make the world perfect yet. However, we recommend that data science workflows try to keep the two concerns as separate as possible. In practice, you should use the solutions presented in this chapter purely to load a clean dataset in your workflow. Then, you should perform any model-specific data transformations in your Python code. In particular, we recommend that you use SQL only for data access, not for model-specific data manipulation.
There are multiple benefits in keeping data access separate from model-specific data manipulation:
- It is easier to keep a model and its features in sync when they are computed together. Metaflow's built-in versioning makes it easy to iterate on multiple concurrent versions of the model safely. However, Metaflow can't protect you against stale input data. It is frustrating to troubleshoot bad model results that are caused by out-of-sync features.
- It is quicker to iterate on your model. Testing and debugging Python is easier than testing and debugging SQL.
- You can request arbitrary amount of resources for your data manipulation needs.
- Instead of having data manipulation code in two places (SQL and Python), all code can be clearly laid out in a single place, in a single language, for maximum readability.
- It is easier to optimize your code for performance when IO bottlenecks can be profiled separately from CPU bottlenecks.
Keep this guideline in mind when choosing the right data access method below.
Data in Tables
Accessing data in tables (most often Hive) is by far the most common way to load input data to Metaflow workflows. A common paradigm is to issue arbitrary SQL queries against the data warehouse to fetch data.
See Accessing Secrets if your database or query engine requires authentication.
However, depending on the data volume and the complexity of the query, queries can be slow to execute and can potentially congest the query engine. It is not uncommon for a data science workflow to hit these limitations. Even if your data set is not huge, you may want to build multiple models in parallel, e.g. one per country. In this case, each model needs to load a shard of data. If you used SQL to load the shards, it will very quickly overload your query engine.
As a solution, metaflow.S3
provides a way to load
data directly from S3, bypassing any query engines such as Spark. Combined with a
metadata catalog, it is easy to write shims on top
of metaflow.S3
to directly interface with data files on S3 backing your tables. Since
data is loaded directly from S3, there is no limitation to the number of parallel
processes. The size of data is only limited by the size of your instance, which can be
easily controlled with the @resources
decorator.
The best part is that this approach is blazingly fast compared to executing SQL.
The main downside of this approach is that the table needs to have partitions that match your access pattern. For small and medium-sized tables, this isn't necessarily an issue as you can afford loading extra data. Additional filtering can be performed in your Python code. With larger tables this approach is not feasible, so you may need to run an extra SQL query to repartition data properly.
Use cases
- Workflows that need to process large amounts of data.
- Workflows that build many models in parallel.
- Performance-oriented workflows.
Data in S3: metaflow.S3
This section contains an overview of metaflow.S3
. For a complete API, see the API
reference for the S3 class.
It is not always appropriate to store data in a table. For instance, Netflix has many
systems that communicate via JSON files in S3. Or, there is little benefit in storing a
large Keras model serialized with
model.save()
in a table.
When you assign anything to self
in your Metaflow flow, the object gets automatically
persisted in S3 as a Metaflow artifact. Hence, in most
cases you do not need to worry about saving data or models to S3 explicitly. We
recommend that you use Metaflow artifacts whenever possible, since they are easily
accessible through the Client API by you, by other people, and by
other workflows.
However, there are valid reasons for interacting with S3 directly. For instance, you may
need to consume or produce data to a 3rd party system that knows nothing about Metaflow.
For use cases like this, we provide a high-performance S3 client, metaflow.S3
.
The sole benefit of metaflow.S3
over Metaflow artifacts is that you get to see and
control the S3 locations for data. Also, you must take care of object serialization by
yourself: metaflow.S3
only deals with objects of type str
, unicode
, and bytes
.
Compared to other S3 clients metaflow.S3
provides two key benefits: First, when used
in Metaflow flows, it can piggyback on Metaflow versioning, which makes it easy to track
the lineage of an object back to the Metaflow run that produced it. Secondly,
metaflow.S3
provides better throughput than any other S3 client that we are aware of.
In other words, it is very fast at loading and storing large amounts of data in S3.
Pros
- Load and store data to/from arbitrary S3 locations.
- Built-in support for lineage and versioning.
- Maximum throughput between S3 and a compute instance.
Cons
- Don't use
metaflow.S3
if you can use Metaflow artifacts instead. In contrast to Metaflow artifacts,metaflow.S3
is more tedious to use, uses space more wastefully, and it is less suitable for moving data between Metaflow steps reliably.
Use cases
- Communication with external systems through files in S3.
- Special corner cases where you need more control over object serialization than what Metaflow artifacts provide by default.
We recommend that you use metaflow.S3
in a with
scope in Python. Objects retrieved
from S3 are stored in local temporary files for the lifetime of the with
scope, not in
memory. You can use metaflow.S3
without with
but in this case you need to call
s3.close()
to get rid of the temporary files. See examples of this below.
Note that in order to get the maximum performance out of metaflow.S3
, you need to set
your @resources
properly. However, don't request more resources than what your
workload actually needs.
Choosing the context
To benefit from the built-in support for versioning, first you need to tell
metaflow.S3
whether it is being used in the context of a Metaflow run. A run can refer
to a currently running flow (run=self
) or a past run, run=Run(...)
. If run
is not
specified, metaflow.S3
can be used to access data without versioning in arbitrary S3
locations.
Store and load objects in a Metaflow flow
We expect that the most common use case for metaflow.S3
is to store auxiliary data in
a Metaflow flow. Here is an example:
from metaflow import FlowSpec, step, S3
import json
class S3DemoFlow(FlowSpec):
@step
def start(self):
with S3(run=self) as s3:
message = json.dumps({'message': 'hello world!'})
url = s3.put('example_object', message)
print("Message saved at", url)
self.next(self.end)
@step
def end(self):
with S3(run=self) as s3:
s3obj = s3.get('example_object')
print("Object found at", s3obj.url)
print("Message:", json.loads(s3obj.text))
if __name__ == '__main__':
S3DemoFlow()
Running the flow produced the following output:
Workflow starting (run-id 3):
[3/start/646436 (pid 30559)] Task is starting.
[3/start/646436 (pid 30559)] Message saved at s3://my-bucket/metaflow/userdata/v1/S3DemoFlow/3/example_object
[3/start/646436 (pid 30559)] Task finished successfully.
[3/end/646437 (pid 30619)] Task is starting.
[3/end/646437 (pid 30619)] Object found at s3://my-bucket/metaflow/userdata/v1/S3DemoFlow/3/example_object
[3/end/646437 (pid 30619)] Message: {'message': 'hello world!'}
[3/end/646437 (pid 30619)] Task finished successfully.
Now you could share the URL,
s3://my-bucket/metaflow/userdata/v1/S3DemoFlow/3/example_object
, with external
systems. Note that the URL includes both the flow name, S3DemoFlow
, as well as its
unique run id, 3
, which allow us to track the lineage of the object back to the run
that produced it.
Note that metaflow.S3
provides a default S3 location for storing data. You could
change the location by defining S3(bucket='my-bucket', prefix='/my/prefix')
for the
constructor. Metaflow versioning information would be concatenated to the prefix
.
Load external objects produced by a Metaflow run
What if you want to inspect S3 data produced by a flow afterwards? Just use the Client
API as usual to locate the desired Run
and use it to initialize
an S3
object:
from metaflow import S3
with S3(run=Flow('S3DemoFlow').latest_run) as s3:
print(s3.get('example_object').text)
{"message": "hello world!"}
This pattern is particularly convenient for notebooks.
Store and load objects to/from a known S3 location
The above examples inferred the S3 location based on the current or an existing Metaflow run. What if you want to load data that has nothing to do with Metaflow? Easy:
from metaflow import S3
with S3() as s3:
res = s3.get('s3://my-bucket/savin/tmp/external_data')
print('an alien message: %s' % res.text)
an alien message: I know nothing about Metaflow
If S3
is initialized without any arguments, all operations require a full S3 URL.
If you need to operate on multiple files, it may be more convenient to specify a custom
S3 prefix with the s3root
argument:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo/') as s3:
s3.put('fruit', 'pineapple')
s3.put('animal', 'mongoose')
with S3() as s3:
s3.get('s3://my-bucket/savin/tmp/s3demo/fruit').text
pineapple
If the requested URL does not exist, the get
call will raise an exception. You can
call get
with return_missing=True
if you want to return a missing URL as an ordinary
result object, as described in the section below.
By default, put_*
calls will overwrite existing keys in S3. To avoid this behavior you
can invoke your put_*
calls with overwrite=False
. Refer to this
section for some of the pitfalls involved with
overwriting keys in S3.
Instead of hardcoding an S3 URL in your code, you can read s3root
from a Parameter
or a Config
file. Learn more in Configuring Flows.
The S3 result object
All get
operations return an S3Object
, backed by a temporary file on local disk,
which exposes a number of attributes about the object:
with S3(s3root='s3://my-bucket/savin/tmp/s3demo/') as s3:
s3obj = s3.get('fruit')
print('location', s3obj.url)
print('key', s3obj.key)
print('size', s3obj.size)
print('local path', s3obj.path)
print('bytes', s3obj.blob)
print('unicode', s3obj.text)
print('metadata', s3obj.metadata)
print('content-type', s3obj.content_type)
print('downloaded', s3obj.downloaded)
location s3://my-bucket/savin/tmp/s3demo/fruit
key fruit
size 9
local path /data/metaflow/metaflow.s3.5agi129m/metaflow.s3.one_file.pih_iseg
bytes b'pineapple'
unicode pineapple
metadata None
content-type application/octet-stream
downloaded True
Note that you can not access data behind s3obj
outside the with
scope as the
temporary file pointed at s3obj.path
will get deleted as the scope exits.
The S3Object
may also refer to an S3 URL that does not correspond to an object in S3.
These objects have exists
property set to False
. Non-existent objects may be
returned by a list_path
call, if the result refers to an S3 prefix, not an object.
Listing operations also set downloaded
property to False
, to distinguish them from
operations that download data locally. Also get
and get_many
may return non-existent
objects if you call these methods with an argument return_missing=True
.
Querying objects without downloading them
The above information about an object, like size
and metadata
, can be useful even
without downloading the file itself. To just get the metadata, use the info
and
info_many
calls that work like get
and get_many
but avoid the potentially
expensive downloading part. The info calls set downloaded=False
in the result object.
Operations on multiple objects
After you have instantiated the object given the right context information, all get
and put
operations work equally. The context is only used to construct an appropriate
S3 URL.
Besides loading individual files with .get()
and .put()
as shown above,
metaflow.S3
really shines at operating multiple files at once.
It is guaranteed that the list of S3Objects
returned is always in the same order as
long as the underlying data does not change. This can be important e.g. if you use
metaflow.S3
to feed data for a model. The input data will be in a deterministic order
so results should be easily reproducible.
Load multiple objects in parallel
Use get_many()
to load arbitrarily many objects at once:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo/') as s3:
s3.get_many(['fruit', 'animal'])
[<S3Object s3://my-bucket/savin/tmp/s3demo/fruit (9 bytes)>,
<S3Object s3://my-bucket/savin/tmp/s3demo/animal (8 bytes)>]
Here, get_many()
loads objects in parallel, which is much faster than loading
individual objects sequentially. You can achieve the optimal throughput with S3 only
when you operate on many files in parallel.
If one of the requested URLs doesn't exist, the get_many
call will raise an exception.
If you don't want to fail all objects because of missing URLs, call get_many
with
return_missing=True
. This will make get_many
return missing URLs amongst other
results. You can distinguish between the found and not found URLs using the exists
property of S3Object
.
Load all objects recursively under a prefix
We can load all objects under a given prefix:
from metaflow import S3
with S3() as s3:
s3.get_recursive(['s3://my-bucket/savin/tmp/s3demo'])
[<S3Object s3://my-bucket/savin/tmp/s3demo/animal (8 bytes)>,
<S3Object s3://my-bucket/savin/tmp/s3demo/fruit (9 bytes)>]
Note that get_recursive
takes a list of prefixes. This is useful for achieving the
maximum level of parallelism when retrieving data under multiple prefixes.
If you have specified a custom s3root
, you can use get_all()
to get all files
recursively under the given prefix.
Loading parts of files
A performance-sensitive application may want to read only a part of a large file.
Instead of a string, the get
and get_many
calls also accept an object with key
,
offset
, length
attributes that specify a part of a file to download. You can use an
object called S3GetObject
provided by Metaflow for this purpose.
This example loads two 1KB chunks of a file in S3:
from metaflow import S3
from metaflow.datatools.s3 import S3GetObject
URL = 's3://ursa-labs-taxi-data/2014/12/data.parquet'
with S3() as s3:
res = s3.get_many([S3GetObject(key=URL, offset=0, length=1024),
S3GetObject(key=URL, offset=1024, length=1024)])
for obj in res:
print(obj.path, obj.size)
Store multiple objects or files
If you need to store multiple objects, use put_many
:
from metaflow import S3
many = {'first_key': 'foo', 'second_key': 'bar'}
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_put/') as s3:
s3.put_many(many.items())
[('first_key', 's3://my-bucket/savin/tmp/s3demo_put/first_key'),
('second_key', 's3://my-bucket/savin/tmp/s3demo_put/second_key')]
You may want to store more data to S3 than what you can fit in memory at once. This is a
good use case for put_files
:
from metaflow import S3
with open('/tmp/1', 'w') as f:
f.write('first datum')
with open('/tmp/2', 'w') as f:
f.write('second datum')
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_put/') as s3:
s3.put_files([('first_file', '/tmp/1'), ('second_file', '/tmp/2')])
[('first_file', 's3://my-bucket/savin/tmp/s3demo_put/first_file'),
('second_file', 's3://my-bucket/savin/tmp/s3demo_put/second_file')]
Objects are stored in S3 in parallel for maximum throughput.
Listing objects in S3
To get objects with get
and get_many
, you need to know the exact names of the
objects to download. S3 is optimized for looking up specific names, so it is preferable
to structure your code around known names. However, sometimes this is not possible and
you need to check first what is available in S3.
Metaflow provides two ways to list objects in S3: list_paths
and list_recursive
. The
first method provides the next level of prefixes (directories) in S3, directly under the
given prefix. The latter method provides all objects under the given prefix. Since
list_paths
returns a subset of prefixes returned by list_recursive
, it is typically
a much faster operation.
Here's an example: First, let's create files in S3 in a hierarchy like this:
first/a/object1
first/b/x/object2
second/c/object3
from metaflow import S3
many = {'first/a/object1': 'data',
'first/b/x/object2': 'data',
'second/c/object3': 'data'}
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
s3.put_many(many.items())
Next, let's list all directories using list_paths
:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_paths():
print key.key
first
second
You can list multiple prefixes in parallel by giving list_paths
a list of prefixes:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_paths(['first', 'second']):
print key.key
a
b
c
Listing may return either prefixes (directories) or objects. To distinguish between the
two, use the .exists
property of the returned S3Object
:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_paths(['first/a', 'first/b']):
print key.key, 'object' if key.exists else 'prefix'
object1 object
x prefix
If you want all objects under the given prefix, use the list_recursive
method:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_recursive():
print key.key
first/a/object1
first/b/x/object2
second/c/object3
Similar to list_paths
, list_recursive
can take a list of prefixes to process in
parallel.
A common pattern is to list objects using either list_paths
or list_recursive
,
filter out some keys from the listing, and provide the pruned list to get_many
for
fast parallelized downloading.
Caution: Overwriting data in S3
You should avoid overwriting data in the same key (URL) in S3. S3 guarantees that new keys always reflect the latest data. In contrast, when you overwrite data in an existing key, there is a short period of time when a reader may see either the old version or the new version of the data.
In particular, when you use metaflow.S3
in your Metaflow flows, make sure that every
task and step writes to a unique key. Otherwise you may find results unpredictable and
inconsistent.
Note that specifying overwrite=False
in your put_*
calls changes the behavior of S3
slightly compared to the default mode of overwrite=True
. There may be a small delay
(typically in the order of milliseconds) before the key becomes available for reading.
This is an important reason to rely on Metaflow artifacts, which handle this
complication for you, whenever possible. If you absolutely need to handle this by
yourself, one way to guarantee uniqueness is to use current.task_id
from the
current
module as a part of your
S3 keys.
Maximizing S3 performance
S3 can provide massive download speeds, tens of gigabits per second on large instances,
when using metaflow.S3
. In order to achieve the maximum throughput, pay attention to
the following dimensions:
Same region: Make sure the EC2 instances hosting the tasks are located in the same region as the S3 bucket you are loading data from.
File layout: You need to download multiple files in parallel using e.g. metaflow.S3.get_many. The files should be around 0.1-1GB each. Fortunately, it is easy to produce partitioned outputs like this with many query engines.
Instance size: Larger EC2 instances boost higher number of CPU cores, network throughput, and memory.
Data fits in RAM: Crucially, loading data from S3 directly to memory is faster than loading data from S3 to an instance volume. If data doesn’t fit in memory, performance can be very bad due to slow local disk IO.
Read more about fast data processing with metaflow.S3
in this blog post.
Using metaflow.S3
for in-memory processing
For maximum performance, ensure that
the @resources(memory=)
setting
is higher than the amount of data you are downloading with metaflow.S3
.
If the amount of data is higher than the available disk space, you can use the
use_tmpfs=True
with @batch
an
@kubernetes
to create an in-memory
filesystem which metaflow.S3
will use automatically.
These options are available for tmpfs
:
use_tmpfs=True
enabled atmpfs
mountpoint and instructsmetaflow.S3
to use it as a destination for downloads. Note that you must ensure that thetmpfs
size is large enough for all data downloaded.tmpfs_tempdir=False
will instructmetaflow.S3
to not use thetmpfs
. Use this option if you want to reserve thetmpfs
mount for your own use only.tmpfs_size=N
allocates at mostN
megabytes fortmpfs
. Note that unused space doesn't count towards actual memory usage, so you can safely overallocate space. By default, 50% of the available memory is made available fortmpfs
.tmpfs_path=P
allows you to use an alternative mount point fortmpfs
.
You can access the current tmpfs
mountpoint in your tasks with
current.tempdir
. You can use it as fast
temporary disk space for your own needs as well.
Data in Local Files
Similarly to Parameters, you
can define a data file to include as input for your flow. Metaflow will version the file
and make it accessible to all the steps directly through the self
object in your flow.
This example allows the user to include a data file and compute its hash:
from metaflow import FlowSpec, step, IncludeFile
class HashFileFlow(FlowSpec):
myfile = IncludeFile(
'myfile',
is_text=False,
help='My input',
default='/Users/bob/myinput.bin')
@step
def start(self):
import hashlib
print('Hello from start')
print('Hash of file is %s' % \
str(hashlib.sha1(self.myfile).hexdigest()))
self.next(self.end)
@step
def end(self):
print('Goodbye')
if __name__ == '__main__':
HashFileFlow()
You can specify the file to use by using:
python hash_flow.py run --myfile '/path/to/input/file'