Bag

Dask.Bag parallelizes computations across a large collection of generic Python objects. It is particularly useful when dealing with large quantities of semi-structured data like JSON blobs or log files.

Name

Bag is an abstract collection, like list or set. It is a friendly synonym to multiset.

  • list: ordered collection with repeats, [1, 2, 3, 2]
  • set: unordered collection without repeats, {1, 2, 3}
  • bag: unordered collection with repeats, {1, 2, 2, 3}

So a bag is like a list but doesn’t guarantee an ordering among elements. There can be repeated elements but you can’t ask for a particular element.

Example

We commonly use dask.bag to process unstructured or semi-structured data.

>>> import dask.bag as db
>>> import json
>>> js = db.from_filenames('logs/2015-*.json.gz').map(json.loads)
>>> js.take(2)
({'name': 'Alice', 'location': {'city': 'LA', 'state': 'CA'}},
 {'name': 'Bob', 'location': {'city': 'NYC', 'state': 'NY'})

>>> result = js.pluck('name').frequencies()  # just another Bag
>>> dict(result)                             # Evaluate Result
{'Alice': 10000, 'Bob': 5555, 'Charlie': ...}

Create Bags

There are several ways to create dask.bags around your data

db.from_sequence

You can create a bag from an existing Python sequence.

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

You can control the number of partitions into which this data is binned.

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.

Warning: you should not load your data into Python and then load that data into dask.bag. Instead, you should use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication.

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.from_filenames

Dask.bag can load data from textfiles directly. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition.

>>> b = db.from_filenames('myfile.json')
>>> b = db.from_filenames(['myfile.1.json', 'myfile.2.json', ...])
>>> b = db.from_filenames('myfile.*.json')

Dask.bag handles standard compression libraries, notably gzip and bz2, based on the filename extension.

>>> b = db.from_filenames('myfile.*.json.gz')

The resulting items in the bag are strings. You may want to parse them using functions like json.loads

>>> import json
>>> b = db.from_filenames('myfile.*.json.gz').map(json.loads)

Or do string munging tasks. For convenience there is a string namespace attached directly to bags with .str.methodname.

>>> b = db.from_filenames('myfile.*.csv.gz').str.strip().str.split(',')

db.from_hdfs

Dask.bag can use WebHDFS to load text data from HDFS

>>> from pywebhdfs.webhdfs import PyWebHdfsClient
>>> hdfs = PyWebHdfsClient(host='hostname', user_name='hdfs')

>>> b = db.from_hdfs('/user/username/data/2015/06/', hdfs=hdfs)

If the input is a directory then we return all data underneath that directory and all subdirectories.

This uses WebHDFS to pull data from HDFS and so only works if that is enabled. It does not require your computer to actually be on HDFS, merely that you have network access. Data will be downloaded to memory, decompressed, used, and cleaned up as necessary.

Notably, this function does not tightly integrate dask.bag with a Hadoop cluster. Computation is not guaranteed (or likely) to be local to the node that has the data. This functionality is not the same as what you would get with Hadoop or Spark. No dask scheduler currently integrates nicely with data-local file systems like HDFS.

Execution

Execution on bags provide two benefits

  1. Streaming: data processes lazily, allowing smooth execution of larger-than-memory data
  2. Parallel: data is split up, allowing multiple cores to execute in parallel

Trigger Evaluation

Bags have a .compute() method to trigger computation.

>>> c = b.map(func)
>>> c.compute()
[1, 2, 3, 4, ...]

You must ensure that your result will fit in memory.

Bags also support the __iter__ protocol and so work well with pythonic collections like list, tuple, set, dict. Converting your object into a list or dict can look more Pythonic than calling .compute()

>>> list(b.map(lambda x: x + 1))
[1, 2, 3, 4, ...]

>>> dict(b.frequencies())
{'Alice': 100, 'Bob': 200, ...}

Default scheduler

By default dask.bag uses dask.multiprocessing for computation. As a benefit dask bypasses the GIL and uses multiple cores on Pure Python objects. As a drawback dask.bag doesn’t perform well on computations that include a great deal of inter-worker communication. For common operations this is rarely an issue as most dask.bag workflows are embarrassingly parallel or result in reductions with little data moving between workers.

Additionally, using multiprocessing opens up potential problems with function serialization (see below).

Shuffle

Some operations, like full groupby and bag-to-bag join do require substantial inter-worker communication. These are handled specially by shuffle operations that use disk and a central memory server as a central point of communication.

Shuffle operations are expensive and better handled by projects like dask.dataframe. It is best to use dask.bag to clean and process data then transform it into an array or dataframe before embarking on the more complex operations that require shuffle steps.

Dask.bag uses partd to perform efficient, parallel, spill-to-disk shuffles.

Function Serialization and Error Handling

Dask.bag uses dill to serialize functions to send to worker processes. Dill supports almost any kind of function, including lambdas, closures, partials and functions defined interactively.

When an error occurs in a remote process the dask schedulers record the Exception and the traceback and delivers these to the main process. These tracebacks can not be navigated (i.e. you can’t use pdb) but still contain valuable contextual information.

These two features are arguably the most important when comparing dask.bag to direct use of multiprocessing.

If you would like to turn off multiprocessing you can do so by setting the default get function to the synchronous single-core scheduler

>>> from dask.async import get_sync
>>> b.compute(get=get_sync)

or

>>> import dask
>>> dask.set_options(get=get_sync)  # set global
>>> list(b)  # uses synchronous scheduler

Known Limitations

Bags provide very general computation (any Python function.) This generality comes at cost. Bags have the following known limitations

  1. By default they rely on the multiprocessing scheduler, which has its own set of known limitations (see shared)
  2. Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/pandas
  3. Bag.groupby is slow. You should try to use Bag.foldby if possible. Using Bag.foldby requires more thought.
  4. The implementation backing Bag.groupby is under heavy churn.

API

Create Bags

from_sequence(seq[, partition_size, npartitions]) Create dask from Python sequence
from_filenames(filenames[, chunkbytes, encoding]) Create dask by loading in lines from many files
from_hdfs(path[, hdfs, host, port, user_name]) Create dask by loading in files from HDFS
concat(bags) Concatenate many bags together, unioning all elements

Turn Bags into other things

Bag.to_textfiles(path[, name_function, encoding]) Write bag to disk, one filename per partition, one line per element
Bag.to_dataframe([columns]) Convert Bag to dask.dataframe

Bag Methods

class dask.bag.core.Bag(dsk, name, npartitions)

Parallel collection of Python objects

Methods

all((iterable) -> bool) Return True if bool(x) is True for all values x in the iterable.
any((iterable) -> bool) Return True if bool(x) is True for any x in the iterable.
compute(**kwargs)
concat() Concatenate nested lists into one long list
count() Count the number of elements
distinct() Distinct elements of collection
filter(predicate) Filter elements in collection by a predicate function
fold(binop[, combine, initial]) Parallelizable reduction
foldby(key, binop[, initial, combine, ...]) Combined reduction and groupby
frequencies() Count number of occurrences of each distinct element
from_filenames(*args, **kwargs)
from_sequence(*args, **kwargs)
groupby(grouper[, npartitions, blocksize]) Group collection by key function
join(other, on_self[, on_other]) Join collection with another collection
map(func) Map a function across all elements in collection
map_partitions(func) Apply function to every partition within collection
max((iterable[[, key]) max(a, b, c, ...[, key=func]) -> value
mean() Arithmetic mean
min((iterable[[, key]) min(a, b, c, ...[, key=func]) -> value
pluck(key[, default]) Select item from all tuples/dicts in collection
product(other) Cartesian product between two bags
reduction(perpartition, aggregate) Reduce collection with reduction operators
remove(predicate) Remove elements in collection that match predicate
std([ddof]) Standard deviation
sum((sequence[, start]) -> value) Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0).
take(k[, compute]) Take the first k elements
to_dataframe([columns]) Convert Bag to dask.dataframe
to_textfiles(path[, name_function, encoding]) Write bag to disk, one filename per partition, one line per element
topk(k[, key]) K largest elements in collection
var([ddof]) Variance
visualize([filename, optimize_graph])
all(iterable) → bool

Return True if bool(x) is True for all values x in the iterable. If the iterable is empty, return True.

any(iterable) → bool

Return True if bool(x) is True for any x in the iterable. If the iterable is empty, return False.

concat()

Concatenate nested lists into one long list

>>> b = from_sequence([[1], [2, 3]])
>>> list(b)
[[1], [2, 3]]
>>> list(b.concat())
[1, 2, 3]
count()

Count the number of elements

distinct()

Distinct elements of collection

Unordered without repeats.

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> sorted(b.distinct())
['Alice', 'Bob']
filter(predicate)

Filter elements in collection by a predicate function

>>> def iseven(x):
...     return x % 2 == 0
>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.filter(iseven))  
[0, 2, 4]
fold(binop, combine=None, initial='__no__default__')

Parallelizable reduction

Fold is like the builtin function reduce except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitions

  1. binop: Binary operator to reduce within each partition
  2. combine: Binary operator to combine results from binop

Sequentially this would look like the following:

>>> intermediates = [reduce(binop, part) for part in partitions]  
>>> final = reduce(combine, intermediates)  

If only one function is given then it is used for both functions binop and combine as in the following example to compute the sum:

>>> def add(x, y):
...     return x + y
>>> b = from_sequence(range(5))
>>> b.fold(add).compute()  
10

In full form we provide both binary operators as well as their default arguments

>>> b.fold(binop=add, combine=add, initial=0).compute()  
10

More complex binary operators are also doable

>>> def add_to_set(acc, x):
...     ''' Add new element x to set acc '''
...     return acc | set([x])
>>> b.fold(add_to_set, set.union, initial=set()).compute()  
{1, 2, 3, 4, 5}

See also

Bag.foldby

foldby(key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__')

Combined reduction and groupby

Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.

The computation

>>> b.foldby(key, binop, init)                        

is equivalent to the following:

>>> def reduction(group):                               
...     return reduce(binop, group, init)               
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))

But uses minimal communication and so is much faster.

>>> b = from_sequence(range(10))
>>> iseven = lambda x: x % 2 == 0
>>> add = lambda x, y: x + y
>>> dict(b.foldby(iseven, add))                         
{True: 20, False: 25}

See also

toolz.reduceby, pyspark.combineByKey

frequencies()

Count number of occurrences of each distinct element

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> dict(b.frequencies())  
{'Alice': 2, 'Bob', 1}
groupby(grouper, npartitions=None, blocksize=1048576)

Group collection by key function

Note that this requires full dataset read, serialization and shuffle. This is expensive. If possible you should use foldby.

>>> b = from_sequence(range(10))
>>> dict(b.groupby(lambda x: x % 2 == 0))  
{True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}

See also

Bag.foldby

join(other, on_self, on_other=None)

Join collection with another collection

Other collection must be an Iterable, and not a Bag.

>>> people = from_sequence(['Alice', 'Bob', 'Charlie'])
>>> fruit = ['Apple', 'Apricot', 'Banana']
>>> list(people.join(fruit, lambda x: x[0]))  
[('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
map(func)

Map a function across all elements in collection

>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.map(lambda x: x * 10))  
[0, 10, 20, 30, 40]
map_partitions(func)

Apply function to every partition within collection

Note that this requires you to understand how dask.bag partitions your data and so is somewhat internal.

>>> b.map_partitions(myfunc)  
max(iterable[, key=func]) → value

max(a, b, c, ...[, key=func]) -> value

With a single iterable argument, return its largest item. With two or more arguments, return the largest argument.

mean()

Arithmetic mean

min(iterable[, key=func]) → value

min(a, b, c, ...[, key=func]) -> value

With a single iterable argument, return its smallest item. With two or more arguments, return the smallest argument.

pluck(key, default='__no__default__')

Select item from all tuples/dicts in collection

>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]},
...                    {'name': 'Bob',   'credits': [10, 20]}])
>>> list(b.pluck('name'))  
['Alice', 'Bob']
>>> list(b.pluck('credits').pluck(0))  
[1, 10]
product(other)

