API

Top level user functions:

Bag(dsk, name, npartitions) Parallel collection of Python objects
Bag.all((iterable) -> bool) Return True if bool(x) is True for all values x in the iterable.
Bag.any((iterable) -> bool) Return True if bool(x) is True for any x in the iterable.
Bag.compute(**kwargs)
Bag.concat() Concatenate nested lists into one long list
Bag.count([split_every]) Count the number of elements
Bag.distinct() Distinct elements of collection
Bag.filter(predicate) Filter elements in collection by a predicate function
Bag.fold(binop[, combine, initial, split_every]) Parallelizable reduction
Bag.foldby(key, binop[, initial, combine, ...]) Combined reduction and groupby
Bag.frequencies([split_every]) Count number of occurrences of each distinct element
Bag.groupby(grouper[, npartitions, blocksize]) Group collection by key function
Bag.join(other, on_self[, on_other]) Join collection with another collection
Bag.map(func, **kwargs) Map a function across all elements in collection
Bag.map_partitions(func, **kwargs) Apply function to every partition within collection
Bag.max((iterable[[, key]) max(a, b, c, ...[, key=func]) -> value
Bag.mean() Arithmetic mean
Bag.min((iterable[[, key]) min(a, b, c, ...[, key=func]) -> value
Bag.pluck(key[, default]) Select item from all tuples/dicts in collection
Bag.product(other) Cartesian product between two bags
Bag.reduction(perpartition, aggregate[, ...]) Reduce collection with reduction operators
Bag.remove(predicate) Remove elements in collection that match predicate
Bag.repartition(npartitions) Coalesce bag into fewer partitions
Bag.std([ddof]) Standard deviation
Bag.sum((sequence[, start]) -> value) Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0).
Bag.take(k[, compute]) Take the first k elements
Bag.to_dataframe([columns]) Convert Bag to dask.dataframe
Bag.to_delayed() Convert bag to dask Values
Bag.to_textfiles(path[, name_function, ...]) Write bag to disk, one filename per partition, one line per element
Bag.topk(k[, key, split_every]) K largest elements in collection
Bag.var([ddof]) Variance
Bag.visualize([filename, format, optimize_graph])

Create Bags

from_sequence(seq[, partition_size, npartitions]) Create dask from Python sequence
from_delayed(values) Create bag from many dask.delayed objects
read_text(path[, blocksize, compression, ...]) Read lines from text files
from_castra(x[, columns, index]) Load a dask Bag from a Castra.
from_url(urls) Create a dask.bag from a url
range(n, npartitions) Numbers from zero to n
concat(bags) Concatenate many bags together, unioning all elements
zip(*bags) Partition-wise bag zip

Turn Bags into other things

Bag.to_textfiles(path[, name_function, ...]) Write bag to disk, one filename per partition, one line per element
Bag.to_dataframe([columns]) Convert Bag to dask.dataframe

Bag methods

class dask.bag.Bag(dsk, name, npartitions)

Parallel collection of Python objects

Examples

Create Bag from sequence

>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.filter(lambda x: x % 2 == 0).map(lambda x: x * 10))  
[0, 20, 40]

Create Bag from filename or globstring of filenames

>>> b = db.read_text('/path/to/mydata.*.json.gz').map(json.loads)  

Create manually (expert use)

>>> dsk = {('x', 0): (range, 5),
...        ('x', 1): (range, 5),
...        ('x', 2): (range, 5)}
>>> b = Bag(dsk, 'x', npartitions=3)
>>> sorted(b.map(lambda x: x * 10))  
[0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40]
>>> int(b.fold(lambda x, y: x + y))  
30
all(iterable) → bool

Return True if bool(x) is True for all values x in the iterable. If the iterable is empty, return True.

any(iterable) → bool

Return True if bool(x) is True for any x in the iterable. If the iterable is empty, return False.

concat()

Concatenate nested lists into one long list

>>> b = from_sequence([[1], [2, 3]])
>>> list(b)
[[1], [2, 3]]
>>> list(b.concat())
[1, 2, 3]
count(split_every=None)

Count the number of elements

distinct()

Distinct elements of collection

Unordered without repeats.

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> sorted(b.distinct())
['Alice', 'Bob']
filter(predicate)

Filter elements in collection by a predicate function

>>> def iseven(x):
...     return x % 2 == 0
>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.filter(iseven))  
[0, 2, 4]
fold(binop, combine=None, initial='__no__default__', split_every=None)

Parallelizable reduction

Fold is like the builtin function reduce except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitions

  1. binop: Binary operator to reduce within each partition
  2. combine: Binary operator to combine results from binop

Sequentially this would look like the following:

>>> intermediates = [reduce(binop, part) for part in partitions]  
>>> final = reduce(combine, intermediates)  

If only one function is given then it is used for both functions binop and combine as in the following example to compute the sum:

>>> def add(x, y):
...     return x + y
>>> b = from_sequence(range(5))
>>> b.fold(add).compute()  
10

In full form we provide both binary operators as well as their default arguments

>>> b.fold(binop=add, combine=add, initial=0).compute()  
10

More complex binary operators are also doable

>>> def add_to_set(acc, x):
...     ''' Add new element x to set acc '''
...     return acc | set([x])
>>> b.fold(add_to_set, set.union, initial=set()).compute()  
{1, 2, 3, 4, 5}

See also

Bag.foldby

foldby(key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__')

Combined reduction and groupby

Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.

The computation

>>> b.foldby(key, binop, init)                        

is equivalent to the following:

>>> def reduction(group):                               
...     return reduce(binop, group, init)               
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))

