Source code for tetris_gymnasium.wrappers.observation

"""Observation wrapper module for the Tetris Gymnasium environment."""
import cv2
import gymnasium as gym
import numpy as np
from gymnasium.core import RenderFrame
from gymnasium.spaces import Box

from tetris_gymnasium.envs import Tetris


[docs] class RgbObservation(gym.ObservationWrapper): """Observation wrapper that displays all observations (board, holder, queue) as one single RGB Image. The observation contains the board on the left, the queue on the top right and the holder on the bottom right. The size of the matrix depends on how many tetrominoes can be stored in the queue / holder. """ def __init__(self, env: Tetris): """Initialize the RgbObservation wrapper. Args: env (Tetris): The environment """ super().__init__(env) self.observation_space = Box( low=0, high=len(env.unwrapped.tetrominoes), shape=( env.unwrapped.height_padded, env.unwrapped.width_padded + max(env.unwrapped.holder.size, env.unwrapped.queue.size) * env.unwrapped.padding, 3, ), dtype=np.uint8, ) def observation(self, observation): """Observation wrapper that displays all observations (board, holder, queue) as one single RGB Image. The observation contains the board on the left, the queue on the top right and the holder on the bottom right. """ # Board board_obs = observation["board"] # Holder holder_obs = observation["holder"] # Queue queue_obs = observation["queue"] max_size = holder_obs.shape[0] max_len = max(holder_obs.shape[1], queue_obs.shape[1]) # make holder and queue same length by adding optional padding holder_obs = np.hstack( (holder_obs, np.ones((max_size, max_len - holder_obs.shape[1]))) ) queue_obs = np.hstack( (queue_obs, np.ones((max_size, max_len - queue_obs.shape[1]))) ) # add vertical padding between the board and the holder/queue v_padding = np.ones((board_obs.shape[0] - 2 * max_size, max_len)) cnn_extra = np.vstack((queue_obs, v_padding, holder_obs)) stack = np.hstack((board_obs, cnn_extra)).astype(np.uint8) # Convert to RGB rgb = np.zeros((stack.shape[0], stack.shape[1], 3)) colors = np.array( list(p.color_rgb for p in self.env.unwrapped.pixels), dtype=np.uint8 ) rgb[...] = colors[stack] return rgb.astype(np.uint8) def render(self) -> "RenderFrame | list[RenderFrame] | None": """Renders the environment in various formats. This render function is different from the default as it uses the values from :func:`observation` to render the environment. """ matrix = self.observation(self.env.unwrapped._get_obs()) kernel = np.ones( (self.render_scaling_factor, self.render_scaling_factor, 1), dtype=np.uint8, ) matrix = np.kron(matrix, kernel) if self.render_mode == "human" or self.render_mode == "rgb_array": if self.render_mode == "rgb_array": return matrix if self.render_mode == "human": if self.env.unwrapped.window_name is None: self.env.unwrapped.window_name = "Tetris Gymnasium" cv2.namedWindow( self.env.unwrapped.window_name, cv2.WINDOW_GUI_NORMAL ) assert self.observation_space.shape is not None h, w = ( self.observation_space.shape[0], self.observation_space.shape[1], ) cv2.resizeWindow(self.env.unwrapped.window_name, w * 10, h * 10) cv2.imshow( self.env.unwrapped.window_name, cv2.cvtColor(matrix, cv2.COLOR_RGB2BGR), ) cv2.waitKey(1) return None
[docs] class FeatureVectorObservation(gym.ObservationWrapper): """Observation wrapper that returns a feature vector as observation. **State representation** A feature vector can contain different features of the board, such as the height of the stack or the number of holes. In the literature, this is often referred to as a state representation and many different features can be used. A discussion about the state representation can be found in "Reinforcement learning (RL) is a paradigm within machine learning that has been applied to Tetris, demonstrating the effect of state representation on performance (Hendriks)." **Features** For this wrapper, the features from https://github.com/uvipen/Tetris-deep-Q-learning-pytorch have been adapted. These features are: - The height of the stack in each column (list: int for each column) - The maximum height of the stack (int) - The number of holes in the stack (int) - The bumpiness of the stack (int) More features can be added in the future or by introducing new wrappers. """ def __init__( self, env: Tetris, report_height=True, report_max_height=True, report_holes=True, report_bumpiness=True, ): """Initialize the FeatureVectorObservation wrapper. Args: env (Tetris): The environment. report_height (bool, optional): Report the height of the stack in each column. Defaults to True. report_max_height (bool, optional): Report the maximum height of the stack. Defaults to True. report_holes (bool, optional): Report the number of holes in the stack. Defaults to True. report_bumpiness (bool, optional): Report the bumpiness of the stack. Defaults to True. """ super().__init__(env) self.observation_space = Box( low=0, high=len(env.unwrapped.tetrominoes), shape=( ( (env.unwrapped.width if report_height else 0) + (1 if report_max_height else 0) + (1 if report_holes else 0) + (1 if report_bumpiness else 0), ) ), dtype=np.uint8, ) self.report_height = report_height self.report_max_height = report_max_height self.report_holes = report_holes self.report_bumpiness = report_bumpiness def calc_height(self, board): """Calculate the height of the board. Args: board (np.ndarray): The board. Returns: np.ndarray: The height of the stack in each column. """ # Find the lowest non-zero element in each column heights = board.shape[0] - np.argmax( board != 0, axis=0 ) # measure top to bottom to avoid holes heights = np.where( np.all(board == 0, axis=0), 0, heights ) # empty columns should be 0 (not 20) return heights def calc_max_height(self, board): """Calculate the maximum height of the board. Args: board (np.ndarray): The board. Returns: int: The maximum height of the board. """ # Find the maximum height across all columns return np.max(self.calc_height(board)) def calc_bumpiness(self, board): """Calculate the bumpiness of the board. Bumpiness is the sum of the absolute differences between adjacent column heights. Args: board (np.ndarray): The board. Returns: int: The bumpiness of the board. """ heights = self.calc_height(board) # Calculate differences between adjacent heights and sum their absolute values return np.sum(np.abs(np.diff(heights))) def calc_holes(self, board): """Calculate the number of holes in the stack. Args: board (np.ndarray): The board. Returns: int: The number of holes in the stack. """ # Create a mask of non-zero elements filled = board != 0 # Calculate cumulative sum of filled cells from top to bottom cumsum = np.cumsum(filled, axis=0) # Count cells that are empty but have filled cells above them return np.sum((board == 0) & (cumsum > 0)) def observation(self, observation): """Observation wrapper that returns the feature vector as the observation. Args: observation (dict): The observation from the base environment. Returns: np.ndarray: The feature vector. """ # Board board_obs = observation["board"] active_tetromino_mask = observation["active_tetromino_mask"] # mask out the active tetromino board_obs[active_tetromino_mask] = 0 # crop the board to remove padding board_obs = board_obs[ 0 : -self.env.unwrapped.padding, self.env.unwrapped.padding : -self.env.unwrapped.padding, ] features = [] if self.report_height or self.report_max_height: height_vector = self.calc_height(board_obs) if self.report_height: features += list(height_vector) if self.report_max_height: max_height = np.max(height_vector) features.append(max_height) if self.report_holes: holes = self.calc_holes(board_obs) features.append(holes) if self.report_bumpiness: bumpiness = self.calc_bumpiness(board_obs) features.append(bumpiness) features = np.array(features, dtype=np.uint8) return features