Cartesian product between two bags

reduction(perpartition, aggregate)

Reduce collection with reduction operators

Parameters:

perpartition: function

reduction to apply to each partition

aggregate: function

reduction to apply to the results of all partitions

remove(predicate)

Remove elements in collection that match predicate

>>> def iseven(x):
...     return x % 2 == 0
>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.remove(iseven))  
[1, 3]
std(ddof=0)

Standard deviation

sum(sequence[, start]) → value

Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0). When the sequence is empty, return start.

take(k, compute=True)

Take the first k elements

Evaluates by default, use compute=False to avoid computation. Only takes from the first partition

>>> b = from_sequence(range(10))
>>> b.take(3)  
(0, 1, 2)
to_dataframe(columns=None)

Convert Bag to dask.dataframe

Bag should contain tuple or dict records.

Provide columns= keyword arg to specify column names.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

to_textfiles(path, name_function=<type 'str'>, encoding='ascii')

Write bag to disk, one filename per partition, one line per element

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * will be replaced by the increasing sequence 1, 2, ...

/path/to/data/0.json.gz
/path/to/data/1.json.gz

Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

topk(k, key=None)

K largest elements in collection

Optionally ordered by some key function

>>> b = from_sequence([10, 3, 5, 7, 11, 4])
>>> list(b.topk(2))  
[11, 10]
>>> list(b.topk(2, lambda x: -x))  
[3, 4]
var(ddof=0)

