Source code for cambrian.ml.fitness_fns

"""Fitness functions for evaluating the fitness of agents. These functions are used by
the optimizers to evaluate the fitness of the agents. The fitness functions are
responsible for loading the evaluations and monitor files and calculating the fitness
of the agent based on the evaluations."""

import csv
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Tuple

import numpy as np
from scipy.stats import zscore

if TYPE_CHECKING:
    from cambrian.config import MjCambrianConfig

# ========================
# Utils


[docs] def parse_evaluations_npz(evaluations_npz: Path) -> Dict[str, np.ndarray]: """Parse the evaluations npz file and return the rewards.""" assert ( evaluations_npz.exists() ), f"Evaluations file {evaluations_npz} does not exist." data = np.load(evaluations_npz, allow_pickle=True) return {k: data[k] for k in data}
[docs] def parse_monitor_csv(monitor_csv: Path) -> Tuple[np.ndarray, np.ndarray]: """Parse the monitor csv file and return the timesteps and rewards.""" assert monitor_csv.exists(), f"Monitor file {monitor_csv} does not exist." timesteps, rewards = [], [] with open(monitor_csv, "r") as f: # Skip the comment line f.readline() csv_reader = csv.DictReader(f) for row in csv_reader: timesteps.append(float(row["t"])) rewards.append(float(row["r"])) return np.array(timesteps), np.array(rewards)
[docs] def top_n_percent( data: np.ndarray, percent: float, use_outliers: bool, max_zscore: float = 3.0, ) -> float: """Calculate the mean of the top `percent` of the data, optionally excluding outliers.""" if not use_outliers: z_scores = zscore(data) filtered_data = data[abs(z_scores) < max_zscore] if len(filtered_data) == 0: filtered_data = data data = filtered_data n_top = int(len(data) * (percent / 100.0)) return float(np.median(np.sort(data)[-n_top:]))
# ========================
[docs] def fitness_from_evaluations( config: "MjCambrianConfig", evaluations_npz: Path, *, return_data: bool = False, use_outliers: bool = False, percent: float = 25.0, quartiles: tuple[float, float] = (25, 75), ) -> float | Tuple[float, np.ndarray]: """ Calculate the fitness of the agent based on evaluation results. The fitness is determined by taking the mean of the top `percent` of the evaluation rewards, optionally excluding outliers. Args: config (MjCambrianConfig): Configuration for the evaluation. evaluations_npz (Path): Path to the `.npz` file containing evaluation data. Keyword Args: return_data (bool): If True, returns the evaluation data along with the fitness. use_outliers (bool): If True, includes outliers in the fitness calculation. Defaults to False. percent (float): Defines the top 'n' percent of rewards to be used for calculating fitness. Defaults to 75.0. Returns: float | Tuple[float, np.ndarray]: The calculated fitness value. If `return_data` is True, returns a tuple containing the fitness value and the full evaluation data as a numpy array. Notes: - The rewards array should be a 2D array where each row represents an evaluation run and each column represents the rewards for each evaluation step. - If the evaluations file does not exist, returns negative infinity as the fitness value. """ # Return negative infinity if the evaluations file doesn't exist if not evaluations_npz.exists(): return -float("inf") # The rewards array will be stored in a 2D array where each row represents each # evaluation run and each column represents the rewards for each evaluation step. # We may run multiple steps of the same or slightly different environment to reduce # variance. We will average the rewards across each row to get the final rewards. evaluations = parse_evaluations_npz(evaluations_npz) rewards = evaluations["results"] rewards = np.mean(rewards, axis=1) fitness = top_n_percent(rewards, percent, use_outliers, quartiles) if return_data: return fitness, evaluations return top_n_percent(rewards, percent, use_outliers, quartiles)
[docs] def fitness_from_monitor( config: "MjCambrianConfig", monitor_csv: Path, *, return_data: bool = False, percent: float = 25.0, n_episodes: int = 1, ) -> float | Tuple[float, Tuple[np.ndarray, np.ndarray]]: """ Calculate the fitness of the agent based on monitor data. The fitness is determined by taking the specified `percent` of the cumulative monitor rewards. Args: config (MjCambrianConfig): Configuration for the evaluation. monitor_csv (Path): Path to the CSV file containing monitor data. Keyword Args: return_data (bool): If True, returns the monitor data along with the fitness. Defaults to False. percent (float): Defines the top 'n' percent of rewards to be used for calculating fitness. Defaults to 75.0 (3rd quartile). n_episodes (int): The number of episodes to use sum rewards for. Defaults to 1. n_episodes must be a common factor of the number of episodes in the monitor data. Returns: float | Tuple[float, Tuple[np.ndarray, np.ndarray]]: The calculated fitness value. If `return_data` is True, returns a tuple containing the fitness value and the monitor data (timesteps, rewards) as numpy arrays. Notes: - If the rewards are empty, returns negative infinity as the fitness value. - The monitor data should contain timesteps and corresponding rewards. """ timesteps, rewards = parse_monitor_csv(monitor_csv) if len(rewards) == 0: return -float("inf") assert len(rewards) % n_episodes == 0, ( "n_episodes must be a common factor of the" " number of episodes in the monitor data." ) timesteps = timesteps.reshape(-1, n_episodes).mean(axis=1) rewards = rewards.reshape(-1, n_episodes).sum(axis=1) fitness = top_n_percent(rewards, percent, use_outliers=False) if return_data: return fitness, (timesteps, rewards) return fitness
[docs] def fitness_from_txt(config: "MjCambrianConfig", txt_file: Path) -> float: """Calculate the fitness of the agent. Uses the 3rd quartile of the cumulative monitor rewards.""" with open(txt_file, "r") as f: fitness = float(f.read().strip()) return fitness
# ======================== # Fake eval fns which are used to test optimizers # They should be realistic in the sense that they return a fitness value which is # similar to the fitness value returned by the real eval fns. This means they should # be fairly noisy and a similar correlation with the genotype as the real eval fns.
[docs] def fitness_num_eyes( config: "MjCambrianConfig", *, pattern: str, mean: float = 0, std: float = 5, assume_one: bool = True, ) -> float: """This fitness function will return higher rewards generally for agents with more eyes. Args: pattern (str): The path to the number of eyes in the config. """ num_eyes = config.glob(pattern, flatten=True, assume_one=assume_one) # Set the seed such that when loading later, the same random values are generated # Get the seed based on the agent's id seed = config.seed if config.evo is not None: rank = config.evo.rank generation = config.evo.generation population_size = config.evo.population_size seed = generation * population_size + rank return np.random.default_rng(seed).normal(mean + np.prod(num_eyes), std)
[docs] def fitness_num_eyes_and_fov( config: "MjCambrianConfig", *, num_eyes_pattern: str, fov_pattern: str, mean: float = 0, std: float = 5, assume_one: bool = True, optimal_fov: float = 45, ) -> float: """This fitness function will return higher rewards generally for agents with more eyes and a fov closer to the `optimal_fov`. """ num_eyes = config.glob(num_eyes_pattern, flatten=True, assume_one=assume_one) fov = config.glob(fov_pattern, flatten=True, assume_one=assume_one) # Set the seed such that when loading later, the same random values are generated # Get the seed based on the agent's id seed = config.seed if config.evo is not None: rank = config.evo.rank generation = config.evo.generation population_size = config.evo.population_size seed = generation * population_size + rank rng = np.random.default_rng(seed) num_eyes = rng.normal(mean + np.prod(num_eyes), std) fov = rng.normal(mean + optimal_fov / max(abs(optimal_fov - fov), 1), std) return num_eyes + fov