API Reference

craftium.make_dungeon_env(ascii_map=None, mapgen_kwargs=dict(), wall_height=5, return_map_str=False, **kwargs)

Utility function to instantiate procedurally generated dungeon environments (see Craftium/ProcDungeons-v0).

Parameters:

Name Type Description Default
ascii_map Optional[str]

ASCII representation of the map. This is optional, and if not given, a random map will be generated using RandomMapGen.

None
mapgen_kwargs dict[str, Any]

Arguments for RandomMapGen. The arguments provided via this parameter will overwrite the default values of RandomMapGen. This argument is only used if ascii_map is None.

dict()
wall_height int

The height of the walls. This argument is only used if ascii_map is None.

5
return_map_str bool

If set to True the function returs the generated map as second return value.

False
**kwargs

Extra arguments to provide to gymnasium.make. If the minetest_conf argument is set, it's values will overwrite the default parameters of the environment.

{}
Source code in craftium/__init__.py
def make_dungeon_env(
        ascii_map: Optional[str] = None,
        mapgen_kwargs: dict[str, Any] = dict(),
        wall_height: int = 5,
        return_map_str: bool = False,
        **kwargs
):
    """Utility function to instantiate procedurally generated dungeon environments (see [`Craftium/ProcDungeons-v0`](https://craftium.readthedocs.io/en/latest/environments/#procedural-environment-generation)).

    :param ascii_map: ASCII representation of the map. This is optional, and if not given, a random map will be generated using `RandomMapGen`.
    :param mapgen_kwargs: Arguments for  `RandomMapGen`. The arguments provided via this parameter will overwrite the default values of `RandomMapGen`. This argument is only used if `ascii_map` is `None`.
    :param wall_height: The height of the walls. This argument is only used if `ascii_map` is `None`.
    :param return_map_str: If set to `True` the function returs the generated map as second return value.
    :param **kwargs: Extra arguments to provide to `gymnasium.make`. If the `minetest_conf` argument is set, it's values will overwrite the default parameters of the environment.
    """
    import gymnasium as gym
    from .extra.random_map_generator import RandomMapGen

    # generate the map
    if ascii_map is None:
        mapgen = RandomMapGen(**mapgen_kwargs)
        ascii_map = mapgen.rasterize(wall_height=wall_height)

    minetest_conf = DEFAULT_PROCDUNGEONS_CONF
    if "minetest_conf" in kwargs:
        for key, value in kwargs["minetest_conf"].items():
            minetest_conf[key] = value

    minetest_conf["ascii_map"] = ascii_map.replace("\n", "\\n")
    kwargs["minetest_conf"] = minetest_conf

    env = gym.make("Craftium/ProcDungeons-v0", **kwargs)

    if return_map_str:
        return env, ascii_map
    return env

craftium.craftium_env.CraftiumEnv

Bases: Env

The main class implementing Gymnasium's Env API.

Parameters:

Name Type Description Default
env_dir os.PathLike

Directory of the environment to load (should contain worlds and games directories).

required
obs_width int

The width of the observation image in pixels.

640
obs_height int

The height of the observation image in pixels.

360
enable_voxel_obs bool

Whether to enable voxel observations. Can only be enabled if _voxel_obs_available is True. The voxel observation is a 3D grid of dimensions (2voxel_obs_rx+1, 2voxel_obs_ry+1, 2*voxel_obs_rz+1, 3). The last dimension contains the voxel node ID, the light data, and the param2 data for each voxel.

False
voxel_obs_rx int

The radius of the voxel observation in the x-axis (North).

20
voxel_obs_ry int

The radius of the voxel observation in the y-axis (Up).

10
voxel_obs_rz int

The radius of the voxel observation in the z-axis (East).

20
init_frames int

The number of frames to wait for Minetest to load.

15
render_mode Optional[str]

Render mode ("human" or "rgb_array"), see Env.render.

None
max_timesteps Optional[int]

Maximum number of timesteps until episode termination. Disabled if set to None.

None
run_dir Optional[os.PathLike]

Path to save the artifacts created by the run. Will be automatically generated if not provided.

None
run_dir_prefix Optional[os.PathLike]

Prefix path to add to the automatically generated run_dir. This value is only used if run_dir is None.

None
game_id str

The name of the game to load. Defaults to the "original" minetest game.

'minetest'
world_name str

The name of the world to load. Defaults to "world".

'world'
minetest_dir Optional[str]

Path to the craftium's minetest build directory. If not given, defaults to the directory where craftium is installed. This option is intended for debugging purposes.

None
minetest_conf dict[str, Any]

Extra configuration options added to the default minetest.conf file generated by craftium. Setting options here will overwrite default values. Check mintest.conf.example for all available configuration options.

dict()
pipe_proc bool

If True, the minetest process stderr and stdout will be piped into two files inside the run's directory. Otherwise, the minetest process will not be piped and its output will be shown in the terminal. This option is disabled by default to reduce verbosity, but can be useful for debugging.

True
mt_listen_timeout int

Number of milliseconds to wait for MT to connect to the TCP channel. If the timeout is reached a Timeout exception is raised. WARNING: When using multiple (serial) MT environments, timeout can be easily reached for the last environment. In this case, you might want to increase the value of this parameter according to the number of environments.

60000
mt_port Optional[int]

TCP port to employ for MT's internal client<->server communication. If not provided a random port in the [49152, 65535] range is used.

None
frameskip int

The number of frames skipped between steps, 1 by default (disabled). Note that max_timesteps and init_frames parameters will be divided by the frameskip value.

1
rgb_observations bool

Whether to use RGB images or gray scale images as observations. Note that RGB images are slower to send from MT to python via TCP. By default RGB images are used.

True
gray_scale_keepdim bool

If True, a singleton dimension will be added, i.e. observations are of the shape WxHx1. Otherwise, they are of shape WxH.

False
seed Optional[int]

Random seed. Affects minetest's map generation and Lua's RNG (in mods).

None
sync_mode bool

If set to true, minetest's internal client and server steps are synchronized. This is useful for training models slower than realtime.

False
pmul int

Physics multiplier. As craftium agent's take actions by frame, default movement speeds make the agent move slowly. When set to > 1, minetest's movement velocity and acceleration increase helping the agent to move at acceptable relative speeds.

20
soft_reset bool

If set to true, resets will have to be handled by the Lua mod and minetest won't be killed and rerun every call to restart. IMPORTANT: Only set this flag to True in environments that support this feature.

False
offscreen_sdl bool

Whether to use the offscreen SDL driver or not (true by default).

True
gpu_id Optional[int]

If a GPU id was passed, set SDL_HINT_EGL_DEVICE to render the environment using that GPU.

None
human_screeen_size

Size (width, height) of the render screen when render_mode is set to "human".

required
_minetest_conf dict[str, Any]

The default minetest configuration provided during environment registration.

dict()
_voxel_obs_available bool

This flag indicates environments that support voxel observations during registration (do not manually override).