Variance

Other functions

dask.bag.core.from_sequence(seq, partition_size=None, npartitions=None)

Create dask from Python sequence

This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself. Commonly we load a sequence of filenames into a Bag and then use .map to open them.

Parameters:

seq: Iterable

A sequence of elements to put into the dask

partition_size: int (optional)

The length of each partition

npartitions: int (optional)

The number of desired partitions

It is best to provide either ``partition_size`` or ``npartitions``

(though not both.)

dask.bag.core.from_filenames(filenames, chunkbytes=None, encoding='ascii')

Create dask by loading in lines from many files

Provide list of filenames

>>> b = from_filenames(['myfile.1.txt', 'myfile.2.txt'])  

Or a globstring

>>> b = from_filenames('myfiles.*.txt')  

Parallelize a large files by providing the number of uncompressed bytes to load into each partition.

>>> b = from_filenames('largefile.txt', chunkbytes=1e7)  
See also:
from_sequence: A more generic bag creation function
dask.bag.core.from_hdfs(path, hdfs=None, host='localhost', port='50070', user_name=None)

Create dask by loading in files from HDFS

Provide an hdfs directory and credentials

>>> b = from_hdfs('home/username/data/', host='localhost', user_name='ubuntu')  

Alternatively provide an instance of pywebhdfs.webhdfs.PyWebHdfsClient

>>> from pywebhdfs.webhdfs import PyWebHdfsClient  
>>> hdfs = PyWebHdfsClient(host='hostname', user_name='username')  
>>> b = from_hdfs('home/username/data/', hdfs=hdfs)  
dask.bag.core.concat(bags)

Concatenate many bags together, unioning all elements

>>> import dask.bag as db
>>> a = db.from_sequence([1, 2, 3])
>>> b = db.from_sequence([4, 5, 6])
>>> c = db.concat([a, b])
>>> list(c)
[1, 2, 3, 4, 5, 6]