Environment Wrappers
Recording Trajectories
Example Usage
def store_rgb_obs(self: TrajectoryRecorder, shape: Tuple[int, int] = image_shape_to_save):
observation = self.env.render()
observation = cv2.resize(
observation,
shape,
interpolation=cv2.INTER_AREA,
)
self.trajectory["rgb"].append(observation)
metadata = {
"frame_skip": env.frame_skip,
"time_step": env.time_step,
"observation_type": env.observation_type.name,
"reward_amount_dict": env.reward_amount_dict,
"user_info": args.info,
}
env = TrajectoryRecorder(
env,
log_dir="trajectories",
metadata=metadata,
store_info=True,
save_compressed_keys=["observation", "terminal_observation", "rgb", "info"],
after_step_callbacks=[store_rgb_obs],
after_reset_callbacks=[store_rgb_obs],
)
Slowing Down Environments to Real Time
Point Cloud Observations
Semantically Segmented Image Observations
Making an Environment Episodic
Example Usage
import numpy as np
from sofa_env.scenes.reach.reach_env import ReachEnv, RenderMode, ObservationType, ActionType
episode_length = 300
env = ReachEnv(
observation_type=ObservationType.STATE,
render_mode=RenderMode.HUMAN,
action_type=ActionType.CONTINUOUS,
image_shape=(1024, 1024),
frame_skip=1,
time_step=1 / 30,
observe_target_position=True,
maximum_robot_velocity=35.0,
)
env = EpisodicEnvWrapper(
env=env,
reward_accumulation_func=np.sum,
episode_length=episode_length,
stop_on_done=True,
return_buffers=True,
)
reset_obs = env.reset()
action_plan = np.ones((episode_length, 3)) * (env.target_position - env.end_effector.get_pose()[:3]) / (35.0 * env.dt * 0.001 * episode_length)
observations, rewards, dones, infos, cumulative_reward = env.step(action_plan)