Dask (II)

Testing different schedulers

We will test different schedulers and compare the performance on a simple task calculating the mean of a random generated array.

Here is the code using NumPy:

import dask
import time
import numpy as np

def calc_mean(i, n):
    data = np.mean(np.random.normal(size = n))
    return(data)

Here we run the same code using different schedulers from Dask:

n = 100000
%%timeit
rs=[calc_mean(i, n) for i in range(100)]
#352 ms ± 925 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

SVD with large skinny matrix using distributed scheduler

We can use dask to compute SVD of a large matrix which does not fit into the memory of a normal laptop/desktop. While it is computing, you should switch to the Dask dashboard and watch column “Workers” and “Graph”, so you must run this using distributed scheduler

import dask
import dask.array as da
X = da.random.random((2000000, 100), chunks=(10000, 100))
X
u, s, v = da.linalg.svd(X)
dask.visualize(u, s, v)
s.compute()

SVD is only supported for arrays with chunking in one dimension, which requires that the matrix is either tall-and-skinny or short-and-fat. If chunking in both dimensions is needed, one should use approximate algorithm.

import dask
import dask.array as da
X = da.random.random((10000, 10000), chunks=(2000, 2000))
u, s, v = da.linalg.svd_compressed(X, k=5)
dask.visualize(u, s, v)
s.compute()

Memory management

You may observe that there are different memory categories showing on the dashboard:

  • process: Overall memory used by the worker process, as measured by the OS

  • managed: Size of data that Dask holds in RAM, but most probably inaccurate, excluding spilled data.

  • unmanaged: Memory that Dask is not directly aware of, this can be e.g. Python modules, temporary arrays, memory leasks, memory not yet free()’d by the Python memory manager to the OS

  • unmanaged recent: Unmanaged memory that has appeared within the last 30 seconds whch is not included in the “unmanaged” memory measure

  • spilled: Memory spilled to disk

The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory.

When the managed memory exceeds 60% of the memory limit (target threshold), the worker will begin to dump the least recently used data to disk. Above 70% of the target memory usage based on process memory measurment (spill threshold), the worker will start dumping unused data to disk.

At 80% process memory load, currently executing tasks continue to run, but no additional tasks in the worker’s queue will be started.

At 95% process memory load (terminate threshold), all workers will be terminated. Tasks will be cancelled as well and data on the worker will be lost and need to be recomputed.