Source code for cambrian.envs.maze_env

"""Defines the MjCambrianMazeEnv class."""

from enum import Enum
from typing import (
    Any,
    Callable,
    Concatenate,
    Dict,
    List,
    Optional,
    Self,
    Tuple,
    TypeAlias,
)

import numpy as np
from hydra_config import HydraContainerConfig, config_wrapper

from cambrian.agents.agent import MjCambrianAgent
from cambrian.envs.env import MjCambrianEnv, MjCambrianEnvConfig
from cambrian.utils import safe_index
from cambrian.utils.cambrian_xml import MjCambrianXML
from cambrian.utils.spec import MjCambrianSpec

DEFAULT_ENTITY_ID: str = "default"


[docs] class MjCambrianMapEntity(Enum): """ Enum representing different states in a grid. Attributes: RESET (str): Initial reset position of the agent. Can include agent IDs in the format "R:<agent id>". WALL (str): Represents a wall in the grid. Can include texture IDs in the format "1:<texture id>". EMPTY (str): Represents an empty space in the grid. """ RESET = "R" WALL = "1" EMPTY = "0" @staticmethod
[docs] def parse(value: str) -> Tuple[Self, str]: """ Parse a value to handle special formats like "1:<texture id>". Args: value (str): The value to parse. Returns: Tuple[Self, str]: The parsed entity and the texture id if applicable. """ if value.startswith("1:"): return MjCambrianMapEntity.WALL, value[2:] elif value.startswith("R:"): return MjCambrianMapEntity.RESET, value[2:] for entity in MjCambrianMapEntity: if value == entity.value: return entity, DEFAULT_ENTITY_ID raise ValueError(f"Unknown MjCambrianMapEntity: {value}")
@config_wrapper
[docs] class MjCambrianMazeConfig(HydraContainerConfig): """Defines a map config. Used for type hinting. Attributes: xml (MjCambrianXML): The xml for the maze. This is the xml that will be used to create the maze. map (str): The map to use for the maze. It's a 2D array where each element is a string and corresponds to a "pixel" in the map. See `maze.py` for info on what different strings mean. This is actually a List[List[str]], but we keep it as a string for readability when dumping the config to a file. Will convert to list when creating the maze. scale (float): The maze scaling for the continuous coordinates in the MuJoCo simulation. height (float): The height of the walls in the MuJoCo simulation. hflip (bool): Whether to flip the maze or not. If True, the maze will be flipped along the x-axis. vflip (bool): Whether to flip the maze or not. If True, the maze will be flipped along the y-axis. rotation (float): The rotation of the maze in degrees. The rotation is applied after the flip. wall_texture_map (Dict[str, List[str]]): The mapping from texture id to texture names. Textures in the list are chosen at random. If the list is of length 1, only one texture will be used. A length >= 1 is required. The keyword "default" is required for walls denoted simply as 1 or W. Other walls are specified as 1/W:<texture id>. agent_id_map (Dict[str, List[str]]): The mapping from agent id to agent names. Agents in the list are chosen at random. If the list is of length 1, only one agent will be used. A length >= 1 is required for each agent name. Effectively, this means you can set a reset position as R:<agent id> in the map and this map is used to assign to a group of agents. For instance, R:O in the map and including O: [agent1, agent2] in the agent_id_map will assign the reset position to either agent1 or agent2 at random. "default" is required for agents denoted simply as R. enabled (bool): Whether the maze is enabled or not. """ xml: MjCambrianXML map: str scale: float height: float hflip: bool vflip: bool rotation: float wall_texture_map: Dict[str, List[str]] agent_id_map: Dict[str, List[str]] enabled: bool
MjCambrianMazeSelectionFn: TypeAlias = Callable[ Concatenate[MjCambrianAgent, Dict[str, Any], ...], float ] @config_wrapper
[docs] class MjCambrianMazeEnvConfig(MjCambrianEnvConfig): """ mazes (Dict[str, MjCambrianMazeEnvConfig]): The configs for the mazes. Each maze will be loaded into the scene and the agent will be placed in a maze at each reset. maze_selection_fn (MjCambrianMazeSelectionFn): The function to use to select the maze. The function will be called at each reset to select the maze to use. See `MjCambrianMazeSelectionFn` and `maze.py` for more info. """ mazes: Dict[str, MjCambrianMazeConfig] maze_selection_fn: MjCambrianMazeSelectionFn
[docs] class MjCambrianMazeEnv(MjCambrianEnv): def __init__(self, config: MjCambrianMazeEnvConfig, **kwargs): self._config = config # Have to initialize the mazes first since generate_xml is called from the # MjCambrianEnv constructor self._maze: MjCambrianMaze = None self._maze_store = MjCambrianMazeStore(config.mazes, config.maze_selection_fn) super().__init__(config, **kwargs)
[docs] def generate_xml(self) -> MjCambrianXML: """Generates the xml for the environment.""" xml = MjCambrianXML.make_empty() # Add the mazes to the xml # Do this first so overrides defined in the env xml are applied xml += self._maze_store.generate_xml() # Add the rest of the xml xml += super().generate_xml() return xml
[docs] def reset( self, *, seed: Optional[int] = None, options: Optional[Dict[Any, Any]] = None ) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: # Set the random seed first if seed is not None: self.set_random_seed(seed) # Choose the maze self._maze = self._maze_store.select_maze(self) self._maze_store.reset(self.spec) # For each agent, generate an initial position for agent in self.agents.values(): agent.init_pos = self._maze.generate_reset_pos(agent=agent.name) # Now reset the environment obs, info = super().reset(seed=seed, options=options) if (renderer := self.renderer) and (viewer := renderer.viewer): if viewer.config.config.get("camera", {}).get("lookat", None) is None: # Update the camera positioning to match the current maze viewer.camera.lookat = self._maze.lookat # Update the camera distance to match the current maze's extent if viewer.config.config.get("camera", {}).get("distance", None) is None: viewer.camera.distance = viewer.config.camera.distance viewer.camera.distance *= self._maze.max_dim / renderer.ratio if self._maze.ratio < 2.0: viewer.camera.distance *= 2 return obs, info
# ================== @property
[docs] def maze(self) -> "MjCambrianMaze": """Returns the current maze.""" return self._maze
@maze.setter def maze(self, maze: "MjCambrianMaze"): """Sets the current maze.""" self._maze = maze @property
[docs] def maze_store(self) -> "MjCambrianMazeStore": """Returns the maze store.""" return self._maze_store
# ================
[docs] class MjCambrianMaze: """The maze class. Generates a maze from a given map and provides utility functions for working with the maze.""" def __init__(self, config: MjCambrianMazeConfig, name: str): self._config = config self._name = name self._starting_x = None self._map: np.ndarray = None self._load_map() self._wall_textures: List[str] = [] self._wall_locations: List[np.ndarray] = [] self._reset_locations: List[np.ndarray] = [] self._reset_agents: List[str] = [] self._occupied_locations: List[np.ndarray] = [] self._agent_locations: Dict[str, int] = {} self._agent_id_map = dict(**self._config.agent_id_map) def initialize(self, starting_x: float): self._starting_x = starting_x self._update_locations() def _load_map(self): """Parses the map (which is a str) as a yaml str and converts it to an np array.""" import yaml self._map = np.array(yaml.safe_load(self._config.map), dtype=str) if self._config.hflip: self._map = np.flip(self._map, axis=0) if self._config.vflip: self._map = np.flip(self._map, axis=1) self._map = np.rot90(self._map, k=int(self._config.rotation / 90)) def _update_locations(self): """This helper method will update the initially place the wall and reset locations. These are known at construction time. It will also parse wall textures.""" for i in range(self._map.shape[0]): for j in range(self._map.shape[1]): struct = self._map[i][j] # Calculate the cell location in global coords x = (j + 0.5) * self._config.scale - self.x_map_center y = self.y_map_center - (i + 0.5) * self._config.scale loc = np.array([x, y]) entity, entity_id = MjCambrianMapEntity.parse(struct) if entity == MjCambrianMapEntity.WALL: self._wall_locations.append(loc) # Do a check for the texture assert entity_id in self._config.wall_texture_map, ( f"Invalid texture: {entity_id}. " f"Available textures: {list(self._config.wall_texture_map.keys())}" # noqa ) self._wall_textures.append(entity_id) elif entity == MjCambrianMapEntity.RESET: self._reset_locations.append(loc) # Do a check for the agent id assert entity_id in list(self._agent_id_map.keys()), ( f"Invalid agent id: {entity_id}. " f"Available agent ids: {list(self._agent_id_map.keys())}" ) self._reset_agents.append(entity_id) def generate_xml(self) -> MjCambrianXML: xml = MjCambrianXML.from_string(self._config.xml) worldbody = xml.find(".//worldbody") assert worldbody is not None, "xml must have a worldbody tag" assets = xml.find(".//asset") assert assets is not None, "xml must have an asset tag" # Add the wall textures for t, textures in self._config.wall_texture_map.items(): for texture in textures: name_prefix = f"wall_{self._name}_{t}_{texture}" xml.add( assets, "material", name=f"{name_prefix}_mat", texture=f"{name_prefix}_tex", ) xml.add( assets, "texture", name=f"{name_prefix}_tex", file=f"maze_textures/{texture}.png", gridsize="3 4", gridlayout=".U..LFRB.D..", ) # Add the walls. Each wall has it's own geom. scale = self._config.scale / 2 height = self._config.height for i, (x, y) in enumerate(self._wall_locations): name = f"wall_{self._name}_{i}" # Set the contype != conaffinity so walls don't collide with each other xml.add( worldbody, "geom", name=name, pos=f"{x} {y} {scale * height}", size=f"{scale} {scale} {scale * height}", contype="1", conaffinity="2", **{"class": f"maze_wall_{self._name}"}, ) # Update floor size based on the map extent # Only done if the size is explicitly set to 0 0 0 floor_name = f"floor_{self._name}" floor = xml.find(f".//geom[@name='{floor_name}']") assert floor is not None, f"`{floor_name}` not found" if floor.attrib.get("size", "0 0 0"): size = f"{self.map_width_scaled // 2} {self.map_length_scaled // 2} 0.1" floor.attrib["size"] = size floor.attrib["pos"] = " ".join(map(str, [-self._starting_x, 0, -0.05])) return xml
[docs] def reset(self, spec: MjCambrianSpec, *, reset_occupied: bool = True): """Resets the maze. Will reset the wall textures and reset the occupied locations, if desired.""" if reset_occupied: self._occupied_locations.clear() self._agent_locations.clear() self._reset_wall_textures(spec)
def _reset_wall_textures(self, spec: MjCambrianSpec): """Helper method to reset the wall textures. All like-labelled walls will have the same texture. Their textures will be randomly selected from their respective texture lists. """ # First, generate the texture_id -> texture_name mapping texture_map: Dict[str, str] = {} for t in self._wall_textures: if t not in texture_map: texture_map[t] = np.random.choice( list(self._config.wall_texture_map[t]) ) # Now, update the wall textures for i, t in zip(range(len(self._wall_locations)), self._wall_textures): wall_name = f"wall_{self._name}_{i}" geom_id = spec.get_geom_id(wall_name) assert geom_id != -1, f"`{wall_name}` geom not found" # Update the geom material material_name = f"wall_{self._name}_{t}_{texture_map[t]}_mat" spec.geoms[geom_id].material = material_name # ================== def rowcol_to_xy(self, rowcol_pos: np.ndarray) -> np.ndarray: x = (rowcol_pos[1] + 0.5) * self._config.scale - self.x_map_center y = self.y_map_center - (rowcol_pos[0] + 0.5) * self._config.scale return np.array([x, y]) def xy_to_rowcol(self, xy_pos: np.ndarray) -> np.ndarray: i = np.floor((self.y_map_center - xy_pos[1]) / self._config.scale) j = np.floor((xy_pos[0] + self.x_map_center) / self._config.scale) return np.array([i, j], dtype=int)
[docs] def compute_optimal_path( self, start: np.ndarray, target: np.ndarray, *, obstacles: List[Tuple[int, int]] = [], ) -> np.ndarray: """Computes the optimal path from the start position to the target. Uses a BFS to find the shortest path. Keyword Args: obstacles (List[Tuple[int, int]]): The obstacles in the maze. Each obstacle is a tuple of (row, col). Defaults to []. Avoids these positions when computing the path. """ from typing import Deque start = self.xy_to_rowcol(start) target = self.xy_to_rowcol(target) rows = self._map.shape[0] cols = self._map.shape[1] visited = [[False for _ in range(cols)] for _ in range(rows)] visited[start[0]][start[1]] = True queue = Deque([([start], 0)]) # (path, distance) moves = [(-1, 0), (1, 0), (0, -1), (0, 1), (-1, -1), (-1, 1), (1, -1), (1, 1)] while queue: path, dist = queue.popleft() current = path[-1] if np.all(current == target): # Convert path from indices to positions path = [self.rowcol_to_xy(pos) for pos in path] path.append(self.rowcol_to_xy(target)) return np.array(path) # Check all moves (left, right, up, down, and all diagonals) for dr, dc in moves: r, c = current[0] + dr, current[1] + dc map_entity = MjCambrianMapEntity.parse(self._map[r][c])[0] if ( 0 <= r < rows and 0 <= c < cols and not visited[r][c] and map_entity != MjCambrianMapEntity.WALL ): # If the movement is diagonal, check that the adjacent cells are # free as well so the path doesn't clip through walls pr, pc = current[0], current[1] if (dr, dc) in moves[4:]: pc_map_entity = MjCambrianMapEntity.parse(self._map[r][pc])[0] pr_map_entity = MjCambrianMapEntity.parse(self._map[pr][c])[0] if ( pc_map_entity == MjCambrianMapEntity.WALL or pr_map_entity == MjCambrianMapEntity.WALL ): continue # Check if the cell is an obstacle if (r, c) in obstacles: continue visited[r][c] = True queue.append((path + [(r, c)], dist + 1)) raise ValueError("No path found")
# ================== def _generate_pos(self, locations: List[np.ndarray], tries: int = 20) -> np.ndarray: """Helper method to generate a position. The generated position must be at a unique location from self._occupied_locations. Args: locations (List[np.ndarray]): The locations to choose from. tries (int): The number of tries to attempt to find a unique position. Defaults to 20. Returns: np.ndarray: The chosen position. Is of size (2,). """ assert len(locations) > 0, "Not enough locations to choose from" for _ in range(tries): idx = np.random.randint(low=0, high=len(locations)) pos = locations[idx].copy() # Check if the position is already occupied for occupied in self._occupied_locations: if np.linalg.norm(pos - occupied) <= 0.5 * self._config.scale: break else: return pos raise ValueError( f"Could not generate a unique position. {tries} tries failed. " f"Occupied locations: {self._occupied_locations}. " f"Available locations: {locations}." )
[docs] def generate_reset_pos( self, agent: str, *, add_as_occupied: bool = True ) -> np.ndarray: """Generates a random reset position for an agent. Keyword Args: add_as_occupied (bool): Whether to add the chosen location to the occupied locations. Defaults to True. Returns: np.ndarray: The chosen position. Is of size (2,). """ reset_locations = [] for reset_agent, reset_pos in zip(self._reset_agents, self._reset_locations): if agent in self._agent_id_map[reset_agent]: reset_locations.append(reset_pos) reset_locations = np.array(reset_locations) if len(reset_locations) == 0: raise ValueError(f"No reset locations found for agent '{agent}'.") # Reset the occupied location if the agent is already in the occupied locations if agent in self._agent_locations: del self._occupied_locations[self._agent_locations[agent]] # Generate the pos and assign that pos to the agent reset_locations = [tuple(loc) for loc in reset_locations] occupied_locations = [tuple(loc) for loc in self._occupied_locations] possible_locations = list(set(reset_locations) - set(occupied_locations)) pos = possible_locations[np.random.choice(len(possible_locations))] if add_as_occupied: self._agent_locations[agent] = len(self._occupied_locations) self._occupied_locations.append(pos) return pos
# ================== @property
[docs] def config(self) -> MjCambrianMazeEnvConfig: """Returns the config.""" return self._config
@property
[docs] def name(self) -> str: """Returns the name.""" return self._name
@property
[docs] def map(self) -> np.ndarray: """Returns the map.""" return self._map
@property
[docs] def map_length_scaled(self) -> float: """Returns the map length scaled.""" return self._map.shape[0] * self._config.scale
@property
[docs] def map_width_scaled(self) -> float: """Returns the map width scaled.""" return self._map.shape[1] * self._config.scale
@property
[docs] def max_dim(self) -> float: """Returns the max dimension.""" return max(self.map_length_scaled, self.map_width_scaled)
@property
[docs] def min_dim(self) -> float: """Returns the min dimension.""" return min(self.map_length_scaled, self.map_width_scaled)
@property
[docs] def ratio(self) -> float: """Returns the ratio of the length over width.""" return self.map_length_scaled / self.map_width_scaled
@property
[docs] def x_map_center(self) -> float: """Returns the x map center.""" assert self._starting_x is not None, "Maze has not been initialized" return self.map_width_scaled // 2 + self._starting_x
@property
[docs] def y_map_center(self) -> float: """Returns the y map center.""" return self.map_length_scaled / 2
@property
[docs] def lookat(self) -> np.ndarray: """Returns a point which aids in placement of a camera to visualize this maze.""" # NOTE: Negative because of convention based on BEV camera assert self._starting_x is not None, "Maze has not been initialized" return np.array([-self._starting_x + len(self._map[0]) / 4, 0, 0])
@property
[docs] def reset_locations(self) -> List[np.ndarray]: """Returns the reset locations.""" return self._reset_locations
# ================================
[docs] class MjCambrianMazeStore: """This is a simple class to store a collection of mazes.""" def __init__( self, maze_configs: Dict[str, MjCambrianMazeEnvConfig], maze_selection_fn: MjCambrianMazeSelectionFn, ): self._mazes: Dict[str, MjCambrianMaze] = {} self._create_mazes(maze_configs) self._current_maze: MjCambrianMaze = None self._maze_selection_fn = maze_selection_fn def _create_mazes(self, maze_configs: Dict[str, MjCambrianMazeEnvConfig]): prev_x, prev_width = 0, 0 for name, config in maze_configs.items(): if name in self._mazes: # If the maze already exists, skip it continue elif not config.enabled: # If the maze is not enabled, skip it continue # First create the maze maze = MjCambrianMaze(config, name) self._mazes[name] = maze # Calculate the starting x of the maze # We'll place the maze such that it doesn't overlap with existing mazes # It'll be placed next to the previous one # The positions of the maze is calculated from one corner (defined as x # in this case) x = prev_x + prev_width / 2 + maze.map_width_scaled / 2 maze.initialize(x) # Update the prev_center and prev_width prev_x, prev_width = x, maze.map_width_scaled
[docs] def generate_xml(self) -> MjCambrianXML: """Generates the xml for the current maze.""" xml = MjCambrianXML.make_empty() for maze in self._mazes.values(): xml += maze.generate_xml() return xml
[docs] def reset(self, spec: MjCambrianSpec): """Resets all mazes.""" for maze in self._mazes.values(): maze.reset(spec)
@property
[docs] def current_maze(self) -> MjCambrianMaze: """Returns the current maze.""" return self._current_maze
@property
[docs] def maze_list(self) -> List[MjCambrianMaze]: """Returns the list of mazes.""" return list(self._mazes.values())
# ====================== # Maze selection methods
[docs] def select_maze(self, env: "MjCambrianEnv") -> MjCambrianMaze: """This should be called by the environment to select a maze.""" maze = self._maze_selection_fn(self, env) self._current_maze = maze return maze
[docs] def select_maze_random(self, _: "MjCambrianEnv") -> MjCambrianMaze: """Selects a maze at random.""" return np.random.choice(self.maze_list)
[docs] def select_maze_schedule( self, env: "MjCambrianEnv", *, schedule: Optional[str] = "linear", total_timesteps: int, n_envs: int, lam_0: Optional[float] = -2.0, lam_n: Optional[float] = 2.0, ) -> MjCambrianMaze: """Selects a maze based on a schedule. The scheduled selections are based on the order of the mazes in the list. Keyword Args: schedule (Optional[str]): The schedule to use. One of "linear", "exponential", or "logistic". Defaults to "linear". total_timesteps (int): The total number of timesteps in the training schedule. Unused if schedule is None. Required otherwise. n_envs (int): The number of environments. Unused if schedule is None. Required otherwise. lam_0 (Optional[float]): The lambda value at the start of the schedule. Unused if schedule is None. lam_n (Optional[float]): The lambda value at the end of the schedule. Unused if schedule is None. """ assert lam_0 < lam_n, "lam_0 must be less than lam_n" # Compute the current step steps_per_env = total_timesteps // n_envs step = env.num_timesteps / steps_per_env # Compute the lambda value if schedule == "linear": lam = lam_0 + (lam_n - lam_0) * step elif schedule == "exponential": lam = lam_0 * (lam_n / lam_0) ** (step / n_envs) elif schedule == "logistic": lam = lam_0 + (lam_n - lam_0) / (1 + np.exp(-2 * step / n_envs)) else: raise ValueError(f"Invalid schedule: {schedule}") p = np.exp(lam * np.arange(len(self.maze_list))) return np.random.choice(self.maze_list, p=p / p.sum())
[docs] def select_maze_cycle(self, env: "MjCambrianEnv") -> MjCambrianMaze: """Selects a maze based on a cycle.""" idx = safe_index(self.maze_list, self._current_maze, default=-1) return self.maze_list[(idx + 1) % len(self.maze_list)]