False
Source code in craftium/craftium_env.py
class CraftiumEnv(Env):
    """The main class implementing Gymnasium's [Env](https://gymnasium.farama.org/api/env/) API.

    :param env_dir: Directory of the environment to load (should contain `worlds` and `games` directories).
    :param obs_width: The width of the observation image in pixels.
    :param obs_height: The height of the observation image in pixels.
    :param enable_voxel_obs: Whether to enable voxel observations. Can only be enabled if _voxel_obs_available is True. The voxel observation is a 3D grid of dimensions (2*voxel_obs_rx+1, 2*voxel_obs_ry+1, 2*voxel_obs_rz+1, 3). The last dimension contains the voxel node ID, the light data, and the param2 data for each voxel.
    :param voxel_obs_rx: The radius of the voxel observation in the x-axis (North).
    :param voxel_obs_ry: The radius of the voxel observation in the y-axis (Up).
    :param voxel_obs_rz: The radius of the voxel observation in the z-axis (East).
    :param init_frames: The number of frames to wait for Minetest to load.
    :param render_mode: Render mode ("human" or "rgb_array"), see [Env.render](https://gymnasium.farama.org/api/env/#gymnasium.Env.render).
    :param max_timesteps: Maximum number of timesteps until episode termination. Disabled if set to `None`.
    :param run_dir: Path to save the artifacts created by the run. Will be automatically generated if not provided.
    :param run_dir_prefix: Prefix path to add to the automatically generated `run_dir`. This value is only used if `run_dir` is `None`.
    :param game_id: The name of the game to load. Defaults to the "original" minetest game.
    :param world_name: The name of the world to load. Defaults to "world".
    :param minetest_dir: Path to the craftium's minetest build directory. If not given, defaults to the directory where craftium is installed. This option is intended for debugging purposes.
    :param minetest_conf: Extra configuration options added to the default minetest.conf file generated by craftium. Setting options here will overwrite default values. Check [mintest.conf.example](https://github.com/minetest/minetest/blob/master/minetest.conf.example) for all available configuration options.
    :param pipe_proc: If `True`, the minetest process stderr and stdout will be piped into two files inside the run's directory. Otherwise, the minetest process will not be piped and its output will be shown in the terminal. This option is disabled by default to reduce verbosity, but can be useful for debugging.
    :param mt_listen_timeout: Number of milliseconds to wait for MT to connect to the TCP channel. If the timeout is reached a Timeout exception is raised. **WARNING:** When using multiple (serial) MT environments, timeout can be easily reached for the last environment. In this case, you might want to increase the value of this parameter according to the number of environments.
    :param mt_port: TCP port to employ for MT's internal client<->server communication. If not provided a random port in the [49152, 65535] range is used.
    :param frameskip: The number of frames skipped between steps, 1 by default (disabled). Note that `max_timesteps` and `init_frames` parameters will be divided by the frameskip value.
    :param rgb_observations: Whether to use RGB images or gray scale images as observations. Note that RGB images are slower to send from MT to python via TCP. By default RGB images are used.
    :param gray_scale_keepdim: If `True`, a singleton dimension will be added, i.e. observations are of the shape WxHx1. Otherwise, they are of shape WxH.
    :param seed: Random seed. Affects minetest's map generation and Lua's RNG (in mods).
    :param sync_mode: If set to true, minetest's internal client and server steps are synchronized. This is useful for training models slower than realtime.
    :param pmul: Physics multiplier. As craftium agent's take actions by frame, default movement speeds make the agent move slowly. When set to > 1, minetest's movement velocity and acceleration increase helping the agent to move at acceptable relative speeds.
    :param soft_reset: If set to true, resets will have to be handled by the Lua mod and minetest won't be killed and rerun every call to restart. **IMPORTANT:** Only set this flag to `True` in environments that support this feature.
    :param offscreen_sdl: Whether to use the `offscreen` SDL driver or not (true by default).
    :param gpu_id: If a GPU id was passed, set `SDL_HINT_EGL_DEVICE` to render the environment using that GPU.
    :param human_screeen_size: Size (width, height) of the render screen when `render_mode` is set to `"human"`.
    :param _minetest_conf: The default minetest configuration provided during environment registration.
    :param _voxel_obs_available: This flag indicates environments that support voxel observations during registration (do not manually override).
    """
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30, "voxel_observations_enabled": False}

    def __init__(
            self,
            env_dir: os.PathLike,
            obs_width: int = 640,
            obs_height: int = 360,
            enable_voxel_obs: bool = False,
            voxel_obs_rx: int = 20,
            voxel_obs_ry: int = 10,
            voxel_obs_rz: int = 20,
            init_frames: int = 15,
            render_mode: Optional[str] = None,
            max_timesteps: Optional[int] = None,
            run_dir: Optional[os.PathLike] = None,
            run_dir_prefix: Optional[os.PathLike] = None,
            game_id: str = "minetest",
            world_name: str = "world",
            minetest_dir: Optional[str] = None,
            minetest_conf: dict[str, Any] = dict(),
            pipe_proc: bool = True,
            mt_listen_timeout: int = 60_000,
            mt_port: Optional[int] = None,
            frameskip: int = 1,
            rgb_observations: bool = True,
            gray_scale_keepdim: bool = False,
            seed: Optional[int] = None,
            sync_mode: bool = False,
            fps_max: int = 200,
            pmul: int = 20,
            soft_reset: bool = False,
            offscreen_sdl: bool = True,
            gpu_id: Optional[int] = None,
            human_screen_size: tuple[int, int] = (720, 720),
            _minetest_conf: dict[str, Any] = dict(),
            _voxel_obs_available: bool = False,
    ):
        super(CraftiumEnv, self).__init__()

        if enable_voxel_obs:
            if _voxel_obs_available:
                self.metadata["voxel_observations_enabled"] = True
            else:
                raise ValueError("Voxel observations are not supported for this environment. Set `enable_voxel_obs` to `False` "
                                 "or use a different environment.")

        _minetest_conf.update(minetest_conf)

        #  out auto enlargement of fov for aspect ratios smaller than 16/10
        aspect_ratio = obs_width / obs_height
        if aspect_ratio < 16/10 and 'fov' in _minetest_conf:
            _minetest_conf['fov'] = _minetest_conf['fov'] / np.clip(np.sqrt((16/10)/aspect_ratio), 1.0, 1.4)

        self.obs_width = obs_width
        self.obs_height = obs_height
        self.init_frames = init_frames // frameskip
        self.max_timesteps = max_timesteps
        self.gray_scale_keepdim = gray_scale_keepdim
        self.rgb_observations = rgb_observations
        self.soft_reset = soft_reset

        # define the action space
        action_dict = {}
        for act in ACTION_ORDER[:-1]:  # all actions except the last ("mouse")
            action_dict[act] = Discrete(2)  # 1/0: key pressed/not pressed
        # define the mouse action
        action_dict[ACTION_ORDER[-1]
                    ] = Box(low=-1, high=1, shape=(2,), dtype=np.float32)
        self.action_space = Dict(action_dict)

        # define the observation space
        shape = [obs_width, obs_height]
        if rgb_observations:
            shape.append(3)
        elif gray_scale_keepdim:
            shape.append(1)

        self.observation_space = Box(
            low=0, high=255, shape=shape, dtype=np.uint8)

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        # initialize the Python<->Minetest communication channel (server side)
        self.mt_chann = MtChannel(
            img_width=self.obs_width,
            img_height=self.obs_height,
            voxel_obs=enable_voxel_obs,
            voxel_obs_rx=voxel_obs_rx,
            voxel_obs_ry=voxel_obs_ry,
            voxel_obs_rz=voxel_obs_rz,
            listen_timeout=mt_listen_timeout,
            rgb_imgs=rgb_observations,
        )

        # handles the MT configuration and process
        self.mt = Minetest(
            world_name=world_name,
            run_dir=run_dir,
            run_dir_prefix=run_dir_prefix,
            headless=offscreen_sdl,
            gpu_id=gpu_id,
            seed=seed,
            game_id=game_id,
            sync_dir=env_dir,
            screen_w=obs_width,
            screen_h=obs_height,
            voxel_obs=enable_voxel_obs,
            voxel_obs_rx=voxel_obs_rx,
            voxel_obs_ry=voxel_obs_ry,
            voxel_obs_rz=voxel_obs_rz,
            minetest_dir=minetest_dir,
            tcp_port=self.mt_chann.port,
            minetest_conf=_minetest_conf,
            pipe_proc=pipe_proc,
            mt_port=mt_port,
            frameskip=frameskip,
            rgb_frames=rgb_observations,
            sync_mode=sync_mode,
            fps_max=fps_max,
            pmul=pmul,
        )

        # set up the pygame screen if `render_mode` is set to "human"
        self.pyg_closed = False
        if render_mode == "human":
            pygame.init()
            self.pyg_screen = pygame.display.set_mode(human_screen_size)
            self.pyg_clock = pygame.time.Clock()
            self.pyg_screen_size = human_screen_size
            pygame.display.set_caption("Craftium")

        self.last_observation = None  # used in render if "rgb_array"
        self.timesteps = 0  # the timesteps counter

    def _get_info(self):
        return dict()

    def get_mt_config(self):
        return deepcopy(self.mt.config)

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        """Resets the environment to an initial internal state, returning an initial observation and info.

        See [Env.reset](https://gymnasium.farama.org/api/env/#gymnasium.Env.reset) in the Gymnasium docs.

        :param seed: The random seed.
        :param options: Options dictionary.
        """
        super().reset(seed=seed)
        self.timesteps = 0

        if not self.soft_reset or not self.mt_chann.is_open():
            if self.mt_chann.is_open():
                self.mt_chann.send_kill()
                self.mt_chann.close_conn()
                self.mt.close_pipes()
                self.mt.wait_close()

            if options is not None:
                self.mt.overwrite_config(options["minetest_conf"])
            if seed is not None:
                self.mt.overwrite_config({"fixed_map_seed": seed})

            # start the new MT process
            self.mt.start_process()

            # open communication channel with minetest
            try:
                self.mt_chann.open_conn()
            except Exception as e:
                print(
                    "\n\x1b[1m[!] Error connecting to Minetest. Minetest probably failed to launch.")
                print(
                    "  => Run's scratch directory should be available, containing stderr.txt and")
                print("     stdout.txt useful for checking what went wrong.")
                print(
                    "** Content of stderr.txt in the run's sratch directory:\x1b[0m")
                print("~"*45, "\n")
                with open(f"{self.mt.run_dir}/stderr.txt", "r") as f:
                    print(f.read())
                print("~"*45)
                print(
                    "\x1b[1mRaising catched exception (in case it's useful):\x1b[0m")
                print("~"*45, "\n")
                raise e

            # HACK skip some frames to let the game initialize
            # TODO This "waiting" should be implemented in Minetest not in python
            for _ in range(self.init_frames):
                _observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_chann.receive()
                self.mt_chann.send([0]*21, 0, 0)  # nop action
        else:
            self.mt_chann.send_soft_reset()

        observation, voxobs, pos, vel, pitch, yaw, dtime, _reward, _term = self.mt_chann.receive()
        if not self.gray_scale_keepdim and not self.rgb_observations:
            observation = observation[:, :, 0]

        self.last_observation = observation

        info = self._get_info()
        info["voxel_obs"] = voxobs
        info["player_pos"] = pos
        info["player_vel"] = vel
        info["player_pitch"] = pitch
        info["player_yaw"] = yaw
        info["mt_dtime"] = dtime

        return observation, info

    def step(self, action):
        """Run one timestep of the environment’s dynamics using the agent actions.

        See [Env.step](https://gymnasium.farama.org/api/env/#gymnasium.Env.step) in the Gymnasium docs.

        :param action: An action provided by the agent.
        """
        self.timesteps += 1

        # render the previous observation if needed
        if self.render_mode == "human" and not self.pyg_closed:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    self.pyg_closed = True

            if not self.pyg_closed:
                # handle different observation types converting to RGB if needed
                if not self.rgb_observations and not self.gray_scale_keepdim:
                    pyg_obs = self.last_observation[:, :, None].repeat(3, 2)
                elif not self.rgb_observations:
                    pyg_obs = self.last_observation.repeat(3, 2)
                else:
                    pyg_obs = self.last_observation

                # render the last observation in the pygame window
                surface = pygame.image.frombuffer(
                    pyg_obs.tobytes(),
                    (self.obs_width, self.obs_height),
                    "RGB"
                )
                surface = pygame.transform.scale(surface, self.pyg_screen_size)
                self.pyg_screen.blit(surface, (0, 0))

                pygame.display.flip()
                self.pyg_clock.tick(self.metadata["render_fps"])  # limits FPS

        # convert the action dict to a format to be sent to MT through mt_chann
        keys = [0]*21  # all commands (keys) except the mouse
        mouse_x, mouse_y = 0, 0
        for k, v in action.items():
            if k == "mouse":
                x, y = v[0], -v[1]
                mouse_x = int(x*(self.obs_width // 2))
                mouse_y = int(y*(self.obs_height // 2))
            else:
                keys[ACTION_ORDER.index(k)] = v
        # send the action to MT
        self.mt_chann.send(keys, mouse_x, mouse_y)

        # receive the new info from minetest
        observation, voxobs, pos, vel, pitch, yaw, dtime, reward, termination = self.mt_chann.receive()
        if not self.gray_scale_keepdim and not self.rgb_observations:
            observation = observation[:, :, 0]

        self.last_observation = observation

        info = self._get_info()
        info["voxel_obs"] = voxobs
        info["player_pos"] = pos
        info["player_vel"] = vel
        info["player_pitch"] = pitch
        info["player_yaw"] = yaw
        info["mt_dtime"] = dtime

        truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps

        return observation, reward, termination, truncated, info

    def render(self):
        if self.render_mode == "rgb_array":
            return self.last_observation

    def close(self, clear: bool = True):
        """
        Closes the environment and removes temporary files.

        :param clear: Whether to remove the MT working
        directory or not.
        """
        if self.mt_chann.is_open():
            self.mt_chann.send_kill()
            self.mt_chann.close()
            self.mt.close_pipes()
            self.mt.wait_close()

        if clear:
            self.mt.clear()

        if self.render_mode == "human":
            pygame.quit()

close(clear=True)

Closes the environment and removes temporary files.

Parameters:

Name Type Description Default
clear bool

Whether to remove the MT working directory or not.

True
Source code in craftium/craftium_env.py
def close(self, clear: bool = True):
    """
    Closes the environment and removes temporary files.

    :param clear: Whether to remove the MT working
    directory or not.
    """
    if self.mt_chann.is_open():
        self.mt_chann.send_kill()
        self.mt_chann.close()
        self.mt.close_pipes()
        self.mt.wait_close()

    if clear:
        self.mt.clear()

    if self.render_mode == "human":
        pygame.quit()

reset(*, seed=None, options=None)

Resets the environment to an initial internal state, returning an initial observation and info.

See Env.reset in the Gymnasium docs.

Parameters:

Name Type Description Default
seed Optional[int]

The random seed.

None
options Optional[dict]

Options dictionary.

None
Source code in craftium/craftium_env.py
def reset(
    self,
    *,
    seed: Optional[int] = None,
    options: Optional[dict] = None,
):
    """Resets the environment to an initial internal state, returning an initial observation and info.

    See [Env.reset](https://gymnasium.farama.org/api/env/#gymnasium.Env.reset) in the Gymnasium docs.

    :param seed: The random seed.
    :param options: Options dictionary.
    """
    super().reset(seed=seed)
    self.timesteps = 0

    if not self.soft_reset or not self.mt_chann.is_open():
        if self.mt_chann.is_open():
            self.mt_chann.send_kill()
            self.mt_chann.close_conn()
            self.mt.close_pipes()
            self.mt.wait_close()

        if options is not None:
            self.mt.overwrite_config(options["minetest_conf"])
        if seed is not None:
            self.mt.overwrite_config({"fixed_map_seed": seed})

        # start the new MT process
        self.mt.start_process()

        # open communication channel with minetest
        try:
            self.mt_chann.open_conn()
        except Exception as e:
            print(
                "\n\x1b[1m[!] Error connecting to Minetest. Minetest probably failed to launch.")
            print(
                "  => Run's scratch directory should be available, containing stderr.txt and")
            print("     stdout.txt useful for checking what went wrong.")
            print(
                "** Content of stderr.txt in the run's sratch directory:\x1b[0m")
            print("~"*45, "\n")
            with open(f"{self.mt.run_dir}/stderr.txt", "r") as f:
                print(f.read())
            print("~"*45)
            print(
                "\x1b[1mRaising catched exception (in case it's useful):\x1b[0m")
            print("~"*45, "\n")
            raise e

        # HACK skip some frames to let the game initialize
        # TODO This "waiting" should be implemented in Minetest not in python
        for _ in range(self.init_frames):
            _observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_chann.receive()
            self.mt_chann.send([0]*21, 0, 0)  # nop action
    else:
        self.mt_chann.send_soft_reset()

    observation, voxobs, pos, vel, pitch, yaw, dtime, _reward, _term = self.mt_chann.receive()
    if not self.gray_scale_keepdim and not self.rgb_observations:
        observation = observation[:, :, 0]

    self.last_observation = observation

    info = self._get_info()
    info["voxel_obs"] = voxobs
    info["player_pos"] = pos
    info["player_vel"] = vel
    info["player_pitch"] = pitch
    info["player_yaw"] = yaw
    info["mt_dtime"] = dtime

    return observation, info

step(action)

Run one timestep of the environment’s dynamics using the agent actions.

See Env.step in the Gymnasium docs.

Parameters:

Name Type Description Default
action

An action provided by the agent.

required
Source code in craftium/craftium_env.py
def step(self, action):
    """Run one timestep of the environment’s dynamics using the agent actions.

    See [Env.step](https://gymnasium.farama.org/api/env/#gymnasium.Env.step) in the Gymnasium docs.

    :param action: An action provided by the agent.
    """
    self.timesteps += 1

    # render the previous observation if needed
    if self.render_mode == "human" and not self.pyg_closed:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.pyg_closed = True

        if not self.pyg_closed:
            # handle different observation types converting to RGB if needed
            if not self.rgb_observations and not self.gray_scale_keepdim:
                pyg_obs = self.last_observation[:, :, None].repeat(3, 2)
            elif not self.rgb_observations:
                pyg_obs = self.last_observation.repeat(3, 2)
            else:
                pyg_obs = self.last_observation

            # render the last observation in the pygame window
            surface = pygame.image.frombuffer(
                pyg_obs.tobytes(),
                (self.obs_width, self.obs_height),
                "RGB"
            )
            surface = pygame.transform.scale(surface, self.pyg_screen_size)
            self.pyg_screen.blit(surface, (0, 0))

            pygame.display.flip()
            self.pyg_clock.tick(self.metadata["render_fps"])  # limits FPS

    # convert the action dict to a format to be sent to MT through mt_chann
    keys = [0]*21  # all commands (keys) except the mouse
    mouse_x, mouse_y = 0, 0
    for k, v in action.items():
        if k == "mouse":
            x, y = v[0], -v[1]
            mouse_x = int(x*(self.obs_width // 2))
            mouse_y = int(y*(self.obs_height // 2))
        else:
            keys[ACTION_ORDER.index(k)] = v
    # send the action to MT
    self.mt_chann.send(keys, mouse_x, mouse_y)

    # receive the new info from minetest
    observation, voxobs, pos, vel, pitch, yaw, dtime, reward, termination = self.mt_chann.receive()
    if not self.gray_scale_keepdim and not self.rgb_observations:
        observation = observation[:, :, 0]

    self.last_observation = observation

    info = self._get_info()
    info["voxel_obs"] = voxobs
    info["player_pos"] = pos
    info["player_vel"] = vel
    info["player_pitch"] = pitch
    info["player_yaw"] = yaw
    info["mt_dtime"] = dtime

    truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps

    return observation, reward, termination, truncated, info

craftium.multiagent_env.MarlCraftiumEnv

The main class implementing the multi-agent version of Craftium environments.

Check CraftiumEnv for parameters' documentation.

Source code in craftium/multiagent_env.py
class MarlCraftiumEnv():
    """
    The main class implementing the multi-agent version of Craftium environments. 

    Check `CraftiumEnv` for parameters' documentation.
    """
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}

    def __init__(
            self,
            num_agents: int,
            env_dir: os.PathLike,
            obs_width: int = 640,
            obs_height: int = 360,
            init_frames: int = 200,
            render_mode: Optional[str] = None,
            max_timesteps: Optional[int] = None,
            run_dir_prefix: Optional[os.PathLike] = None,
            game_id: str = "minetest",
            world_name: str = "world",
            minetest_dir: Optional[str] = None,
            # tcp_port: Optional[int] = None,
            mt_server_conf: dict[str, Any] = dict(),
            mt_clients_conf: dict[str, Any] = dict(),
            pipe_proc: bool = True,
            mt_listen_timeout: int = 60_000,
            mt_server_port: Optional[int] = None,
            frameskip: int = 1,
            rgb_observations: bool = True,
            gray_scale_keepdim: bool = False,
            seed: Optional[int] = None,
            sync_mode: bool = False,
            fps_max: int = 200,
            pmul: int = 20,
    ):
        assert num_agents > 1, "Number of agents lower than 2. Use CraftiumEnv for single agent environments."
        self.num_agents = num_agents
        self.obs_width = obs_width
        self.obs_height = obs_height
        self.init_frames = init_frames // frameskip
        self.max_timesteps = max_timesteps
        self.gray_scale_keepdim = gray_scale_keepdim
        self.rgb_observations = rgb_observations

        # define the action space
        action_dict = {}
        for act in ACTION_ORDER[:-1]:  # all actions except the last ("mouse")
            action_dict[act] = Discrete(2)  # 1/0: key pressed/not pressed
        # define the mouse action
        action_dict[ACTION_ORDER[-1]
                    ] = Box(low=-1, high=1, shape=(2,), dtype=np.float32)
        self.action_space = Dict(action_dict)

        # define the observation space
        shape = [num_agents, obs_width, obs_height]
        if rgb_observations:
            shape.append(3)
        elif gray_scale_keepdim:
            shape.append(1)

        self.observation_space = Box(
            low=0, high=255, shape=shape, dtype=np.uint8)

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        # crate a communication channel between python and MT (server) for each of the agents
        self.mt_channs = []
        for _ in range(num_agents):
            chann = MtChannel(
                img_width=self.obs_width,
                img_height=self.obs_height,
                listen_timeout=mt_listen_timeout,
                rgb_imgs=rgb_observations,
            )
            self.mt_channs.append(chann)

        # create a MT instance that will be configured as the server
        self.mt_server = MTServerOnly(
            run_dir_prefix=run_dir_prefix,
            seed=seed,
            game_id=game_id,
            world_name=world_name,
            sync_dir=env_dir,
            minetest_dir=minetest_dir,
            minetest_conf=mt_server_conf,
            pipe_proc=pipe_proc,
            mt_server_port=mt_server_port,
            sync_mode=sync_mode,
            fps_max=fps_max,
            pmul=pmul,
        )

        # create a MT (client) instance for each agent
        self.mt_clients = []
        for i in range(num_agents):
            client = MTClientOnly(
                tcp_port=self.mt_channs[i].port,
                client_name=f"agent{i}",
                mt_server_port=self.mt_server.server_port,
                run_dir_prefix=run_dir_prefix,
                headless=i == 0 and render_mode != "human",
                seed=seed,
                sync_dir=env_dir,
                screen_w=obs_width,
                screen_h=obs_height,
                minetest_dir=minetest_dir,
                minetest_conf=mt_clients_conf,
                pipe_proc=pipe_proc,
                frameskip=frameskip,
                rgb_frames=rgb_observations,
                sync_mode=sync_mode,
                fps_max=fps_max,
                pmul=pmul,
            )
            self.mt_clients.append(client)

        # used in render if "rgb_array"
        self.last_observations = [None]*num_agents
        self.timesteps = 0  # the timesteps counter
        self.current_agent_id = 0

    def _get_info(self):
        return dict()

    def reset(self, **kwargs):
        """Resets the environment."""

        self.timesteps = 0

        observations = []

        # intialize the MT processes (server and clients)
        # NOTE: only done the first time reset is called
        if self.mt_server.proc is None:
            self.mt_server.start_process()
            # HACK wait for the server to initialize before launching the clients
            print(
                "* Waiting for MT server to initialize. This is only required in the first call to reset")
            # TODO Use a an argument of the class instead of a constant
            time.sleep(5)

            for i in range(self.num_agents):
                # start the new MT (client) process
                self.mt_clients[i].start_process()
                # re-open the connection between python and the MT client
                self.mt_channs[i].open_conn()

                # HACK skip some frames to let the game initialize
                # TODO This "waiting" should be implemented in Minetest not in python
                for _ in range(self.init_frames):
                    _observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_channs[i].receive()
                    self.mt_channs[i].send([0]*21, 0, 0)  # nop action

                # receive the new info from minetest
                observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
                if not self.gray_scale_keepdim and not self.rgb_observations:
                    observation = observation[:, :, 0]
                observations.append(observation)
                self.last_observations[i] = observation

        else:  # soft reset
            for i in range(self.num_agents):
                # send a soft reset to the MT client
                self.mt_channs[i].send_soft_reset()
                # receive a new observation from minetest
                observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
                if not self.gray_scale_keepdim and not self.rgb_observations:
                    observation = observation[:, :, 0]
                observations.append(observation)
                self.last_observations[i] = observation

        infos = self._get_info()

        # stack the observations of each agent
        observations = np.vstack([np.expand_dims(obs, 0)
                                 for obs in observations])

        return observations, infos

    def step_agent(self, action):
        """
        Runs a single step of the currently selected agent (`env.current_agent_id`).

        :param action
        """
        self.timesteps += 1

        if self.current_agent_id == self.num_agents:
            self.current_agent_id = 0
        agent_id = self.current_agent_id
        self.current_agent_id += 1

        # convert the action dict to a format to be sent to MT through mt_chann
        keys = [0]*21  # all commands (keys) except the mouse
        mouse_x, mouse_y = 0, 0
        for k, v in action.items():
            if k == "mouse":
                x, y = v[0], -v[1]
                mouse_x = int(x*(self.obs_width // 2))
                mouse_y = int(y*(self.obs_height // 2))
            else:
                keys[ACTION_ORDER.index(k)] = v

        # send the action to MT
        self.mt_channs[agent_id].send(keys, mouse_x, mouse_y)

        # receive the new info from minetest
        observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, termination = self.mt_channs[agent_id].receive()
        if not self.gray_scale_keepdim and not self.rgb_observations:
            observation = observation[:, :, 0]

        self.last_observations[agent_id] = observation
        info = self._get_info()

        truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps

        return observation, reward, termination, truncated, info

    def step(self, actions):
        """Runs an environment step per agent.

        :param actions
        """
        assert len(
            actions) == self.num_agents, f"The number of actions ({len(actions)}) must match with the number of agents ({self.num_agents})"

        observations, rewards, terminations, truncations = [], [], [], []
        infos = dict()
        for agent_id in range(self.num_agents):
            self.current_agent_id = agent_id
            obs, rwd, trm, trc, inf = self.step_agent(actions[agent_id])
            observations.append(obs)
            rewards.append(rwd)
            terminations.append(trm)
            truncations.append(trc)
            infos |= inf  # the | operator merges two dicts

        # stack the observations of each agent
        observations = np.vstack([np.expand_dims(obs, 0)
                                 for obs in observations])
        rewards = np.array(rewards)
        terminations = np.array(terminations)
        truncations = np.array(truncations)
        return observations, rewards, terminations, truncations, infos

    def render(self):
        if self.render_mode == "rgb_array":
            return self.last_observations

    def close(self, clear: bool = True):
        """
        Closes the environment and removes temporary files.

        :param clear: Whether to remove the MT working 
        directory or not.
        """
        # close all MT clients
        for i in range(self.num_agents):
            if self.mt_channs[i].is_open():
                self.mt_channs[i].send_kill()
                self.mt_channs[i].close_conn()
                self.mt_clients[i].close_pipes()
                self.mt_clients[i].wait_close()
            if clear:
                self.mt_clients[i].clear()

        # close the MT server
        self.mt_server.close_pipes()
        # send a kill signal to the server, as we've no way to send a self-termination
        # command to it like in the case of the clients
        os.killpg(os.getpgid(self.mt_server.proc.pid), signal.SIGTERM)
        if clear:
            self.mt_server.clear()

close(clear=True)

Closes the environment and removes temporary files.

Parameters:

Name Type Description Default
clear bool

Whether to remove the MT working directory or not.

True
Source code in craftium/multiagent_env.py
def close(self, clear: bool = True):
    """
    Closes the environment and removes temporary files.

    :param clear: Whether to remove the MT working 
    directory or not.
    """
    # close all MT clients
    for i in range(self.num_agents):
        if self.mt_channs[i].is_open():
            self.mt_channs[i].send_kill()
            self.mt_channs[i].close_conn()
            self.mt_clients[i].close_pipes()
            self.mt_clients[i].wait_close()
        if clear:
            self.mt_clients[i].clear()

    # close the MT server
    self.mt_server.close_pipes()
    # send a kill signal to the server, as we've no way to send a self-termination
    # command to it like in the case of the clients
    os.killpg(os.getpgid(self.mt_server.proc.pid), signal.SIGTERM)
    if clear:
        self.mt_server.clear()

reset(**kwargs)

Resets the environment.

Source code in craftium/multiagent_env.py
def reset(self, **kwargs):
    """Resets the environment."""

    self.timesteps = 0

    observations = []

    # intialize the MT processes (server and clients)
    # NOTE: only done the first time reset is called
    if self.mt_server.proc is None:
        self.mt_server.start_process()
        # HACK wait for the server to initialize before launching the clients
        print(
            "* Waiting for MT server to initialize. This is only required in the first call to reset")
        # TODO Use a an argument of the class instead of a constant
        time.sleep(5)

        for i in range(self.num_agents):
            # start the new MT (client) process
            self.mt_clients[i].start_process()
            # re-open the connection between python and the MT client
            self.mt_channs[i].open_conn()

            # HACK skip some frames to let the game initialize
            # TODO This "waiting" should be implemented in Minetest not in python
            for _ in range(self.init_frames):
                _observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_channs[i].receive()
                self.mt_channs[i].send([0]*21, 0, 0)  # nop action

            # receive the new info from minetest
            observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
            if not self.gray_scale_keepdim and not self.rgb_observations:
                observation = observation[:, :, 0]
            observations.append(observation)
            self.last_observations[i] = observation

    else:  # soft reset
        for i in range(self.num_agents):
            # send a soft reset to the MT client
            self.mt_channs[i].send_soft_reset()
            # receive a new observation from minetest
            observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
            if not self.gray_scale_keepdim and not self.rgb_observations:
                observation = observation[:, :, 0]
            observations.append(observation)
            self.last_observations[i] = observation

    infos = self._get_info()

    # stack the observations of each agent
    observations = np.vstack([np.expand_dims(obs, 0)
                             for obs in observations])

    return observations, infos

step(actions)

Runs an environment step per agent.

Source code in craftium/multiagent_env.py
def step(self, actions):
    """Runs an environment step per agent.

    :param actions
    """
    assert len(
        actions) == self.num_agents, f"The number of actions ({len(actions)}) must match with the number of agents ({self.num_agents})"

    observations, rewards, terminations, truncations = [], [], [], []
    infos = dict()
    for agent_id in range(self.num_agents):
        self.current_agent_id = agent_id
        obs, rwd, trm, trc, inf = self.step_agent(actions[agent_id])
        observations.append(obs)
        rewards.append(rwd)
        terminations.append(trm)
        truncations.append(trc)
        infos |= inf  # the | operator merges two dicts

    # stack the observations of each agent
    observations = np.vstack([np.expand_dims(obs, 0)
                             for obs in observations])
    rewards = np.array(rewards)
    terminations = np.array(terminations)
    truncations = np.array(truncations)
    return observations, rewards, terminations, truncations, infos

step_agent(action)

Runs a single step of the currently selected agent (env.current_agent_id).

Source code in craftium/multiagent_env.py
def step_agent(self, action):
    """
    Runs a single step of the currently selected agent (`env.current_agent_id`).

    :param action
    """
    self.timesteps += 1

    if self.current_agent_id == self.num_agents:
        self.current_agent_id = 0
    agent_id = self.current_agent_id
    self.current_agent_id += 1

    # convert the action dict to a format to be sent to MT through mt_chann
    keys = [0]*21  # all commands (keys) except the mouse
    mouse_x, mouse_y = 0, 0
    for k, v in action.items():
        if k == "mouse":
            x, y = v[0], -v[1]
            mouse_x = int(x*(self.obs_width // 2))
            mouse_y = int(y*(self.obs_height // 2))
        else:
            keys[ACTION_ORDER.index(k)] = v

    # send the action to MT
    self.mt_channs[agent_id].send(keys, mouse_x, mouse_y)

    # receive the new info from minetest
    observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, termination = self.mt_channs[agent_id].receive()
    if not self.gray_scale_keepdim and not self.rgb_observations:
        observation = observation[:, :, 0]

    self.last_observations[agent_id] = observation
    info = self._get_info()

    truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps

    return observation, reward, termination, truncated, info

craftium.pettingzoo_env.env(env_name, render_mode=None, **kwargs)

Returns the PettingZoo version (AECEnv) of multi-agent Craftium environments.

Parameters:

Name Type Description Default
env_name str

Name of the environment to load.

required
render_mode Optional[str]

Rendering mode ("rgb_array" or "human").

None
**kwargs

Additional keyword arguments to pass to the created Craftium environment (see MarlCraftiumEnv).

{}
Source code in craftium/pettingzoo_env.py
def env(env_name: str, render_mode: Optional[str] = None, **kwargs):
    """
    Returns the PettingZoo version (`AECEnv`) of multi-agent Craftium environments.

    :param env_name: Name of the environment to load.
    :param render_mode: Rendering mode (`"rgb_array"` or `"human"`).
    :param **kwargs: Additional keyword arguments to pass to the created Craftium environment (see `MarlCraftiumEnv`).
    """
    env_dir = os.path.join(root_path, AVAIL_ENVS[env_name]["env_dir"])
    final_args = kwargs | AVAIL_ENVS[env_name]["conf"]

    return raw_env(
        env_dir,
        render_mode,
        **final_args
    )

craftium.wrappers.BinaryActionWrapper

Bases: ActionWrapper

A Gymnasium ActionWrapper that translates craftium's Dict action space into a binary (discretized) action space MultiBiniary.

Parameters:

Name Type Description Default
env Env

The environment to wrap.

required
actions list[str]

A list of strings containing the names of the actions that will consititute the new action space.

required
mouse_mov float

Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped.

0.5
Source code in craftium/wrappers.py
class BinaryActionWrapper(ActionWrapper):
    """A Gymnasium `ActionWrapper` that translates craftium's `Dict` action space into a binary (discretized) action space [`MultiBiniary`](https://gymnasium.farama.org/api/spaces/fundamental/#gymnasium.spaces.MultiBinary).

    :param env: The environment to wrap.
    :param actions: A list of strings containing the names of the actions that will consititute the new action space.
    :params mouse_mov: Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped.
    """
    def __init__(self, env: Env, actions: list[str], mouse_mov: float = 0.5):
        ActionWrapper.__init__(self, env)

        check_actions_valid(actions)
        self.actions = actions
        self.action_space = MultiBinary(len(actions))
        self.mouse_mov = clip_mouse(mouse_mov)

    def process(self, action):
        assert len(action) == len(self.actions), \
            f"Incorrect number of actions, got {len(action)} but expected {len(self.actions)}"

        res = {}
        mouse = [0, 0]
        for a, name in zip(action, self.actions):
            if a == 0:
                continue

            if name == "mouse x+":
                mouse[0] += self.mouse_mov
            elif name == "mouse x-":
                mouse[0] -= self.mouse_mov
            elif name == "mouse y+":
                mouse[1] += self.mouse_mov
            elif name == "mouse y-":
                mouse[1] -= self.mouse_mov
            else:
                res[name] = a

        res["mouse"] = mouse

        return res

    def action(self, action):
        if isinstance(action, list) or isinstance(action, np.ndarray):
            return self.process(action)
        return self.process([action])

craftium.wrappers.DiscreteActionWrapper

Bases: ActionWrapper

A Gymnasium ActionWrapper that translates craftium's Dict action space into a discretized action space Discrete.

Unlike DiscreteActionWrapper, this wrapper adds an additional action to the action space in order to include the NOP action. This action is equivalent to {} in the Dict space or to a list of zeros in the MultiBinary space. The NOP action has index 0, and the rest of the actions have the consecutive idexes. Thus, the number of actions of the environment will be len(actions)+1.

Parameters:

Name Type Description Default
env Env

The environment to wrap.

required
actions list[str]

A list of strings containing the names of the actions that will consititute the new action space.

required
mouse_mov float

Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped.

0.5
Source code in craftium/wrappers.py
class DiscreteActionWrapper(ActionWrapper):
    """A Gymnasium `ActionWrapper` that translates craftium's `Dict` action space into a discretized action space [`Discrete`](https://gymnasium.farama.org/api/spaces/fundamental/#gymnasium.spaces.Discrete).

    Unlike `DiscreteActionWrapper`, this wrapper adds an additional action to the action space in order to include the `NOP` action. This action is equivalent to `{}` in the `Dict` space or to a list of zeros in the `MultiBinary` space. The `NOP` action has index `0`, and the rest of the actions have the consecutive idexes. Thus, the number of actions of the environment will be `len(actions)+1`.

    :param env: The environment to wrap.
    :param actions: A list of strings containing the names of the actions that will consititute the new action space.
    :params mouse_mov: Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped.
    """
    def __init__(self, env: Env, actions: list[str], mouse_mov: float = 0.5):
        ActionWrapper.__init__(self, env)

        check_actions_valid(actions)

        self.actions = actions
        self.action_space = Discrete(len(actions)+1)
        self.mouse_mov = clip_mouse(mouse_mov)

    def process(self, action):
        assert action >= 0 and action <= len(self.actions), \
            f"Action out of bound, got {action} but expected 0 <= action <= {len(self.actions)}"

        # if the action has index 0, return an empty action (NOP)
        if action == 0:
            return {}

        res = {}

        name = self.actions[action-1]

        mouse = [0, 0]

        if name == "mouse x+":
            mouse[0] += self.mouse_mov
        elif name == "mouse x-":
            mouse[0] -= self.mouse_mov
        elif name == "mouse y+":
            mouse[1] += self.mouse_mov
        elif name == "mouse y-":
            mouse[1] -= self.mouse_mov
        else:
            res[name] = 1

        res["mouse"] = mouse

        return res

    def action(self, action):
        if isinstance(action, list) or isinstance(action, np.ndarray):
            return [self.process(act) for act in action]
        return self.process(action)

craftium.extra

This module contains some extra functinallities for Craftium, such as a random dungeon generator and continual RL utilities.

craftium.extra.random_map_generator.RandomMapGen

Random dungeon map generator. This class is intended to be used for the Craftium/ProcDungeons-v0 environment, but you could implement and use your own generator (must use the same ascii format).

Parameters:

Name Type Description Default
n_rooms int

Number of rooms of the dungeon.

4
room_min_size int

Minimum size (both height and width) of the room.

7
room_max_size int

Maximum size (both height and width) of the room.

15
dispersion float

Affects the _distance between the rooms.

1.0
min_monsters_per_room int

Minimum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed.

0
max_monsters_per_room int

Maximum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed.

5
monsters dict[str, float]

A dictionary with the a, b, c, and d keys (refers to the type of the monster), where values are the probability of spawning a monster of that type. Types are sorted from a less dangerous to, d, more.

{'a': 0.4, 'b': 0.3, 'c': 0.2, 'd': 0.1}
monsters_in_player_spawn bool

If set to True, monsters can spawn in the same room as the player.

False
Source code in craftium/extra/random_map_generator.py
class RandomMapGen():
    """Random dungeon map generator. This class is intended to be used
    for the `Craftium/ProcDungeons-v0` environment, but you could implement
    and use your own generator (must use the same ascii format).

    :param n_rooms: Number of rooms of the dungeon.
    :param room_min_size: Minimum size (both height and width) of the room.
    :param room_max_size: Maximum size (both height and width) of the room.
    :param dispersion: Affects the _distance between the rooms.
    :param min_monsters_per_room: Minimum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed.
    :param max_monsters_per_room: Maximum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed.
    :param monsters: A dictionary with the `a`, `b`, `c`, and `d` keys (refers to the type of the monster), where values are the probability of spawning a monster of that type. Types are sorted from `a` less dangerous to, `d`, more.
    :param monsters_in_player_spawn: If set to `True`, monsters can spawn in the same room as the player.
    """

    def __init__(
            self,
            n_rooms: int = 4,
            room_min_size: int = 7,
            room_max_size: int = 15,
            dispersion: float = 1.,
            min_monsters_per_room: int = 0,
            max_monsters_per_room: int = 5,
            monsters: dict[str, float] = {
                "a": 0.4, "b": 0.3, "c": 0.2, "d": 0.1},
            monsters_in_player_spawn: bool = False,
    ):
        assert room_min_size >= 5, "Minimum room size must be >= 5"
        assert room_min_size <= room_max_size, "Room min size must be smaller or equal to max size"
        rooms = [[[0, 0],
                  list(np.random.randint(room_min_size, room_max_size+1, size=2))]
                 for _ in range(n_rooms)]

        # place the rooms in non-overlapping placements
        collide = True
        while collide:
            for i in range(n_rooms):
                others = rooms[:i] + rooms[i+1:]

                if sum([self._collide(rooms[i], other) for other in others]) == 0:
                    continue

                others_center = np.array(others).mean(axis=0).mean(axis=0)
                center = np.array(rooms[i]).mean(axis=0)
                d = (dispersion*(center - others_center)).astype(np.int64)
                d[d == 0] = np.random.randint(-1, 2)

                rooms[i][0][0] += d[0]
                rooms[i][1][0] += d[0]
                rooms[i][0][1] += d[1]
                rooms[i][1][1] += d[1]

                collide = False
                for k in range(n_rooms):
                    for j in range(n_rooms):
                        if k != j:
                            if self._collide(rooms[k], rooms[j]):
                                collide = True
                                break
                if not collide:
                    break
            # plt.clf()
            # self.plot(rooms)
            # plt.pause(0.01)

        # translate the rooms to the x>=0 y>=0 quadrant
        rooms = np.array(rooms)
        rooms[:, :, 0] -= rooms[:, :, 0].min()
        rooms[:, :, 1] -= rooms[:, :, 1].min()

        # place all corridors
        corrs = []
        for i in range(n_rooms):
            # compute the _distances from the room i to the other rooms
            c = np.array(self._box_center(rooms[i]))
            dists = [np.power(c - np.array(self._box_center(rooms[j])), 2).sum()
                     for j in range(n_rooms) if i != j]
            indices = list(range(n_rooms))
            indices.remove(i)
            indices = np.array(indices)
            for j in indices[np.argsort(dists)]:
                cor = [self._box_center(rooms[i]), self._box_center(rooms[j])]
                # the corridor should not intersect with any room
                cor_intersects_rooms = sum([
                    self._line_intersects_box(cor, rooms[k]) for k in range(n_rooms) if k != i and k != j
                ]) > 0
                # the corridor should not intersect with other corridors
                cor_intersects_cor = sum(
                    [self.lines_intersect(cor, c) for c in corrs]) > 0
                if not (cor_intersects_rooms or cor_intersects_cor):
                    corrs.append(cor)
                    # if 0.5 > np.random.rand(): # add another (extra) corridor with 0.5 prob
                    #     break
                    break
        # make all rooms accessible
        corrs, _ = self._add_minimum_edges(corrs)

        # plt.clf()
        # self.plot(rooms, corridors=corrs)
        # plt.show()

        # place the player in the center of a random room
        idx1 = np.random.randint(0, len(corrs))
        idx2 = np.random.randint(0, 2)
        self.player_pos = corrs[idx1][idx2]

        # place the objective in the most distant room to the player
        dists = [self._distance(self.player_pos, self._box_center(r))
                 for r in rooms]
        room_idx = np.argmax(dists)
        self.objective_pos = self._box_center(rooms[room_idx])

        # place monsters
        monster_names = list(monsters.keys())
        monster_probs = list(monsters.values())
        monster_type_indices = -np.arange(1, len(monster_probs)+1)
        monster_locs = []
        for room in rooms:
            if (not monsters_in_player_spawn
                    and self._box_center(room) == self.player_pos) or max_monsters_per_room == 0:
                continue
            # number of monsters in this room
            if min_monsters_per_room == max_monsters_per_room:
                n = min_monsters_per_room
            else:
                n = np.random.randint(
                    min_monsters_per_room, max_monsters_per_room)
            type_idx = np.random.choice(monster_type_indices, p=monster_probs)
            # compute al locations within a room that a monster can spawn
            places = []
            for x in range(room[0, 0]+2, room[1, 0]-1):
                for y in range(room[0, 1]+2, room[1, 1]-1):
                    # check the position isn't objective's
                    if not (self.objective_pos == np.array([x, y])).all():
                        places.append([x, y])
            # select random placements for the monsters
            for idx in np.random.choice(len(places), min(n, len(places)), replace=False):
                monster_locs.append((type_idx, places[idx]))

        self.monster_locs = monster_locs
        self.monster_names = monster_names
        self.monster_type_indices = monster_type_indices

        self.corridors = np.array(corrs)
        self.rooms = rooms

    def _bresenham(self, x0, y0, x1, y1):
        """Yield integer coordinates on the line from (x0, y0) to (x1, y1).

        This function is directly taken from https://github.com/encukou/_bresenham under the MIT license.
        """
        dx = x1 - x0
        dy = y1 - y0

        xsign = 1 if dx > 0 else -1
        ysign = 1 if dy > 0 else -1

        dx = abs(dx)
        dy = abs(dy)

        if dx > dy:
            xx, xy, yx, yy = xsign, 0, 0, ysign
        else:
            dx, dy = dy, dx
            xx, xy, yx, yy = 0, ysign, xsign, 0

        D = 2*dy - dx
        y = 0

        for x in range(dx + 1):
            yield x0 + x*xx + y*yx, y0 + x*xy + y*yy
            if D >= 0:
                y += 1
                D -= 2*dx
            D += 2*dy

    def rasterize(self, wall_height=2, ceiling=True):
        """Converts the generated map into an ASCII string.

        :params wall_height: Height of the walls of the dungeon.
        """
        assert wall_height >= 1, "Wall height must be >= 1"
        ncols, nrows = self.rooms[:, :, 0].max(), self.rooms[:, :, 1].max()
        m = np.zeros((nrows, ncols), dtype=np.int8)

        # add rooms
        for room in self.rooms:
            m[room[0, 1]:room[1, 1], room[0, 0]:room[1, 0]] = 1

        # add corridors
        for corr in self.corridors:
            x0, y0, x1, y1 = corr[0, 0], corr[0, 1], corr[1, 0], corr[1, 1]
            lines = list(self._bresenham(x0, y0, x1, y1))
            for (x, y) in lines:
                m[y, x] = 1
                # increase line thickness from 1 to 3 (minimum)
                m[y+1, x], m[y-1, x] = 1, 1
                m[y+2, x], m[y-2, x] = 1, 1
                m[y, x+1], m[y, x-1] = 1, 1
                m[y, x+2], m[y, x-2] = 1, 1

        # add walls
        layers = [m]
        l1 = np.zeros((nrows, ncols), dtype=np.int8)
        for i in range(nrows):
            for j in range(ncols):
                if m[i, j] == 0:
                    continue
                # check if the block i,j is completely surrounded with other blocks or not
                v = True
                if i > 0:
                    v = v and m[i-1, j] == 1
                if i < nrows - 1:
                    v = v and m[i+1, j] == 1
                if j > 0:
                    v = v and m[i, j-1] == 1
                if j < ncols - 1:
                    v = v and m[i, j+1] == 1
                if not v or (i == 0 or i == nrows-1 or j == 0 or j == ncols - 1):
                    l1[i, j] = 1

        # add walls
        for _ in range(wall_height):
            layers.append(l1.copy())

        # place the player
        layers[1][self.player_pos[1], self.player_pos[0]] = 2

        # place the objective
        layers[1][self.objective_pos[1], self.objective_pos[0]] = 3

        # place the monsters
        for type_idx, pos in self.monster_locs:
            layers[1][pos[1], pos[0]] = type_idx

        if ceiling:
            # add ceiling if needed (same pattern as the first layer)
            l0 = layers[0].copy()
            l0[l0 == 1] = 4
            layers.append(l0)

        # convert layer matrices into a string
        s = ""
        for il, layer in enumerate(layers):
            for i in range(nrows):
                for j in range(ncols):
                    char = " "
                    if layer[i][j] == 1:
                        char = "#"
                    elif layer[i][j] == 2:
                        char = "@"
                    elif layer[i][j] == 3:
                        char = "O"
                    elif layer[i][j] == 4:
                        char = "%"
                    elif layer[i][j] < 0:  # negative values are monsters
                        char = self.monster_names[(-layer[i][j])-1]
                    s += char
                if il < len(layers)-1 or i < nrows-1:
                    s += "\n"
            if il < len(layers)-1:  # skip separator in last layer
                s += "-\n"
        return s

    def plot(self, rooms, corridors=[]):
        ax = plt.gca()
        for room in rooms:
            rect = plt.Rectangle(
                xy=room[0],
                height=room[1][1]-room[0][1],
                width=room[1][0]-room[0][0],
                fill=False,
                linewidth=2,
            )
            ax.add_patch(rect)
            plt.scatter(*self._box_center(room))

        for corr in corridors:
            plt.plot([corr[0][0], corr[1][0]], [corr[0][1], corr[1][1]])

        lims = np.array(rooms)
        plt.xlim([lims.min()-10, lims.max()+10])
        plt.ylim([lims.min()-10, lims.max()+10])

    def _box_center(self, a):
        return int((a[0][0]+a[1][0])/2), int((a[0][1]+a[1][1])/2)

    def lines_intersect(self, line1, line2):
        def on_segment(p, q, r):
            """Check if point q lies on the segment pr"""
            if (min(p[0], r[0]) <= q[0] <= max(p[0], r[0]) and
                    min(p[1], r[1]) <= q[1] <= max(p[1], r[1])):
                return True
            return False

        def orientation(p, q, r):
            """Determine the orientation of the triplet (p, q, r)
            0 -> p, q and r are collinear
            1 -> Clockwise
            2 -> Counterclockwise
            """
            val = (q[1] - p[1]) * (r[0] - q[0]) - (q[0] - p[0]) * (r[1] - q[1])
            if val == 0:
                return 0  # collinear
            elif val > 0:
                return 1  # clockwise
            else:
                return 2  # counterclockwise

        def do_intersect(p1, q1, p2, q2):
            """Check if the line segments 'p1q1' and 'p2q2' intersect"""

            # Check if they share an endpoint, if so, return False (no intersection)
            if p1 == p2 or p1 == q2 or q1 == p2 or q1 == q2:
                return False

            # Find the four orientations needed for general and special cases
            o1 = orientation(p1, q1, p2)
            o2 = orientation(p1, q1, q2)
            o3 = orientation(p2, q2, p1)
            o4 = orientation(p2, q2, q1)

            # General case
            if o1 != o2 and o3 != o4:
                return True

            # Special Cases (excluding shared endpoint cases)
            if o1 == 0 and on_segment(p1, p2, q1):
                return True
            if o2 == 0 and on_segment(p1, q2, q1):
                return True
            if o3 == 0 and on_segment(p2, p1, q2):
                return True
            if o4 == 0 and on_segment(p2, q1, q2):
                return True

            return False

        # Extract points from the lines
        (p1, q1) = line1
        (p2, q2) = line2

        return do_intersect(p1, q1, p2, q2)

    def _line_intersects_box(self, line, box):
        """Check if a line intersects a rectangular box"""
        # Extract the line points
        (x1, y1), (x2, y2) = line

        # Extract the box points
        (box_left, box_top), (box_right, box_bottom) = box

        # Define the four sides of the box as lines
        box_edges = [
            [(box_left, box_top), (box_right, box_top)],  # top edge
            [(box_right, box_top), (box_right, box_bottom)],  # right edge
            [(box_right, box_bottom), (box_left, box_bottom)],  # bottom edge
            [(box_left, box_bottom), (box_left, box_top)]  # left edge
        ]

        # Check if the line intersects any of the four sides of the box
        for edge in box_edges:
            if self.lines_intersect(line, edge):
                return True

        # Check if the line is completely inside the box
        if (box_left <= x1 <= box_right and box_top <= y1 <= box_bottom) or \
           (box_left <= x2 <= box_right and box_top <= y2 <= box_bottom):
            return True

        return False

    def _distance(self, node1, node2):
        """Calculate Euclidean _distance between two nodes."""
        return math.sqrt((node1[0] - node2[0]) ** 2 + (node1[1] - node2[1]) ** 2)

    def _add_minimum_edges(self, edges):
        """Add the minimum number of edges to connect all components."""

        def find_components(edges):
            """Find connected components in the graph using DFS."""
            graph = defaultdict(list)
            for (a, b) in edges:
                graph[a].append(b)
                graph[b].append(a)

            visited = set()
            components = []

            def dfs(node, component):
                stack = [node]
                while stack:
                    current = stack.pop()
                    if current not in visited:
                        visited.add(current)
                        component.append(current)
                        stack.extend(graph[current])

            for node in graph:
                if node not in visited:
                    component = []
                    dfs(node, component)
                    components.append(component)

            return components

        # Find all the connected components
        components = find_components(edges)

        # If there's only one component, the graph is already fully connected
        if len(components) == 1:
            return edges, []

        new_edges = []

        # Find the closest pair of nodes between different components
        while len(components) > 1:
            min_dist = float('inf')
            edge_to_add = None
            comp1, comp2 = None, None

            # Check all pairs of components
            for i in range(len(components)):
                for j in range(i + 1, len(components)):
                    for node1 in components[i]:
                        for node2 in components[j]:
                            dist = self._distance(node1, node2)
                            if dist < min_dist:
                                min_dist = dist
                                edge_to_add = (node1, node2)
                                comp1, comp2 = i, j

            # Add the closest edge
            new_edges.append(edge_to_add)
            edges.append(edge_to_add)

            # Merge the two components
            components[comp1].extend(components[comp2])
            components.pop(comp2)

        return edges, new_edges

    def _collide(self, a, b):
        b1 = np.array(a).flatten()
        b2 = np.array(b).flatten()
        return not (b1[0] > b2[2] or b1[2] < b2[0] or b1[1] > b2[3] or b1[3] < b2[1])

rasterize(wall_height=2, ceiling=True)

Converts the generated map into an ASCII string.

Parameters:

Name Type Description Default
wall_height

Height of the walls of the dungeon.

2
Source code in craftium/extra/random_map_generator.py
def rasterize(self, wall_height=2, ceiling=True):
    """Converts the generated map into an ASCII string.

    :params wall_height: Height of the walls of the dungeon.
    """
    assert wall_height >= 1, "Wall height must be >= 1"
    ncols, nrows = self.rooms[:, :, 0].max(), self.rooms[:, :, 1].max()
    m = np.zeros((nrows, ncols), dtype=np.int8)

    # add rooms
    for room in self.rooms:
        m[room[0, 1]:room[1, 1], room[0, 0]:room[1, 0]] = 1

    # add corridors
    for corr in self.corridors:
        x0, y0, x1, y1 = corr[0, 0], corr[0, 1], corr[1, 0], corr[1, 1]
        lines = list(self._bresenham(x0, y0, x1, y1))
        for (x, y) in lines:
            m[y, x] = 1
            # increase line thickness from 1 to 3 (minimum)
            m[y+1, x], m[y-1, x] = 1, 1
            m[y+2, x], m[y-2, x] = 1, 1
            m[y, x+1], m[y, x-1] = 1, 1
            m[y, x+2], m[y, x-2] = 1, 1

    # add walls
    layers = [m]
    l1 = np.zeros((nrows, ncols), dtype=np.int8)
    for i in range(nrows):
        for j in range(ncols):
            if m[i, j] == 0:
                continue
            # check if the block i,j is completely surrounded with other blocks or not
            v = True
            if i > 0:
                v = v and m[i-1, j] == 1
            if i < nrows - 1:
                v = v and m[i+1, j] == 1
            if j > 0:
                v = v and m[i, j-1] == 1
            if j < ncols - 1:
                v = v and m[i, j+1] == 1
            if not v or (i == 0 or i == nrows-1 or j == 0 or j == ncols - 1):
                l1[i, j] = 1

    # add walls
    for _ in range(wall_height):
        layers.append(l1.copy())

    # place the player
    layers[1][self.player_pos[1], self.player_pos[0]] = 2

    # place the objective
    layers[1][self.objective_pos[1], self.objective_pos[0]] = 3

    # place the monsters
    for type_idx, pos in self.monster_locs:
        layers[1][pos[1], pos[0]] = type_idx

    if ceiling:
        # add ceiling if needed (same pattern as the first layer)
        l0 = layers[0].copy()
        l0[l0 == 1] = 4
        layers.append(l0)

    # convert layer matrices into a string
    s = ""
    for il, layer in enumerate(layers):
        for i in range(nrows):
            for j in range(ncols):
                char = " "
                if layer[i][j] == 1:
                    char = "#"
                elif layer[i][j] == 2:
                    char = "@"
                elif layer[i][j] == 3:
                    char = "O"
                elif layer[i][j] == 4:
                    char = "%"
                elif layer[i][j] < 0:  # negative values are monsters
                    char = self.monster_names[(-layer[i][j])-1]
                s += char
            if il < len(layers)-1 or i < nrows-1:
                s += "\n"
        if il < len(layers)-1:  # skip separator in last layer
            s += "-\n"
    return s

craftium.extra.crl_dungeons.load_task(seq_name, task_id=0, make_env=True, return_map=False, prefix=None, **kwargs)

Loads a task from a predefined continual RL task sequence.

Example usage:

# Instantiate an environment with the third task
env = crl.load_task("sequence01", task_id=2)

# Get the ASCII map of the first task
ascii_map = crl.load_task("sequence01", make_env=False, return_map=True)

Parameters:

Name Type Description Default
seq_name str

Name of the sequence to load.

required
task_id int

Identifier of the task to load.

0
make_env bool

If set to True, a Gymnasium environment of the task will be returned.

True
return_map bool

If set to True, the ASCII map will be returned.

False
prefix Optional[os.PathLike]

Prefix path to add to seq_name. Defaults to the path of the module. Useful to load custom sequences.

None
**kwargs

keyword arguments to pass to the make_dungeon_env function (see the reference).

{}
Source code in craftium/extra/crl_dungeons.py
def load_task(
        seq_name: str,
        task_id: int = 0,
        make_env: bool = True,
        return_map: bool = False,
        prefix: Optional[os.PathLike] = None,
        **kwargs):
    """Loads a task from a predefined continual RL task sequence.

    Example usage:
    ```python
    # Instantiate an environment with the third task
    env = crl.load_task("sequence01", task_id=2)

    # Get the ASCII map of the first task
    ascii_map = crl.load_task("sequence01", make_env=False, return_map=True)
    ```
    :param seq_name: Name of the sequence to load.
    :param task_id: Identifier of the task to load.
    :param make_env: If set to `True`, a `Gymnasium` environment of the task will be returned.
    :param return_map: If set to `True`, the ASCII map will be returned.
    :param prefix: Prefix path to add to `seq_name`. Defaults to the path of the module. Useful to load custom sequences.
    :param **kwargs: keyword arguments to pass to the `make_dungeon_env` function (see the [reference](reference.md)).
    """
    # open the sequence file and read all the maps
    prefix = os.path.dirname(__file__) if prefix is None else prefix
    try:
        file = open(os.path.join(prefix, seq_name), "r")
        maps = file.read().split("=")
        file.close()
    except Exception as _:
        raise Exception(f"Sequence '{seq_name}' does not exist")
    # get the map of the sequence
    assert task_id < len(maps), f"Task ID ({task_id}) must be smaller than the number of tasks {len(maps)}"
    ascii_map = maps[task_id]

    ret_vals = []  # list of values to return

    # instantiate a dungeon environment if needed
    if make_env:
        env = make_dungeon_env(ascii_map, **kwargs)
        ret_vals.append(env)

    if return_map:
        ret_vals.append(ascii_map)

    if len(ret_vals) == 1:
        return ret_vals[0]
    return tuple(ret_vals)