Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Fix IMPALA/APPO learning behavior: Fix EnvRunner sync bug, GPU loader thread, enable local learner w/ GPU. #48314

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
54579d5
wip
sven1977 Oct 28, 2024
8443dcb
Revert "Revert "[RLlib] Upgrade to gymnasium 1.0.0 (ale_py 0.10.1, mu…
sven1977 Oct 29, 2024
ab2b22c
wip
sven1977 Oct 29, 2024
43bd52f
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Oct 29, 2024
716c241
Merge branch 'revert-48297-revert-45328-upgrade_gymnasium_to_1_0_0a1'…
sven1977 Oct 29, 2024
a967fd4
wip
sven1977 Oct 29, 2024
bc17c93
wip
sven1977 Oct 29, 2024
17c6bad
wip
sven1977 Oct 30, 2024
ee208a0
wip
sven1977 Oct 30, 2024
bef9e1f
wip
sven1977 Oct 30, 2024
43b9ba6
wip
sven1977 Oct 30, 2024
317875a
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Oct 31, 2024
b2aebd1
wip
sven1977 Oct 31, 2024
c403ffe
wip
sven1977 Oct 31, 2024
bde9583
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Oct 31, 2024
e576ebe
wip
sven1977 Oct 31, 2024
7396518
wip
sven1977 Oct 31, 2024
3ff57ae
learns Pong-v5 on 1 (local) GPU and 46 env runners in ~6-7min.
sven1977 Oct 31, 2024
8afddb4
wip
sven1977 Nov 1, 2024
ced8703
fix
sven1977 Nov 1, 2024
8148259
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Nov 1, 2024
a98568a
fix
sven1977 Nov 1, 2024
5e29b1f
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Nov 1, 2024
dde1132
fix
sven1977 Nov 1, 2024
db4641c
fix
sven1977 Nov 1, 2024
aa9c578
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Nov 1, 2024
20efe00
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Nov 2, 2024
5b979f7
wip
sven1977 Nov 4, 2024
157060f
wip
sven1977 Nov 4, 2024
0c09e74
wip
sven1977 Nov 5, 2024
3602517
fixes
sven1977 Nov 5, 2024
97cb2a8
merge
sven1977 Nov 5, 2024
8574688
fix
sven1977 Nov 5, 2024
c674cd7
fix
sven1977 Nov 5, 2024
f3c0352
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Nov 5, 2024
051c3bc
wip
sven1977 Nov 5, 2024
cebbec1
fix
sven1977 Nov 5, 2024
fa07017
fix
sven1977 Nov 6, 2024
0e34fd9
merge
sven1977 Nov 6, 2024
07faf22
fix
sven1977 Nov 6, 2024
a1f68b1
Merge branch 'master' of https://github.com/ray-project/ray into fix_…
sven1977 Nov 6, 2024
fa63e33
fix
sven1977 Nov 6, 2024
277e057
wip
sven1977 Nov 6, 2024
8fae002
fix
sven1977 Nov 6, 2024
308e161
fix
sven1977 Nov 6, 2024
3f31afa
wip
sven1977 Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/rllib/rllib-examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ in roughly 5 minutes. It can be run as follows on a single g5.24xlarge (or g6.24
.. code-block:: bash

$ cd ray/rllib/tuned_examples/ppo
$ python atari_ppo.py --env=ale_py:ALE/Pong-v5 --num-gpus=4 --num-env-runners=95
$ python atari_ppo.py --env=ale_py:ALE/Pong-v5 --num-learners=4 --num-env-runners=95

Note that some of the files in this folder are used for RLlib's daily or weekly release tests as well.

Expand Down
6 changes: 3 additions & 3 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2716,7 +2716,7 @@

run:
timeout: 43200 # 12h
script: python learning_tests/tuned_examples/dreamerv3/atari_100k.py --framework=tf2 --env=ale_py:ALE/Pong-v5 --num-gpus=1 --stop-reward=15.0 --as-release-test
script: python learning_tests/tuned_examples/dreamerv3/atari_100k.py --framework=tf2 --env=ale_py:ALE/Pong-v5 --num-learners=1 --stop-reward=15.0 --as-release-test

alert: default

Expand Down Expand Up @@ -2751,7 +2751,7 @@

run:
timeout: 1200
script: python learning_tests/tuned_examples/ppo/atari_ppo.py --enable-new-api-stack --env=ale_py:ALE/Pong-v5 --num-gpus=4 --num-env-runners=95 --stop-reward=20.0 --as-release-test
script: python learning_tests/tuned_examples/ppo/atari_ppo.py --enable-new-api-stack --env=ale_py:ALE/Pong-v5 --num-learners=4 --num-env-runners=95 --stop-reward=20.0 --as-release-test

alert: default

Expand Down Expand Up @@ -2786,7 +2786,7 @@

run:
timeout: 7200
script: python learning_tests/tuned_examples/sac/halfcheetah_sac.py --enable-new-api-stack --num-gpus=4 --num-env-runners=8 --stop-reward=1000.0 --as-release-test
script: python learning_tests/tuned_examples/sac/halfcheetah_sac.py --enable-new-api-stack --num-learners=4 --num-env-runners=8 --stop-reward=1000.0 --as-release-test

alert: default

Expand Down
238 changes: 108 additions & 130 deletions rllib/BUILD

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1579,8 +1579,8 @@ def _env_runner_remote(worker, num, round, iter):
logger.warning(
"This evaluation iteration resulted in an empty set of episode summary "
"results! It's possible that your configured duration timesteps are not"
" enough to finish even a single episode. Your have configured "
f"{self.config.evaluation_duration}"
" enough to finish even a single episode. You have configured "
f"{self.config.evaluation_duration} "
f"{self.config.evaluation_duration_unit}. For 'timesteps', try "
"increasing this value via the `config.evaluation(evaluation_duration="
"...)` OR change the unit to 'episodes' via `config.evaluation("
Expand Down Expand Up @@ -3684,8 +3684,8 @@ def _run_one_training_iteration_and_evaluation_in_parallel_wo_thread(
logger.warning(
"This evaluation iteration resulted in an empty set of episode summary "
"results! It's possible that your configured duration timesteps are not"
" enough to finish even a single episode. Your have configured "
f"{self.config.evaluation_duration}"
" enough to finish even a single episode. You have configured "
f"{self.config.evaluation_duration} "
f"{self.config.evaluation_duration_unit}. For 'timesteps', try "
"increasing this value via the `config.evaluation(evaluation_duration="
"...)` OR change the unit to 'episodes' via `config.evaluation("
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def __init__(self, algo_class: Optional[type] = None):
self.num_gpus_per_env_runner = 0
self.custom_resources_per_env_runner = {}
self.validate_env_runners_after_construction = True
self.max_requests_in_flight_per_env_runner = 2
self.max_requests_in_flight_per_env_runner = 1
self.sample_timeout_s = 60.0
self.create_env_on_local_worker = False
self._env_to_module_connector = None
Expand Down
76 changes: 33 additions & 43 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
LEARNER_RESULTS,
LEARNER_UPDATE_TIMER,
MEAN_NUM_EPISODE_LISTS_RECEIVED,
MEAN_NUM_LEARNER_GROUP_RESULTS_RECEIVED,
MEAN_NUM_LEARNER_GROUP_UPDATE_CALLED,
NUM_AGENT_STEPS_SAMPLED,
NUM_AGENT_STEPS_SAMPLED_LIFETIME,
Expand Down Expand Up @@ -142,7 +143,7 @@ def __init__(self, algo_class=None):
self.vtrace_clip_rho_threshold = 1.0
self.vtrace_clip_pg_rho_threshold = 1.0
self.learner_queue_size = 3
self.max_requests_in_flight_per_env_runner = 2
self.max_requests_in_flight_per_env_runner = 1
self.max_requests_in_flight_per_aggregator_worker = 2
self.timeout_s_sampler_manager = 0.0
self.timeout_s_aggregator_manager = 0.0
Expand All @@ -159,10 +160,8 @@ def __init__(self, algo_class=None):
# global_norm, no matter the value of `grad_clip_by`.
self.grad_clip_by = "global_norm"

self.lr_schedule = None
self.vf_loss_coeff = 0.5
self.entropy_coeff = 0.01
self.entropy_coeff_schedule = None

# Override some of AlgorithmConfig's default values with IMPALA-specific values.
self.num_learners = 1
Expand All @@ -178,6 +177,8 @@ def __init__(self, algo_class=None):
# __sphinx_doc_end__
# fmt: on

self.lr_schedule = None # @OldAPIStack
self.entropy_coeff_schedule = None # @OldAPIStack
self.num_multi_gpu_tower_stacks = 1 # @OldAPIstack
self.minibatch_buffer_size = 1 # @OldAPIstack
self.replay_proportion = 0.0 # @OldAPIstack
Expand Down Expand Up @@ -413,15 +414,6 @@ def validate(self) -> None:
"than or equal to `total_train_batch_size` "
f"({self.total_train_batch_size})!"
)
# Make sure we have >=1 Learner and warn if `num_learners=0` (should only be
# used for debugging).
if self.num_learners == 0:
logger.warning(
f"{self} should only be run with `num_learners` >= 1! A value of 0 "
"(local learner) should only be used for debugging purposes as it "
"makes the algorithm non-asynchronous. When running with "
"`num_learners=0`, expect diminished learning capabilities."
)

elif isinstance(self.entropy_coeff, float) and self.entropy_coeff < 0.0:
raise ValueError("`entropy_coeff` must be >= 0.0")
Expand Down Expand Up @@ -644,18 +636,6 @@ def training_step(self) -> ResultDict:

# Log the average number of sample results (list of episodes) received.
self.metrics.log_value(MEAN_NUM_EPISODE_LISTS_RECEIVED, len(episode_refs))
self.metrics.log_value(
"_mean_num_episode_ts_received",
len(episode_refs)
* self.config.num_envs_per_env_runner
* self.config.get_rollout_fragment_length(),
)
self.metrics.log_value(
"_mean_num_episode_ts_received_using_reduced_metrics",
self.metrics.peek(
(ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED), default=0
),
)

# Log lifetime counts for env- and agent steps.
if env_runner_metrics:
Expand Down Expand Up @@ -695,6 +675,7 @@ def training_step(self) -> ResultDict:
)
rl_module_state = None
last_good_learner_results = None
num_learner_group_results_received = 0

for batch_ref_or_episode_list_ref in data_packages_for_learner_group:
if self.config.num_aggregation_workers:
Expand Down Expand Up @@ -727,7 +708,11 @@ def training_step(self) -> ResultDict:
)
if not do_async_updates:
learner_results = [learner_results]

for results_from_n_learners in learner_results:
if not results_from_n_learners[0]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we continue here before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we sent the empty results into the logger and tried to extract the model state from it, which wouldn't succeed. So it was pretty much a waste of time :)

continue
num_learner_group_results_received += 1
for r in results_from_n_learners:
rl_module_state = r.pop(
"_rl_module_state_after_update", rl_module_state
Expand All @@ -737,6 +722,10 @@ def training_step(self) -> ResultDict:
key=LEARNER_RESULTS,
)
last_good_learner_results = results_from_n_learners
self.metrics.log_value(
key=MEAN_NUM_LEARNER_GROUP_RESULTS_RECEIVED,
value=num_learner_group_results_received,
)

# Update LearnerGroup's own stats.
self.metrics.log_dict(self.learner_group.get_stats(), key=LEARNER_GROUP)
Expand All @@ -754,10 +743,12 @@ def training_step(self) -> ResultDict:
# Figure out, whether we should sync/broadcast the (remote) EnvRunner states.
# Note: `learner_results` is a List of n (num async calls) Lists of m
# (num Learner workers) ResultDicts each.
self.metrics.log_value(
NUM_TRAINING_STEP_CALLS_SINCE_LAST_SYNCH_WORKER_WEIGHTS, 1, reduce="sum"
)
if last_good_learner_results:
# TODO (sven): Rename this metric into a more fitting name: ex.
# `NUM_LEARNER_UPDATED_SINCE_LAST_WEIGHTS_SYNC`
self.metrics.log_value(
NUM_TRAINING_STEP_CALLS_SINCE_LAST_SYNCH_WORKER_WEIGHTS, 1, reduce="sum"
)
# Merge available EnvRunner states into local worker's EnvRunner state.
# Broadcast merged EnvRunner state AND new model weights back to all remote
# EnvRunners that - in this call - had returned samples.
Expand All @@ -774,7 +765,6 @@ def training_step(self) -> ResultDict:
with self.metrics.log_time((TIMERS, SYNCH_WORKER_WEIGHTS_TIMER)):
self.env_runner_group.sync_env_runner_states(
config=self.config,
env_runner_indices_to_update=env_runner_indices_to_update,
env_steps_sampled=self.metrics.peek(
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
Expand Down Expand Up @@ -809,15 +799,15 @@ def _remote_sample_get_state_and_metrics(_worker):

# Perform asynchronous sampling on all (healthy) remote rollout workers.
if num_healthy_remote_workers > 0:
self.env_runner_group.foreach_worker_async(
_remote_sample_get_state_and_metrics
)
async_results: List[
Tuple[int, ObjectRef]
] = self.env_runner_group.fetch_ready_async_reqs(
timeout_seconds=self.config.timeout_s_sampler_manager,
return_obj_refs=False,
)
self.env_runner_group.foreach_worker_async(
_remote_sample_get_state_and_metrics
)
# Get results from the n different async calls and store those EnvRunner
# indices we should update.
results = []
Expand Down Expand Up @@ -847,7 +837,7 @@ def _remote_sample_get_state_and_metrics(_worker):
episode_refs,
connector_states,
env_runner_metrics,
list(env_runner_indices_to_update),
env_runner_indices_to_update,
)

def _pre_queue_episode_refs(
Expand Down Expand Up @@ -948,18 +938,18 @@ def default_resource_request(
# from RolloutWorkers (n rollout workers map to m
# aggregation workers, where m < n) and always use 1 CPU
# each.
"CPU": max(
cf.num_cpus_for_main_process,
cf.num_cpus_per_learner if cf.num_learners == 0 else 0,
)
+ cf.num_aggregation_workers,
# Ignore `cf.num_gpus` on the new API stack.
"CPU": (
max(
cf.num_cpus_for_main_process,
cf.num_cpus_per_learner if cf.num_learners == 0 else 0,
)
+ cf.num_aggregation_workers
),
# Use n GPUs if we have a local Learner (num_learners=0).
"GPU": (
0
if cf._fake_gpus
else cf.num_gpus
if not cf.enable_rl_module_and_learner
else 0
(cf.num_gpus_per_learner if cf.num_learners == 0 else 0)
if cf.enable_rl_module_and_learner
else (0 if cf._fake_gpus else cf.num_gpus)
),
}
]
Expand Down
83 changes: 62 additions & 21 deletions rllib/algorithms/impala/impala_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray.rllib.algorithms.impala.impala import LEARNER_RESULTS_CURR_ENTROPY_COEFF_KEY
from ray.rllib.core.columns import Columns
from ray.rllib.core.learner.learner import Learner
from ray.rllib.connectors.common import NumpyToTensor
from ray.rllib.connectors.learner import AddOneTsToEpisodesAndTruncate
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.utils.annotations import (
Expand All @@ -33,6 +34,7 @@
GPU_LOADER_QUEUE_WAIT_TIMER = "gpu_loader_queue_wait_timer"
GPU_LOADER_LOAD_TO_GPU_TIMER = "gpu_loader_load_to_gpu_timer"
LEARNER_THREAD_IN_QUEUE_WAIT_TIMER = "learner_thread_in_queue_wait_timer"
LEARNER_THREAD_ENV_STEPS_DROPPED = "learner_thread_env_steps_dropped"
LEARNER_THREAD_UPDATE_TIMER = "learner_thread_update_timer"
RAY_GET_EPISODES_TIMER = "ray_get_episodes_timer"
EPISODES_TO_BATCH_TIMER = "episodes_to_batch_timer"
Expand Down Expand Up @@ -60,14 +62,18 @@ def build(self) -> None:
)
)

# Extend all episodes by one artificual timestep to allow the value function net
# Extend all episodes by one artificial timestep to allow the value function net
# to compute the bootstrap values (and add a mask to the batch to know, which
# slots to mask out).
if (
self._learner_connector is not None
and self.config.add_default_connectors_to_learner_pipeline
):
self._learner_connector.prepend(AddOneTsToEpisodesAndTruncate())
# Leave all batches on the CPU (they'll be moved to the GPU, if applicable,
# by the n GPU loader threads.
numpy_to_tensor_connector = self._learner_connector[NumpyToTensor][0]
numpy_to_tensor_connector._device = "cpu" # TODO (sven): Provide API?

# Create and start the GPU-loader thread. It picks up train-ready batches from
# the "GPU-loader queue" and loads them to the GPU, then places the GPU batches
Expand All @@ -78,17 +84,18 @@ def build(self) -> None:
self._learner_thread_out_queue = Queue()

# Create and start the GPU loader thread(s).
self._gpu_loader_threads = [
_GPULoaderThread(
in_queue=self._gpu_loader_in_queue,
out_queue=self._learner_thread_in_queue,
device=self._device,
metrics_logger=self.metrics,
)
for _ in range(self.config.num_gpu_loader_threads)
]
for t in self._gpu_loader_threads:
t.start()
if self.config.num_gpus_per_learner > 0:
self._gpu_loader_threads = [
_GPULoaderThread(
in_queue=self._gpu_loader_in_queue,
out_queue=self._learner_thread_in_queue,
device=self._device,
metrics_logger=self.metrics,
)
for _ in range(self.config.num_gpu_loader_threads)
]
for t in self._gpu_loader_threads:
t.start()

# Create and start the Learner thread.
self._learner_thread = _LearnerThread(
Expand Down Expand Up @@ -144,11 +151,22 @@ def update_from_episodes(
)

# Queue the CPU batch to the GPU-loader thread.
self._gpu_loader_in_queue.put((batch, env_steps))
self.metrics.log_value(
(ALL_MODULES, QUEUE_SIZE_GPU_LOADER_QUEUE),
self._gpu_loader_in_queue.qsize(),
)
if self.config.num_gpus_per_learner > 0:
self._gpu_loader_in_queue.put((batch, env_steps))
self.metrics.log_value(
(ALL_MODULES, QUEUE_SIZE_GPU_LOADER_QUEUE),
self._gpu_loader_in_queue.qsize(),
)
else:
# Enqueue to Learner thread's in-queue.
_LearnerThread.enqueue(
self._learner_thread_in_queue,
MultiAgentBatch(
{mid: SampleBatch(b) for mid, b in batch.items()},
env_steps=env_steps,
),
self.metrics,
)

# Return all queued result dicts thus far (after reducing over them).
results = {}
Expand Down Expand Up @@ -200,6 +218,7 @@ def __init__(

self._in_queue = in_queue
self._out_queue = out_queue
self._ts_dropped = 0
self._device = device
self.metrics = metrics_logger

Expand Down Expand Up @@ -227,10 +246,8 @@ def _step(self) -> None:
policy_batches={mid: SampleBatch(b) for mid, b in batch_on_gpu.items()},
env_steps=env_steps,
)
self._out_queue.append(ma_batch_on_gpu)
self.metrics.log_value(
(ALL_MODULES, QUEUE_SIZE_LEARNER_THREAD_QUEUE), len(self._out_queue)
)
# Enqueue to Learner thread's in-queue.
_LearnerThread.enqueue(self._out_queue, ma_batch_on_gpu, self.metrics)


class _LearnerThread(threading.Thread):
Expand Down Expand Up @@ -296,3 +313,27 @@ def step(self):
(ALL_MODULES, QUEUE_SIZE_RESULTS_QUEUE),
self._out_queue.qsize(),
)

@staticmethod
def enqueue(learner_queue, batch, metrics_logger):
# Right-append to learner queue (a deque). If full, drops the leftmost
# (oldest) item in the deque. Note that we consume from the right
# (newest first), which is why the queue size should probably always be 1,
# otherwise we run into the danger of training with very old samples.
# ts_dropped = 0
# if len(learner_queue) == learner_queue.maxlen:
# ts_dropped = learner_queue.popleft().env_steps()
learner_queue.append(batch)
# TODO (sven): This metric will not show correctly on the Algo side (main
# logger), b/c of the bug in the metrics not properly "upstreaming" reduce=sum
# metrics (similarly: ENV_RUNNERS/NUM_ENV_STEPS_SAMPLED grows exponentially
# on the main algo's logger).
# metrics_logger.log_value(
# LEARNER_THREAD_ENV_STEPS_DROPPED, ts_dropped, reduce="sum"
# )

# Log current queue size.
metrics_logger.log_value(
(ALL_MODULES, QUEUE_SIZE_LEARNER_THREAD_QUEUE),
len(learner_queue),
)
Loading
Loading