But uses minimal communication and so is much faster.

>>> b = from_sequence(range(10))
>>> iseven = lambda x: x % 2 == 0
>>> add = lambda x, y: x + y
>>> dict(b.foldby(iseven, add))                         
{True: 20, False: 25}

Key Function

The key function determines how to group the elements in your bag. In the common case where your bag holds dictionaries then the key function often gets out one of those elements.

>>> def key(x):
...     return x['name']

This case is so common that it is special cased, and if you provide a key that is not a callable function then dask.bag will turn it into one automatically. The following are equivalent:

>>> b.foldby(lambda x: x['name'], ...)  
>>> b.foldby('name', ...)  

Binops

It can be tricky to construct the right binary operators to perform analytic queries. The foldby method accepts two binary operators, binop and combine. Binary operators two inputs and output must have the same type.

Binop takes a running total and a new element and produces a new total:

>>> def binop(total, x):
...     return total + x['amount']

Combine takes two totals and combines them:

>>> def combine(total1, total2):
...     return total1 + total2

Each of these binary operators may have a default first value for total, before any other value is seen. For addition binary operators like above this is often 0 or the identity element for your operation.

>>> b.foldby('name', binop, 0, combine, 0)  

See also

toolz.reduceby, pyspark.combineByKey

frequencies(split_every=None)

Count number of occurrences of each distinct element

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> dict(b.frequencies())  
{'Alice': 2, 'Bob', 1}
groupby(grouper, npartitions=None, blocksize=1048576)

Group collection by key function

Note that this requires full dataset read, serialization and shuffle. This is expensive. If possible you should use foldby.

>>> b = from_sequence(range(10))
>>> dict(b.groupby(lambda x: x % 2 == 0))  
{True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}

See also

Bag.foldby

join(other, on_self, on_other=None)

Join collection with another collection

Other collection must be an Iterable, and not a Bag.

>>> people = from_sequence(['Alice', 'Bob', 'Charlie'])
>>> fruit = ['Apple', 'Apricot', 'Banana']
>>> list(people.join(fruit, lambda x: x[0]))  
[('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
map(func, **kwargs)

Map a function across all elements in collection

>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.map(lambda x: x * 10))  
[0, 10, 20, 30, 40]

Keyword arguments are passed through to func. These can be either dask.bag.Item, or normal python objects.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(1, 101), npartitions=10)
>>> def div(num, den=1):
...     return num / den

Using a python object:

>>> hi = b.max().compute()
>>> hi
100
>>> b.map(div, den=hi).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Using an Item:

>>> b.map(div, den=b.max()).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Note that while both versions give the same output, the second forms a single graph, and then computes everything at once, and in some cases may be more efficient.

map_partitions(func, **kwargs)

Apply function to every partition within collection

Note that this requires you to understand how dask.bag partitions your data and so is somewhat internal.

>>> b.map_partitions(myfunc)  

