并行数据处理库Dask介绍

前言

最近有一些临时需求需要并行一些python代码
受限于multiprocessing的进程通信限制,不能很方便的通信大矩阵…

今天偶然发现一个Dask库
api与numpy,pandas一致,但可以并行计算
真的是雪中送炭hh

Dask

接口介绍

High Level

Arrays: 并行Numpy
Bags: 并行lists
Dataframes: 并行Pandas
Machine Learning : 并行Scikit-Learn
Others from external projects, like XArray

Low Level

Delayed: 用这个装饰器可以用于封装自定义函数来搭建计算图延迟计算
Futures: real-time parallel function evaluation

计算图搭建与延迟计算

方式一

import dask

lazy_results = []
for a in A:
  for b in B:
    if a < b:
      c = dask.delayed(f)(a, b) # 延迟任务计算,f是自定义函数
    else:
      c = dask.delayed(g)(a, b) # 延迟任务计算
    lazy_results.append(c)

results = dask.compute(*lazy_results) # 并行计算所有任务

方式二

@dask.delayed
def inc(x):
  return x + 1

@dask.delayed
def add(x, y):
  return x + y

a = inc(1) # 放入计算图
b = inc(2) # 放入计算图
c = add(a, b) # 放入计算图

c = c.compute() # 运行计算图并返回结果

Dask中的pandas一览

>>> import dask.dataframe as dd
>>> df = dd.read_csv(‘2014-*.csv’)
>>> df.head()
   x y
 0 1 a
 1 2 b
 2 3 c
 3 4 a
 4 5 b
 5 6 c

>>> df2 = df[df.y == ‘a’].x + 1

>>> df2.compute()
0 2
3 5
Name: x, dtype: int64

pandas中如下等各类函数皆被加速处理

dd.merge(df1, df2, on=’name’)
df.groupby(df.x).apply(myfunc)
df[df.x > 0]
df.x + df.y
等等