0


RayOnSpark 快速入门

RayOnSpark Quickstart

RayOnSpark 快速入门

In this guide, we will describe how to use RayOnSpark to directly run Ray programs on Big Data clusters in 2 simple steps.
在本指南中,我们将通过两个简单的步骤来描述如何使用 RayOnSpark 直接在大数据集群上运行 Ray 程序。

Step 0: Prepare Environment 步骤0:准备环境

We recommend using conda to prepare the environment. Please refer to the install guide for more details.
建议使用 conda 来准备环境。有关详细信息,请参阅安装指南。

conda create -n zoo python=3.7# "zoo" is conda environment name, you can use any name you like.
conda activate zoo
pip install analytics-zoo[ray]

Step 1: Initialize 第 1 步:初始化

We recommend using init_orca_context to initiate and run Analytics Zoo on the underlying cluster. The Ray cluster would be launched automatically by specifying init_ray_on_spark=True.
我们建议使用 init_orca_context 在底层集群上启动和运行 Analytics Zoo。Ray 集群将通过指定 init_ray_on_spark=True 自动启动。

from zoo.orca import init_orca_context

if cluster_mode =="local":# For local machine
    sc = init_orca_context(cluster_mode="local", cores=4, memory="10g", init_ray_on_spark=True)elif cluster_mode =="k8s":# For K8s cluster
    sc = init_orca_context(cluster_mode="k8s", num_nodes=2, cores=2, memory="10g", driver_memory="10g", driver_cores=1, init_ray_on_spark=True)elif cluster_mode =="yarn":# For Hadoop/YARN cluster
    sc = init_orca_context(cluster_mode="yarn", num_nodes=2, cores=2, memory="10g", driver_memory="10g", driver_cores=1, init_ray_on_spark=True)

This is the only place where you need to specify local or distributed mode.
这是唯一需要指定本地或分布式模式的地方。

By default, the Ray cluster would be launched using Spark barrier execution mode, you can turn it off via the configurations of OrcaContext:
默认情况下,Ray 集群将使用 Spark 屏障执行模式启动,您可以通过以下 OrcaContext 配置将其关闭:

from zoo.orca import OrcaContext

OrcaContext.barrier_mode =False

Note: You should export HADOOP_CONF_DIR=/path/to/hadoop/conf/dir when running on Hadoop YARN cluster. View Hadoop User Guide for more details.
注意:在 Hadoop YARN 集群上运行时应 export HADOOP_CONF_DIR=/path/to/hadoop/conf/dir 这样做。查看 Hadoop 用户指南了解更多详情。

You can retrieve the information of the Ray cluster via OrcaContext:
您可以通过以下方式 OrcaContext 检索 Ray 集群的信息:

from zoo.orca import OrcaContext

ray_ctx = OrcaContext.get_ray_context()
address_info = ray_ctx.address_info  # The dictionary information of the ray cluster, including node_ip_address, object_store_address, webui_url, etc.
redis_address = ray_ctx.redis_address  # The redis address of the ray cluster.

Step 2: Run Ray Applications 第 2 步:运行 Ray 应用程序

After the initialization, you can directly write Ray code inline with your Spark code, and run Ray programs on the underlying existing Big Data clusters. Ray tasks and actors would be launched across the cluster.
初始化后,您可以直接将 Ray 代码与 Spark 代码内联编写,并在底层现有大数据集群上运行 Ray 程序。Ray 任务和 actor 将在整个集群中启动。

The following example uses actor handles to implement a parameter server example for distributed asynchronous stochastic gradient descent. This is a simple Ray example for demonstration purpose. Similarly, you can write other Ray applications as you wish.
以下示例使用执行组件句柄实现分布式异步随机梯度下降的参数服务器示例。这是一个用于演示目的的简单 Ray 示例。同样,您可以根据需要编写其他 Ray 应用程序。

A parameter server is simply an object that stores the parameters (or “weights”) of a machine learning model (this could be a neural network, a linear model, or something else). It exposes two methods: one for getting the parameters and one for updating the parameters.
参数服务器只是一个对象,用于存储机器学习模型(可以是神经网络、线性模型或其他模型)的参数(或“权重”)。它公开了两种方法:一种用于获取参数,另一种用于更新参数。

By adding the @ray.remote decorator, the ParameterServer class becomes a Ray actor.
通过添加 @ray.remote 装饰器,该 ParameterServer 类成为 Ray actor。

import ray
import numpy as np

dim [email protected](object):def__init__(self, dim):
        self.parameters = np.zeros(dim)defget_parameters(self):return self.parameters
    
    defupdate_parameters(self, update):
        self.parameters += update

ps = ParameterServer.remote(dim)

In a typical machine learning training application, worker processes will run in an infinite loop that does the following:
在典型的机器学习训练应用程序中,工作进程将在无限循环中运行,该循环执行以下操作:

Get the latest parameters from the parameter server.
从参数服务器获取最新参数。

Compute an update to the parameters (using the current parameters and some data).
计算参数的更新(使用当前参数和一些数据)。

Send the update to the parameter server.
将更新发送到参数服务器。

By adding the @ray.remote decorator, the worker function becomes a Ray remote function.
通过添加 @ray.remote 装饰器,该 worker 函数成为 Ray 远程函数。

import time

@ray.remotedefworker(ps, dim, num_iters):for _ inrange(num_iters):# Get the latest parameters.
        parameters = ray.get(ps.get_parameters.remote())# Compute an update.
        update =1e-3* parameters + np.ones(dim)# Update the parameters.
        ps.update_parameters.remote(update)# Sleep a little to simulate a real workload.
        time.sleep(0.5)# Test that worker is implemented correctly. You do not need to change this line.
ray.get(worker.remote(ps, dim,1))# Start two workers.
worker_results =[worker.remote(ps, dim,100)for _ inrange(2)]

As the worker tasks are executing, you can query the parameter server from the driver and see the parameters changing in the background.
在执行辅助角色任务时,可以从驱动程序查询参数服务器,并查看参数在后台的变化。

print(ray.get(ps.get_parameters.remote()))

Note: You should call stop_orca_context() when your program finishes:
注意:您应该在程序完成时调用 stop_orca_context() :

from zoo.orca import stop_orca_context

stop_orca_context()
标签: python

本文转载自: https://blog.csdn.net/wq6qeg88/article/details/135187705
版权归原作者 资源存储库 所有, 如有侵权,请联系我们删除。

“RayOnSpark 快速入门”的评论:

还没有评论