8000 [RLlib; Offline RL] Implement Offline Policy Evaluation (OPE) via Importance Sampling. by simonsays1980 · Pull Request #53702 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[RLlib; Offline RL] Implement Offline Policy Evaluation (OPE) via Importance Sampling. #53702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
ea74ebd
Implemented offline policy evaluation.
simonsays1980 Jun 10, 2025
30c9a8f
Small changes. WIP.
simonsays1980 Jun 10, 2025
41665ed
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 10, 2025
2f43583
Enabled automatic runner class selection of evaluation loss or ope.
simonsays1980 Jun 10, 2025
768397e
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 10, 2025
34f399c
Update rllib/offline/offline_evaluation_runner_group.py
simonsays1980 Jun 10, 2025
9fb729c
Added 'OfflinePolicyEvaluationRunner'.
simonsays1980 Jun 10, 2025
d7f8eee
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 10, 2025
ee7f010
Removed unused imports.
simonsays1980 Jun 10, 2025
f2dd8cc
Added missing 'return'.
simonsays1980 Jun 10, 2025
b0290d2
Added new attributes to the test.
simonsays1980 Jun 11, 2025
0e327f3
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 11, 2025
750c2ba
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 12, 2025
7e43fee
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 16, 2025
da66048
Fixed a bug in tests due to a global import.
simonsays1980 Jun 17, 2025
c54c296
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 17, 2025
3ae36e3
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 20, 2025
4d3168a
Merge branch 'master' into offline-policy-evaluation-importance-sampling
simonsays1980 Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ def setup(self, config: AlgorithmConfig) -> None:
self.offline_eval_runner_group: OfflineEvaluationRunnerGroup = OfflineEvaluationRunnerGroup(
config=self.evaluation_config,
# Do not create a local runner such that the dataset can be split.
local_runner=False,
local_runner=self.config.num_offline_eval_runners == 0,
# Provide the `RLModule`'s state for the `OfflinePreLearner`s.
module_state=rl_module_state[COMPONENT_RL_MODULE],
module_spec=module_spec,
Expand Down Expand Up @@ -1134,10 +1134,13 @@ def evaluate_offline(self):
)

# Evaluate with fixed duration.
self._evaluate_offline_with_fixed_duration()
if self.offline_eval_runner_group.num_healthy_remote_runners > 0:
self._evaluate_offline_with_fixed_duration()
else:
self._evaluate_offline_on_local_runner()
# Reduce the evaluation results.
eval_results = self.metrics.peek(
("EVALUATION_RESULTS", "OFFLINE_EVAL_RUNNER_RESULTS"), default={}
(EVALUATION_RESULTS, OFFLINE_EVAL_RUNNER_RESULTS), default={}
)

# Trigger `on_evaluate_offline_end` callback.
Expand All @@ -1153,7 +1156,7 @@ def evaluate_offline(self):
)

# Also return the results here for convenience.
return {EVALUATION_RESULTS: {OFFLINE_EVAL_RUNNER_RESULTS: eval_results}}
return {OFFLINE_EVAL_RUNNER_RESULTS: eval_results}

