Source code for cambrian.renderer.renderer

"""Wrapper around the mujoco viewer for rendering scenes."""

import atexit
import ctypes
from abc import ABC, abstractmethod
from copy import deepcopy
from enum import Flag, auto
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Type

import glfw
import imageio
import mujoco as mj
import numpy as np
import OpenGL.GL as GL
import torch
from hydra_config import HydraContainerConfig, HydraFlagWrapperMeta, config_wrapper

import cambrian.utils
from cambrian.renderer.overlays import MjCambrianViewerOverlay
from cambrian.utils.logger import get_logger
from cambrian.utils.spec import MjCambrianSpec

if TYPE_CHECKING:
    try:
        import mujoco.usd.exporter
    except ImportError:
        pass

has_pycuda_gl = False  # disable pycuda for now
try:
    if has_pycuda_gl:
        import pycuda.autoinit  # noqa
        import pycuda.driver as cuda
        import pycuda.gl as cudagl

    has_pycuda_gl = has_pycuda_gl
except ImportError:
    has_pycuda_gl = False

device = cambrian.utils.device
if has_pycuda_gl and torch.device(device) != torch.device("cuda"):
    get_logger().warning(
        "Not using CUDA device. Disabling PyCUDA GL interop for rendering."
    )
    has_pycuda_gl = False


