Source code for tetris_gymnasium.envs.tetris

"""Tetris environment for Gymnasium."""
import copy
from dataclasses import dataclass, fields
from typing import Any, List

import cv2
import gymnasium as gym
import numpy as np
from gymnasium.core import ActType, RenderFrame
from gymnasium.spaces import Box, Discrete

from tetris_gymnasium.components.tetromino import Pixel, Tetromino
from tetris_gymnasium.components.tetromino_holder import TetrominoHolder
from tetris_gymnasium.components.tetromino_queue import TetrominoQueue
from tetris_gymnasium.components.tetromino_randomizer import BagRandomizer, Randomizer
from tetris_gymnasium.mappings.actions import ActionsMapping
from tetris_gymnasium.mappings.rewards import RewardsMapping


@dataclass
class TetrisState:
    """State of the Tetris environment."""

    board: np.ndarray
    active_tetromino: Tetromino
    x: int
    y: int
    queue: TetrominoQueue
    holder: TetrominoHolder
    randomizer: Randomizer
    has_swapped: bool
    game_over: bool
    score: int


[docs] class Tetris(gym.Env): """Tetris environment for Gymnasium.""" metadata = { "render_modes": ["human", "rgb_array", "ansi"], "render_fps": 1, "render_human": True, } BASE_PIXELS = [Pixel(0, [0, 0, 0]), Pixel(1, [128, 128, 128])] # Empty # Bedrock TETROMINOES = [ Tetromino( 0, [0, 240, 240], np.array( [[0, 0, 0, 0], [1, 1, 1, 1], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=np.uint8 ), ), # I Tetromino(1, [240, 240, 0], np.array([[1, 1], [1, 1]], dtype=np.uint8)), # O Tetromino( 2, [160, 0, 240], np.array([[0, 1, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), ), # T Tetromino( 3, [0, 240, 0], np.array([[0, 1, 1], [1, 1, 0], [0, 0, 0]], dtype=np.uint8) ), # S Tetromino( 4, [240, 0, 0], np.array([[1, 1, 0], [0, 1, 1], [0, 0, 0]], dtype=np.uint8) ), # Z Tetromino( 5, [0, 0, 240], np.array([[1, 0, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8) ), # J Tetromino( 6, [240, 160, 0], np.array([[0, 0, 1], [1, 1, 1], [0, 0, 0]], dtype=np.uint8), ), # L ] def __init__( self, render_mode=None, width=10, height=20, gravity=True, actions_mapping=ActionsMapping(), rewards_mapping=RewardsMapping(), queue: TetrominoQueue = None, holder: TetrominoHolder = None, randomizer: Randomizer = None, base_pixels=None, tetrominoes=None, render_upscale: int = 10, ): """Creates a new Tetris environment. Args: render_mode: The mode to use for rendering. If None, no rendering will be done. width: The width of the board. height: The height of the board. gravity: Whether gravity is enabled in the game.. actions_mapping: The mapping for the actions that the agent can take. rewards_mapping: The mapping for the rewards that the agent can receive. queue: The :class:`TetrominoQueue` to use for holding tetrominoes temporarily. holder: The :class:`TetrominoHolder` to use for storing tetrominoes. randomizer: The :class:`Randomizer` to use for selecting tetrominoes base_pixels: A list of base (non-Tetromino) :class:`Pixel` to use for the environment (e.g. empty, bedrock). tetrominoes: A list of :class:`Tetromino` to use in the environment. render_upscale: The factor to upscale the rendered board by. """ # Dimensions self.game_over = False self.height: int = height self.width: int = width # Base Pixels if base_pixels is None: self.base_pixels = copy.deepcopy(self.BASE_PIXELS) # Tetrominoes if tetrominoes is None: tetrominoes = copy.deepcopy(self.TETROMINOES) self.tetrominoes = tetrominoes self.tetrominoes: List[Tetromino] = self.offset_tetromino_id( self.tetrominoes, len(self.base_pixels) ) self.active_tetromino: Tetromino = None # Pixels self.pixels: List[Pixel] = self.parse_pixels(self.tetrominoes) # Padding self.padding: int = max(max(t.matrix.shape) for t in self.tetrominoes) self.width_padded: int = self.width + 2 * self.padding self.height_padded: int = self.height + self.padding # Board self.board = self.create_board() # Game engine # Reason for this kind of initialization: https://stackoverflow.com/q/41686829 if randomizer is None: self.randomizer = BagRandomizer(len(self.tetrominoes)) if queue is None: self.queue = TetrominoQueue(self.randomizer) if holder is None: self.holder = TetrominoHolder() self.has_swapped = False self.gravity_enabled = gravity # Position self.x: int = 0 self.y: int = 0 # Gymnasium self.observation_space = gym.spaces.Dict( { "board": Box( low=0, high=len(self.pixels), shape=(self.height_padded, self.width_padded), dtype=np.uint8, ), "active_tetromino_mask": Box( low=0, high=1, shape=(self.height_padded, self.width_padded), dtype=np.uint8, ), "holder": Box( low=0, high=len(self.pixels), shape=( self.padding, self.padding * self.holder.size, ), dtype=np.uint8, ), "queue": gym.spaces.Box( low=0, high=len(self.pixels), shape=( self.padding, self.padding * self.queue.size, ), dtype=np.uint8, ), } ) # Mappings for rewards & actions (readability in code) self.actions = actions_mapping self.rewards = rewards_mapping self.action_space = Discrete(len(fields(self.actions))) self.reward_range = ( min(vars(self.rewards).values()), max(vars(self.rewards).values()), ) assert render_mode is None or render_mode in self.metadata["render_modes"] self.render_mode = render_mode self.render_scaling_factor = render_upscale self.window_name = None def step(self, action: ActType) -> "tuple[dict, float, bool, bool, dict]": """Perform one step of the environment's dynamics. Args: action: The action to be executed. Returns: observation: The observation of the current board as np array. reward: Amount of reward returned after previous action. done: Whether the episode has ended, in which case further step() calls will return undefined results. info: Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ assert self.action_space.contains( action ), f"{action!r} ({type(action)}) invalid" truncated = False # Tetris without levels will never truncate reward = 0 lines_cleared = 0 if action == self.actions.move_left: if not self.collision(self.active_tetromino, self.x - 1, self.y): self.x -= 1 elif action == self.actions.move_right: if not self.collision(self.active_tetromino, self.x + 1, self.y): self.x += 1 elif action == self.actions.move_down: if not self.collision(self.active_tetromino, self.x, self.y + 1): self.y += 1 elif action == self.actions.rotate_clockwise: if not self.collision( self.rotate(self.active_tetromino, True), self.x, self.y ): self.active_tetromino = self.rotate(self.active_tetromino, True) elif action == self.actions.rotate_counterclockwise: if not self.collision( self.rotate(self.active_tetromino, False), self.x, self.y ): self.active_tetromino = self.rotate(self.active_tetromino, False) elif action == self.actions.swap: if not self.has_swapped: # Swap the active tetromino with the one in the holder (saves orientation) self.active_tetromino = self.holder.swap(self.active_tetromino) self.has_swapped = True if self.active_tetromino is None: # If the holder is empty, spawn the next tetromino # No need for collision check, as the holder is only empty at the start self.spawn_tetromino() else: self.reset_tetromino_position() elif action == self.actions.hard_drop: reward, self.game_over, lines_cleared = self.commit_active_tetromino() elif action == self.actions.no_op: pass # Gravity if self.gravity_enabled and action != self.actions.hard_drop: if not self.collision(self.active_tetromino, self.x, self.y + 1): self.y += 1 else: # If there's no more room to move, lock in the tetromino reward, self.game_over, lines_cleared = self.commit_active_tetromino() return ( self._get_obs(), reward, self.game_over, truncated, {"lines_cleared": lines_cleared}, ) def reset( self, *, seed: "int | None" = None, options: "dict[str, Any] | None" = None ) -> "tuple[dict[str, Any], dict[str, Any]]": """Resets the state of the environment. As with all Gymnasium environments, the reset method is called once at the beginning of an episode. Args: seed: The random seed to use for the reset. options: A dictionary of options to use for the reset. Returns: The initial observation of the space. """ super().reset(seed=seed, options=options) # Initialize fresh board self.board = self.create_board() self.game_over = False # Reset the randomizer self.queue.reset(seed=seed) # Get the next tetromino and spawn it self.active_tetromino = self.tetrominoes[self.queue.get_next_tetromino()] self.reset_tetromino_position() # Holder self.holder.reset() self.has_swapped = False # Render self.window_name = None return self._get_obs(), self._get_info() def get_rgb(self, observation): """Observation wrapper that displays all observations (board, holder, queue) as one single RGB Image. The observation contains the board on the left, the queue on the top right and the holder on the bottom right. """ # Board board_obs = observation["board"] # Holder holder_obs = observation["holder"] # Queue queue_obs = observation["queue"] max_size = holder_obs.shape[0] max_len = max(holder_obs.shape[1], queue_obs.shape[1]) # make holder and queue same length by adding optional padding holder_obs = np.hstack( (holder_obs, np.ones((max_size, max_len - holder_obs.shape[1]))) ) queue_obs = np.hstack( (queue_obs, np.ones((max_size, max_len - queue_obs.shape[1]))) ) # add vertical padding between the board and the holder/queue v_padding = np.ones((board_obs.shape[0] - 2 * max_size, max_len)) cnn_extra = np.vstack((queue_obs, v_padding, holder_obs)) stack = np.hstack((board_obs, cnn_extra)).astype(np.integer) # Convert to RGB rgb = np.zeros((stack.shape[0], stack.shape[1], 3)) colors = np.array(list(p.color_rgb for p in self.pixels), dtype=np.uint8) rgb[...] = colors[stack] return rgb.astype(np.uint8) def render(self) -> "RenderFrame | list[RenderFrame] | None": """Renders the environment in various formats. This render function is different from the default as it uses the values from :func:`observation` to render the environment. """ if self.render_mode == "ansi": # Render active tetromino (because it's not on self.board) projection = self.project_tetromino() # Crop padding away as we don't want to render it projection = self.crop_padding(projection) # Convert to string char_field = np.where(projection == 0, ".", projection.astype(str)) field_str = "\n".join("".join(row) for row in char_field) return field_str matrix = self.get_rgb(self._get_obs()) if self.render_mode == "human" or self.render_mode == "rgb_array": # Upscale the matrix for better visualization kernel = np.ones( (self.render_scaling_factor, self.render_scaling_factor, 1), dtype=np.uint8, ) matrix = np.kron(matrix, kernel) if self.render_mode == "rgb_array": return matrix if self.render_mode == "human": if self.window_name is None: self.window_name = "Tetris Gymnasium" cv2.namedWindow(self.window_name, cv2.WINDOW_GUI_NORMAL) h, w = ( matrix.shape[0], matrix.shape[1], ) cv2.resizeWindow(self.window_name, w, h) cv2.imshow( self.window_name, cv2.cvtColor(matrix, cv2.COLOR_RGB2BGR), ) return None def spawn_tetromino(self) -> bool: """Spawns a new tetromino at the top of the board and checks for collision. Returns True if the tetromino can be successfully spawned, False otherwise. """ self.active_tetromino = self.tetrominoes[self.queue.get_next_tetromino()] self.reset_tetromino_position() return not self.collision(self.active_tetromino, self.x, self.y) def place_active_tetromino(self): """Locks the active tetromino in place on the board.""" self.board = self.project_tetromino() self.active_tetromino = None def collision(self, tetromino: Tetromino, x: int, y: int) -> bool: """Check if the tetromino collides with the board at the given position. A collision is detected if the tetromino overlaps with any non-zero cell on the board. These non-zero cells represent the padding / bedrock (value 1) or other tetrominoes (values >=2). Args: tetromino: The tetromino to check for collision. x: The x position of the tetromino to check collision for. y: The y position of the tetromino to check collision for. Returns: True if the tetromino collides with the board at the given position, False otherwise. """ # Extract the part of the board that the tetromino would occupy. slices = self.get_tetromino_slices(tetromino, x, y) board_subsection = self.board[slices] # Check collision using numpy element-wise operations. return np.any(board_subsection[tetromino.matrix > 0] > 0) def rotate(self, tetromino: Tetromino, clockwise=True) -> Tetromino: """Rotate a tetromino by 90 degrees. Args: tetromino: The tetromino to rotate. clockwise: Whether to rotate the tetromino clockwise or counterclockwise. Returns: The rotated tetromino. """ return Tetromino( tetromino.id, tetromino.color_rgb, np.rot90(tetromino.matrix, k=(1 if clockwise else -1)), ) def drop_active_tetromino(self): """Drop the active tetromino to the lowest possible position on the board.""" while not self.collision(self.active_tetromino, self.x, self.y + 1): self.y += 1 def commit_active_tetromino(self): """Commit the active tetromino to the board. After locking in the tetromino, the game checks if any rows are filled and clears them. Finally, it spawns the next tetromino. Returns The reward for the current step and whether the game is over. """ # 1. Drop the tetromino and lock it in place lines_cleared = 0 if self.collision(self.active_tetromino, self.x, self.y): reward = self.rewards.game_over self.game_over = True else: self.drop_active_tetromino() self.place_active_tetromino() self.board, lines_cleared = self.clear_filled_rows(self.board) reward = self.score(lines_cleared) # 2. Spawn the next tetromino and check if the game continues self.game_over = not self.spawn_tetromino() reward += self.rewards.alife if self.game_over: reward = self.rewards.game_over # 3. Reset the swap flag (agent can swap once per tetromino) self.has_swapped = False return reward, self.game_over, lines_cleared def clear_filled_rows(self, board) -> "tuple(np.ndarray, int)": """Clear any filled rows on the board. The clearing is performed using numpy by indexing only the rows that are not filled and concatenating them with a new top part of the board that contains zeros. With this implementation, the clearing operation is efficient and does not require loops. Returns: The number of rows that were cleared. """ # A row is filled if it doesn't contain any free space (0) and doesn't contain any bedrock / padding (1). filled_rows = (~(board == 0).any(axis=1)) & (~(board == 1).all(axis=1)) n_filled = np.sum(filled_rows) if n_filled > 0: # Identify the rows that are not filled. unfilled_rows = board[~filled_rows] # Create a new top part of the board with free space (0) to compensate for the cleared rows. free_space = np.zeros((n_filled, self.width), dtype=np.uint8) free_space = np.pad( free_space, ((0, 0), (self.padding, self.padding)), mode="constant", constant_values=1, ) # Concatenate the new top with the unfilled rows to form the updated board. board[:] = np.concatenate((free_space, unfilled_rows), axis=0) return board, n_filled def crop_padding(self, matrix: np.ndarray) -> np.ndarray: """Crop the padding from the given matrix. The Tetris board has padding on all sides except the top to simplify collision detection. This method crops the padding from the given matrix to return the actual board, which is useful for rendering. Returns The matrix with the padding cropped. """ return matrix[0 : -self.padding, self.padding : -self.padding] def get_tetromino_slices( self, tetromino: Tetromino, x: int, y: int ) -> "tuple(slice, slice)": """Get the slices of the active tetromino on the board. Returns: The slices of the active tetromino on the board. """ tetromino_height, tetromino_width = tetromino.matrix.shape return tuple((slice(y, y + tetromino_height), slice(x, x + tetromino_width))) def reset_tetromino_position(self) -> None: """Reset the x and y position of the active tetromino to the center of the board.""" self.x, self.y = ( self.width_padded // 2 - self.active_tetromino.matrix.shape[0] // 2, 0, ) def project_tetromino( self, tetromino: Tetromino = None, x: int = None, y: int = None ) -> np.ndarray: """Project the active tetromino on the board. By default, the active (moving) tetromino is not part of the board. This function projects the active tetromino on the board to render it. """ if tetromino is None: tetromino = self.active_tetromino if x is None: x = self.x if y is None: y = self.y projection = self.board.copy() if self.collision(tetromino, x, y): return projection slices = self.get_tetromino_slices(tetromino, x, y) projection[slices] += tetromino.matrix return projection def _get_obs(self) -> "dict[str, Any]": """Return the current board as an observation.""" # Include the active tetromino on the board for the observation. board_obs = self.project_tetromino() # Create a mask where the active tetromino is active_tetromino_slices = self.get_tetromino_slices( self.active_tetromino, self.x, self.y ) active_tetromino_mask = np.zeros_like(board_obs) active_tetromino_mask[active_tetromino_slices] = 1 # Holder max_size = self.padding holder_tetrominoes = self.holder.get_tetrominoes() if len(holder_tetrominoes) > 0: # Pad all tetrominoes to be the same size for index, t in enumerate(holder_tetrominoes): holder_tetrominoes[index] = np.pad( t.matrix, ( (0, max_size - t.matrix.shape[0]), (0, max_size - t.matrix.shape[1]), ), ) # Concatenate all tetrominoes horizontally holder_obs = np.hstack(holder_tetrominoes) else: holder_obs = np.ones((max_size, max_size * self.holder.size)) # Queue queue_tetrominoes = self.queue.get_queue() for index, t_id in enumerate(queue_tetrominoes): # Pad all tetrominoes to be the same size t = copy.deepcopy(self.tetrominoes[t_id]) t.matrix = np.pad( t.matrix, ((0, max_size - t.matrix.shape[0]), (0, max_size - t.matrix.shape[1])), ) # Safe padded result back to the array queue_tetrominoes[index] = t.matrix # Concatenate all tetrominoes horizontally queue_obs = np.hstack(queue_tetrominoes) return { "board": board_obs.astype(np.uint8), "active_tetromino_mask": active_tetromino_mask.astype(np.uint8), "holder": holder_obs.astype(np.uint8), "queue": queue_obs.astype(np.uint8), } def _get_info(self) -> dict: """Return the current game state as info.""" return {"lines_cleared": 0} def score(self, rows_cleared) -> int: """Calculate the score based on the number of lines cleared. Args: rows_cleared: The number of lines cleared in the last step. Returns The score for the given number of lines cleared. """ return (rows_cleared**2) * self.width def create_board(self) -> np.ndarray: """Create a new board with the given dimensions.""" board = np.zeros((self.height, self.width), dtype=np.uint8) board = np.pad( board, ((0, self.padding), (self.padding, self.padding)), mode="constant", constant_values=1, ) return board def parse_pixels(self, tetrominoes: "List[Tetromino]") -> "List[Pixel]": """Creates a list of pixels from the base pixels and the tetrominoes. Pixels are used to represent the board and the tetrominoes in the environment. Args: tetrominoes: The tetrominoes to add to the base pixels. Returns: The list of pixels for the environment. """ return self.base_pixels + [ Pixel(t.id + len(self.base_pixels), t.color_rgb) for t in tetrominoes ] def offset_tetromino_id( self, tetrominoes: "List[Tetromino]", offset: int ) -> "List[Tetromino]": """In order to make the tetominos distinguishable, each tetromino should have a unique value. The tetrominoes already possess a unique ID, but the matrix should also be updated to reflect this. Additionally, the tetrominoes should be offset by a certain value to avoid conflicts with the board. The board already contains a number of pixels which are not part of the tetrominoes (empty cells, bedrock). So, the tetrominoes should be offset by the number of pixels in the board that are not tetrominoes. Args: tetrominoes: The tetrominoes to preprocess. offset: The offset to apply to the tetrominoes. This is usually the number of non-tetromino pixels in the board. Returns: The preprocessed tetrominoes (= id and matrix values offset by number of non-tetromino pixels). """ for i in range(len(tetrominoes)): tetrominoes[i].id += offset tetrominoes[i].matrix = tetrominoes[i].matrix * (i + offset) return tetrominoes def set_state(self, state: TetrisState) -> None: """Restore the state of the environment. Should be used instead of deepcopy for performance.""" self.board = state.board self.active_tetromino = state.active_tetromino self.x = state.x self.y = state.y self.queue = state.queue self.holder = state.holder self.randomizer = state.randomizer self.has_swapped = state.has_swapped self.game_over = state.game_over self.score = state.score def get_state(self) -> TetrisState: """Clone the current state of the environment. Should be used instead of deepcopy for performance.""" randomizer = copy.copy(self.randomizer) return TetrisState( board=self.board.copy(), active_tetromino=copy.copy(self.active_tetromino), x=self.x, y=self.y, queue=self.queue.copy(randomizer), holder=copy.copy(self.holder), randomizer=randomizer, has_swapped=self.has_swapped, game_over=self.game_over, score=self.score, )