Bag¶
Dask.Bag parallelizes computations across a large collection of generic Python objects. It is particularly useful when dealing with large quantities of semi-structured data like JSON blobs or log files.
Name¶
Bag is an abstract collection, like list or set. It is a friendly synonym to multiset.
- list: ordered collection with repeats, [1, 2, 3, 2]
- set: unordered collection without repeats, {1, 2, 3}
- bag: unordered collection with repeats, {1, 2, 2, 3}
So a bag is like a list but doesn’t guarantee an ordering among elements. There can be repeated elements but you can’t ask for a particular element.
Example¶
We commonly use dask.bag to process unstructured or semi-structured data.
>>> import dask.bag as db
>>> import json
>>> js = db.from_filenames('logs/2015-*.json.gz').map(json.loads)
>>> js.take(2)
({'name': 'Alice', 'location': {'city': 'LA', 'state': 'CA'}},
{'name': 'Bob', 'location': {'city': 'NYC', 'state': 'NY'})
>>> result = js.pluck('name').frequencies() # just another Bag
>>> dict(result) # Evaluate Result
{'Alice': 10000, 'Bob': 5555, 'Charlie': ...}
Create Bags¶
There are several ways to create dask.bags around your data
db.from_sequence¶
You can create a bag from an existing Python sequence.
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
You can control the number of partitions into which this data is binned.
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.
Warning: you should not load your data into Python and then load that data into dask.bag. Instead, you should use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication.
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.from_filenames¶
Dask.bag can load data from textfiles directly. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition.
>>> b = db.from_filenames('myfile.json')
>>> b = db.from_filenames(['myfile.1.json', 'myfile.2.json', ...])
>>> b = db.from_filenames('myfile.*.json')
Dask.bag handles standard compression libraries, notably gzip and bz2, based on the filename extension.
>>> b = db.from_filenames('myfile.*.json.gz')
The resulting items in the bag are strings. You may want to parse them using functions like json.loads
>>> import json
>>> b = db.from_filenames('myfile.*.json.gz').map(json.loads)
Or do string munging tasks. For convenience there is a string namespace attached directly to bags with .str.methodname.
>>> b = db.from_filenames('myfile.*.csv.gz').str.strip().str.split(',')
db.from_hdfs¶
Dask.bag can use WebHDFS to load text data from HDFS
>>> from pywebhdfs.webhdfs import PyWebHdfsClient
>>> hdfs = PyWebHdfsClient(host='hostname', user_name='hdfs')
>>> b = db.from_hdfs('/user/username/data/2015/06/', hdfs=hdfs)
If the input is a directory then we return all data underneath that directory and all subdirectories.
This uses WebHDFS to pull data from HDFS and so only works if that is enabled. It does not require your computer to actually be on HDFS, merely that you have network access. Data will be downloaded to memory, decompressed, used, and cleaned up as necessary.
Notably, this function does not tightly integrate dask.bag with a Hadoop cluster. Computation is not guaranteed (or likely) to be local to the node that has the data. This functionality is not the same as what you would get with Hadoop or Spark. No dask scheduler currently integrates nicely with data-local file systems like HDFS.
Execution¶
Execution on bags provide two benefits
- Streaming: data processes lazily, allowing smooth execution of larger-than-memory data
- Parallel: data is split up, allowing multiple cores to execute in parallel
Trigger Evaluation¶
Bags have a .compute() method to trigger computation.
>>> c = b.map(func)
>>> c.compute()
[1, 2, 3, 4, ...]
You must ensure that your result will fit in memory.
Bags also support the __iter__ protocol and so work well with pythonic collections like list, tuple, set, dict. Converting your object into a list or dict can look more Pythonic than calling .compute()
>>> list(b.map(lambda x: x + 1))
[1, 2, 3, 4, ...]
>>> dict(b.frequencies())
{'Alice': 100, 'Bob': 200, ...}
Default scheduler¶
By default dask.bag uses dask.multiprocessing for computation. As a benefit dask bypasses the GIL and uses multiple cores on Pure Python objects. As a drawback dask.bag doesn’t perform well on computations that include a great deal of inter-worker communication. For common operations this is rarely an issue as most dask.bag workflows are embarrassingly parallel or result in reductions with little data moving between workers.
Additionally, using multiprocessing opens up potential problems with function serialization (see below).
Shuffle¶
Some operations, like full groupby and bag-to-bag join do require substantial inter-worker communication. These are handled specially by shuffle operations that use disk and a central memory server as a central point of communication.
Shuffle operations are expensive and better handled by projects like dask.dataframe. It is best to use dask.bag to clean and process data then transform it into an array or dataframe before embarking on the more complex operations that require shuffle steps.
Dask.bag uses partd to perform efficient, parallel, spill-to-disk shuffles.
Function Serialization and Error Handling¶
Dask.bag uses dill to serialize functions to send to worker processes. Dill supports almost any kind of function, including lambdas, closures, partials and functions defined interactively.
When an error occurs in a remote process the dask schedulers record the Exception and the traceback and delivers these to the main process. These tracebacks can not be navigated (i.e. you can’t use pdb) but still contain valuable contextual information.
These two features are arguably the most important when comparing dask.bag to direct use of multiprocessing.
If you would like to turn off multiprocessing you can do so by setting the default get function to the synchronous single-core scheduler
>>> from dask.async import get_sync
>>> b.compute(get=get_sync)
or
>>> import dask
>>> dask.set_options(get=get_sync) # set global
>>> list(b) # uses synchronous scheduler
Known Limitations¶
Bags provide very general computation (any Python function.) This generality comes at cost. Bags have the following known limitations
- By default they rely on the multiprocessing scheduler, which has its own set of known limitations (see shared)
- Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/pandas
- Bag.groupby is slow. You should try to use Bag.foldby if possible. Using Bag.foldby requires more thought.
- The implementation backing Bag.groupby is under heavy churn.
API¶
Create Bags¶
| from_sequence(seq[, partition_size, npartitions]) | Create dask from Python sequence |
| from_filenames(filenames[, chunkbytes, encoding]) | Create dask by loading in lines from many files |
| from_hdfs(path[, hdfs, host, port, user_name]) | Create dask by loading in files from HDFS |
| concat(bags) | Concatenate many bags together, unioning all elements |
Turn Bags into other things¶
| Bag.to_textfiles(path[, name_function, encoding]) | Write bag to disk, one filename per partition, one line per element |
| Bag.to_dataframe([columns]) | Convert Bag to dask.dataframe |
Bag Methods¶
- class dask.bag.core.Bag(dsk, name, npartitions)¶
Parallel collection of Python objects
Methods
all((iterable) -> bool) Return True if bool(x) is True for all values x in the iterable. any((iterable) -> bool) Return True if bool(x) is True for any x in the iterable. compute(**kwargs) concat() Concatenate nested lists into one long list count() Count the number of elements distinct() Distinct elements of collection filter(predicate) Filter elements in collection by a predicate function fold(binop[, combine, initial]) Parallelizable reduction foldby(key, binop[, initial, combine, ...]) Combined reduction and groupby frequencies() Count number of occurrences of each distinct element from_filenames(*args, **kwargs) from_sequence(*args, **kwargs) groupby(grouper[, npartitions, blocksize]) Group collection by key function join(other, on_self[, on_other]) Join collection with another collection map(func) Map a function across all elements in collection map_partitions(func) Apply function to every partition within collection max((iterable[[, key]) max(a, b, c, ...[, key=func]) -> value mean() Arithmetic mean min((iterable[[, key]) min(a, b, c, ...[, key=func]) -> value pluck(key[, default]) Select item from all tuples/dicts in collection product(other) Cartesian product between two bags reduction(perpartition, aggregate) Reduce collection with reduction operators remove(predicate) Remove elements in collection that match predicate std([ddof]) Standard deviation sum((sequence[, start]) -> value) Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0). take(k[, compute]) Take the first k elements to_dataframe([columns]) Convert Bag to dask.dataframe to_textfiles(path[, name_function, encoding]) Write bag to disk, one filename per partition, one line per element topk(k[, key]) K largest elements in collection var([ddof]) Variance visualize([filename, optimize_graph]) - all(iterable) → bool¶
Return True if bool(x) is True for all values x in the iterable. If the iterable is empty, return True.
- any(iterable) → bool¶
Return True if bool(x) is True for any x in the iterable. If the iterable is empty, return False.
- concat()¶
Concatenate nested lists into one long list
>>> b = from_sequence([[1], [2, 3]]) >>> list(b) [[1], [2, 3]]
>>> list(b.concat()) [1, 2, 3]
- count()¶
Count the number of elements
- distinct()¶
Distinct elements of collection
Unordered without repeats.
>>> b = from_sequence(['Alice', 'Bob', 'Alice']) >>> sorted(b.distinct()) ['Alice', 'Bob']
- filter(predicate)¶
Filter elements in collection by a predicate function
>>> def iseven(x): ... return x % 2 == 0
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.filter(iseven)) [0, 2, 4]
- fold(binop, combine=None, initial='__no__default__')¶
Parallelizable reduction
Fold is like the builtin function reduce except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitions
- binop: Binary operator to reduce within each partition
- combine: Binary operator to combine results from binop
Sequentially this would look like the following:
>>> intermediates = [reduce(binop, part) for part in partitions] >>> final = reduce(combine, intermediates)
If only one function is given then it is used for both functions binop and combine as in the following example to compute the sum:
>>> def add(x, y): ... return x + y
>>> b = from_sequence(range(5)) >>> b.fold(add).compute() 10
In full form we provide both binary operators as well as their default arguments
>>> b.fold(binop=add, combine=add, initial=0).compute() 10
More complex binary operators are also doable
>>> def add_to_set(acc, x): ... ''' Add new element x to set acc ''' ... return acc | set([x]) >>> b.fold(add_to_set, set.union, initial=set()).compute() {1, 2, 3, 4, 5}
See also
- foldby(key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__')¶
Combined reduction and groupby
Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.
The computation
>>> b.foldby(key, binop, init)
is equivalent to the following:
>>> def reduction(group): ... return reduce(binop, group, init)
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))
But uses minimal communication and so is much faster.
>>> b = from_sequence(range(10)) >>> iseven = lambda x: x % 2 == 0 >>> add = lambda x, y: x + y >>> dict(b.foldby(iseven, add)) {True: 20, False: 25}
See also
toolz.reduceby, pyspark.combineByKey
- frequencies()¶
Count number of occurrences of each distinct element
>>> b = from_sequence(['Alice', 'Bob', 'Alice']) >>> dict(b.frequencies()) {'Alice': 2, 'Bob', 1}
- groupby(grouper, npartitions=None, blocksize=1048576)¶
Group collection by key function
Note that this requires full dataset read, serialization and shuffle. This is expensive. If possible you should use foldby.
>>> b = from_sequence(range(10)) >>> dict(b.groupby(lambda x: x % 2 == 0)) {True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}
See also
- join(other, on_self, on_other=None)¶
Join collection with another collection
Other collection must be an Iterable, and not a Bag.
>>> people = from_sequence(['Alice', 'Bob', 'Charlie']) >>> fruit = ['Apple', 'Apricot', 'Banana'] >>> list(people.join(fruit, lambda x: x[0])) [('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
- map(func)¶
Map a function across all elements in collection
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.map(lambda x: x * 10)) [0, 10, 20, 30, 40]
- map_partitions(func)¶
Apply function to every partition within collection
Note that this requires you to understand how dask.bag partitions your data and so is somewhat internal.
>>> b.map_partitions(myfunc)
- max(iterable[, key=func]) → value¶
max(a, b, c, ...[, key=func]) -> value
With a single iterable argument, return its largest item. With two or more arguments, return the largest argument.
- mean()¶
Arithmetic mean
- min(iterable[, key=func]) → value¶
min(a, b, c, ...[, key=func]) -> value
With a single iterable argument, return its smallest item. With two or more arguments, return the smallest argument.
- pluck(key, default='__no__default__')¶
Select item from all tuples/dicts in collection
>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]}, ... {'name': 'Bob', 'credits': [10, 20]}]) >>> list(b.pluck('name')) ['Alice', 'Bob'] >>> list(b.pluck('credits').pluck(0)) [1, 10]
- product(other)¶
Cartesian product between two bags
- reduction(perpartition, aggregate)¶
Reduce collection with reduction operators
Parameters: perpartition: function
reduction to apply to each partition
aggregate: function
reduction to apply to the results of all partitions
- remove(predicate)¶
Remove elements in collection that match predicate
>>> def iseven(x): ... return x % 2 == 0
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.remove(iseven)) [1, 3]
- std(ddof=0)¶
Standard deviation
- sum(sequence[, start]) → value¶
Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0). When the sequence is empty, return start.
- take(k, compute=True)¶
Take the first k elements
Evaluates by default, use compute=False to avoid computation. Only takes from the first partition
>>> b = from_sequence(range(10)) >>> b.take(3) (0, 1, 2)
- to_dataframe(columns=None)¶
Convert Bag to dask.dataframe
Bag should contain tuple or dict records.
Provide columns= keyword arg to specify column names.
Index will not be particularly meaningful. Use reindex afterwards if necessary.
- to_textfiles(path, name_function=<type 'str'>, encoding='ascii')¶
Write bag to disk, one filename per partition, one line per element
Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.
Use a globstring
>>> b.to_textfiles('/path/to/data/*.json.gz')
The * will be replaced by the increasing sequence 1, 2, ...
/path/to/data/0.json.gz /path/to/data/1.json.gz
Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string.
>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
You can also provide an explicit list of paths.
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.
- topk(k, key=None)¶
K largest elements in collection
Optionally ordered by some key function
>>> b = from_sequence([10, 3, 5, 7, 11, 4]) >>> list(b.topk(2)) [11, 10]
>>> list(b.topk(2, lambda x: -x)) [3, 4]
- var(ddof=0)¶
Variance
Other functions¶
- dask.bag.core.from_sequence(seq, partition_size=None, npartitions=None)¶
Create dask from Python sequence
This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself. Commonly we load a sequence of filenames into a Bag and then use .map to open them.
Parameters: seq: Iterable
A sequence of elements to put into the dask
partition_size: int (optional)
The length of each partition
npartitions: int (optional)
The number of desired partitions
It is best to provide either ``partition_size`` or ``npartitions``
(though not both.)
- dask.bag.core.from_filenames(filenames, chunkbytes=None, encoding='ascii')¶
Create dask by loading in lines from many files
Provide list of filenames
>>> b = from_filenames(['myfile.1.txt', 'myfile.2.txt'])
Or a globstring
>>> b = from_filenames('myfiles.*.txt')
Parallelize a large files by providing the number of uncompressed bytes to load into each partition.
>>> b = from_filenames('largefile.txt', chunkbytes=1e7)
- See also:
- from_sequence: A more generic bag creation function
- dask.bag.core.from_hdfs(path, hdfs=None, host='localhost', port='50070', user_name=None)¶
Create dask by loading in files from HDFS
Provide an hdfs directory and credentials
>>> b = from_hdfs('home/username/data/', host='localhost', user_name='ubuntu')
Alternatively provide an instance of pywebhdfs.webhdfs.PyWebHdfsClient
>>> from pywebhdfs.webhdfs import PyWebHdfsClient >>> hdfs = PyWebHdfsClient(host='hostname', user_name='username')
>>> b = from_hdfs('home/username/data/', hdfs=hdfs)
- dask.bag.core.concat(bags)¶
Concatenate many bags together, unioning all elements
>>> import dask.bag as db >>> a = db.from_sequence([1, 2, 3]) >>> b = db.from_sequence([4, 5, 6]) >>> c = db.concat([a, b])
>>> list(c) [1, 2, 3, 4, 5, 6]