Create Dask Bags¶
There are several ways to create dask.bags around your data:
db.from_sequence¶
You can create a bag from an existing Python sequence:
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
You can control the number of partitions into which this data is binned:
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.
IMPORTANT: do not load your data into Python and then load that data into dask.bag. Instead, use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication:
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.read_text¶
Dask.bag can load data directly from textfiles. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition:
>>> b = db.read_text('myfile.json')
>>> b = db.read_text(['myfile.1.json', 'myfile.2.json', ...])
>>> b = db.read_text('myfile.*.json')
This handles standard compression libraries like gzip, bz2, xz, or
any easily installed compression library that has a File-like object.
Compression will be inferred by filename extension, or by using the
compression='gzip' keyword:
>>> b = db.read_text('myfile.*.json.gz')
The resulting items in the bag are strings. You may want to parse them using
functions like json.loads:
>>> import json
>>> b = db.read_text('myfile.*.json.gz', compression='gzip').map(json.loads)
Or do string munging tasks. For convenience there is a string namespace
attached directly to bags with .str.methodname:
>>> b = db.read_text('myfile.*.csv.gz').str.strip().str.split(',')
db.from_delayed¶
You can construct a dask bag from dask.delayed values
using the db.from_delayed function. See
documentation on using dask.delayed with collections
for more information.
Store Dask Bags¶
In Memory¶
You can convert a dask bag to a list or Python iterable by calling compute() or by converting the object into a list
>>> result = b.compute()
or
>>> result = list(b)
To Textfiles¶
You can convert a dask bag into a sequence of files on disk by calling the
.to_textfiles() method
-
dask.bag.core.to_textfiles(b, path, name_function=<type 'str'>, compression='infer', encoding='utf-8', compute=True)¶ Write bag to disk, one filename per partition, one line per element
Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.
Use a globstring
>>> b.to_textfiles('/path/to/data/*.json.gz')
The * will be replaced by the increasing sequence 1, 2, ...
/path/to/data/0.json.gz /path/to/data/1.json.gz
Use a globstring and a
name_function=keyword argument. The name_function function should expect an integer and produce a string.>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
You can also provide an explicit list of paths.
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.
To DataFrames¶
You can convert a dask bag into a dask dataframe and use those storage solutions.
-
Bag.to_dataframe(columns=None)¶ Convert Bag to dask.dataframe
Bag should contain tuple or dict records.
Provide
columns=keyword arg to specify column names.Index will not be particularly meaningful. Use
reindexafterwards if necessary.Examples
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
>>> df.compute() balance name 0 100 Alice 1 200 Bob 0 300 Charlie
To Delayed Values¶
You can convert a dask bag into a list of dask delayed values and custom storage solutions from there.
-
Bag.to_delayed()¶ Convert bag to dask Values
Returns list of values, one value per partition.