前言
最近有一些临时需求需要并行一些python代码
受限于multiprocessing的进程通信限制,不能很方便的通信大矩阵…
今天偶然发现一个Dask库
api与numpy,pandas一致,但可以并行计算
真的是雪中送炭hh
Dask
接口介绍
High Level
Arrays: 并行Numpy
Bags: 并行lists
Dataframes: 并行Pandas
Machine Learning : 并行Scikit-Learn
Others from external projects, like XArray
Low Level
Delayed: 用这个装饰器可以用于封装自定义函数来搭建计算图延迟计算
Futures: real-time parallel function evaluation
计算图搭建与延迟计算
方式一
import dask
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # 延迟任务计算,f是自定义函数
else:
c = dask.delayed(g)(a, b) # 延迟任务计算
lazy_results.append(c)
results = dask.compute(*lazy_results) # 并行计算所有任务
方式二
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add(x, y):
return x + y
a = inc(1) # 放入计算图
b = inc(2) # 放入计算图
c = add(a, b) # 放入计算图
c = c.compute() # 运行计算图并返回结果
Dask中的pandas一览
>>> import dask.dataframe as dd
>>> df = dd.read_csv(‘2014-*.csv’)
>>> df.head()
x y
0 1 a
1 2 b
2 3 c
3 4 a
4 5 b
5 6 c
>>> df2 = df[df.y == ‘a’].x + 1
>>> df2.compute()
0 2
3 5
Name: x, dtype: int64
pandas中如下等各类函数皆被加速处理
dd.merge(df1, df2, on=’name’)
df.groupby(df.x).apply(myfunc)
df[df.x > 0]
df.x + df.y
等等