Parallelize
Objectives
Learn what approaches exist to parallelize applications
Showcase Dask
Instructor note
5 min teaching/demo
Different ways to do parallelization
Data parallelization:
split an array / data into chunks
do computation in separate processes / threads
combine it
Libraries:
mpi4py
,dask
,cython
,numba
,cupy
,pythran
Task parallelization:
construct a task or a function
feed different sets input data
collect the results in a queue and write it
Libraries:
Standard library:
multiprocessing
,threading
,queue
,asyncio
,concurrent.futures
Third party:
trio
,dask
,ray
Irrespective of the approach, one common consideration is that a parallelized part of the code should be free from race-conditions.
In this episode we will showcase parallelizing using Dask.
Dask
Dask is composed of two parts:
Dynamic task scheduling optimized for computation. Similar to other workflow management systems, but optimized for interactive computational workloads.
“Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.
Dask distributed and dashboard
Dask has a plugin package known as distributed which brings in the capability to tap into a variety of computing setups: ranging from local machines to HPC/Supercomputers and Kubernetes clusters. It also has an integrated web application called dashboard to monitor the application.
Dask Bag
A Dask bag enables processing data that can be represented as a sequence of arbitrary inputs (“messy data”), like in a Python list. Dask Bags are often used to for preprocessing log files, JSON records, or other user defined Python objects.
We will content ourselves with implementing a dask version of the word-count problem, specifically the step where we count words in a text.
Demo: Dask version of word-count
First navigate to the word-count-hpda
directory. The serial version (wrapped in
multiple functions in the source/wordcount.py
code) looks like this:
filename = './data/concat.txt'
DELIMITERS = ". , ; : ? $ @ ^ < > # % ` ! * - = ( ) [ ] { } / \" '".split()
with open(filename, "r") as input_fd:
lines = input_fd.read().splitlines()
counts = {}
for line in lines:
for purge in DELIMITERS:
line = line.replace(purge, " ")
words = line.split()
for word in words:
word = word.lower().strip()
if word in counts:
counts[word] += 1
else:
counts[word] = 1
sorted_counts = sorted(
list(counts.items()),
key=lambda key_value: key_value[1],
reverse=True
)
sorted_counts[:10]
A very compact dask.bag
version of this code is as follows:
import dask.bag as db
filename = './data/concat.txt'
DELIMITERS = ". , ; : ? $ @ ^ < > # % ` ! * - = ( ) [ ] { } / \" '".split()
text = db.read_text(filename, blocksize='1MiB')
sorted_counts = (
text
.filter(lambda word: word not in DELIMITERS)
.str.lower()
.str.strip()
.str.split()
.flatten()
.frequencies().topk(10,key=1)
.compute()
)
sorted_counts
The last two steps of the pipeline could also have been done with a Dask dataframe (which is the Dask equivalent of a Pandas dataframe):
text = db.read_text(filename, blocksize='1MiB')
filtered = (
text
.filter(lambda word: word not in DELIMITERS)
.str.lower()
.str.strip()
.str.split()
.flatten()
)
ddf = filtered.to_dataframe(columns=['words'])
ddf['words'].value_counts().compute()[:10]
Dashboard
Try adding the following snippet and visualize the run in a dashboard
from dask.distributed import Client
client = Client() # start distributed scheduler locally.
client
When to use Dask
There is no benefit from using Dask on small datasets. But imagine we were analysing a very large text file (all tweets in a year? a genome?). Dask provides both parallelisation and the ability to utilize RAM on multiple machines.