Keyword arguments are passed through to func. These can be either dask.bag.Item, or normal python objects.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(1, 101), npartitions=10)
>>> def div(nums, den=1):
...     return [num / den for num in nums]

Using a python object:

>>> hi = b.max().compute()
>>> hi
100
>>> b.map_partitions(div, den=hi).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Using an Item:

>>> b.map_partitions(div, den=b.max()).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Note that while both versions give the same output, the second forms a single graph, and then computes everything at once, and in some cases may be more efficient.

max(iterable[, key=func]) → value

max(a, b, c, ...[, key=func]) -> value

With a single iterable argument, return its largest item. With two or more arguments, return the largest argument.

mean()

Arithmetic mean

min(iterable[, key=func]) → value

min(a, b, c, ...[, key=func]) -> value

With a single iterable argument, return its smallest item. With two or more arguments, return the smallest argument.

pluck(key, default='__no__default__')

Select item from all tuples/dicts in collection

>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]},
...                    {'name': 'Bob',   'credits': [10, 20]}])
>>> list(b.pluck('name'))  
['Alice', 'Bob']
>>> list(b.pluck('credits').pluck(0))  
[1, 10]
product(other)

Cartesian product between two bags

reduction(perpartition, aggregate, split_every=None, out_type=<class 'dask.bag.core.Item'>, name=None)

Reduce collection with reduction operators

Parameters:

perpartition: function

reduction to apply to each partition

aggregate: function

reduction to apply to the results of all partitions

split_every: int (optional)

Group partitions into groups of this size while performing reduction Defaults to 8

out_type: {Bag, Item}

The out type of the result, Item if a single element, Bag if a list of elements. Defaults to Item.

Examples

>>> b = from_sequence(range(10))
>>> b.reduction(sum, sum).compute()
45
remove(predicate)

Remove elements in collection that match predicate

>>> def iseven(x):
...     return x % 2 == 0
>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.remove(iseven))  
[1, 3]
repartition(npartitions)

Coalesce bag into fewer partitions

Examples

>>> b.repartition(5)  # set to have 5 partitions  
std(ddof=0)

Standard deviation

sum(sequence[, start]) → value

Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0). When the sequence is empty, return start.

take(k, compute=True)

Take the first k elements

Evaluates by default, use compute=False to avoid computation. Only takes from the first partition

>>> b = from_sequence(range(10))
>>> b.take(3)  
(0, 1, 2)
to_dataframe(columns=None)

Convert Bag to dask.dataframe

Bag should contain tuple or dict records.

Provide columns= keyword arg to specify column names.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
   balance     name
0      100    Alice
1      200      Bob
0      300  Charlie
to_delayed()

Convert bag to dask Values

Returns list of values, one value per partition.

to_textfiles(path, name_function=<type 'str'>, compression='infer', encoding='utf-8', compute=True)

Write bag to disk, one filename per partition, one line per element

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * will be replaced by the increasing sequence 1, 2, ...

/path/to/data/0.json.gz
/path/to/data/1.json.gz

Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

topk(k, key=None, split_every=None)

K largest elements in collection

Optionally ordered by some key function

>>> b = from_sequence([10, 3, 5, 7, 11, 4])
>>> list(b.topk(2))  
[11, 10]
>>> list(b.topk(2, lambda x: -x))  
[3, 4]
unzip(n)

Transform a bag of tuples to n bags of their elements.

Examples

>>> b = from_sequence([(i, i + 1, i + 2) for i in range(10)])
>>> first, second, third = b.unzip(3)
>>> isinstance(first, Bag)
True
>>> first.compute()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Note that this is equivalent to:

>>> first, second, third = (b.pluck(i) for i in range(3))
var(ddof=0)

Variance

Other functions

dask.bag.from_sequence(seq, partition_size=None, npartitions=None)

Create dask from Python sequence

This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself. Commonly we load a sequence of filenames into a Bag and then use .map to open them.

Parameters:

seq: Iterable

A sequence of elements to put into the dask

partition_size: int (optional)

The length of each partition

npartitions: int (optional)

The number of desired partitions

It is best to provide either ``partition_size`` or ``npartitions``

(though not both.)

See also

read_text
Create bag from textfiles

Examples

>>> b = from_sequence(['Alice', 'Bob', 'Chuck'], partition_size=2)
dask.bag.from_delayed(values)

Create bag from many dask.delayed objects