[docs] class MjCambrianRendererSaveMode(Flag, metaclass=HydraFlagWrapperMeta): """The save modes for saving rendered images.""" NONE = auto() GIF = auto() MP4 = auto() PNG = auto() WEBP = auto() USD = auto()
@config_wrapper
[docs] class MjCambrianRendererConfig(HydraContainerConfig): """The config for the renderer. Used for type hinting. A renderer corresponds to a single camera. The renderer can then view the scene in different ways, like offscreen (rgb_array) or onscreen (human). Attributes: render_modes (List[str]): The render modes to use for the renderer. See `MjCambrianRenderer.metadata["render.modes"]` for options. width (Optional[int]): The width of the rendered image. For onscreen renderers, if this is set, the window cannot be resized. Must be set for offscreen renderers. height (Optional[int]): The height of the rendered image. For onscreen renderers, if this is set, the window cannot be resized. Must be set for offscreen renderers. fullscreen (Optional[bool]): Whether to render in fullscreen or not. If True, the width and height are ignored and the window is rendered in fullscreen. This is only valid for onscreen renderers. scene (Type[mj.MjvScene]): The scene to render. scene_options (mj.MjvOption): The options to use for rendering. camera (mj.MjvCamera): The camera to use for rendering. use_shared_context (bool): Whether to use a shared context or not. If True, the renderer will share a context with other renderers. This is useful for rendering multiple renderers at the same time. If False, the renderer will create its own context. This is computationally expensive if there are many renderers. save_mode (Optional[MjCambrianRendererSaveMode]): The save modes to use for saving the rendered images. See `MjCambrianRenderer.SaveMode` for options. Must be set if `save` is called without save modes passed directly. """ render_modes: List[str] width: Optional[int] = None height: Optional[int] = None fullscreen: Optional[bool] = None scene: Type[mj.MjvScene] scene_options: mj.MjvOption camera: mj.MjvCamera use_shared_context: bool save_mode: Optional[MjCambrianRendererSaveMode] = None
# =================== GL_CONTEXT: mj.gl_context.GLContext = None MJR_CONTEXT: mj.MjrContext = None CUDA_CONTEXT: "cuda.Context" = None if has_pycuda_gl: CUDA_CONTEXT = cuda.Device(0).make_context() def free_contexts(): global GL_CONTEXT, MJR_CONTEXT, CUDA_CONTEXT if GL_CONTEXT is not None: try: GL_CONTEXT.free() except Exception: pass finally: GL_CONTEXT = None if MJR_CONTEXT is not None: try: MJR_CONTEXT.free() except Exception: pass finally: MJR_CONTEXT = None if CUDA_CONTEXT is not None: try: CUDA_CONTEXT.detach() except Exception: pass finally: CUDA_CONTEXT = None # Remove the automatic freeing. Will error out when calling free # since we explicitly free the contexts in the atexit function. mj.gl_context.GLContext.__del__ = lambda _: None atexit.register(free_contexts) # ===================
[docs] class MjCambrianViewer(ABC): """The base class for the viewer. This class should not be instantiated directly. Args: config (MjCambrianRendererConfig): The config to use for the viewer. """ def __init__(self, config: MjCambrianRendererConfig): self._config = config self._spec: MjCambrianSpec = None self._viewport: mj.MjrRect = None self._scene: mj.MjvScene = None self._scene_options: mj.MjvOption = None self._camera: mj.MjvCamera = None self._gl_context: mj.gl_context.GLContext = None self._mjr_context: mj.MjrContext = None self._font = mj.mjtFontScale.mjFONTSCALE_50 self._pixel_bytes = 3 self._rgb_float32: torch.Tensor = None self._depth: torch.Tensor = None if has_pycuda_gl: self._rgb_pbo: int = None self._rgb_res: cudagl.RegisteredBuffer = None self._rgb_mapped_res: cuda.DeviceAllocation = None self._rgb_ptr: int = None self._depth_pbo: int = None self._depth_res: cudagl.RegisteredBuffer = None self._depth_mapped_res: cuda.DeviceAllocation = None self._depth_ptr: int = None def reset(self, spec: MjCambrianSpec, width: int, height: int): self._spec = spec # Only create the scene once if self._scene is None: self._scene = self._config.scene(self._spec.model) self._scene_options = deepcopy(self._config.scene_options) self._camera = deepcopy(self._config.camera) self._initialize_contexts(width, height) self._viewport = mj.MjrRect(0, 0, width, height) # Initialize the buffers if self._rgb_float32 is None or self._rgb_float32.shape != ( height, width, self._pixel_bytes, ): self._rgb_uint8 = torch.zeros( (height, width, self._pixel_bytes), dtype=torch.uint8, device=device, ) self._rgb_uint8_cpu = self._rgb_uint8.cpu() self._rgb_float32 = torch.zeros( (height, width, self._pixel_bytes), dtype=torch.float32, device=device, ) self._depth = torch.zeros( (height, width), dtype=torch.float32, device=device, ) self._depth_cpu = self._depth.cpu() if has_pycuda_gl: self._initialize_pbo() def _initialize_contexts(self, width: int, height: int): global GL_CONTEXT, MJR_CONTEXT # NOTE: All shared contexts must match either onscreen or offscreen. And their # height and width most likely must match as well. If the existing context # is onscreen and we're requesting offscreen, override use_shared_context (and # vice versa). use_shared_context = self._config.use_shared_context if use_shared_context and MJR_CONTEXT: if MJR_CONTEXT.currentBuffer != self.get_framebuffer_option(): get_logger().warning( "Overriding use_shared_context. " "First buffer and current buffer don't match." ) use_shared_context = False if use_shared_context: # Initialize or reuse the GL context GL_CONTEXT = GL_CONTEXT or mj.gl_context.GLContext(width, height) self._gl_context = GL_CONTEXT self.make_context_current() MJR_CONTEXT = MJR_CONTEXT or mj.MjrContext(self._spec.model, self._font) self._mjr_context = MJR_CONTEXT elif self._viewport is None or width != self.width or height != self.height: # If the viewport is None (i.e. this is the first reset), or the window # has been resized, create a new context. We'll need to clean up the old # context if it exists. if self._gl_context is not None: del self._gl_context if self._mjr_context is not None: del self._mjr_context # Initialize the new contexts self._gl_context = mj.gl_context.GLContext(width, height) self.make_context_current() self._mjr_context = mj.MjrContext(self._spec.model, self._font) self._mjr_context.readDepthMap = mj.mjtDepthMap.mjDEPTH_ZEROFAR mj.mjr_setBuffer(self.get_framebuffer_option(), self._mjr_context) def _initialize_pbo(self): assert has_pycuda_gl and torch.device(device) == torch.device("cuda") rgb_buffer_size = self.width * self.height * self._pixel_bytes depth_buffer_size = self.width * self.height self._rgb_pbo = GL.glGenBuffers(1) GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, int(self._rgb_pbo)) GL.glBufferData( GL.GL_PIXEL_PACK_BUFFER, rgb_buffer_size, None, GL.GL_STREAM_READ ) GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, 0) self._rgb_res = cudagl.RegisteredBuffer( int(self._rgb_pbo), cuda.graphics_map_flags.READ_ONLY ) self._rgb_mapped_res = self._rgb_res.map() self._rgb_ptr = self._rgb_mapped_res.device_ptr_and_size()[0] self._depth_pbo = GL.glGenBuffers(1) GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, int(self._depth_pbo)) GL.glBufferData( GL.GL_PIXEL_PACK_BUFFER, depth_buffer_size, None, GL.GL_STREAM_READ ) GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, 0) self._depth_res = cudagl.RegisteredBuffer( int(self._depth_pbo), cuda.graphics_map_flags.READ_ONLY ) self._depth_mapped_res = self._depth_res.map() self._depth_ptr = self._depth_mapped_res.device_ptr_and_size()[0] @abstractmethod def update(self, width: int, height: int): # Subclass should override this method such that this is not possible assert width == self._viewport.width and height == self._viewport.height mj.mjv_updateScene( self._spec.model, self._spec.data, self._scene_options, None, # mjvPerturb self._camera, mj.mjtCatBit.mjCAT_ALL, self._scene, ) def render(self, *, overlays: List[MjCambrianViewerOverlay] = []): self.make_context_current() self.update(self._viewport.width, self._viewport.height) # Reorder the overlays based on their layer type overlays = sorted(overlays, key=lambda overlay: overlay.layer.value) self._draw_before_render(overlays) mj.mjr_render(self._viewport, self._scene, self._mjr_context) self._draw_after_render(overlays) def _draw_before_render(self, overlays: List[MjCambrianViewerOverlay]): for overlay in overlays: overlay.draw_before_render(self._scene) def _draw_after_render(self, overlays: List[MjCambrianViewerOverlay]): if len(overlays) > 0: # Do a single mjr_overlay call to initialize underlying 2D rendering # If we don't do this, calling mjr_drawPixels without first calling # mjr_overlay will result in the pixels being drawn behind the 3D scene. mj.mjr_overlay( mj.mjtFont.mjFONT_NORMAL, mj.mjtGridPos.mjGRID_BOTTOMLEFT, self._viewport, "", "", self._mjr_context, ) for overlay in overlays: overlay.draw_after_render(self._mjr_context, self._viewport) def read_pixels( self, read_rgb: bool, read_depth: bool ) -> Tuple[torch.Tensor | None, torch.Tensor | None]: if has_pycuda_gl: out = self._read_pixels_cuda(read_rgb, read_depth) else: out = self._read_pixels(read_rgb, read_depth) return out def _read_pixels( self, read_rgb: bool, read_depth: bool ) -> Tuple[torch.Tensor | None, torch.Tensor | None]: mj.mjr_readPixels( self._rgb_uint8_cpu if read_rgb else None, self._depth_cpu if read_depth else None, self._viewport, self._mjr_context, ) if read_rgb: self._rgb_uint8.copy_(self._rgb_uint8_cpu, non_blocking=True) torch.divide(self._rgb_uint8, 255.0, out=self._rgb_float32) if read_depth: self._depth.copy_(self._depth_cpu, non_blocking=True) return self._rgb_float32 if read_rgb else None, ( self._depth if read_depth else None ) def _read_pixels_cuda( self, read_rgb: bool, read_depth: bool ) -> Tuple[torch.Tensor | None, torch.Tensor | None]: mask = 0 if read_depth: mask = GL.GL_COLOR_BUFFER_BIT if read_depth: mask |= GL.GL_DEPTH_BUFFER_BIT if self._spec.visual.quality.offsamples: # Multisampling GL.glBindFramebuffer(GL.GL_READ_FRAMEBUFFER, self._mjr_context.offFBO) GL.glReadBuffer(GL.GL_COLOR_ATTACHMENT0) GL.glBindFramebuffer(GL.GL_DRAW_FRAMEBUFFER, self._mjr_context.offFBO_r) GL.glDrawBuffer(GL.GL_COLOR_ATTACHMENT0) GL.glBlitFramebuffer( self._viewport.left, self._viewport.bottom, self._viewport.left + self.width, self._viewport.bottom + self.height, self._viewport.left, self._viewport.bottom, self._viewport.left + self.width, self._viewport.bottom + self.height, mask, GL.GL_NEAREST, ) GL.glBindFramebuffer(GL.GL_READ_FRAMEBUFFER, self._mjr_context.offFBO_r) GL.glReadBuffer(GL.GL_COLOR_ATTACHMENT0) else: GL.glBindFramebuffer(GL.GL_READ_FRAMEBUFFER, self._mjr_context.offFBO) GL.glReadBuffer(GL.GL_COLOR_ATTACHMENT0) if read_rgb: GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, int(self._rgb_pbo)) GL.glReadPixels( self._viewport.left, self._viewport.bottom, self.width, self.height, self._mjr_context.readPixelFormat, GL.GL_UNSIGNED_BYTE, ctypes.c_void_p(0), ) GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, 0) if read_depth: GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, int(self._depth_pbo)) GL.glReadPixels( 0, 0, self.width, self.height, GL.GL_DEPTH_COMPONENT, GL.GL_FLOAT, ctypes.c_void_p(0), ) GL.glBindBuffer(GL.GL_PIXEL_PACK_BUFFER, 0) GL.glBindFramebuffer(GL.GL_FRAMEBUFFER, self._mjr_context.offFBO) GL.glReadBuffer(GL.GL_COLOR_ATTACHMENT0) GL.glDrawBuffer(GL.GL_COLOR_ATTACHMENT0) CUDA_CONTEXT.push() if read_rgb: cuda.memcpy_dtod( int(self._rgb_uint8.data_ptr()), self._rgb_ptr, self.width * self.height * self._pixel_bytes, ) torch.divide(self._rgb_uint8, 255.0, out=self._rgb_float32) if read_depth: cuda.memcpy_dtod( int(self._depth.data_ptr()), self._depth_ptr, self.width * self.height ) CUDA_CONTEXT.pop() return self._rgb_float32 if read_rgb else None, ( self._depth if read_depth else None ) @abstractmethod def make_context_current(self): pass @abstractmethod def get_framebuffer_option(self) -> int: pass @abstractmethod def is_running(self): pass # =================== @property def width(self) -> int: return self._viewport.width @width.setter def width(self, width: int): self._viewport.width = width @property def height(self) -> int: return self._viewport.height @height.setter def height(self, height: int): self._viewport.height = height @property def camera(self) -> mj.MjvCamera: return self._camera @property def scene_options(self) -> mj.MjvOption: return self._scene_options @property def config(self) -> MjCambrianRendererConfig: return self._config
[docs] class MjCambrianOffscreenViewer(MjCambrianViewer): """The offscreen viewer for rendering scenes.""" def get_framebuffer_option(self) -> int: return mj.mjtFramebuffer.mjFB_OFFSCREEN.value def update(self, width: int, height: int): if self._viewport.width != width or self._viewport.height != height: self.make_context_current() self._viewport = mj.MjrRect(0, 0, width, height) mj.mjr_resizeOffscreen(width, height, self._mjr_context) super().update(width, height) def make_context_current(self): assert ( self._gl_context is not None ), "GL context is not initialized, did you call reset?" self._gl_context.make_current() def is_running(self): return True
[docs] class MjCambrianOnscreenViewer(MjCambrianViewer): """The onscreen viewer for rendering scenes.""" def __init__(self, config: MjCambrianRendererConfig): super().__init__(config) self._window = None self.default_window_pos: Tuple[int, int] = None self._scale: float = None self._last_mouse_x: int = None self._last_mouse_y: int = None self._is_paused: bool = None self.custom_key_callback: Callable = None def reset(self, spec: MjCambrianSpec, width: int, height: int): self._last_mouse_x: int = 0 self._last_mouse_y: int = 0 self._is_paused: bool = False if self._window is None: self._initialize_window(width, height) glfw.set_window_size(self._window, width, height) self.fullscreen(self._config.fullscreen if self._config.fullscreen else False) super().reset(spec, width, height) window_width, _ = glfw.get_window_size(self._window) self._scale = width / window_width glfw.set_cursor_pos_callback(self._window, self._cursor_pos_callback) glfw.set_mouse_button_callback(self._window, self._mouse_button_callback) glfw.set_scroll_callback(self._window, self._scroll_callback) glfw.set_key_callback(self._window, self._key_callback) glfw.swap_interval(0) def _initialize_window(self, width: int, height: int): global GL_CONTEXT, MJR_CONTEXT if not glfw.init(): raise Exception("GLFW failed to initialize.") gl_context = None if self._config.use_shared_context: from mujoco.glfw import GLContext as GLFWGLContext GLFWGLContext.__del__ = lambda _: None GL_CONTEXT = GL_CONTEXT or GLFWGLContext(width, height) assert isinstance(GL_CONTEXT, GLFWGLContext), ( f"The mujoco gl context must be of type {GLFWGLContext} to use " f"the OnscreenViewer, but got {type(GL_CONTEXT)} instead. " "Set the env variable `MUJOCO_GL` to `glfw` to use the correct context." ) gl_context = GL_CONTEXT._context self._window = glfw.create_window(width, height, "MjCambrian", None, gl_context) if not self._window: glfw.terminate() raise Exception("GLFW failed to create window.") glfw.show_window(self._window) self.default_window_pos = glfw.get_window_pos(self._window) def make_context_current(self): glfw.make_context_current(self._window) super().make_context_current() def get_framebuffer_option(self) -> int: return mj.mjtFramebuffer.mjFB_WINDOW.value def update(self, width: int, height: int): if self._viewport.width != width or self._viewport.height != height: self.make_context_current() self._viewport = mj.MjrRect(0, 0, width, height) GL.glViewport(0, 0, width, height) super().update(width, height) def render(self, *, overlays: List[MjCambrianViewerOverlay] = []): if self._window is None: get_logger().warning("Tried to render destroyed window.") return elif glfw.window_should_close(self._window): get_logger().warning("Tried to render closed or closing window.") return while True: self.make_context_current() width, height = glfw.get_framebuffer_size(self._window) self._viewport = mj.MjrRect(0, 0, width, height) super().render(overlays=overlays) glfw.swap_buffers(self._window) glfw.poll_events() if not self._is_paused: break def is_running(self): return not (self._window is None or glfw.window_should_close(self._window)) # =================== def fullscreen(self, fullscreen: bool): if self._window is None: get_logger().warning("Tried to set fullscreen to destroyed window.") return if fullscreen: monitor = glfw.get_primary_monitor() video_mode = glfw.get_video_mode(monitor) glfw.set_window_monitor( self._window, monitor, 0, 0, video_mode.size.width, video_mode.size.height, video_mode.refresh_rate, ) # =================== def _cursor_pos_callback(self, window, xpos, ypos): left_button_pressed = glfw.get_mouse_button(window, glfw.MOUSE_BUTTON_LEFT) right_button_pressed = glfw.get_mouse_button(window, glfw.MOUSE_BUTTON_RIGHT) if not (left_button_pressed or right_button_pressed): return shift_pressed = ( glfw.get_key(window, glfw.KEY_LEFT_SHIFT) == glfw.PRESS or glfw.get_key(window, glfw.KEY_RIGHT_SHIFT) == glfw.PRESS ) if right_button_pressed: MOVE_H, MOVE_V = mj.mjtMouse.mjMOUSE_MOVE_H, mj.mjtMouse.mjMOUSE_MOVE_V action = MOVE_H if shift_pressed else MOVE_V elif left_button_pressed: ROT_H, ROT_V = mj.mjtMouse.mjMOUSE_ROTATE_H, mj.mjtMouse.mjMOUSE_ROTATE_V action = ROT_H if shift_pressed else ROT_V else: action = mj.mjtMouse.mjMOUSE_ZOOM dx = int(self._scale * xpos) - self._last_mouse_x dy = int(self._scale * ypos) - self._last_mouse_y width, height = glfw.get_framebuffer_size(window) reldx, reldy = dx / width, dy / height mj.mjv_moveCamera( self._spec.model, action, reldx, reldy, self._scene, self._camera ) self._last_mouse_x = int(self._scale * xpos) self._last_mouse_y = int(self._scale * ypos) def _mouse_button_callback(self, window, button, action, mods): x, y = glfw.get_cursor_pos(window) self._last_mouse_x = int(self._scale * x) self._last_mouse_y = int(self._scale * y) def _scroll_callback(self, window, xoffset, yoffset): mj.mjv_moveCamera( self._spec.model, mj.mjtMouse.mjMOUSE_ZOOM, 0, -0.05 * yoffset, self._scene, self._camera, ) def _key_callback(self, window, key, scancode, action, mods): if action != glfw.RELEASE: return # Close window. if key == glfw.KEY_ESCAPE or key == glfw.KEY_Q: glfw.set_window_should_close(window, True) self._is_paused = False # unpause so the window can close # Switch cameras if key == glfw.KEY_TAB: self._camera.fixedcamid += 1 self._camera.type = mj.mjtCamera.mjCAMERA_FIXED if self._camera.fixedcamid >= self._spec.model.ncam: self._camera.fixedcamid = -1 self._camera.type = mj.mjtCamera.mjCAMERA_FREE # Pause simulation if key == glfw.KEY_SPACE: self._is_paused = not self._is_paused # Screenshot if key == glfw.KEY_S: rgb, _ = self._read_pixels(read_rgb=True, read_depth=False) rgb = (rgb * 255.0).to(torch.uint8).cpu().numpy() rgb = np.flipud(rgb) imageio.imwrite("screenshot.png", rgb) get_logger().info("Saved screenshot at screenshot.png.") # Custom key callback if self.custom_key_callback is not None: self.custom_key_callback(window, key, scancode, action, mods)
[docs] class MjCambrianRenderer: """The renderer for rendering scenes. This is essentially a wrapper around the mujoco viewer/renderer. Args: config (MjCambrianRendererConfig): The config to use for the renderer. Attributes: metadata (Dict[str, List[str]]): The metadata for the renderer. The render modes are stored here. """ metadata: Dict[str, List[str]] = { "render.modes": ["human", "rgb_array", "depth_array"] } def __init__(self, config: MjCambrianRendererConfig): self._config = config self._spec: MjCambrianSpec = None assert all( mode in self.metadata["render.modes"] for mode in self._config.render_modes ), f"Invalid render mode found. Valid modes are {self.metadata['render.modes']}" self._viewer: MjCambrianViewer = None if "human" in self._config.render_modes: self._viewer = MjCambrianOnscreenViewer(self._config) else: self._viewer = MjCambrianOffscreenViewer(self._config) self._record: bool = False self._rgb_buffer: List[torch.Tensor] = [] self._usd_exporter: Optional["mujoco.usd.exporter.USDExporter"] = None self._should_render: bool = any( m in self._config.render_modes for m in ["rgb_array", "depth_array"] ) self._return_rgb: bool = "rgb_array" in self._config.render_modes self._return_depth: bool = "depth_array" in self._config.render_modes def reset( self, spec: MjCambrianSpec, width: Optional[int] = None, height: Optional[int] = None, ) -> torch.Tensor | None: self._spec = spec width = width or self._config.width or spec.model.vis.global_.offwidth height = height or self._config.height or spec.model.vis.global_.offheight if width > spec.model.vis.global_.offwidth: spec.model.vis.global_.offwidth = width if height > spec.model.vis.global_.offheight: spec.model.vis.global_.offheight = height self._viewer.reset(spec, width, height) return self.render(resetting=True) def render( self, *, overlays: List[MjCambrianViewerOverlay] = [], resetting: bool = False ) -> torch.Tensor | Tuple[torch.Tensor, torch.Tensor] | None: self._viewer.render(overlays=overlays) if not self._should_render: return rgb, depth = self._viewer.read_pixels( read_rgb=self._return_rgb, read_depth=self._return_depth ) if self._record and not resetting: rgb_to_record = rgb.clone() if rgb.device == "cpu" else rgb.cpu().clone() self._rgb_buffer.append(rgb_to_record) returns = [] if self._return_rgb: returns.append(rgb) if self._return_depth: returns.append(depth) if self._usd_exporter: self._usd_exporter.update_scene(self._spec.data, self._viewer.scene_options) return returns if len(returns) > 1 else returns[0] def is_running(self): return self._viewer.is_running() # =================== def save( self, path: Path | str, *, save_mode: Optional[MjCambrianRendererSaveMode] = None, fps: int = 50, ): save_mode = save_mode or self._config.save_mode duration = 1000 / fps if not self._record: get_logger().warning( "Not recording. Check if you called record() " "or `rgb_array` is in render_modes. Ignoring save..." ) return assert len(self._rgb_buffer) > 0, "Cannot save empty buffer." get_logger().info(f"Saving visualizations at {path}...") path = Path(path) rgb_buffer = ( (torch.stack(self._rgb_buffer) * 255.0).to(torch.uint8).cpu().numpy() ) # Mujoco uses OpenGL, which uses bottom-left origin, so flip the buffer # since most of python uses top-left origin. rgb_buffer = np.flip(rgb_buffer, axis=1) if save_mode & MjCambrianRendererSaveMode.MP4: try: mp4 = path.with_suffix(".mp4") imageio.mimwrite(mp4, rgb_buffer, fps=fps) except TypeError: get_logger().error( "imageio is not compiled with ffmpeg. " "You may need to install it with `pip install imageio[ffmpeg]`." ) get_logger().debug(f"Saved visualization at {mp4}") if save_mode & MjCambrianRendererSaveMode.PNG: png = path.with_suffix(".png") idx = -2 if len(rgb_buffer) > 1 else -1 imageio.imwrite(png, rgb_buffer[idx]) get_logger().debug(f"Saved visualization at {png}") if save_mode & MjCambrianRendererSaveMode.GIF: gif = path.with_suffix(".gif") imageio.mimwrite(gif, rgb_buffer, loop=0, duration=duration) get_logger().debug(f"Saved visualization at {gif}") if save_mode & MjCambrianRendererSaveMode.WEBP: webp = path.with_suffix(".webp") imageio.mimwrite(webp, rgb_buffer, fps=fps, lossless=True) get_logger().debug(f"Saved visualization at {webp}") if save_mode & MjCambrianRendererSaveMode.USD or self._usd_exporter: assert self._usd_exporter, "USD exporter not initialized." self._usd_exporter.save_scene("usd") get_logger().debug(f"Saved visualization at {path}") def record( self, record: bool = True, *, path: Optional[Path] = None, save_mode: Optional[MjCambrianRendererSaveMode] = None, ): get_logger().info(f"{'Starting' if record else 'Stopping'} recording...") if record and self._record: get_logger().warning("Already recording. Ignoring...") return elif not record and not self._record: get_logger().warning("Not recording. Ignoring...") return elif record and "rgb_array" not in self._config.render_modes: render_modes = list(self._config.render_modes) get_logger().warning( f"Cannot record without rgb_array mode: {render_modes}. Ignoring..." ) return save_mode = save_mode or self._config.save_mode if record and MjCambrianRendererSaveMode.USD & save_mode: try: import mujoco.usd.exporter except ImportError: get_logger().error( "MuJoCo wasn't installed with USD support. You may need to " "run `pip install mujoco[usd]`." ) return camera_names = [ self._spec.get_camera_name(i) for i in range(self._spec.model.ncam) ] self._usd_exporter = mujoco.usd.exporter.USDExporter( self._spec.model, self.height, self.width, self._config.scene.maxgeom, output_directory_root=path, output_directory="usd", camera_names=camera_names, # save all cameras verbose=False, ) elif not record: self._rgb_buffer.clear() self._usd_exporter = None self._record = record # =================== @property def config(self) -> MjCambrianRendererConfig: return self._config @property def viewer(self) -> MjCambrianViewer: return self._viewer @property def width(self) -> int: return self._viewer.width @property def height(self) -> int: return self._viewer.height @property def ratio(self) -> float: return self.width / self.height