"""Callbacks used during training and/or evaluation."""importcsvimportglobimportshutilimportsubprocessfrompathlibimportPathfromtypingimportAny,Dict,Iterableimportmatplotlib.pyplotaspltimportnumpyasnpimporttorchfromhydra.experimental.callbacksimportCallbackasHydraCallbackfromomegaconfimportDictConfigfromstable_baselines3.common.callbacksimport(BaseCallback,CallbackList,EvalCallback,ProgressBarCallback,)fromstable_baselines3.common.monitorimportMonitorfromstable_baselines3.common.results_plotterimportload_results,ts2xyfromcambrian.envsimportMjCambrianEnvfromcambrian.ml.modelimportMjCambrianModelfromcambrian.utils.loggerimportget_logger
[docs]classMjCambrianPlotMonitorCallback(BaseCallback):"""Should be used with an EvalCallback to plot the evaluation results. This callback will take the monitor.csv file produced by the VecMonitor and plot the results and save it as an image. Should be passed as the `callback_after_eval` for the EvalCallback. Args: logdir (Path | str): The directory where the evaluation results are stored. The evaluations.npz file is expected to be at `<logdir>/<filename>.csv`. The resulting plot is going to be stored at `<logdir>/evaluations/<filename>.png`. filename (Path | str): The filename of the monitor file. The saved file will be `<logdir>/<filename>.csv`. And the resulting plot will be saved as `<logdir>/evaluations/<filename>.png`. """parent:EvalCallbackdef__init__(self,logdir:Path|str,filename:Path|str,n_episodes:int=1):self.logdir=Path(logdir)self.filename=Path(filename)self.filename_csv=self.filename.with_suffix(".csv")self.filename_png=self.filename.with_suffix(".png")self.evaldir=self.logdir/"evaluations"self.evaldir.mkdir(parents=True,exist_ok=True)self.n_episodes=n_episodesself.n_calls=0def_on_step(self)->bool:ifnot(self.logdir/self.filename_csv).exists():get_logger().warning(f"No {self.filename_csv} file found.")return# Temporarily set the monitor ext so that the right file is readold_ext=Monitor.EXTMonitor.EXT=str(self.filename_csv)x,y=ts2xy(load_results(self.logdir),"timesteps")Monitor.EXT=old_extiflen(x)<=20orlen(y)<=20:get_logger().warning(f"Not enough {self.filename} data to plot.")returnTrueoriginal_x,original_y=x.copy(),y.copy()get_logger().info(f"Plotting {self.filename} results at {self.evaldir}")defmoving_average(data,window=1):returnnp.convolve(data,np.ones(window),"valid")/windown=min(len(y)//10,1000)y=y.astype(float)ifself.n_episodes>1:assertlen(y)%self.n_episodes==0,("n_episodes must be a common factor of the"f" number of episodes in the {self.filename} data.")y=y.reshape(-1,self.n_episodes).mean(axis=1)else:y=moving_average(y,window=n)x=moving_average(x,window=n).astype(int)# Make sure the x, y are of the same lengthmin_len=min(len(x),len(y))x,y=x[:min_len],y[:min_len]plt.plot(x,y)plt.plot(original_x,original_y,color="grey",alpha=0.3)plt.xlabel("Number of Timesteps")plt.ylabel("Rewards")plt.savefig(self.evaldir/self.filename.with_suffix(".png"))plt.cla()returnTrue
[docs]classMjCambrianEvalCallback(EvalCallback):"""Overwrites the default EvalCallback to support saving visualizations at the same time as the evaluation. Note: Only the first environment is visualized """def_init_callback(self):self.log_path=Path(self.log_path)self.n_evals=0# Delete all the existing rendersforfinglob.glob(str(self.log_path/"vis_*")):get_logger().info(f"Deleting {f}")Path(f).unlink()super()._init_callback()def_on_step(self)->bool:# Early exitifself.eval_freq<=0orself.n_calls%self.eval_freq!=0:returnTrueenv:MjCambrianEnv=self.eval_env.envs[0].unwrapped# Add some overlays# env.overlays["Exp"] = env.config.expname # TODOenv.overlays["Best Mean Reward"]=f"{self.best_mean_reward:.2f}"env.overlays["Total Timesteps"]=f"{self.num_timesteps}"# Run the evaluationget_logger().info(f"Starting {self.n_eval_episodes} evaluation run(s)...")env.record(self.render)continue_training=super()._on_step()ifself.render:# Save the visualizationfilename=Path(f"vis_{self.n_evals}")env.save(self.log_path/filename)env.record(False)ifself.render:# Copy the most recent gif to latest.gif so that we can just watch this fileforfinself.log_path.glob(str(filename.with_suffix(".*"))):shutil.copy(f,f.with_stem("latest"))self.n_evals+=1returncontinue_trainingdef_log_success_callback(self,locals_:Dict[str,Any],globals_:Dict[str,Any]):""" Callback passed to the ``evaluate_policy`` function in order to log the success rate (when applicable), for instance when using HER. :param locals_: :param globals_: """env:MjCambrianEnv=self.eval_env.envs[0].unwrapped# If done, do some loggingiflocals_["done"]:run=locals_["episode_counts"][locals_["i"]]cumulative_reward=env.stashed_cumulative_rewardget_logger().info(f"Run {run} done. Cumulative reward: {cumulative_reward}")super()._log_success_callback(locals_,globals_)
[docs]classMjCambrianGPUUsageCallback(BaseCallback):"""This callback will log the GPU usage at the end of each evaluation. We'll log to a csv."""parent:EvalCallbackdef__init__(self,logdir:Path|str,logfile:Path|str="gpu_usage.csv",*,verbose:int=0,):super().__init__(verbose)self.logdir=Path(logdir)self.logdir.mkdir(parents=True,exist_ok=True)self.logfile=self.logdir/logfilewithopen(self.logfile,"w")asf:writer=csv.writer(f)writer.writerow(["timesteps","memory_reserved","max_memory_reserved","memory_available",])def_on_step(self)->bool:iftorch.cuda.is_available():# Get the GPU usage, log it and save it to the filedevice=torch.cuda.current_device()memory_reserved=torch.cuda.memory_reserved(device)max_memory_reserved=torch.cuda.max_memory_reserved(device)memory_available=torch.cuda.get_device_properties(device).total_memory# Log to the output filewithopen(self.logfile,"a")asf:writer=csv.writer(f)writer.writerow([self.num_timesteps,memory_reserved,max_memory_reserved,memory_available,])# Log to stdoutifself.verbose>0:get_logger().debug(subprocess.getoutput("nvidia-smi"))get_logger().debug(torch.cuda.memory_summary())returnTrue
[docs]classMjCambrianSavePolicyCallback(BaseCallback):"""Should be used with an EvalCallback to save the policy. This callback will save the policy at the end of each evaluation. Should be passed as the `callback_after_eval` for the EvalCallback. Args: logdir (Path | str): The directory to store the generated visualizations. The resulting visualizations are going to be stored at `<logdir>/evaluations/visualization.gif`. """parent:EvalCallbackdef__init__(self,logdir:Path|str,*,verbose:int=0,):super().__init__(verbose)self.logdir=Path(logdir)self.logdir.mkdir(parents=True,exist_ok=True)self.model:MjCambrianModel=Nonedef_on_step(self)->bool:self.model.save_policy(self.logdir)returnTrue
[docs]classMjCambrianProgressBarCallback(ProgressBarCallback):"""Overwrite the default progress bar callback to flush the pbar on deconstruct."""
[docs]def__del__(self):"""This string will restore the terminal back to its original state."""ifhasattr(self,"pbar"):print("\x1b[?25h")