API Reference
craftium.make_dungeon_env(ascii_map=None, mapgen_kwargs=dict(), wall_height=5, return_map_str=False, **kwargs)
Utility function to instantiate procedurally generated dungeon environments (see Craftium/ProcDungeons-v0).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ascii_map |
Optional[str]
|
ASCII representation of the map. This is optional, and if not given, a random map will be generated using |
None
|
mapgen_kwargs |
dict[str, Any]
|
Arguments for |
dict()
|
wall_height |
int
|
The height of the walls. This argument is only used if |
5
|
return_map_str |
bool
|
If set to |
False
|
**kwargs |
Extra arguments to provide to |
{}
|
Source code in craftium/__init__.py
def make_dungeon_env(
ascii_map: Optional[str] = None,
mapgen_kwargs: dict[str, Any] = dict(),
wall_height: int = 5,
return_map_str: bool = False,
**kwargs
):
"""Utility function to instantiate procedurally generated dungeon environments (see [`Craftium/ProcDungeons-v0`](https://craftium.readthedocs.io/en/latest/environments/#procedural-environment-generation)).
:param ascii_map: ASCII representation of the map. This is optional, and if not given, a random map will be generated using `RandomMapGen`.
:param mapgen_kwargs: Arguments for `RandomMapGen`. The arguments provided via this parameter will overwrite the default values of `RandomMapGen`. This argument is only used if `ascii_map` is `None`.
:param wall_height: The height of the walls. This argument is only used if `ascii_map` is `None`.
:param return_map_str: If set to `True` the function returs the generated map as second return value.
:param **kwargs: Extra arguments to provide to `gymnasium.make`. If the `minetest_conf` argument is set, it's values will overwrite the default parameters of the environment.
"""
import gymnasium as gym
from .extra.random_map_generator import RandomMapGen
# generate the map
if ascii_map is None:
mapgen = RandomMapGen(**mapgen_kwargs)
ascii_map = mapgen.rasterize(wall_height=wall_height)
minetest_conf = DEFAULT_PROCDUNGEONS_CONF
if "minetest_conf" in kwargs:
for key, value in kwargs["minetest_conf"].items():
minetest_conf[key] = value
minetest_conf["ascii_map"] = ascii_map.replace("\n", "\\n")
kwargs["minetest_conf"] = minetest_conf
env = gym.make("Craftium/ProcDungeons-v0", **kwargs)
if return_map_str:
return env, ascii_map
return env
craftium.craftium_env.CraftiumEnv
Bases: Env
The main class implementing Gymnasium's Env API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env_dir |
os.PathLike
|
Directory of the environment to load (should contain |
required |
obs_width |
int
|
The width of the observation image in pixels. |
640
|
obs_height |
int
|
The height of the observation image in pixels. |
360
|
enable_voxel_obs |
bool
|
Whether to enable voxel observations. Can only be enabled if _voxel_obs_available is True. The voxel observation is a 3D grid of dimensions (2voxel_obs_rx+1, 2voxel_obs_ry+1, 2*voxel_obs_rz+1, 3). The last dimension contains the voxel node ID, the light data, and the param2 data for each voxel. |
False
|
voxel_obs_rx |
int
|
The radius of the voxel observation in the x-axis (North). |
20
|
voxel_obs_ry |
int
|
The radius of the voxel observation in the y-axis (Up). |
10
|
voxel_obs_rz |
int
|
The radius of the voxel observation in the z-axis (East). |
20
|
init_frames |
int
|
The number of frames to wait for Minetest to load. |
15
|
render_mode |
Optional[str]
|
Render mode ("human" or "rgb_array"), see Env.render. |
None
|
max_timesteps |
Optional[int]
|
Maximum number of timesteps until episode termination. Disabled if set to |
None
|
run_dir |
Optional[os.PathLike]
|
Path to save the artifacts created by the run. Will be automatically generated if not provided. |
None
|
run_dir_prefix |
Optional[os.PathLike]
|
Prefix path to add to the automatically generated |
None
|
game_id |
str
|
The name of the game to load. Defaults to the "original" minetest game. |
'minetest'
|
world_name |
str
|
The name of the world to load. Defaults to "world". |
'world'
|
minetest_dir |
Optional[str]
|
Path to the craftium's minetest build directory. If not given, defaults to the directory where craftium is installed. This option is intended for debugging purposes. |
None
|
minetest_conf |
dict[str, Any]
|
Extra configuration options added to the default minetest.conf file generated by craftium. Setting options here will overwrite default values. Check mintest.conf.example for all available configuration options. |
dict()
|
pipe_proc |
bool
|
If |
True
|
mt_listen_timeout |
int
|
Number of milliseconds to wait for MT to connect to the TCP channel. If the timeout is reached a Timeout exception is raised. WARNING: When using multiple (serial) MT environments, timeout can be easily reached for the last environment. In this case, you might want to increase the value of this parameter according to the number of environments. |
60000
|
mt_port |
Optional[int]
|
TCP port to employ for MT's internal client<->server communication. If not provided a random port in the [49152, 65535] range is used. |
None
|
frameskip |
int
|
The number of frames skipped between steps, 1 by default (disabled). Note that |
1
|
rgb_observations |
bool
|
Whether to use RGB images or gray scale images as observations. Note that RGB images are slower to send from MT to python via TCP. By default RGB images are used. |
True
|
gray_scale_keepdim |
bool
|
If |
False
|
seed |
Optional[int]
|
Random seed. Affects minetest's map generation and Lua's RNG (in mods). |
None
|
sync_mode |
bool
|
If set to true, minetest's internal client and server steps are synchronized. This is useful for training models slower than realtime. |
False
|
pmul |
int
|
Physics multiplier. As craftium agent's take actions by frame, default movement speeds make the agent move slowly. When set to > 1, minetest's movement velocity and acceleration increase helping the agent to move at acceptable relative speeds. |
20
|
soft_reset |
bool
|
If set to true, resets will have to be handled by the Lua mod and minetest won't be killed and rerun every call to restart. IMPORTANT: Only set this flag to |
False
|
offscreen_sdl |
bool
|
Whether to use the |
True
|
gpu_id |
Optional[int]
|
If a GPU id was passed, set |
None
|
human_screeen_size |
Size (width, height) of the render screen when |
required | |
_minetest_conf |
dict[str, Any]
|
The default minetest configuration provided during environment registration. |
dict()
|
_voxel_obs_available |
bool
|
This flag indicates environments that support voxel observations during registration (do not manually override). |
False
|
Source code in craftium/craftium_env.py
class CraftiumEnv(Env):
"""The main class implementing Gymnasium's [Env](https://gymnasium.farama.org/api/env/) API.
:param env_dir: Directory of the environment to load (should contain `worlds` and `games` directories).
:param obs_width: The width of the observation image in pixels.
:param obs_height: The height of the observation image in pixels.
:param enable_voxel_obs: Whether to enable voxel observations. Can only be enabled if _voxel_obs_available is True. The voxel observation is a 3D grid of dimensions (2*voxel_obs_rx+1, 2*voxel_obs_ry+1, 2*voxel_obs_rz+1, 3). The last dimension contains the voxel node ID, the light data, and the param2 data for each voxel.
:param voxel_obs_rx: The radius of the voxel observation in the x-axis (North).
:param voxel_obs_ry: The radius of the voxel observation in the y-axis (Up).
:param voxel_obs_rz: The radius of the voxel observation in the z-axis (East).
:param init_frames: The number of frames to wait for Minetest to load.
:param render_mode: Render mode ("human" or "rgb_array"), see [Env.render](https://gymnasium.farama.org/api/env/#gymnasium.Env.render).
:param max_timesteps: Maximum number of timesteps until episode termination. Disabled if set to `None`.
:param run_dir: Path to save the artifacts created by the run. Will be automatically generated if not provided.
:param run_dir_prefix: Prefix path to add to the automatically generated `run_dir`. This value is only used if `run_dir` is `None`.
:param game_id: The name of the game to load. Defaults to the "original" minetest game.
:param world_name: The name of the world to load. Defaults to "world".
:param minetest_dir: Path to the craftium's minetest build directory. If not given, defaults to the directory where craftium is installed. This option is intended for debugging purposes.
:param minetest_conf: Extra configuration options added to the default minetest.conf file generated by craftium. Setting options here will overwrite default values. Check [mintest.conf.example](https://github.com/minetest/minetest/blob/master/minetest.conf.example) for all available configuration options.
:param pipe_proc: If `True`, the minetest process stderr and stdout will be piped into two files inside the run's directory. Otherwise, the minetest process will not be piped and its output will be shown in the terminal. This option is disabled by default to reduce verbosity, but can be useful for debugging.
:param mt_listen_timeout: Number of milliseconds to wait for MT to connect to the TCP channel. If the timeout is reached a Timeout exception is raised. **WARNING:** When using multiple (serial) MT environments, timeout can be easily reached for the last environment. In this case, you might want to increase the value of this parameter according to the number of environments.
:param mt_port: TCP port to employ for MT's internal client<->server communication. If not provided a random port in the [49152, 65535] range is used.
:param frameskip: The number of frames skipped between steps, 1 by default (disabled). Note that `max_timesteps` and `init_frames` parameters will be divided by the frameskip value.
:param rgb_observations: Whether to use RGB images or gray scale images as observations. Note that RGB images are slower to send from MT to python via TCP. By default RGB images are used.
:param gray_scale_keepdim: If `True`, a singleton dimension will be added, i.e. observations are of the shape WxHx1. Otherwise, they are of shape WxH.
:param seed: Random seed. Affects minetest's map generation and Lua's RNG (in mods).
:param sync_mode: If set to true, minetest's internal client and server steps are synchronized. This is useful for training models slower than realtime.
:param pmul: Physics multiplier. As craftium agent's take actions by frame, default movement speeds make the agent move slowly. When set to > 1, minetest's movement velocity and acceleration increase helping the agent to move at acceptable relative speeds.
:param soft_reset: If set to true, resets will have to be handled by the Lua mod and minetest won't be killed and rerun every call to restart. **IMPORTANT:** Only set this flag to `True` in environments that support this feature.
:param offscreen_sdl: Whether to use the `offscreen` SDL driver or not (true by default).
:param gpu_id: If a GPU id was passed, set `SDL_HINT_EGL_DEVICE` to render the environment using that GPU.
:param human_screeen_size: Size (width, height) of the render screen when `render_mode` is set to `"human"`.
:param _minetest_conf: The default minetest configuration provided during environment registration.
:param _voxel_obs_available: This flag indicates environments that support voxel observations during registration (do not manually override).
"""
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30, "voxel_observations_enabled": False}
def __init__(
self,
env_dir: os.PathLike,
obs_width: int = 640,
obs_height: int = 360,
enable_voxel_obs: bool = False,
voxel_obs_rx: int = 20,
voxel_obs_ry: int = 10,
voxel_obs_rz: int = 20,
init_frames: int = 15,
render_mode: Optional[str] = None,
max_timesteps: Optional[int] = None,
run_dir: Optional[os.PathLike] = None,
run_dir_prefix: Optional[os.PathLike] = None,
game_id: str = "minetest",
world_name: str = "world",
minetest_dir: Optional[str] = None,
minetest_conf: dict[str, Any] = dict(),
pipe_proc: bool = True,
mt_listen_timeout: int = 60_000,
mt_port: Optional[int] = None,
frameskip: int = 1,
rgb_observations: bool = True,
gray_scale_keepdim: bool = False,
seed: Optional[int] = None,
sync_mode: bool = False,
fps_max: int = 200,
pmul: int = 20,
soft_reset: bool = False,
offscreen_sdl: bool = True,
gpu_id: Optional[int] = None,
human_screen_size: tuple[int, int] = (720, 720),
_minetest_conf: dict[str, Any] = dict(),
_voxel_obs_available: bool = False,
):
super(CraftiumEnv, self).__init__()
if enable_voxel_obs:
if _voxel_obs_available:
self.metadata["voxel_observations_enabled"] = True
else:
raise ValueError("Voxel observations are not supported for this environment. Set `enable_voxel_obs` to `False` "
"or use a different environment.")
_minetest_conf.update(minetest_conf)
# out auto enlargement of fov for aspect ratios smaller than 16/10
aspect_ratio = obs_width / obs_height
if aspect_ratio < 16/10 and 'fov' in _minetest_conf:
_minetest_conf['fov'] = _minetest_conf['fov'] / np.clip(np.sqrt((16/10)/aspect_ratio), 1.0, 1.4)
self.obs_width = obs_width
self.obs_height = obs_height
self.init_frames = init_frames // frameskip
self.max_timesteps = max_timesteps
self.gray_scale_keepdim = gray_scale_keepdim
self.rgb_observations = rgb_observations
self.soft_reset = soft_reset
# define the action space
action_dict = {}
for act in ACTION_ORDER[:-1]: # all actions except the last ("mouse")
action_dict[act] = Discrete(2) # 1/0: key pressed/not pressed
# define the mouse action
action_dict[ACTION_ORDER[-1]
] = Box(low=-1, high=1, shape=(2,), dtype=np.float32)
self.action_space = Dict(action_dict)
# define the observation space
shape = [obs_width, obs_height]
if rgb_observations:
shape.append(3)
elif gray_scale_keepdim:
shape.append(1)
self.observation_space = Box(
low=0, high=255, shape=shape, dtype=np.uint8)
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
# initialize the Python<->Minetest communication channel (server side)
self.mt_chann = MtChannel(
img_width=self.obs_width,
img_height=self.obs_height,
voxel_obs=enable_voxel_obs,
voxel_obs_rx=voxel_obs_rx,
voxel_obs_ry=voxel_obs_ry,
voxel_obs_rz=voxel_obs_rz,
listen_timeout=mt_listen_timeout,
rgb_imgs=rgb_observations,
)
# handles the MT configuration and process
self.mt = Minetest(
world_name=world_name,
run_dir=run_dir,
run_dir_prefix=run_dir_prefix,
headless=offscreen_sdl,
gpu_id=gpu_id,
seed=seed,
game_id=game_id,
sync_dir=env_dir,
screen_w=obs_width,
screen_h=obs_height,
voxel_obs=enable_voxel_obs,
voxel_obs_rx=voxel_obs_rx,
voxel_obs_ry=voxel_obs_ry,
voxel_obs_rz=voxel_obs_rz,
minetest_dir=minetest_dir,
tcp_port=self.mt_chann.port,
minetest_conf=_minetest_conf,
pipe_proc=pipe_proc,
mt_port=mt_port,
frameskip=frameskip,
rgb_frames=rgb_observations,
sync_mode=sync_mode,
fps_max=fps_max,
pmul=pmul,
)
# set up the pygame screen if `render_mode` is set to "human"
self.pyg_closed = False
if render_mode == "human":
pygame.init()
self.pyg_screen = pygame.display.set_mode(human_screen_size)
self.pyg_clock = pygame.time.Clock()
self.pyg_screen_size = human_screen_size
pygame.display.set_caption("Craftium")
self.last_observation = None # used in render if "rgb_array"
self.timesteps = 0 # the timesteps counter
def _get_info(self):
return dict()
def get_mt_config(self):
return deepcopy(self.mt.config)
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
):
"""Resets the environment to an initial internal state, returning an initial observation and info.
See [Env.reset](https://gymnasium.farama.org/api/env/#gymnasium.Env.reset) in the Gymnasium docs.
:param seed: The random seed.
:param options: Options dictionary.
"""
super().reset(seed=seed)
self.timesteps = 0
if not self.soft_reset or not self.mt_chann.is_open():
if self.mt_chann.is_open():
self.mt_chann.send_kill()
self.mt_chann.close_conn()
self.mt.close_pipes()
self.mt.wait_close()
if options is not None:
self.mt.overwrite_config(options["minetest_conf"])
if seed is not None:
self.mt.overwrite_config({"fixed_map_seed": seed})
# start the new MT process
self.mt.start_process()
# open communication channel with minetest
try:
self.mt_chann.open_conn()
except Exception as e:
print(
"\n\x1b[1m[!] Error connecting to Minetest. Minetest probably failed to launch.")
print(
" => Run's scratch directory should be available, containing stderr.txt and")
print(" stdout.txt useful for checking what went wrong.")
print(
"** Content of stderr.txt in the run's sratch directory:\x1b[0m")
print("~"*45, "\n")
with open(f"{self.mt.run_dir}/stderr.txt", "r") as f:
print(f.read())
print("~"*45)
print(
"\x1b[1mRaising catched exception (in case it's useful):\x1b[0m")
print("~"*45, "\n")
raise e
# HACK skip some frames to let the game initialize
# TODO This "waiting" should be implemented in Minetest not in python
for _ in range(self.init_frames):
_observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_chann.receive()
self.mt_chann.send([0]*21, 0, 0) # nop action
else:
self.mt_chann.send_soft_reset()
observation, voxobs, pos, vel, pitch, yaw, dtime, _reward, _term = self.mt_chann.receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
self.last_observation = observation
info = self._get_info()
info["voxel_obs"] = voxobs
info["player_pos"] = pos
info["player_vel"] = vel
info["player_pitch"] = pitch
info["player_yaw"] = yaw
info["mt_dtime"] = dtime
return observation, info
def step(self, action):
"""Run one timestep of the environment’s dynamics using the agent actions.
See [Env.step](https://gymnasium.farama.org/api/env/#gymnasium.Env.step) in the Gymnasium docs.
:param action: An action provided by the agent.
"""
self.timesteps += 1
# render the previous observation if needed
if self.render_mode == "human" and not self.pyg_closed:
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.pyg_closed = True
if not self.pyg_closed:
# handle different observation types converting to RGB if needed
if not self.rgb_observations and not self.gray_scale_keepdim:
pyg_obs = self.last_observation[:, :, None].repeat(3, 2)
elif not self.rgb_observations:
pyg_obs = self.last_observation.repeat(3, 2)
else:
pyg_obs = self.last_observation
# render the last observation in the pygame window
surface = pygame.image.frombuffer(
pyg_obs.tobytes(),
(self.obs_width, self.obs_height),
"RGB"
)
surface = pygame.transform.scale(surface, self.pyg_screen_size)
self.pyg_screen.blit(surface, (0, 0))
pygame.display.flip()
self.pyg_clock.tick(self.metadata["render_fps"]) # limits FPS
# convert the action dict to a format to be sent to MT through mt_chann
keys = [0]*21 # all commands (keys) except the mouse
mouse_x, mouse_y = 0, 0
for k, v in action.items():
if k == "mouse":
x, y = v[0], -v[1]
mouse_x = int(x*(self.obs_width // 2))
mouse_y = int(y*(self.obs_height // 2))
else:
keys[ACTION_ORDER.index(k)] = v
# send the action to MT
self.mt_chann.send(keys, mouse_x, mouse_y)
# receive the new info from minetest
observation, voxobs, pos, vel, pitch, yaw, dtime, reward, termination = self.mt_chann.receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
self.last_observation = observation
info = self._get_info()
info["voxel_obs"] = voxobs
info["player_pos"] = pos
info["player_vel"] = vel
info["player_pitch"] = pitch
info["player_yaw"] = yaw
info["mt_dtime"] = dtime
truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps
return observation, reward, termination, truncated, info
def render(self):
if self.render_mode == "rgb_array":
return self.last_observation
def close(self, clear: bool = True):
"""
Closes the environment and removes temporary files.
:param clear: Whether to remove the MT working
directory or not.
"""
if self.mt_chann.is_open():
self.mt_chann.send_kill()
self.mt_chann.close()
self.mt.close_pipes()
self.mt.wait_close()
if clear:
self.mt.clear()
if self.render_mode == "human":
pygame.quit()
close(clear=True)
Closes the environment and removes temporary files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
clear |
bool
|
Whether to remove the MT working directory or not. |
True
|
Source code in craftium/craftium_env.py
def close(self, clear: bool = True):
"""
Closes the environment and removes temporary files.
:param clear: Whether to remove the MT working
directory or not.
"""
if self.mt_chann.is_open():
self.mt_chann.send_kill()
self.mt_chann.close()
self.mt.close_pipes()
self.mt.wait_close()
if clear:
self.mt.clear()
if self.render_mode == "human":
pygame.quit()
reset(*, seed=None, options=None)
Resets the environment to an initial internal state, returning an initial observation and info.
See Env.reset in the Gymnasium docs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seed |
Optional[int]
|
The random seed. |
None
|
options |
Optional[dict]
|
Options dictionary. |
None
|
Source code in craftium/craftium_env.py
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
):
"""Resets the environment to an initial internal state, returning an initial observation and info.
See [Env.reset](https://gymnasium.farama.org/api/env/#gymnasium.Env.reset) in the Gymnasium docs.
:param seed: The random seed.
:param options: Options dictionary.
"""
super().reset(seed=seed)
self.timesteps = 0
if not self.soft_reset or not self.mt_chann.is_open():
if self.mt_chann.is_open():
self.mt_chann.send_kill()
self.mt_chann.close_conn()
self.mt.close_pipes()
self.mt.wait_close()
if options is not None:
self.mt.overwrite_config(options["minetest_conf"])
if seed is not None:
self.mt.overwrite_config({"fixed_map_seed": seed})
# start the new MT process
self.mt.start_process()
# open communication channel with minetest
try:
self.mt_chann.open_conn()
except Exception as e:
print(
"\n\x1b[1m[!] Error connecting to Minetest. Minetest probably failed to launch.")
print(
" => Run's scratch directory should be available, containing stderr.txt and")
print(" stdout.txt useful for checking what went wrong.")
print(
"** Content of stderr.txt in the run's sratch directory:\x1b[0m")
print("~"*45, "\n")
with open(f"{self.mt.run_dir}/stderr.txt", "r") as f:
print(f.read())
print("~"*45)
print(
"\x1b[1mRaising catched exception (in case it's useful):\x1b[0m")
print("~"*45, "\n")
raise e
# HACK skip some frames to let the game initialize
# TODO This "waiting" should be implemented in Minetest not in python
for _ in range(self.init_frames):
_observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_chann.receive()
self.mt_chann.send([0]*21, 0, 0) # nop action
else:
self.mt_chann.send_soft_reset()
observation, voxobs, pos, vel, pitch, yaw, dtime, _reward, _term = self.mt_chann.receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
self.last_observation = observation
info = self._get_info()
info["voxel_obs"] = voxobs
info["player_pos"] = pos
info["player_vel"] = vel
info["player_pitch"] = pitch
info["player_yaw"] = yaw
info["mt_dtime"] = dtime
return observation, info
step(action)
Run one timestep of the environment’s dynamics using the agent actions.
See Env.step in the Gymnasium docs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action |
An action provided by the agent. |
required |
Source code in craftium/craftium_env.py
def step(self, action):
"""Run one timestep of the environment’s dynamics using the agent actions.
See [Env.step](https://gymnasium.farama.org/api/env/#gymnasium.Env.step) in the Gymnasium docs.
:param action: An action provided by the agent.
"""
self.timesteps += 1
# render the previous observation if needed
if self.render_mode == "human" and not self.pyg_closed:
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.pyg_closed = True
if not self.pyg_closed:
# handle different observation types converting to RGB if needed
if not self.rgb_observations and not self.gray_scale_keepdim:
pyg_obs = self.last_observation[:, :, None].repeat(3, 2)
elif not self.rgb_observations:
pyg_obs = self.last_observation.repeat(3, 2)
else:
pyg_obs = self.last_observation
# render the last observation in the pygame window
surface = pygame.image.frombuffer(
pyg_obs.tobytes(),
(self.obs_width, self.obs_height),
"RGB"
)
surface = pygame.transform.scale(surface, self.pyg_screen_size)
self.pyg_screen.blit(surface, (0, 0))
pygame.display.flip()
self.pyg_clock.tick(self.metadata["render_fps"]) # limits FPS
# convert the action dict to a format to be sent to MT through mt_chann
keys = [0]*21 # all commands (keys) except the mouse
mouse_x, mouse_y = 0, 0
for k, v in action.items():
if k == "mouse":
x, y = v[0], -v[1]
mouse_x = int(x*(self.obs_width // 2))
mouse_y = int(y*(self.obs_height // 2))
else:
keys[ACTION_ORDER.index(k)] = v
# send the action to MT
self.mt_chann.send(keys, mouse_x, mouse_y)
# receive the new info from minetest
observation, voxobs, pos, vel, pitch, yaw, dtime, reward, termination = self.mt_chann.receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
self.last_observation = observation
info = self._get_info()
info["voxel_obs"] = voxobs
info["player_pos"] = pos
info["player_vel"] = vel
info["player_pitch"] = pitch
info["player_yaw"] = yaw
info["mt_dtime"] = dtime
truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps
return observation, reward, termination, truncated, info
craftium.multiagent_env.MarlCraftiumEnv
The main class implementing the multi-agent version of Craftium environments.
Check CraftiumEnv for parameters' documentation.
Source code in craftium/multiagent_env.py
class MarlCraftiumEnv():
"""
The main class implementing the multi-agent version of Craftium environments.
Check `CraftiumEnv` for parameters' documentation.
"""
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
def __init__(
self,
num_agents: int,
env_dir: os.PathLike,
obs_width: int = 640,
obs_height: int = 360,
init_frames: int = 200,
render_mode: Optional[str] = None,
max_timesteps: Optional[int] = None,
run_dir_prefix: Optional[os.PathLike] = None,
game_id: str = "minetest",
world_name: str = "world",
minetest_dir: Optional[str] = None,
# tcp_port: Optional[int] = None,
mt_server_conf: dict[str, Any] = dict(),
mt_clients_conf: dict[str, Any] = dict(),
pipe_proc: bool = True,
mt_listen_timeout: int = 60_000,
mt_server_port: Optional[int] = None,
frameskip: int = 1,
rgb_observations: bool = True,
gray_scale_keepdim: bool = False,
seed: Optional[int] = None,
sync_mode: bool = False,
fps_max: int = 200,
pmul: int = 20,
):
assert num_agents > 1, "Number of agents lower than 2. Use CraftiumEnv for single agent environments."
self.num_agents = num_agents
self.obs_width = obs_width
self.obs_height = obs_height
self.init_frames = init_frames // frameskip
self.max_timesteps = max_timesteps
self.gray_scale_keepdim = gray_scale_keepdim
self.rgb_observations = rgb_observations
# define the action space
action_dict = {}
for act in ACTION_ORDER[:-1]: # all actions except the last ("mouse")
action_dict[act] = Discrete(2) # 1/0: key pressed/not pressed
# define the mouse action
action_dict[ACTION_ORDER[-1]
] = Box(low=-1, high=1, shape=(2,), dtype=np.float32)
self.action_space = Dict(action_dict)
# define the observation space
shape = [num_agents, obs_width, obs_height]
if rgb_observations:
shape.append(3)
elif gray_scale_keepdim:
shape.append(1)
self.observation_space = Box(
low=0, high=255, shape=shape, dtype=np.uint8)
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
# crate a communication channel between python and MT (server) for each of the agents
self.mt_channs = []
for _ in range(num_agents):
chann = MtChannel(
img_width=self.obs_width,
img_height=self.obs_height,
listen_timeout=mt_listen_timeout,
rgb_imgs=rgb_observations,
)
self.mt_channs.append(chann)
# create a MT instance that will be configured as the server
self.mt_server = MTServerOnly(
run_dir_prefix=run_dir_prefix,
seed=seed,
game_id=game_id,
world_name=world_name,
sync_dir=env_dir,
minetest_dir=minetest_dir,
minetest_conf=mt_server_conf,
pipe_proc=pipe_proc,
mt_server_port=mt_server_port,
sync_mode=sync_mode,
fps_max=fps_max,
pmul=pmul,
)
# create a MT (client) instance for each agent
self.mt_clients = []
for i in range(num_agents):
client = MTClientOnly(
tcp_port=self.mt_channs[i].port,
client_name=f"agent{i}",
mt_server_port=self.mt_server.server_port,
run_dir_prefix=run_dir_prefix,
headless=i == 0 and render_mode != "human",
seed=seed,
sync_dir=env_dir,
screen_w=obs_width,
screen_h=obs_height,
minetest_dir=minetest_dir,
minetest_conf=mt_clients_conf,
pipe_proc=pipe_proc,
frameskip=frameskip,
rgb_frames=rgb_observations,
sync_mode=sync_mode,
fps_max=fps_max,
pmul=pmul,
)
self.mt_clients.append(client)
# used in render if "rgb_array"
self.last_observations = [None]*num_agents
self.timesteps = 0 # the timesteps counter
self.current_agent_id = 0
def _get_info(self):
return dict()
def reset(self, **kwargs):
"""Resets the environment."""
self.timesteps = 0
observations = []
# intialize the MT processes (server and clients)
# NOTE: only done the first time reset is called
if self.mt_server.proc is None:
self.mt_server.start_process()
# HACK wait for the server to initialize before launching the clients
print(
"* Waiting for MT server to initialize. This is only required in the first call to reset")
# TODO Use a an argument of the class instead of a constant
time.sleep(5)
for i in range(self.num_agents):
# start the new MT (client) process
self.mt_clients[i].start_process()
# re-open the connection between python and the MT client
self.mt_channs[i].open_conn()
# HACK skip some frames to let the game initialize
# TODO This "waiting" should be implemented in Minetest not in python
for _ in range(self.init_frames):
_observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_channs[i].receive()
self.mt_channs[i].send([0]*21, 0, 0) # nop action
# receive the new info from minetest
observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
observations.append(observation)
self.last_observations[i] = observation
else: # soft reset
for i in range(self.num_agents):
# send a soft reset to the MT client
self.mt_channs[i].send_soft_reset()
# receive a new observation from minetest
observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
observations.append(observation)
self.last_observations[i] = observation
infos = self._get_info()
# stack the observations of each agent
observations = np.vstack([np.expand_dims(obs, 0)
for obs in observations])
return observations, infos
def step_agent(self, action):
"""
Runs a single step of the currently selected agent (`env.current_agent_id`).
:param action
"""
self.timesteps += 1
if self.current_agent_id == self.num_agents:
self.current_agent_id = 0
agent_id = self.current_agent_id
self.current_agent_id += 1
# convert the action dict to a format to be sent to MT through mt_chann
keys = [0]*21 # all commands (keys) except the mouse
mouse_x, mouse_y = 0, 0
for k, v in action.items():
if k == "mouse":
x, y = v[0], -v[1]
mouse_x = int(x*(self.obs_width // 2))
mouse_y = int(y*(self.obs_height // 2))
else:
keys[ACTION_ORDER.index(k)] = v
# send the action to MT
self.mt_channs[agent_id].send(keys, mouse_x, mouse_y)
# receive the new info from minetest
observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, termination = self.mt_channs[agent_id].receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
self.last_observations[agent_id] = observation
info = self._get_info()
truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps
return observation, reward, termination, truncated, info
def step(self, actions):
"""Runs an environment step per agent.
:param actions
"""
assert len(
actions) == self.num_agents, f"The number of actions ({len(actions)}) must match with the number of agents ({self.num_agents})"
observations, rewards, terminations, truncations = [], [], [], []
infos = dict()
for agent_id in range(self.num_agents):
self.current_agent_id = agent_id
obs, rwd, trm, trc, inf = self.step_agent(actions[agent_id])
observations.append(obs)
rewards.append(rwd)
terminations.append(trm)
truncations.append(trc)
infos |= inf # the | operator merges two dicts
# stack the observations of each agent
observations = np.vstack([np.expand_dims(obs, 0)
for obs in observations])
rewards = np.array(rewards)
terminations = np.array(terminations)
truncations = np.array(truncations)
return observations, rewards, terminations, truncations, infos
def render(self):
if self.render_mode == "rgb_array":
return self.last_observations
def close(self, clear: bool = True):
"""
Closes the environment and removes temporary files.
:param clear: Whether to remove the MT working
directory or not.
"""
# close all MT clients
for i in range(self.num_agents):
if self.mt_channs[i].is_open():
self.mt_channs[i].send_kill()
self.mt_channs[i].close_conn()
self.mt_clients[i].close_pipes()
self.mt_clients[i].wait_close()
if clear:
self.mt_clients[i].clear()
# close the MT server
self.mt_server.close_pipes()
# send a kill signal to the server, as we've no way to send a self-termination
# command to it like in the case of the clients
os.killpg(os.getpgid(self.mt_server.proc.pid), signal.SIGTERM)
if clear:
self.mt_server.clear()
close(clear=True)
Closes the environment and removes temporary files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
clear |
bool
|
Whether to remove the MT working directory or not. |
True
|
Source code in craftium/multiagent_env.py
def close(self, clear: bool = True):
"""
Closes the environment and removes temporary files.
:param clear: Whether to remove the MT working
directory or not.
"""
# close all MT clients
for i in range(self.num_agents):
if self.mt_channs[i].is_open():
self.mt_channs[i].send_kill()
self.mt_channs[i].close_conn()
self.mt_clients[i].close_pipes()
self.mt_clients[i].wait_close()
if clear:
self.mt_clients[i].clear()
# close the MT server
self.mt_server.close_pipes()
# send a kill signal to the server, as we've no way to send a self-termination
# command to it like in the case of the clients
os.killpg(os.getpgid(self.mt_server.proc.pid), signal.SIGTERM)
if clear:
self.mt_server.clear()
reset(**kwargs)
Resets the environment.
Source code in craftium/multiagent_env.py
def reset(self, **kwargs):
"""Resets the environment."""
self.timesteps = 0
observations = []
# intialize the MT processes (server and clients)
# NOTE: only done the first time reset is called
if self.mt_server.proc is None:
self.mt_server.start_process()
# HACK wait for the server to initialize before launching the clients
print(
"* Waiting for MT server to initialize. This is only required in the first call to reset")
# TODO Use a an argument of the class instead of a constant
time.sleep(5)
for i in range(self.num_agents):
# start the new MT (client) process
self.mt_clients[i].start_process()
# re-open the connection between python and the MT client
self.mt_channs[i].open_conn()
# HACK skip some frames to let the game initialize
# TODO This "waiting" should be implemented in Minetest not in python
for _ in range(self.init_frames):
_observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, _reward, _term = self.mt_channs[i].receive()
self.mt_channs[i].send([0]*21, 0, 0) # nop action
# receive the new info from minetest
observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
observations.append(observation)
self.last_observations[i] = observation
else: # soft reset
for i in range(self.num_agents):
# send a soft reset to the MT client
self.mt_channs[i].send_soft_reset()
# receive a new observation from minetest
observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, _term = self.mt_channs[i].receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
observations.append(observation)
self.last_observations[i] = observation
infos = self._get_info()
# stack the observations of each agent
observations = np.vstack([np.expand_dims(obs, 0)
for obs in observations])
return observations, infos
step(actions)
Runs an environment step per agent.
Source code in craftium/multiagent_env.py
def step(self, actions):
"""Runs an environment step per agent.
:param actions
"""
assert len(
actions) == self.num_agents, f"The number of actions ({len(actions)}) must match with the number of agents ({self.num_agents})"
observations, rewards, terminations, truncations = [], [], [], []
infos = dict()
for agent_id in range(self.num_agents):
self.current_agent_id = agent_id
obs, rwd, trm, trc, inf = self.step_agent(actions[agent_id])
observations.append(obs)
rewards.append(rwd)
terminations.append(trm)
truncations.append(trc)
infos |= inf # the | operator merges two dicts
# stack the observations of each agent
observations = np.vstack([np.expand_dims(obs, 0)
for obs in observations])
rewards = np.array(rewards)
terminations = np.array(terminations)
truncations = np.array(truncations)
return observations, rewards, terminations, truncations, infos
step_agent(action)
Runs a single step of the currently selected agent (env.current_agent_id).
Source code in craftium/multiagent_env.py
def step_agent(self, action):
"""
Runs a single step of the currently selected agent (`env.current_agent_id`).
:param action
"""
self.timesteps += 1
if self.current_agent_id == self.num_agents:
self.current_agent_id = 0
agent_id = self.current_agent_id
self.current_agent_id += 1
# convert the action dict to a format to be sent to MT through mt_chann
keys = [0]*21 # all commands (keys) except the mouse
mouse_x, mouse_y = 0, 0
for k, v in action.items():
if k == "mouse":
x, y = v[0], -v[1]
mouse_x = int(x*(self.obs_width // 2))
mouse_y = int(y*(self.obs_height // 2))
else:
keys[ACTION_ORDER.index(k)] = v
# send the action to MT
self.mt_channs[agent_id].send(keys, mouse_x, mouse_y)
# receive the new info from minetest
observation, _voxobs, _pos, _vel, _pitch, _yaw, _dtime, reward, termination = self.mt_channs[agent_id].receive()
if not self.gray_scale_keepdim and not self.rgb_observations:
observation = observation[:, :, 0]
self.last_observations[agent_id] = observation
info = self._get_info()
truncated = self.max_timesteps is not None and self.timesteps >= self.max_timesteps
return observation, reward, termination, truncated, info
craftium.pettingzoo_env.env(env_name, render_mode=None, **kwargs)
Returns the PettingZoo version (AECEnv) of multi-agent Craftium environments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env_name |
str
|
Name of the environment to load. |
required |
render_mode |
Optional[str]
|
Rendering mode ( |
None
|
**kwargs |
Additional keyword arguments to pass to the created Craftium environment (see |
{}
|
Source code in craftium/pettingzoo_env.py
def env(env_name: str, render_mode: Optional[str] = None, **kwargs):
"""
Returns the PettingZoo version (`AECEnv`) of multi-agent Craftium environments.
:param env_name: Name of the environment to load.
:param render_mode: Rendering mode (`"rgb_array"` or `"human"`).
:param **kwargs: Additional keyword arguments to pass to the created Craftium environment (see `MarlCraftiumEnv`).
"""
env_dir = os.path.join(root_path, AVAIL_ENVS[env_name]["env_dir"])
final_args = kwargs | AVAIL_ENVS[env_name]["conf"]
return raw_env(
env_dir,
render_mode,
**final_args
)
craftium.wrappers.BinaryActionWrapper
Bases: ActionWrapper
A Gymnasium ActionWrapper that translates craftium's Dict action space into a binary (discretized) action space MultiBiniary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env |
Env
|
The environment to wrap. |
required |
actions |
list[str]
|
A list of strings containing the names of the actions that will consititute the new action space. |
required |
mouse_mov |
float
|
Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped. |
0.5
|
Source code in craftium/wrappers.py
class BinaryActionWrapper(ActionWrapper):
"""A Gymnasium `ActionWrapper` that translates craftium's `Dict` action space into a binary (discretized) action space [`MultiBiniary`](https://gymnasium.farama.org/api/spaces/fundamental/#gymnasium.spaces.MultiBinary).
:param env: The environment to wrap.
:param actions: A list of strings containing the names of the actions that will consititute the new action space.
:params mouse_mov: Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped.
"""
def __init__(self, env: Env, actions: list[str], mouse_mov: float = 0.5):
ActionWrapper.__init__(self, env)
check_actions_valid(actions)
self.actions = actions
self.action_space = MultiBinary(len(actions))
self.mouse_mov = clip_mouse(mouse_mov)
def process(self, action):
assert len(action) == len(self.actions), \
f"Incorrect number of actions, got {len(action)} but expected {len(self.actions)}"
res = {}
mouse = [0, 0]
for a, name in zip(action, self.actions):
if a == 0:
continue
if name == "mouse x+":
mouse[0] += self.mouse_mov
elif name == "mouse x-":
mouse[0] -= self.mouse_mov
elif name == "mouse y+":
mouse[1] += self.mouse_mov
elif name == "mouse y-":
mouse[1] -= self.mouse_mov
else:
res[name] = a
res["mouse"] = mouse
return res
def action(self, action):
if isinstance(action, list) or isinstance(action, np.ndarray):
return self.process(action)
return self.process([action])
craftium.wrappers.DiscreteActionWrapper
Bases: ActionWrapper
A Gymnasium ActionWrapper that translates craftium's Dict action space into a discretized action space Discrete.
Unlike DiscreteActionWrapper, this wrapper adds an additional action to the action space in order to include the NOP action. This action is equivalent to {} in the Dict space or to a list of zeros in the MultiBinary space. The NOP action has index 0, and the rest of the actions have the consecutive idexes. Thus, the number of actions of the environment will be len(actions)+1.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env |
Env
|
The environment to wrap. |
required |
actions |
list[str]
|
A list of strings containing the names of the actions that will consititute the new action space. |
required |
mouse_mov |
float
|
Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped. |
0.5
|
Source code in craftium/wrappers.py
class DiscreteActionWrapper(ActionWrapper):
"""A Gymnasium `ActionWrapper` that translates craftium's `Dict` action space into a discretized action space [`Discrete`](https://gymnasium.farama.org/api/spaces/fundamental/#gymnasium.spaces.Discrete).
Unlike `DiscreteActionWrapper`, this wrapper adds an additional action to the action space in order to include the `NOP` action. This action is equivalent to `{}` in the `Dict` space or to a list of zeros in the `MultiBinary` space. The `NOP` action has index `0`, and the rest of the actions have the consecutive idexes. Thus, the number of actions of the environment will be `len(actions)+1`.
:param env: The environment to wrap.
:param actions: A list of strings containing the names of the actions that will consititute the new action space.
:params mouse_mov: Magnitude of the mouse movement. Must be in the [0, 1] range, else it will be clipped.
"""
def __init__(self, env: Env, actions: list[str], mouse_mov: float = 0.5):
ActionWrapper.__init__(self, env)
check_actions_valid(actions)
self.actions = actions
self.action_space = Discrete(len(actions)+1)
self.mouse_mov = clip_mouse(mouse_mov)
def process(self, action):
assert action >= 0 and action <= len(self.actions), \
f"Action out of bound, got {action} but expected 0 <= action <= {len(self.actions)}"
# if the action has index 0, return an empty action (NOP)
if action == 0:
return {}
res = {}
name = self.actions[action-1]
mouse = [0, 0]
if name == "mouse x+":
mouse[0] += self.mouse_mov
elif name == "mouse x-":
mouse[0] -= self.mouse_mov
elif name == "mouse y+":
mouse[1] += self.mouse_mov
elif name == "mouse y-":
mouse[1] -= self.mouse_mov
else:
res[name] = 1
res["mouse"] = mouse
return res
def action(self, action):
if isinstance(action, list) or isinstance(action, np.ndarray):
return [self.process(act) for act in action]
return self.process(action)
craftium.extra
This module contains some extra functinallities for Craftium, such as a random dungeon generator and continual RL utilities.
craftium.extra.random_map_generator.RandomMapGen
Random dungeon map generator. This class is intended to be used
for the Craftium/ProcDungeons-v0 environment, but you could implement
and use your own generator (must use the same ascii format).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_rooms |
int
|
Number of rooms of the dungeon. |
4
|
room_min_size |
int
|
Minimum size (both height and width) of the room. |
7
|
room_max_size |
int
|
Maximum size (both height and width) of the room. |
15
|
dispersion |
float
|
Affects the _distance between the rooms. |
1.0
|
min_monsters_per_room |
int
|
Minimum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed. |
0
|
max_monsters_per_room |
int
|
Maximum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed. |
5
|
monsters |
dict[str, float]
|
A dictionary with the |
{'a': 0.4, 'b': 0.3, 'c': 0.2, 'd': 0.1}
|
monsters_in_player_spawn |
bool
|
If set to |
False
|
Source code in craftium/extra/random_map_generator.py
class RandomMapGen():
"""Random dungeon map generator. This class is intended to be used
for the `Craftium/ProcDungeons-v0` environment, but you could implement
and use your own generator (must use the same ascii format).
:param n_rooms: Number of rooms of the dungeon.
:param room_min_size: Minimum size (both height and width) of the room.
:param room_max_size: Maximum size (both height and width) of the room.
:param dispersion: Affects the _distance between the rooms.
:param min_monsters_per_room: Minimum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed.
:param max_monsters_per_room: Maximum number of monsters per room. If the minimum is set equal to the maximum, the number of monsters per room is fixed.
:param monsters: A dictionary with the `a`, `b`, `c`, and `d` keys (refers to the type of the monster), where values are the probability of spawning a monster of that type. Types are sorted from `a` less dangerous to, `d`, more.
:param monsters_in_player_spawn: If set to `True`, monsters can spawn in the same room as the player.
"""
def __init__(
self,
n_rooms: int = 4,
room_min_size: int = 7,
room_max_size: int = 15,
dispersion: float = 1.,
min_monsters_per_room: int = 0,
max_monsters_per_room: int = 5,
monsters: dict[str, float] = {
"a": 0.4, "b": 0.3, "c": 0.2, "d": 0.1},
monsters_in_player_spawn: bool = False,
):
assert room_min_size >= 5, "Minimum room size must be >= 5"
assert room_min_size <= room_max_size, "Room min size must be smaller or equal to max size"
rooms = [[[0, 0],
list(np.random.randint(room_min_size, room_max_size+1, size=2))]
for _ in range(n_rooms)]
# place the rooms in non-overlapping placements
collide = True
while collide:
for i in range(n_rooms):
others = rooms[:i] + rooms[i+1:]
if sum([self._collide(rooms[i], other) for other in others]) == 0:
continue
others_center = np.array(others).mean(axis=0).mean(axis=0)
center = np.array(rooms[i]).mean(axis=0)
d = (dispersion*(center - others_center)).astype(np.int64)
d[d == 0] = np.random.randint(-1, 2)
rooms[i][0][0] += d[0]
rooms[i][1][0] += d[0]
rooms[i][0][1] += d[1]
rooms[i][1][1] += d[1]
collide = False
for k in range(n_rooms):
for j in range(n_rooms):
if k != j:
if self._collide(rooms[k], rooms[j]):
collide = True
break
if not collide:
break
# plt.clf()
# self.plot(rooms)
# plt.pause(0.01)
# translate the rooms to the x>=0 y>=0 quadrant
rooms = np.array(rooms)
rooms[:, :, 0] -= rooms[:, :, 0].min()
rooms[:, :, 1] -= rooms[:, :, 1].min()
# place all corridors
corrs = []
for i in range(n_rooms):
# compute the _distances from the room i to the other rooms
c = np.array(self._box_center(rooms[i]))
dists = [np.power(c - np.array(self._box_center(rooms[j])), 2).sum()
for j in range(n_rooms) if i != j]
indices = list(range(n_rooms))
indices.remove(i)
indices = np.array(indices)
for j in indices[np.argsort(dists)]:
cor = [self._box_center(rooms[i]), self._box_center(rooms[j])]
# the corridor should not intersect with any room
cor_intersects_rooms = sum([
self._line_intersects_box(cor, rooms[k]) for k in range(n_rooms) if k != i and k != j
]) > 0
# the corridor should not intersect with other corridors
cor_intersects_cor = sum(
[self.lines_intersect(cor, c) for c in corrs]) > 0
if not (cor_intersects_rooms or cor_intersects_cor):
corrs.append(cor)
# if 0.5 > np.random.rand(): # add another (extra) corridor with 0.5 prob
# break
break
# make all rooms accessible
corrs, _ = self._add_minimum_edges(corrs)
# plt.clf()
# self.plot(rooms, corridors=corrs)
# plt.show()
# place the player in the center of a random room
idx1 = np.random.randint(0, len(corrs))
idx2 = np.random.randint(0, 2)
self.player_pos = corrs[idx1][idx2]
# place the objective in the most distant room to the player
dists = [self._distance(self.player_pos, self._box_center(r))
for r in rooms]
room_idx = np.argmax(dists)
self.objective_pos = self._box_center(rooms[room_idx])
# place monsters
monster_names = list(monsters.keys())
monster_probs = list(monsters.values())
monster_type_indices = -np.arange(1, len(monster_probs)+1)
monster_locs = []
for room in rooms:
if (not monsters_in_player_spawn
and self._box_center(room) == self.player_pos) or max_monsters_per_room == 0:
continue
# number of monsters in this room
if min_monsters_per_room == max_monsters_per_room:
n = min_monsters_per_room
else:
n = np.random.randint(
min_monsters_per_room, max_monsters_per_room)
type_idx = np.random.choice(monster_type_indices, p=monster_probs)
# compute al locations within a room that a monster can spawn
places = []
for x in range(room[0, 0]+2, room[1, 0]-1):
for y in range(room[0, 1]+2, room[1, 1]-1):
# check the position isn't objective's
if not (self.objective_pos == np.array([x, y])).all():
places.append([x, y])
# select random placements for the monsters
for idx in np.random.choice(len(places), min(n, len(places)), replace=False):
monster_locs.append((type_idx, places[idx]))
self.monster_locs = monster_locs
self.monster_names = monster_names
self.monster_type_indices = monster_type_indices
self.corridors = np.array(corrs)
self.rooms = rooms
def _bresenham(self, x0, y0, x1, y1):
"""Yield integer coordinates on the line from (x0, y0) to (x1, y1).
This function is directly taken from https://github.com/encukou/_bresenham under the MIT license.
"""
dx = x1 - x0
dy = y1 - y0
xsign = 1 if dx > 0 else -1
ysign = 1 if dy > 0 else -1
dx = abs(dx)
dy = abs(dy)
if dx > dy:
xx, xy, yx, yy = xsign, 0, 0, ysign
else:
dx, dy = dy, dx
xx, xy, yx, yy = 0, ysign, xsign, 0
D = 2*dy - dx
y = 0
for x in range(dx + 1):
yield x0 + x*xx + y*yx, y0 + x*xy + y*yy
if D >= 0:
y += 1
D -= 2*dx
D += 2*dy
def rasterize(self, wall_height=2, ceiling=True):
"""Converts the generated map into an ASCII string.
:params wall_height: Height of the walls of the dungeon.
"""
assert wall_height >= 1, "Wall height must be >= 1"
ncols, nrows = self.rooms[:, :, 0].max(), self.rooms[:, :, 1].max()
m = np.zeros((nrows, ncols), dtype=np.int8)
# add rooms
for room in self.rooms:
m[room[0, 1]:room[1, 1], room[0, 0]:room[1, 0]] = 1
# add corridors
for corr in self.corridors:
x0, y0, x1, y1 = corr[0, 0], corr[0, 1], corr[1, 0], corr[1, 1]
lines = list(self._bresenham(x0, y0, x1, y1))
for (x, y) in lines:
m[y, x] = 1
# increase line thickness from 1 to 3 (minimum)
m[y+1, x], m[y-1, x] = 1, 1
m[y+2, x], m[y-2, x] = 1, 1
m[y, x+1], m[y, x-1] = 1, 1
m[y, x+2], m[y, x-2] = 1, 1
# add walls
layers = [m]
l1 = np.zeros((nrows, ncols), dtype=np.int8)
for i in range(nrows):
for j in range(ncols):
if m[i, j] == 0:
continue
# check if the block i,j is completely surrounded with other blocks or not
v = True
if i > 0:
v = v and m[i-1, j] == 1
if i < nrows - 1:
v = v and m[i+1, j] == 1
if j > 0:
v = v and m[i, j-1] == 1
if j < ncols - 1:
v = v and m[i, j+1] == 1
if not v or (i == 0 or i == nrows-1 or j == 0 or j == ncols - 1):
l1[i, j] = 1
# add walls
for _ in range(wall_height):
layers.append(l1.copy())
# place the player
layers[1][self.player_pos[1], self.player_pos[0]] = 2
# place the objective
layers[1][self.objective_pos[1], self.objective_pos[0]] = 3
# place the monsters
for type_idx, pos in self.monster_locs:
layers[1][pos[1], pos[0]] = type_idx
if ceiling:
# add ceiling if needed (same pattern as the first layer)
l0 = layers[0].copy()
l0[l0 == 1] = 4
layers.append(l0)
# convert layer matrices into a string
s = ""
for il, layer in enumerate(layers):
for i in range(nrows):
for j in range(ncols):
char = " "
if layer[i][j] == 1:
char = "#"
elif layer[i][j] == 2:
char = "@"
elif layer[i][j] == 3:
char = "O"
elif layer[i][j] == 4:
char = "%"
elif layer[i][j] < 0: # negative values are monsters
char = self.monster_names[(-layer[i][j])-1]
s += char
if il < len(layers)-1 or i < nrows-1:
s += "\n"
if il < len(layers)-1: # skip separator in last layer
s += "-\n"
return s
def plot(self, rooms, corridors=[]):
ax = plt.gca()
for room in rooms:
rect = plt.Rectangle(
xy=room[0],
height=room[1][1]-room[0][1],
width=room[1][0]-room[0][0],
fill=False,
linewidth=2,
)
ax.add_patch(rect)
plt.scatter(*self._box_center(room))
for corr in corridors:
plt.plot([corr[0][0], corr[1][0]], [corr[0][1], corr[1][1]])
lims = np.array(rooms)
plt.xlim([lims.min()-10, lims.max()+10])
plt.ylim([lims.min()-10, lims.max()+10])
def _box_center(self, a):
return int((a[0][0]+a[1][0])/2), int((a[0][1]+a[1][1])/2)
def lines_intersect(self, line1, line2):
def on_segment(p, q, r):
"""Check if point q lies on the segment pr"""
if (min(p[0], r[0]) <= q[0] <= max(p[0], r[0]) and
min(p[1], r[1]) <= q[1] <= max(p[1], r[1])):
return True
return False
def orientation(p, q, r):
"""Determine the orientation of the triplet (p, q, r)
0 -> p, q and r are collinear
1 -> Clockwise
2 -> Counterclockwise
"""
val = (q[1] - p[1]) * (r[0] - q[0]) - (q[0] - p[0]) * (r[1] - q[1])
if val == 0:
return 0 # collinear
elif val > 0:
return 1 # clockwise
else:
return 2 # counterclockwise
def do_intersect(p1, q1, p2, q2):
"""Check if the line segments 'p1q1' and 'p2q2' intersect"""
# Check if they share an endpoint, if so, return False (no intersection)
if p1 == p2 or p1 == q2 or q1 == p2 or q1 == q2:
return False
# Find the four orientations needed for general and special cases
o1 = orientation(p1, q1, p2)
o2 = orientation(p1, q1, q2)
o3 = orientation(p2, q2, p1)
o4 = orientation(p2, q2, q1)
# General case
if o1 != o2 and o3 != o4:
return True
# Special Cases (excluding shared endpoint cases)
if o1 == 0 and on_segment(p1, p2, q1):
return True
if o2 == 0 and on_segment(p1, q2, q1):
return True
if o3 == 0 and on_segment(p2, p1, q2):
return True
if o4 == 0 and on_segment(p2, q1, q2):
return True
return False
# Extract points from the lines
(p1, q1) = line1
(p2, q2) = line2
return do_intersect(p1, q1, p2, q2)
def _line_intersects_box(self, line, box):
"""Check if a line intersects a rectangular box"""
# Extract the line points
(x1, y1), (x2, y2) = line
# Extract the box points
(box_left, box_top), (box_right, box_bottom) = box
# Define the four sides of the box as lines
box_edges = [
[(box_left, box_top), (box_right, box_top)], # top edge
[(box_right, box_top), (box_right, box_bottom)], # right edge
[(box_right, box_bottom), (box_left, box_bottom)], # bottom edge
[(box_left, box_bottom), (box_left, box_top)] # left edge
]
# Check if the line intersects any of the four sides of the box
for edge in box_edges:
if self.lines_intersect(line, edge):
return True
# Check if the line is completely inside the box
if (box_left <= x1 <= box_right and box_top <= y1 <= box_bottom) or \
(box_left <= x2 <= box_right and box_top <= y2 <= box_bottom):
return True
return False
def _distance(self, node1, node2):
"""Calculate Euclidean _distance between two nodes."""
return math.sqrt((node1[0] - node2[0]) ** 2 + (node1[1] - node2[1]) ** 2)
def _add_minimum_edges(self, edges):
"""Add the minimum number of edges to connect all components."""
def find_components(edges):
"""Find connected components in the graph using DFS."""
graph = defaultdict(list)
for (a, b) in edges:
graph[a].append(b)
graph[b].append(a)
visited = set()
components = []
def dfs(node, component):
stack = [node]
while stack:
current = stack.pop()
if current not in visited:
visited.add(current)
component.append(current)
stack.extend(graph[current])
for node in graph:
if node not in visited:
component = []
dfs(node, component)
components.append(component)
return components
# Find all the connected components
components = find_components(edges)
# If there's only one component, the graph is already fully connected
if len(components) == 1:
return edges, []
new_edges = []
# Find the closest pair of nodes between different components
while len(components) > 1:
min_dist = float('inf')
edge_to_add = None
comp1, comp2 = None, None
# Check all pairs of components
for i in range(len(components)):
for j in range(i + 1, len(components)):
for node1 in components[i]:
for node2 in components[j]:
dist = self._distance(node1, node2)
if dist < min_dist:
min_dist = dist
edge_to_add = (node1, node2)
comp1, comp2 = i, j
# Add the closest edge
new_edges.append(edge_to_add)
edges.append(edge_to_add)
# Merge the two components
components[comp1].extend(components[comp2])
components.pop(comp2)
return edges, new_edges
def _collide(self, a, b):
b1 = np.array(a).flatten()
b2 = np.array(b).flatten()
return not (b1[0] > b2[2] or b1[2] < b2[0] or b1[1] > b2[3] or b1[3] < b2[1])
rasterize(wall_height=2, ceiling=True)
Converts the generated map into an ASCII string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wall_height |
Height of the walls of the dungeon. |
2
|
Source code in craftium/extra/random_map_generator.py
def rasterize(self, wall_height=2, ceiling=True):
"""Converts the generated map into an ASCII string.
:params wall_height: Height of the walls of the dungeon.
"""
assert wall_height >= 1, "Wall height must be >= 1"
ncols, nrows = self.rooms[:, :, 0].max(), self.rooms[:, :, 1].max()
m = np.zeros((nrows, ncols), dtype=np.int8)
# add rooms
for room in self.rooms:
m[room[0, 1]:room[1, 1], room[0, 0]:room[1, 0]] = 1
# add corridors
for corr in self.corridors:
x0, y0, x1, y1 = corr[0, 0], corr[0, 1], corr[1, 0], corr[1, 1]
lines = list(self._bresenham(x0, y0, x1, y1))
for (x, y) in lines:
m[y, x] = 1
# increase line thickness from 1 to 3 (minimum)
m[y+1, x], m[y-1, x] = 1, 1
m[y+2, x], m[y-2, x] = 1, 1
m[y, x+1], m[y, x-1] = 1, 1
m[y, x+2], m[y, x-2] = 1, 1
# add walls
layers = [m]
l1 = np.zeros((nrows, ncols), dtype=np.int8)
for i in range(nrows):
for j in range(ncols):
if m[i, j] == 0:
continue
# check if the block i,j is completely surrounded with other blocks or not
v = True
if i > 0:
v = v and m[i-1, j] == 1
if i < nrows - 1:
v = v and m[i+1, j] == 1
if j > 0:
v = v and m[i, j-1] == 1
if j < ncols - 1:
v = v and m[i, j+1] == 1
if not v or (i == 0 or i == nrows-1 or j == 0 or j == ncols - 1):
l1[i, j] = 1
# add walls
for _ in range(wall_height):
layers.append(l1.copy())
# place the player
layers[1][self.player_pos[1], self.player_pos[0]] = 2
# place the objective
layers[1][self.objective_pos[1], self.objective_pos[0]] = 3
# place the monsters
for type_idx, pos in self.monster_locs:
layers[1][pos[1], pos[0]] = type_idx
if ceiling:
# add ceiling if needed (same pattern as the first layer)
l0 = layers[0].copy()
l0[l0 == 1] = 4
layers.append(l0)
# convert layer matrices into a string
s = ""
for il, layer in enumerate(layers):
for i in range(nrows):
for j in range(ncols):
char = " "
if layer[i][j] == 1:
char = "#"
elif layer[i][j] == 2:
char = "@"
elif layer[i][j] == 3:
char = "O"
elif layer[i][j] == 4:
char = "%"
elif layer[i][j] < 0: # negative values are monsters
char = self.monster_names[(-layer[i][j])-1]
s += char
if il < len(layers)-1 or i < nrows-1:
s += "\n"
if il < len(layers)-1: # skip separator in last layer
s += "-\n"
return s
craftium.extra.crl_dungeons.load_task(seq_name, task_id=0, make_env=True, return_map=False, prefix=None, **kwargs)
Loads a task from a predefined continual RL task sequence.
Example usage:
# Instantiate an environment with the third task
env = crl.load_task("sequence01", task_id=2)
# Get the ASCII map of the first task
ascii_map = crl.load_task("sequence01", make_env=False, return_map=True)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seq_name |
str
|
Name of the sequence to load. |
required |
task_id |
int
|
Identifier of the task to load. |
0
|
make_env |
bool
|
If set to |
True
|
return_map |
bool
|
If set to |
False
|
prefix |
Optional[os.PathLike]
|
Prefix path to add to |
None
|
**kwargs |
keyword arguments to pass to the |
{}
|
Source code in craftium/extra/crl_dungeons.py
def load_task(
seq_name: str,
task_id: int = 0,
make_env: bool = True,
return_map: bool = False,
prefix: Optional[os.PathLike] = None,
**kwargs):
"""Loads a task from a predefined continual RL task sequence.
Example usage:
```python
# Instantiate an environment with the third task
env = crl.load_task("sequence01", task_id=2)
# Get the ASCII map of the first task
ascii_map = crl.load_task("sequence01", make_env=False, return_map=True)
```
:param seq_name: Name of the sequence to load.
:param task_id: Identifier of the task to load.
:param make_env: If set to `True`, a `Gymnasium` environment of the task will be returned.
:param return_map: If set to `True`, the ASCII map will be returned.
:param prefix: Prefix path to add to `seq_name`. Defaults to the path of the module. Useful to load custom sequences.
:param **kwargs: keyword arguments to pass to the `make_dungeon_env` function (see the [reference](reference.md)).
"""
# open the sequence file and read all the maps
prefix = os.path.dirname(__file__) if prefix is None else prefix
try:
file = open(os.path.join(prefix, seq_name), "r")
maps = file.read().split("=")
file.close()
except Exception as _:
raise Exception(f"Sequence '{seq_name}' does not exist")
# get the map of the sequence
assert task_id < len(maps), f"Task ID ({task_id}) must be smaller than the number of tasks {len(maps)}"
ascii_map = maps[task_id]
ret_vals = [] # list of values to return
# instantiate a dungeon environment if needed
if make_env:
env = make_dungeon_env(ascii_map, **kwargs)
ret_vals.append(env)
if return_map:
ret_vals.append(ascii_map)
if len(ret_vals) == 1:
return ret_vals[0]
return tuple(ret_vals)