Параллельные вычисления в Python с Dask

Структуры данных в NumPy и Pandas основополагающие, но они ограничены размером ОЗУ и одним ядром процессора. Dask позволяет производить параллельные вычисления на данных, размер которых превышает доступный объем памяти, на нескольких ядрах или нескольких машинах. Можно даже сконфигурировать Dask для использования ресурсов тысячи машин — каждой с несколькими ядрами.

Интерфейс Dask имитирует известные библиотеки:

  • dask.array = numpy + threading
  • dask.dataframe = pandas + threading
  • dask.bag = map, filter, … + multiprocessing

Какие задачи эффективно распараллеливать? На примере Pandas:

  • арифметические операции (умножение или сложение, примененные к объекту Series);
  • агрегирование (среднее, минимум, максимум, сумма и т. д.);
  • вызов apply() (если производится по индексу);
  • вызов value_counts(), drop_duplicates() или corr();
  • фильтрация с помощью loc, isin и построчная выборка.

Пример вычисления коэффициента асимметрии:

import numpy
import dask
from dask import array as darray

arr = dask.from_array(numpy.array(my_data), chunks=(1000,))
mean = darray.mean()
stddev = darray.std(arr)
unnormalized_moment = darry.mean(arr * arr * arr)

skewness = ((unnormalized_moment - (3 * mean * stddev ** 2) - mean ** 3) /
            stddev ** 3)

Каждая операция будет использовать столько ядер, сколько понадобится. Будут задействованы все ядра, даже если элементов будет миллиарды.

Пример подсчета количества палиндромов:

import dask

def is_palindrome(s):
    return s == s[::-1]

palindromes = [dask.delayed(is_palindrome)(s) for s in string_list]
total = dask.delayed(sum)(palindromes)
result = total.compute()

С dask.delayed каждый вызов функции ставится в очередь, добавляется в график выполнения и планируется. Это похоже на Luigi, Airflow, Celery или Makefiles.

Всякий раз, когда понадобится быстро распараллелить задачи в Python, можно обратиться к Dask.