@PublicAPI
def evaluate(
Expand Down Expand Up @@ -1363,6 +1366,38 @@ def _evaluate_with_custom_eval_function(self) -> Tuple[ResultDict, int, int]:

return eval_results, env_steps, agent_steps

def _evaluate_offline_on_local_runner(self):
# if hasattr(env_runner, "input_reader") and env_runner.input_reader is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah! How did this even get in there?

# raise ValueError(
# "Can't evaluate on a local worker if this local worker does not have "
# "an environment!\nTry one of the following:"
# "\n1) Set `evaluation_interval` > 0 to force creating a separate "
# "evaluation EnvRunnerGroup.\n2) Set `create_local_env_runner=True` to "
# "force the local (non-eval) EnvRunner to have an environment to "
# "evaluate on."
# )
# elif self.config.evaluation_parallel_to_training:
# raise ValueError(
# "Cannot run on local evaluation worker parallel to training! Try "
# "setting `evaluation_parallel_to_training=False`."
# )

# How many episodes/timesteps do we need to run?
unit = "batches"
duration = (
self.config.offline_evaluation_duration
* self.config.dataset_num_iters_per_eval_runner
)

logger.info(f"Evaluating current state of {self} for {duration} {unit}.")

results = self.offline_eval_runner_group.local_runner.run()

self.metrics.aggregate(
[results],
key=(EVALUATION_RESULTS, OFFLINE_EVAL_RUNNER_RESULTS),
)

def _evaluate_on_local_env_runner(self, env_runner):
if hasattr(env_runner, "input_reader") and env_runner.input_reader is None:
raise ValueError(
Expand Down Expand Up @@ -1651,6 +1686,8 @@ def _offline_eval_runner_remote(runner, iter):
if iter != self.iteration:
continue
all_metrics.append(met)
# Note, the `dataset_num_iters_per_eval_runner` must be smaller than
# `offline_evaluation_duration` // `num_offline_eval_runners`.
num_units_done += (
met[ALL_MODULES][DATASET_NUM_ITERS_EVALUATED].peek()
if DATASET_NUM_ITERS_EVALUATED in met[ALL_MODULES]
Expand Down
42 changes: 42 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ def __init__(self, algo_class: Optional[type] = None):
# Offline evaluation.
self.offline_evaluation_interval = None
self.num_offline_eval_runners = 0
self.offline_evaluation_type: str = None
self.offline_eval_runner_class = None
# TODO (simon): Only `_offline_evaluate_with_fixed_duration` works. Also,
# decide, if we use `offline_evaluation_duration` or
# `dataset_num_iters_per_offline_eval_runner`. Should the user decide here?
Expand Down Expand Up @@ -2705,6 +2707,8 @@ def evaluation(
# Offline evaluation.
offline_evaluation_interval: Optional[int] = NotProvided,
num_offline_eval_runners: Optional[int] = NotProvided,
offline_evaluation_type: Optional[Callable] = NotProvided,
offline_eval_runner_class: Optional[Callable] = NotProvided,
offline_loss_for_module_fn: Optional[Callable] = NotProvided,
offline_eval_batch_size_per_runner: Optional[int] = NotProvided,
dataset_num_iters_per_offline_eval_runner: Optional[int] = NotProvided,
Expand Down Expand Up @@ -2829,6 +2833,13 @@ def evaluation(
for parallel evaluation. Setting this to 0 forces sampling to be done in the
local OfflineEvaluationRunner (main process or the Algorithm's actor when
using Tune).
offline_evaluation_type: Type of offline evaluation to run. Either `"eval_loss"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: So, if a user provides offline_eval_runner_class, then the value of this field is ignored?
For more explicitness, should we not provide these 3 built-ins ("eval_loss", "is", "pdis") as classes as well and show users, where to find them in the repo? Then this config setting would be superfluous. Or do you think it's too complicated to explain?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good one. Let me think about this. Both solutions have their advantages.

for evaluating the validation loss of the policy, `"is"` for importance
sampling, or `"pdis"` for per-decision importance sampling. If you want to
implement your own offline evaluation method write an `OfflineEvaluationRunner`
and use the `AlgorithmConfig.offline_eval_runner_class`.
offline_eval_runner_class: An `OfflineEvaluationRunner` class that implements
custom offline evaluation logic.
offline_loss_for_module_fn: A callable to compute the loss per `RLModule` in
offline evaluation. If not provided the training loss function (
`Learner.compute_loss_for_module`) is used. The signature must be (
Expand Down Expand Up @@ -2975,6 +2986,10 @@ def evaluation(
self.offline_evaluation_interval = offline_evaluation_interval
if num_offline_eval_runners is not NotProvided:
self.n AE96 um_offline_eval_runners = num_offline_eval_runners
if offline_evaluation_type is not NotProvided:
self.offline_evaluation_type = offline_evaluation_type
if offline_eval_runner_class is not NotProvided:
self.offline_eval_runner_cls = offline_eval_runner_class
if offline_loss_for_module_fn is not NotProvided:
self.offline_loss_for_module_fn = offline_loss_for_module_fn
if offline_eval_batch_size_per_runner is not NotProvided:
Expand Down Expand Up @@ -5282,6 +5297,33 @@ def _validate_offline_settings(self):
"recorded episodes cannot be read in for training."
)

# Offline evaluation.
from ray.rllib.offline.offline_policy_evaluation_runner import (
OfflinePolicyEvaluationTypes,
)

offline_eval_types = list(OfflinePolicyEvaluationTypes)
if (
self.offline_evaluation_type
and self.offline_evaluation_type != "eval_loss"
and self.offline_evaluation_type not in OfflinePolicyEvaluationTypes
):
self._value_error(
f"Unknown offline evaluation type: {self.offline_evaluation_type}."
"Available types of offline evaluation are either `'eval_loss' to evaluate "
f"the training loss on a validation dataset or {offline_eval_types}."
)

from ray.rllib.offline.offline_evaluation_runner import OfflineEvaluationRunner

if self.prelearner_class and not issubclass(
self.prelearner_class, OfflineEvaluationRunner
):
self._value_error(
"Unknown `offline_eval_runner_class`. OfflineEvaluationRunner class needs to inherit "
"from `OfflineEvaluationRunner` class."
)

@property
def is_online(self) -> bool:
"""Defines if this config is for online RL.
Expand Down
4 changes: 2 additions & 2 deletions rllib/env/single_agent_env_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,11 @@ def _log_episode_metrics(self, length, ret, sec):
self.metrics.log_value(EPISODE_DURATION_SEC_MEAN, sec, window=win)
# Per-agent returns.
self.metrics.log_value(
("agent_episode_returns_mean", DEFAULT_AGENT_ID), ret, window=win
("agent_episode_return_mean", DEFAULT_AGENT_ID), ret, window=win
)
# Per-RLModule returns.
self.metrics.log_value(
("module_episode_returns_mean", DEFAULT_MODULE_ID), ret, window=win
("module_episode_return_mean", DEFAULT_MODULE_ID), ret, window=win
)

# For some metrics, log min/max as well.
Expand Down
17 changes: 3 additions & 14 deletions rllib/offline/offline_evaluation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from ray.data.iterator import DataIterator
from ray.rllib.core import (
ALL_MODULES,
COMPONENT_ENV_TO_MODULE_CONNECTOR,
COMPONENT_MODULE_TO_ENV_CONNECTOR,
COMPONENT_RL_MODULE,
)
from ray.rllib.core.rl_module.apis import SelfSupervisedLossAPI
Expand Down Expand Up @@ -64,6 +62,7 @@ def __init__(
# This has to be defined after we have a `self.config`.
self._loss_for_module_fn = types.MethodType(self.get_loss_for_module_fn(), self)

@override(Runner)
def run(
self,
explore: bool = False,
Expand Down Expand Up @@ -224,21 +223,14 @@ def get_state(
**kwargs,
)
state[WEIGHTS_SEQ_NO] = self._weights_seq_no
if self._check_component(
COMPONENT_ENV_TO_MODULE_CONNECTOR, components, not_components
):
state[COMPONENT_ENV_TO_MODULE_CONNECTOR] = self._env_to_module.get_state()
if self._check_component(
COMPONENT_MODULE_TO_ENV_CONNECTOR, components, not_components
):
state[COMPONENT_MODULE_TO_ENV_CONNECTOR] = self._module_to_env.get_state()

return state

def _convert_to_tensor(self, struct) -> TensorType:
"""Converts structs to a framework-specific tensor."""
return convert_to_torch_tensor(struct)

@override(Runner)
def stop(self) -> None:
"""Releases all resources used by this EnvRunner.

Expand All @@ -247,6 +239,7 @@ def stop(self) -> None:
"""
pass

@override(Runner)
def __del__(self) -> None:
"""If this Actor is deleted, clears all resources used by it."""
pass
Expand Down Expand Up @@ -333,10 +326,6 @@ def compute_eval_loss_for_module(

@override(Checkpointable)
def set_state(self, state: StateDict) -> None:
if COMPONENT_ENV_TO_MODULE_CONNECTOR in state:
self._env_to_module.set_state(state[COMPONENT_ENV_TO_MODULE_CONNECTOR])
if COMPONENT_MODULE_TO_ENV_CONNECTOR in state:
self._module_to_env.set_state(state[COMPONENT_MODULE_TO_ENV_CONNECTOR])

# Update the RLModule state.
if COMPONENT_RL_MODULE in state:
Expand Down
25 changes: 24 additions & 1 deletion rllib/offline/offline_evaluation_runner_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from ray.rllib.env import INPUT_ENV_SPACES
from ray.rllib.offline.offline_data import OfflineData
from ray.rllib.offline.offline_evaluation_runner import OfflineEvaluationRunner
from ray.rllib.offline.offline_policy_evaluation_runner import (
OfflinePolicyEvaluationRunner,
OfflinePolicyPreEvaluator,
)
from ray.rllib.offline.offline_prelearner import OfflinePreLearner
from ray.rllib.utils.annotations import override
from ray.rllib.utils.runners.runner_group import RunnerGroup

Expand Down Expand Up @@ -57,6 +62,22 @@ def _setup(
**kwargs: Dict[str, Any],
) -> None:

# Define the offline evaluation runner class.
self._runner_cls = config.offline_eval_runner_class or (
OfflineEvaluationRunner
if config.offline_evaluation_type == "eval_loss"
else OfflinePolicyEvaluationRunner
)
# Define
self._pre_learner_or_evaluator_cls = self.config.prelearner_class or (
OfflinePreLearner
if config.offline_evaluation_type == "eval_loss"
else OfflinePolicyPreEvaluator
)
self.config._is_frozen = False
self.config.prelearner_class = self._pre_learner_or_evaluator_cls
self.config._is_frozen = True

# We can either run on a local runner or on remote runners only b/c
# streaming split needs remote runners.
if num_runners > 0 and local_runner:
Expand All @@ -73,6 +94,8 @@ def _setup(
# Do not validate until the `DataIterators` are distributed.
validate=False,
module_spec=module_spec,
module_state=module_state,
spaces=spaces,
)

# Setup the evaluation offline dataset and return an iterator.
Expand Down Expand Up @@ -124,7 +147,7 @@ def runner_health_probe_timeout_s(self):
@property
def runner_cls(self) -> Callable:
"""Class for each runner."""
return OfflineEvaluationRunner
return self._runner_cls

@property
def num_runners(self) -> int:
Expand Down
Loading
0