用 Mars Remote API 轻松分布式执行 Python 函数

简介:Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数u n Z t 0 3 Q ( *主要利用 M% d 7 y * V x V ;ars RemotT g 2 4 W = Y ] |ez & K O * 2 API

Mars 是一个并行和分布式 PythonG I ? Q | 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核R c * ? L或者多机加速。这d q = - Z 9其中,并行和分布式 Python 函数主要利用 Mars Remote API

启动 Mars 分布式环境可以参考:

  1. 命令行方式在集群中部署。
  2. Kubernetes 中部署。- ~ m 5 ^
  3. MaxCompute 开箱即用的环境,购买了 MaxComputeL r 1 8 1 7 服务的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。

拿用蒙特卡洛方法计算 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 值。

from typing impo a = Q F : ( * irt List

import numpy as np

def calc_chunk(n: int, i: int):

# 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数

rs = np.random.Randox $ # i & Q k i wmState(i)

a = rs.uniJ e o 5 kform(-1, 1, size=(n, 2))

d = np.linalg.norm(a, axis=1)

return (d < 1).sum()

def calc_pi(fs: List[int], N: int):

# 将若干次 calc_chunk 计算的结N P H B h +果汇总,计算 pi 的值

returh kn sum(fs) * 4 / N

N = 200_000_000

n = 10_000_000

fs = [~ & A V D 8 Dcalc_chunk(n, i)

for i in range(N // n)]

pi = calc_pi(fs, N)

print(pi)

%%time 下可以看到结果:

3.1416312

CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s

Wa1 ^ , = 6ll time: 12.3 s

) ? E J t单机需要 12.3 s。

J . X r H Q J让这个计算使用 Mars Remote API 并行起, 7 M 6 *来,我们不需要C h L Z函数做任何改动,需要变动的仅仅是最后部分。

im[ u G t .port mars.remote as mr

# 函数调用改成 mars.remote.spawn

fs = [mr.spawn(calc_chunk, args=(n,o $ A T b i))

for i in range(N // n)]

# 把 spawn 的列表传入作为参数,再 spawn 新的函数

pi = mr.spawn(calc_pi, args=(fs, N))

# 通过 execute() 触发执行,fetch() 获取结果

print(pi.execute().fetch())

%%time 下看到结果:

3.1416312

CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms

Wall time: 2.85 s

结果一模一样H o Q,但是却有数倍的性能1 O 1 [ 提升。

可以看到,对已t Y ^ r Y有的 Python 代码,Mars remotev { F 2 D U E J API 几乎不需要做多少改动,就能a q J # ~ x # |有效并行和分布式来加速执行过程。

一个例子

为了让读者理解 Mars RemL - Y K 5 G ? 1 yote API 的作用,我们U f ! 0 E Q从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和D M ( I ! ] H | n库可以选择,这里我们用 RandomForest、LogisticRegressio A 7 hn,以X : ^ 5 T p及 XGBoost。

困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道M B O [ K K e n。所以,我Z X B l们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。

准备数据

这个例子里我们使用 otto 数据集。

首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。

iG e # Ymport pandas as pd

from sklearn.preprocessiB k u I @ng import LabelEncoder

from sklearn.model_selection4 w B C N w g import} m c : train_test_split

def gen_data():

df = pd.read_csv('otto/train.csv')

X = df.drop(['target', 'id'], axis=1)

y = df['W ) o + = 7 Ctarget']

label_encoder = LabelEncoder()

label_encoder.fit(y)

y = label_encoder.transfoL # A E Vrm(y)

return train_test& @ W 9_split(X, y, test_size=0.33, random_state=123)

X_train, X_test, y_train, y_test = gen_dataT ! C . f = s()

模型

接着,我们使用 scikit-learn 的o 3 D RandomFJ c 0 l X 4 7orest 和 LogisticRegression 来处理f b e E l分类。

RandomFU p | T [ (or8 q ` ( best:

from sklearn.ensemble import RandomForestClassifier

def random_ + r p ] ` q 7forest(X_train: pd.DataFrame,

y_train: pd.Series,

verbose: bool = False,

**kw):

model = RandomForestClassifier(verbose=verbose, **kw)

model.fit(X_train, y_train)

retuS $ $rn model

接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。

def gen_random_forest_parameters():

for n_estimators in [50, 100, 600]:

for max_depth in [None, 3, 15]:

for crite_ / (rio. M _ b Tn in ['gini', 'entrl a / n C N d & 3opy']:

yield {

'n_estimators': n_estimatm a U j H N Dors,

'max_depth': max_depth,

'criterion': criterion

}

LogisticRegression 也是这个过程。我们先定义模型。

from sklearK X , e M t P w nn.lineara r W : j { s_model import Logistic: Q e 1 C R M N YRegression

def logistic_regression(X_train: pd.DataFrame,

y_train: pd.Series,

verbose: bool = False,

**kb A w | W i # rw):

modelI 1 7 a = LogisticRegression(P x 3verbose=verbose, **kw)

modeO * Xl.fit(X_train, y_train)

return model

接着生成供 LogisticRegression 使用的超参。

def gen_lr_parameters():

for penalty in ['l2', 'none']:

for tol in [0.1, 0.01, 1e-] $ H % N N @4]:

yield {r @ = D

'penalty':j E ( ~ Z X penalty,

'tol': tol

}

XGBoost 也是一样,我们用 XGBCf M C ^ { ` Z Vlassifier 来执行分类任务。

from xgboost importn 8 p N XGBClassifier

def xgb(X_train: pd.DataFrame,

y_train: pd.l ( E CSeries,

verbose: bool = False,

**kw):

model = XGBClassifieE ` / S ir(verbosity=int(verbose), **kw)

moder 4 g ql.fit(H 8 } GX_train,j e | X 6 y_train)

return model

生成一系列超参。

def gen_xgb_parameters():

for n_estimators in [100, 600]:

for criterion in ['gini', 'entropy']:

for learning_rat8 # 5 7 We in [0.001, 0.1, 0.5]:

yield {

'n_estimators': n_estimators,

'criG 9 i P - 7 ~ 2terion': criterion,

'learning_rate': learning_rate

}

验证

接着我3 D P S : ^ E { /们编写验证逻辑,这里我们使用h S 4 T + I | N O log_loss 来作为评价函数。

from sklearn.metrics import log_loss

def metric_model(model,

X_test: pd.DataFrame,

y_test: pd.Series) -> float:

if isinstance(model, bytes):

model = pickle.loads(model)

y_pred = model.predict_proba(X_test)

return log_loss(y_test, y_pl 0 r ( ired)i t | o

def train_and_metric(train_func,

train_params: dict,

X_train: pd.DataFrame,

y_train: pd.Series,

X_test: pd.DataN O B @ HFrame,

y_test: pd.Serie4 D w ` / g es,

ve` ^rbose: bool = False

)5 & J l # W n 1:

# 把训练和验证封装到一起

model = train_func(X_train, y_trQ 1 & gain, verboseL Y W - ) | A X o=verbose, **train_par+ $ . 5 z C m y ,ams)

metric = metric_model(model, X_test, y_test)

return model, metric

找出最好的模型

做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。

results = []

# -------------

# Random Forest

# -------------

for params in gen_random_forest_parameters% X R 8 c A():

print(f'calculating on {params}')

# fixed random_state

params['random_state'] = 123

# use all CPU cores

params['n_jobs'] = -1

model, metric = train_and_metric(random_forest, params,

X_train, y_train,

X_test, y_test)

print(f'metric: {metric}')

results.appenf 6 r X U T 9 _ {d({'model': model,

'metric': metric})

# -------------------

# Logistic Regression

# -------------------

for pau Z w d : : g xrams in gen_lr_parameters():

print(G 4 C Gfr a F B ` h !'calculating on {p@ O a # t k 9 }arams}')

# fixed random_state

params['random_state'] = 123

# use all CPU cores

params['n_jobs'] = -1

moj i h s 6 6 7 zdel, metric = train_and_metric(logistic_regression, params# 3 P U *,

X_train, y_train,

X_test, y_test)

print(f'metric: {metric}')

results.append({'model': model,

'metric': metric})

# -------

# XGBoost

# -------

for params in gen_xgb_parameters():

print(f'calculating on {params}')

# fixed random_state

parai X | Q T ? Nms['random_state'] = 123

# use$ k ! b N - E ? H all CPU cores

params['n_jobs'] = -1

model, metric = train_and_metric(xgb, params,

X_train, y_train,

X= ` r_test, y_test)

print(f'metric: {metric}')

results.append({'mo7 i V b 4 I t xdel': model,

'metric': metric})

运行一下,需要相当长K 2 F时间,我们省略掉一部分输出内容。

calculating on {C a 0 r , G | Q L'n_estimators': 50, 'max_depI H th': None, 'criterion` 5 @ H j': 'gini'}

metric: 0.6964123781828575

calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'}

metC k Lr# l w W / Sic: 0.6912312790832288

# 省略其他模型的输出结果

CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s

WaY { k L A ] 3 = ell time: 31min 44sC ? u

从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性0 5 . M能。但整个过程还是花费了 31 分钟。

使用 Remote API 分布式加( P (

现在我们尝试使用 Remote API 通过分布式方式s I ~ - = g s e加速整个过程。

集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。

n_coreR r 8s = 8

mem = 2 * n_cores # 16G

# o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G

cluster = o.create_mars_cluster(Z f %10, n_cores, mem, image='extended')

为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到, o g MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。

ifl r u Q # not o.exist_resource('otto_train.cP O $ : $sv1 { K A'):

with open('otto/train.csv') as f:

# 上传资源

o.create_resource('otto_train.csv', 'file', fileobj=f)

def gen_data():

# 改成从资源读取

df = pd.read_csv(o.N & 1 = Bopen_resource('otto_train.c& R J - . a Ksv'))

X = df.drop(['target', 'id'], axis=1)

y( * - + M * = df['target']

label_encoder = LabelEnW i 2 N { % mcoder()

label_encoder.fit(y)

y = labeT x 6 Pl_encoder.transform(y)

return train_test_split(X, y, t T ] Y | [ C o |est_size=0.33, random_state=123)Q { { Q # s x Q

稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上V ( k R a 7 3 V运行。

import mars.remote as mr

# n_output 说明是 4 输出

# execute() 执行后,数据会读取到 Mars 集群内部

data = mr.ExecutableTuple(mr.spawn(gen& + $ | q d_data,+ l . x n_output=4)).execute()

# remote_ 开头的都是 M? C ] : ? ^ ` mars 对象,这时候数据在集群内,这些对象只是引用

remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,J ] $ @ 2 |还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。

def d} % [ x V N Kistributed_train_and_metric(train_func,

train_params: dict,

X_1 ] / } Y 9 #train: pd.DataFrame,

y_train: pd.Series,

X_test: pd.DataFr( v ? Q T 8ame,

y_test: pd.Series,

verbose: bool = False} M . E F

):

model, metric = train_and_metric(train_func, train_params,

X_train, y_train,

X_test, yh C * / ! 1 u_test, verbose=verbosG | n 7 2 + 5 c ge)

ret/ ! l 9 9 ~ ru( 0 ? B Q ] i hrn pickle.dumps(model), metric

后续 Mars 支持了序列化模型后可以直w , f P接 spawn 原本的函数。

接着C D & X ` % Y $ k我们就对前面的执行过程稍作改动,把函数调用全部都N u } 9 V l s用 mars~ D & J P _ z.remote.spawn 来改写。

import numpy as np

tasks = []

models = []

metrics = []

# -------------

# RK - % D l &andom Fj F . Y Vorest

# -------------Z C g $ M $ o s i

for params in gen_random_forest_parameters():

# fixed random_state

params['random_state'] = 123

task = mr.spawn(distributed_O . x - s q v )trai: R cn_and_metric,

args=(random_0 ~ S g G % #forest, params,

remote_X_train, remote_y_train,

rL 2 ` f # v J * 7emote_X_test, remote_y_test),

kwargs={'verbose': 2},

n_output=2

tasks.extend(task)

# 把模型和评价分别存储

models.append(i k : c [ A 4task[0])

meO c rtri. e qcs.append(task[1])

# -------------------

# Logistic Regression

# -----q [ G-------D I $ 4 m ] = O-------

for params in gen_lr_parameters():

# fixed random_state

params['random_state'] = 123

t{ 5 v m W Pas0 ; N v Ik = mr.spawn(distributed_train_and_5 S a ( ^ - j /metric,

args=(logistic_regression, pay D 8 Z ` 7 { vrams,

remote_X_train, remote_? . Y Z m % Wy_train,

remote_X_test,L , x ( remote_y_test),

kwargs={'verbose': 2},

n_output=2

tasks.extend(task)

# 把模k { L w p h ,型和评价分别存储

models.append(task[0])

metrics.appey D &nd(task[1])

#v x V -------

# XZ ) h G 0 e tGBoost

# -------{ h A r @ 2 4 5

for params in gen_xgbC J ) V k g_parameters():

# fixed random_state

params['random_state'] = 123

# 再指定并发为核的个数

params['n_jobs'] = n_cores

task = mr.spm ^ e ] % : w i Rawn(distributed_train_and_metric,

args=(xgb, pa{ ! # O @ 4rams,

remote_X_train, remote_y_train,

remote_X_test, remote_y_test),

kwargs={'verbose': 2},

n_output=2

tasks.extend(task)

# 把模型和评价分别存储

models.append(task[0])

metrics.append(task[1])

# 把顺序打乱,目的是能分散到 worker 上平均一点

shuffled_tasksA v 3 C d t = np.random.permutation(tasks)

_ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代码几乎一致。

运行查看结果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms

Wall[ [ a ~ X y t& R O ? 4 G 1 Time: 1min 59s

时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。

细心的读者* + h K可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的0 3 Y L 7 G Q内容只会输出到 worker 的标准输出流,我们在客户端O a z不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。

以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。q 5 | e D I ] | F

print(models[0].f& P O / U O $etch_log())

输出我们简] e I # R G t f略一部分。

building tree 1 of 50

building tree 2 of 50

building tree 3 of 50

building tree 4 of 50

building tree 5% | H j S # of 50

building tree 6 of 50

# 中间省略

building tree 4N C m *9 of 50

buiW h V 6 $lding tree 50 of 50

要看y l = 8 @ % T哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的: ( =输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 wY k ) * #orker,b F C H _ 0因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日 P a u e , ~志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不F C : 8 V L R用担心日志混合在一起。

想要了解 fetch_log 接口,可以查看 文档: w / O 7 Z

还有更多

Mars Remote API 的能力其实不止这些,举个例子,在 remote 内部可以 spawn 新的函数;也可以调用 M

ars tensor、DataFrame 或者 learn 的算法, e k n *。这些内容,读者们可以先行探索,后续我们再写别的文章介绍。

总结

Mars RemE | c i k Pote API 通过并行和分布式 Python 函数,用很小的修改代价,极大提升了执行效率。

作者:继盛

原文链接

本文为阿里云原创内容,未经允许不得转载