Parameters:

values: list of Values

An iterable of dask.delayed.Value objects, such as come from dask.do These comprise the individual partitions of the resulting bag

Returns:

Bag

Examples

>>> b = from_delayed([x, y, z])  
dask.bag.read_text(path, blocksize=None, compression='infer', encoding='utf-8', errors='strict', linedelimiter='\n', collection=True, **kwargs)

Read lines from text files

Parameters:

path: string or list

Path to data. Can include '*' or protocol like 's3://' Can also be a list of filenames

blocksize: None or int

Size to cut up larger files. Streams by default.

compression: string

Compression format like ‘gzip’ or ‘xz’. Defaults to ‘infer’

encoding: string

errors: string

linedelimiter: string

collection: bool, optional

Return dask.bag if True, or list of delayed values if false

**kwargs: dict

Extra parameters to hand to backend storage system. Often used for authentication when using remote storage like S3 or HDFS

Returns:

dask.bag.Bag if collection is True or list of Delayed lists otherwise

See also

from_sequence
Build bag from Python sequence

Examples

>>> b = read_text('myfiles.1.txt')  
>>> b = read_text('myfiles.*.txt')  
>>> b = read_text('myfiles.*.txt.gz')  
>>> b = read_text('s3://bucket/myfiles.*.txt')  

Parallelize a large file by providing the number of uncompressed bytes to load into each partition.

>>> b = read_text('largefile.txt', blocksize=1e7)  
dask.bag.from_castra(x, columns=None, index=False)

Load a dask Bag from a Castra.

Parameters:

x : filename or Castra

columns: list or string, optional

The columns to load. Default is all columns.

index: bool, optional

If True, the index is included as the first element in each tuple. Default is False.

dask.bag.from_url(urls)

Create a dask.bag from a url

>>> a = from_url('http://raw.githubusercontent.com/dask/dask/master/README.rst')  
>>> a.npartitions  
1
>>> a.take(8)  
('Dask\n',
 '====\n',
 '\n',
 '|Build Status| |Coverage| |Doc Status| |Gitter|\n',
 '\n',
 'Dask provides multi-core execution on larger-than-memory datasets using blocked\n',
 'algorithms and task scheduling.  It maps high-level NumPy and list operations\n',
 'on large datasets on to graphs of many operations on small in-memory datasets.\n')
>>> b = from_url(['http://github.com', 'http://google.com'])  
>>> b.npartitions  
2
dask.bag.range(n, npartitions)

Numbers from zero to n

Examples

>>> import dask.bag as db
>>> b = db.range(5, npartitions=2)
>>> list(b)
[0, 1, 2, 3, 4]
dask.bag.concat(bags)

Concatenate many bags together, unioning all elements

>>> import dask.bag as db
>>> a = db.from_sequence([1, 2, 3])
>>> b = db.from_sequence([4, 5, 6])
>>> c = db.concat([a, b])
>>> list(c)
[1, 2, 3, 4, 5, 6]
dask.bag.zip(*bags)

Partition-wise bag zip

All passed bags must have the same number of partitions.

NOTE: corresponding partitions should have the same length; if they do not, the “extra” elements from the longer partition(s) will be dropped. If you have this case chances are that what you really need is a data alignment mechanism like pandas’s, and not a missing value filler like zip_longest.

Examples

Correct usage:

>>> import dask.bag as db
>>> evens = db.from_sequence(range(0, 10, 2), partition_size=4)
>>> odds = db.from_sequence(range(1, 10, 2), partition_size=4)
>>> pairs = db.zip(evens, odds)
>>> list(pairs)
[(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]

Incorrect usage:

>>> numbers = db.range(20) 
>>> fizz = numbers.filter(lambda n: n % 3 == 0) 
>>> buzz = numbers.filter(lambda n: n % 5 == 0) 
>>> fizzbuzz = db.zip(fizz, buzz) 
>>> list(fizzbuzzz) 
[(0, 0), (3, 5), (6, 10), (9, 15), (12, 20), (15, 25), (18, 30)]

When what you really wanted was more along the lines of: >>> list(fizzbuzzz) # doctest: +SKIP [(0, 0), (3, None), (None, 5), (6, None), (None 10), (9, None), (12, None), (15, 15), (18, None), (None, 20), (None, 25), (None, 30)]