"""Callbacks used during training and/or evaluation."""
import csv
import glob
import shutil
import subprocess
from pathlib import Path
from typing import Any, Dict, Iterable
import matplotlib.pyplot as plt
import numpy as np
import torch
from hydra.experimental.callbacks import Callback as HydraCallback
from omegaconf import DictConfig
from stable_baselines3.common.callbacks import (
BaseCallback,
CallbackList,
EvalCallback,
ProgressBarCallback,
)
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy
from cambrian.envs import MjCambrianEnv
from cambrian.ml.model import MjCambrianModel
from cambrian.utils.logger import get_logger
[docs]
class MjCambrianPlotMonitorCallback(BaseCallback):
"""Should be used with an EvalCallback to plot the evaluation results.
This callback will take the monitor.csv file produced by the VecMonitor and
plot the results and save it as an image. Should be passed as the
`callback_after_eval` for the EvalCallback.
Args:
logdir (Path | str): The directory where the evaluation results are stored. The
evaluations.npz file is expected to be at `<logdir>/<filename>.csv`. The
resulting plot is going to be stored at
`<logdir>/evaluations/<filename>.png`.
filename (Path | str): The filename of the monitor file. The saved file will be
`<logdir>/<filename>.csv`. And the resulting plot will be saved as
`<logdir>/evaluations/<filename>.png`.
"""
parent: EvalCallback
def __init__(self, logdir: Path | str, filename: Path | str, n_episodes: int = 1):
self.logdir = Path(logdir)
self.filename = Path(filename)
self.filename_csv = self.filename.with_suffix(".csv")
self.filename_png = self.filename.with_suffix(".png")
self.evaldir = self.logdir / "evaluations"
self.evaldir.mkdir(parents=True, exist_ok=True)
self.n_episodes = n_episodes
self.n_calls = 0
def _on_step(self) -> bool:
if not (self.logdir / self.filename_csv).exists():
get_logger().warning(f"No {self.filename_csv} file found.")
return
# Temporarily set the monitor ext so that the right file is read
old_ext = Monitor.EXT
Monitor.EXT = str(self.filename_csv)
x, y = ts2xy(load_results(self.logdir), "timesteps")
Monitor.EXT = old_ext
if len(x) <= 20 or len(y) <= 20:
get_logger().warning(f"Not enough {self.filename} data to plot.")
return True
original_x, original_y = x.copy(), y.copy()
get_logger().info(f"Plotting {self.filename} results at {self.evaldir}")
def moving_average(data, window=1):
return np.convolve(data, np.ones(window), "valid") / window
n = min(len(y) // 10, 1000)
y = y.astype(float)
if self.n_episodes > 1:
assert len(y) % self.n_episodes == 0, (
"n_episodes must be a common factor of the"
f" number of episodes in the {self.filename} data."
)
y = y.reshape(-1, self.n_episodes).mean(axis=1)
else:
y = moving_average(y, window=n)
x = moving_average(x, window=n).astype(int)
# Make sure the x, y are of the same length
min_len = min(len(x), len(y))
x, y = x[:min_len], y[:min_len]
plt.plot(x, y)
plt.plot(original_x, original_y, color="grey", alpha=0.3)
plt.xlabel("Number of Timesteps")
plt.ylabel("Rewards")
plt.savefig(self.evaldir / self.filename.with_suffix(".png"))
plt.cla()
return True
[docs]
class MjCambrianEvalCallback(EvalCallback):
"""Overwrites the default EvalCallback to support saving visualizations at the same
time as the evaluation.
Note:
Only the first environment is visualized
"""
def _init_callback(self):
self.log_path = Path(self.log_path)
self.n_evals = 0
# Delete all the existing renders
for f in glob.glob(str(self.log_path / "vis_*")):
get_logger().info(f"Deleting {f}")
Path(f).unlink()
super()._init_callback()
def _on_step(self) -> bool:
# Early exit
if self.eval_freq <= 0 or self.n_calls % self.eval_freq != 0:
return True
env: MjCambrianEnv = self.eval_env.envs[0].unwrapped
# Add some overlays
# env.overlays["Exp"] = env.config.expname # TODO
env.overlays["Best Mean Reward"] = f"{self.best_mean_reward:.2f}"
env.overlays["Total Timesteps"] = f"{self.num_timesteps}"
# Run the evaluation
get_logger().info(f"Starting {self.n_eval_episodes} evaluation run(s)...")
env.record(self.render)
continue_training = super()._on_step()
if self.render:
# Save the visualization
filename = Path(f"vis_{self.n_evals}")
env.save(self.log_path / filename)
env.record(False)
if self.render:
# Copy the most recent gif to latest.gif so that we can just watch this file
for f in self.log_path.glob(str(filename.with_suffix(".*"))):
shutil.copy(f, f.with_stem("latest"))
self.n_evals += 1
return continue_training
def _log_success_callback(self, locals_: Dict[str, Any], globals_: Dict[str, Any]):
"""
Callback passed to the ``evaluate_policy`` function
in order to log the success rate (when applicable),
for instance when using HER.
:param locals_:
:param globals_:
"""
env: MjCambrianEnv = self.eval_env.envs[0].unwrapped
# If done, do some logging
if locals_["done"]:
run = locals_["episode_counts"][locals_["i"]]
cumulative_reward = env.stashed_cumulative_reward
get_logger().info(f"Run {run} done. Cumulative reward: {cumulative_reward}")
super()._log_success_callback(locals_, globals_)
[docs]
class MjCambrianGPUUsageCallback(BaseCallback):
"""This callback will log the GPU usage at the end of each evaluation.
We'll log to a csv."""
parent: EvalCallback
def __init__(
self,
logdir: Path | str,
logfile: Path | str = "gpu_usage.csv",
*,
verbose: int = 0,
):
super().__init__(verbose)
self.logdir = Path(logdir)
self.logdir.mkdir(parents=True, exist_ok=True)
self.logfile = self.logdir / logfile
with open(self.logfile, "w") as f:
writer = csv.writer(f)
writer.writerow(
[
"timesteps",
"memory_reserved",
"max_memory_reserved",
"memory_available",
]
)
def _on_step(self) -> bool:
if torch.cuda.is_available():
# Get the GPU usage, log it and save it to the file
device = torch.cuda.current_device()
memory_reserved = torch.cuda.memory_reserved(device)
max_memory_reserved = torch.cuda.max_memory_reserved(device)
memory_available = torch.cuda.get_device_properties(device).total_memory
# Log to the output file
with open(self.logfile, "a") as f:
writer = csv.writer(f)
writer.writerow(
[
self.num_timesteps,
memory_reserved,
max_memory_reserved,
memory_available,
]
)
# Log to stdout
if self.verbose > 0:
get_logger().debug(subprocess.getoutput("nvidia-smi"))
get_logger().debug(torch.cuda.memory_summary())
return True
[docs]
class MjCambrianSavePolicyCallback(BaseCallback):
"""Should be used with an EvalCallback to save the policy.
This callback will save the policy at the end of each evaluation. Should be passed
as the `callback_after_eval` for the EvalCallback.
Args:
logdir (Path | str): The directory to store the generated visualizations. The
resulting visualizations are going to be stored at
`<logdir>/evaluations/visualization.gif`.
"""
parent: EvalCallback
def __init__(
self,
logdir: Path | str,
*,
verbose: int = 0,
):
super().__init__(verbose)
self.logdir = Path(logdir)
self.logdir.mkdir(parents=True, exist_ok=True)
self.model: MjCambrianModel = None
def _on_step(self) -> bool:
self.model.save_policy(self.logdir)
return True
[docs]
class MjCambrianProgressBarCallback(ProgressBarCallback):
"""Overwrite the default progress bar callback to flush the pbar on deconstruct."""
[docs]
def __del__(self):
"""This string will restore the terminal back to its original state."""
if hasattr(self, "pbar"):
print("\x1b[?25h")
[docs]
class MjCambrianCallbackListWithSharedParent(CallbackList):
def __init__(self, callbacks: Iterable[BaseCallback] | Dict[str, BaseCallback]):
if isinstance(callbacks, dict):
callbacks = callbacks.values()
self.callbacks = []
super().__init__(list(callbacks))
@property
def parent(self):
return getattr(self.callbacks[0], "parent", None)
@parent.setter
def parent(self, parent):
for cb in self.callbacks:
cb.parent = parent
# ==================
[docs]
class MjCambrianSaveConfigCallback(HydraCallback):
"""This callback will save the resolved hydra config to the logdir."""
[docs]
def on_run_start(self, config: DictConfig, **kwargs):
self._save_config(config)
[docs]
def on_multirun_start(self, config: DictConfig, **kwargs):
self._save_config(config)
def _save_config(self, config: DictConfig):
from omegaconf import OmegaConf
config.logdir.mkdir(parents=True, exist_ok=True)
OmegaConf.save(config, config.logdir / "full.yaml")