Imperative¶
As discussed in the custom graphs section, sometimes problems don’t fit into one of the collections like dask.bag or dask.array. Instead of creating a dask directly using a dictionary, one can use the dask.imperative interface. This allows one to create graphs directly with a light annotation of normal python code.
Example¶
Rebuilding the example from custom graphs:
from dask.imperative import do, value
@do
def load(filename):
...
@do
def clean(data):
...
@do
def analyze(sequence_of_data):
...
@do
def store(result):
with open(..., 'w') as f:
f.write(result)
files = ['myfile.a.data', 'myfile.b.data', 'myfile.c.data']
loaded = [load(i) for i in files]
cleaned = [clean(i) for i in loaded]
analyzed = analyze(cleaned)
stored = store(analyzed)
stored.compute()
This builds the same graph as seen before, but using normal python syntax. In fact, the only difference between python code that would do this in serial, and the parallel version with dask is the do decorators on the functions, and the call to compute at the end.
How it works¶
The dask.imperative interface consists of two functions:
do
Wraps functions. Can be used as a decorator, or around function calls directly (i.e. do(foo)(a, b, c)). Outputs from functions wrapped in do are proxy objects of type Value that contain a graph of all operations done to get to this result.
value
Wraps objects. Used to create Value proxies directly.
Value objects can be thought of as representing a key in the dask. A Value supports most python operations, each of which creates another Value representing the result:
- Most operators (*, -, etc...)
- Item access and slicing (a[0])
- Attribute access (a.size)
- Method calls (a.index(0))
Operations that aren’t supported include:
- Mutating operators (a += 1)
- Mutating magics such as __setitem__/__setattr__ (a[0] = 1, a.foo = 1)
- Iteration. (for i in a: ...)
- Use as a predicate (if a: ...)
The last two in particular mean that Value objects cannot be used for control flow, meaning that no Value can appear in a loop or if statement. Even with this limitation, many workflows can be easily parallelized.
Example¶
Here we have a serial blocked computation for computing the mean of all positive elements in a large, on disk array.
x = h5py.File('myfile.hdf5')['/x'] # Trillion element array on disk
sums = []
counts = []
for i in range(1000000): # One million times
chunk = x[1000000*i:1000000*(i + 1)] # Pull out chunk
positive = chunk[chunk > 0] # Filter out negative elements
sums.append(positive.sum()) # Sum chunk
counts.append(positive.size) # Count chunk
result = sum(sums) / sum(counts) # Aggregate results
Below is the same code, parallelized using dask.imperative
x = value(h5py.File('myfile.hdf5')['/x']) # Trillion element array on disk
sums = []
counts = []
for i in range(1000000): # One million times
chunk = x[1000000*i:1000000*(i + 1)] # Pull out chunk
positive = chunk[chunk > 0] # Filter out negative elements
sums.append(positive.sum()) # Sum chunk
counts.append(positive.size) # Count chunk
result = do(sum)(sums) / do(sum)(counts) # Aggregate results
result.compute() # Perform the computation
Only 3 lines had to change to make this computation parallel instead of serial.
- Wrap the original array in value. This makes all the slices on it return Value objects.
- Wrap both calls to sum with do.
- Call the compute method on the result.
While the for loop above still iterates fully, it’s just building up a graph of the computation that needs to happen, without actually doing any computing.
Definitions¶
- dask.imperative.value(val, name=None)¶
Create a Value from a python object.
Parameters: val : object
Object to be wrapped.
name : string, optional
Name to be used in the resulting dask.
Examples
>>> a = value([1, 2, 3]) >>> a.compute() [1, 2, 3]
Values can act as a proxy to the underlying object. Many operators are supported:
>>> (a + [1, 2]).compute() [1, 2, 3, 1, 2] >>> a[1].compute() 2
Method and attribute access also works:
>>> a.count(2).compute() 1
Note that if a method doesn’t exist, no error will be thrown until runtime:
>>> res = a.not_a_real_method() >>> res.compute() AttributeError("'list' object has no attribute 'not_a_real_method'")
- dask.imperative.do()¶
Wraps a function so that it outputs a Value.
Examples
Can be used as a decorator:
>>> @do ... def add(a, b): ... return a + b >>> res = add(1, 2) >>> type(res) == Value True >>> res.compute() 3
For other cases, it may be cleaner to call do on a function at call time:
>>> res2 = do(sum)([res, 2, 3]) >>> res2.compute() 8
do also accepts an optional keyword pure. If False (default), then subsequent calls will always produce a different Value. This is useful for non-pure functions (such as time or random).
>>> from random import random >>> out1 = do(random)() >>> out2 = do(random)() >>> out1.key == out2.key False
If you know a function is pure (output only depends on the input, with no global state), then you can set pure=True. This will attempt to apply a consistent name to the output, but will fallback on the same behavior of pure=False if this fails.
>>> @do(pure=True) ... def add(a, b): ... return a + b >>> out1 = add(1, 2) >>> out2 = add(1, 2) >>> out1.key == out2.key True
- dask.imperative.compute(*args, **kwargs)¶
Evaluate several ``Value``s at once.
Note that the only difference between this function and dask.base.compute is that this implicitly converts python objects to ``Value``s, allowing for collections of dask objects to be computed.
Examples
>>> a = value(1) >>> b = a + 2 >>> c = a + 3 >>> compute(b, c) # Compute both simultaneously (3, 4) >>> compute(a, [b, c]) # Works for lists of Values (1, [3, 4])