Source code for cambrian.agents.point

"""Point agents."""

from functools import cached_property
from typing import Optional, Tuple

import numpy as np
from gymnasium import spaces

from cambrian.agents.agent import MjCambrianAgent2D, MjCambrianAgentConfig
from cambrian.envs.maze_env import MjCambrianMazeEnv
from cambrian.utils import get_logger
from cambrian.utils.types import ActionType, ObsType


[docs] class MjCambrianAgentPoint(MjCambrianAgent2D): """ This is a hardcoded class which implements the agent as actuated by a forward velocity and a rotational position. In mujoco, to the best of my knowledge, all translational joints are actuated in reference to the _global_ frame rather than the local frame. This means a velocity actuator applied along the x-axis will move the agent along the global x-axis rather than the local x-axis. Therefore, the agent will have 3 actuators: two for x and y global velocities and one for rotational position. From the perspective the calling class (i.e. MjCambrianEnv), this agent has two actuators: a forward velocity and a rotational position. We will calculate the global velocities and rotational position from these two "actuators". Todo: Will create an issue on mujoco and see if it's possible to implement this in xml. The issue right now is that mujoco doesn't support relative positions for hinge joints, so we have to implement the heading joint as a velocity actuator which is not ideal. """ def __init__( self, config: MjCambrianAgentConfig, name: str, *, kp: float = 0.75, ): super().__init__(config, name) self._kp = kp assert np.all(self._actuators[0].ctrlrange == self._actuators[1].ctrlrange), ( f"Forward velocity and lateral velocity must have the same control range, " f"got {self._actuators[0].ctrlrange} and {self._actuators[1].ctrlrange}" ) self._v_ctrlrange = np.array([0, 1]) self._theta_ctrlrange = self._actuators[2].ctrlrange def _update_obs(self, obs: ObsType) -> ObsType: """Creates the entire obs dict.""" obs = super()._update_obs(obs) # Update the action obs # Calculate the global velocities if self._config.use_action_obs: v, theta = self._calc_v_theta(self._last_action) v = np.interp(v, self._v_ctrlrange, [-1, 1]) theta = np.interp(theta, self._theta_ctrlrange, [-1, 1]) obs["action"] = np.array([v, theta], dtype=np.float32) return obs def _calc_v_theta(self, action: Tuple[float, float, float]) -> Tuple[float, float]: """Calculates the v and theta from the action.""" vx, vy, _ = action v = np.hypot(vx, vy) theta = np.arctan2(vy, vx) - self.qpos[2] return v, theta
[docs] def apply_action(self, action: ActionType): """Calls the appropriate apply action method based on the heading joint type.""" assert len(action) == 2, f"Action must have two elements, got {len(action)}." # Calculate global velocities v = np.interp(action[0], [-1, 1], self._v_ctrlrange) current_heading = self.qpos[2] vx = v * np.cos(current_heading) vy = v * np.sin(current_heading) super().apply_action([vx, vy, action[1]])
@cached_property
[docs] def action_space(self) -> spaces.Space: """Overrides the base implementation to only have two elements.""" return spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
[docs] class MjCambrianAgentPointSeeker(MjCambrianAgentPoint): """This is an agent which is non-trainable and defines a custom policy which acts as a 'homing' agent in the maze environment. This agent will attempt to reach a target (which is either randomly placed or another specific agent) by taking actions that best map to the optimal trajectory which is calculated from the maze using bfs (or just by choosing the action which minimizes the distance to the target). Keyword Args: target (Optional[str]): The name of the target agent to home in on. If None, a random free space in the maze will be chosen as the target. speed (float): The speed at which the agent moves. Defaults to -0.75. distance_threshold (float): The distance threshold at which the agent will consider itself to have reached the target. Defaults to 2.0. use_optimal_trajectory (bool): Whether to use the optimal trajectory to the target. Defaults to False. """ def __init__( self, config: MjCambrianAgentConfig, name: str, *, target: Optional[str], speed: float = -0.75, distance_threshold: float = 2.0, use_optimal_trajectory: bool = False, ): super().__init__(config, name) self._target = target self._speed = speed self._distance_threshold = distance_threshold self._optimal_trajectory: np.ndarray = None self._use_optimal_trajectory = use_optimal_trajectory self._prev_target_pos: np.ndarray = None
[docs] def reset(self, *args) -> ObsType: """Resets the optimal_trajectory.""" self._optimal_trajectory = None self._prev_target_pos = None return super().reset(*args)
[docs] def get_action_privileged(self, env: MjCambrianMazeEnv) -> ActionType: if self._target is None: if self._optimal_trajectory is None or len(self._optimal_trajectory) == 0: # Generate a random position to navigate to # Chooses one of the empty spaces in the maze rows, cols = np.where(env.maze.map == "0") assert rows.size > 0, "No empty spaces in the maze" index = np.random.randint(rows.size) target_pos = env.maze.rowcol_to_xy((rows[index], cols[index])) else: target_pos = self._optimal_trajectory[0] else: assert self._target in env.agents, f"Target {self._target} not found in env" target_pos = env.agents[self._target].pos if self._prev_target_pos is None: self._prev_target_pos = target_pos # Calculate the optimal trajectory if the current trajectory is None if ( self._optimal_trajectory is None or np.linalg.norm(target_pos - self._prev_target_pos) > 0.1 ): if self._use_optimal_trajectory: obstacles = [] for agent_name, agent in env.agents.items(): if agent_name != self._target: obstacles.append(tuple(env.maze.xy_to_rowcol(agent.pos))) try: self._optimal_trajectory = env.maze.compute_optimal_path( self.pos, target_pos, obstacles=obstacles ) except IndexError: # Happens if there's no path to the target get_logger().warning( f"No path to target {target_pos} from {self.pos}" ) self._optimal_trajectory = np.array([target_pos[:2]]) else: self._optimal_trajectory = np.array([target_pos[:2]]) # If the optimal trajectory is empty, then set the optimal trajectory to the # target position if len(self._optimal_trajectory) == 0: self._optimal_trajectory = np.array([target_pos[:2]]) # Get the current target. If the distance between the current position and the # target is less than the threshold, then remove the target from the optimal # trajectory target = self._optimal_trajectory[0] target_vector = target - self.pos[:2] distance = np.linalg.norm(target - self.pos[:2]) if distance < self._distance_threshold: self._optimal_trajectory = self._optimal_trajectory[1:] # Update the previous target position target_theta = np.arctan2(target_vector[1], target_vector[0]) theta_action = np.interp(target_theta, [-np.pi, np.pi], [-1, 1]) return [self._speed, theta_action]