Dask (II)
Testing different schedulers
We will test different schedulers and compare the performance on a simple task calculating the mean of a random generated array.
Here is the code using NumPy:
import dask
import time
import numpy as np
def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)
Here we run the same code using different schedulers from Dask:
n = 100000
%%timeit
rs=[calc_mean(i, n) for i in range(100)]
#352 ms ± 925 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
import dask
import time
import numpy as np
def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)
n = 100000
output = [dask.delayed(calc_mean)(i, n) for i in range(100)]
%%timeit
with dask.config.set(scheduler='threads',num_workers=1):
mt_1 = dask.compute(output)
#395 ms ± 18.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
with dask.config.set(scheduler='threads',num_workers=2):
mt_2 = dask.compute(output)
#1.28 s ± 1.46 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
with dask.config.set(scheduler='threads',num_workers=4):
mt_4 = dask.compute(output)
#1.28 s ± 3.84 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
import dask
import time
import numpy as np
def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)
n = 100000
output = [dask.delayed(calc_mean)(i, n) for i in range(100)]
%%timeit
with dask.config.set(scheduler='processes',num_workers=1):
mp_1 = dask.compute(output)
#990 ms ± 39.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
with dask.config.set(scheduler='processes',num_workers=2):
mp_2 = dask.compute(output)
#881 ms ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
with dask.config.set(scheduler='processes',num_workers=4):
mp_4 = dask.compute(output)
#836 ms ± 10.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
import dask
import time
import numpy as np
from dask.distributed import Client, LocalCluster
def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)
n = 100000
output = [dask.delayed(calc_mean)(i, n) for i in range(100)]
cluster = LocalCluster(n_workers = 1,threads_per_worker=1)
c = Client(cluster)
%timeit dis_1 = dask.compute(output,n_workers = 1)
#619 ms ± 253 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
cluster.scale(2)
%timeit dis_2 = dask.compute(output,n_workers = 2)
#357 ms ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
cluster.scale(4)
%timeit dis_4 = dask.compute(output,n_workers = 4)
#265 ms ± 53.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
c.shutdown()
Testing different schedulers
Comparing profiling from mt_1, mt_2 and mt_4: Using threads
scheduler is limited by the GIL on pure Python code.
In our case, although it is not a pure Python function, it is still limited by GIL, therefore no multi-core speedup
Comparing profiling from mt_1, mp_1 and dis_1: Except for threads
, the other two schedulers copy data between processes
and this can introduce performance penalties, particularly when the data being transferred between processes is large.
Comparing profiling from serial, mt_1, mp_1 and dis_1: Creating and destroying threads and processes have overheads,
processes
have even more overhead than threads
Comparing profiling from mp_1, mp_2 and mp_4: Running multiple processes is only effective when there is enough computational work to do i.e. CPU-bound tasks. In this very example, most of the time is actually spent on transferring the data rather than computing the mean
Comparing profiling from processes
and distributed
: Using distributed
scheduler has advantages over processes
,
this is related to better handling of data copying, i.e. processes
scheduler copies data for every task, while
distributed
scheduler copies data for each worker.
SVD with large skinny matrix using distributed
scheduler
We can use dask to compute SVD of a large matrix which does not fit into the memory of a
normal laptop/desktop. While it is computing, you should switch to the Dask dashboard and
watch column “Workers” and “Graph”, so you must run this using distributed
scheduler
import dask
import dask.array as da
X = da.random.random((2000000, 100), chunks=(10000, 100))
X
u, s, v = da.linalg.svd(X)
dask.visualize(u, s, v)
s.compute()
SVD is only supported for arrays with chunking in one dimension, which requires that the matrix is either tall-and-skinny or short-and-fat. If chunking in both dimensions is needed, one should use approximate algorithm.
import dask
import dask.array as da
X = da.random.random((10000, 10000), chunks=(2000, 2000))
u, s, v = da.linalg.svd_compressed(X, k=5)
dask.visualize(u, s, v)
s.compute()
Memory management
You may observe that there are different memory categories showing on the dashboard:
process: Overall memory used by the worker process, as measured by the OS
managed: Size of data that Dask holds in RAM, but most probably inaccurate, excluding spilled data.
unmanaged: Memory that Dask is not directly aware of, this can be e.g. Python modules, temporary arrays, memory leasks, memory not yet free()’d by the Python memory manager to the OS
unmanaged recent: Unmanaged memory that has appeared within the last 30 seconds whch is not included in the “unmanaged” memory measure
spilled: Memory spilled to disk
The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory.
When the managed memory exceeds 60% of the memory limit (target threshold), the worker will begin to dump the least recently used data to disk. Above 70% of the target memory usage based on process memory measurment (spill threshold), the worker will start dumping unused data to disk.
At 80% process memory load, currently executing tasks continue to run, but no additional tasks in the worker’s queue will be started.
At 95% process memory load (terminate threshold), all workers will be terminated. Tasks will be cancelled as well and data on the worker will be lost and need to be recomputed.