From b95f5e96ec46f6a26e4aad55ed044af77b08fa56 Mon Sep 17 00:00:00 2001 From: spencerfolk Date: Wed, 20 Dec 2023 20:43:52 -0500 Subject: [PATCH] Gym environment scripts --- rotorpy/learning/__init__.py | 6 + rotorpy/learning/quadrotor_environments.py | 417 ++++++++++++++++++ .../learning/quadrotor_reward_functions.py | 36 ++ 3 files changed, 459 insertions(+) create mode 100644 rotorpy/learning/__init__.py create mode 100644 rotorpy/learning/quadrotor_environments.py create mode 100644 rotorpy/learning/quadrotor_reward_functions.py diff --git a/rotorpy/learning/__init__.py b/rotorpy/learning/__init__.py new file mode 100644 index 0000000..ddb11ec --- /dev/null +++ b/rotorpy/learning/__init__.py @@ -0,0 +1,6 @@ +from gymnasium.envs.registration import register + +register( + id="Quadrotor-v0", + entry_point="rotorpy.learning.quadrotor_environments:QuadrotorEnv", +) \ No newline at end of file diff --git a/rotorpy/learning/quadrotor_environments.py b/rotorpy/learning/quadrotor_environments.py new file mode 100644 index 0000000..9ed915b --- /dev/null +++ b/rotorpy/learning/quadrotor_environments.py @@ -0,0 +1,417 @@ +import numpy as np +from scipy.spatial.transform import Rotation +import matplotlib.pyplot as plt + +from enum import Enum +from rotorpy.world import World +from rotorpy.vehicles.multirotor import Multirotor +from rotorpy.vehicles.crazyflie_params import quad_params as crazyflie_params +from rotorpy.learning.quadrotor_reward_functions import hover_reward +from rotorpy.utils.shapes import Quadrotor + +import gymnasium as gym +from gymnasium import spaces + +import math +from copy import deepcopy + +class QuadrotorEnv(gym.Env): + """ + + A quadrotor environment for reinforcement learning using Gymnasium. + + Inputs: + initial_state: the initial state of the quadrotor. The default is hover. + control_mode: the appropriate control abstraction that is used by the controller, options are... + 'cmd_motor_speeds': the controller directly commands motor speeds. + 'cmd_motor_thrusts': the controller commands forces for each rotor. + 'cmd_ctbr': the controller commands a collective thrsut and body rates. + 'cmd_ctbm': the controller commands a collective thrust and moments on the x/y/z body axes + 'cmd_vel': the controller commands a velocity vector in the body frame. + reward_fn: the reward function, default to hover, but the user can pass in any function that is used as a reward. + quad_params: the parameters for the quadrotor. + max_time: the maximum time of the session. After this time, the session will exit. + world: the world for the quadrotor to operate within. + sim_rate: the simulation rate (in Hz), i.e. the timestep. + render_mode: render the quadrotor. + """ + + metadata = {"render_modes": ["None", "3D", "console"], + "control_modes": ['cmd_motor_speeds', 'cmd_motor_thrusts', 'cmd_ctbr', 'cmd_ctbm', 'cmd_vel']} + + def __init__(self, + initial_state = {'x': np.array([0,0,0]), + 'v': np.zeros(3,), + 'q': np.array([0, 0, 0, 1]), # [i,j,k,w] + 'w': np.zeros(3,), + 'wind': np.array([0,0,0]), # Since wind is handled elsewhere, this value is overwritten + 'rotor_speeds': np.array([1788.53, 1788.53, 1788.53, 1788.53])}, + control_mode = 'cmd_vel', + reward_fn = hover_reward, + quad_params = crazyflie_params, + max_time = 10, # Maximum time to run the simulation for in a single session. + wind_profile = None, # wind profile object, if none is supplied it will choose no wind. + world = None, # The world object + sim_rate = 100, # The update frequency of the simulator in Hz + render_mode = None, # The rendering mode + ): + super(QuadrotorEnv, self).__init__() + + self.initial_state = initial_state + + self.vehicle_state = initial_state + + assert control_mode in self.metadata["control_modes"] # Don't accept improper control modes + self.control_mode = control_mode + + self.sim_rate = sim_rate + self.t_step = 1/self.sim_rate + self.world = world + self.reward_fn = reward_fn + + # Create quadrotor from quad params and control abstraction. + self.quadrotor = Multirotor(quad_params=quad_params, initial_state=initial_state, control_abstraction=control_mode) + + assert render_mode is None or render_mode in self.metadata["render_modes"] + self.render_mode = render_mode + + self.t = 0 # The current time of the instance + self.max_time = max_time + + ############ OBSERVATION SPACE + + # The observation state is the full state of the quadrotor. + # position, x, observation_state[0:3] + # velocity, v, observation_state[3:6] + # orientation, q, observation_state[6:10] + # body rates, w, observation_state[10:13] + # wind, wind, observation_state[13:16] + # motor_speeds, rotor_speeds, observation_state[16:20] + # For simplicitly, we assume these observations can lie within -inf to inf. + + self.observation_space = spaces.Box(low = -np.inf, high=np.inf, shape = (20,), dtype=np.float32) + + ############ ACTION SPACE + + # For generalizability, we assume the controller outputs 4 numbers between -1 and 1. Depending on the control mode, we scale these appropriately. + + if self.control_mode == 'cmd_vel': + self.action_space = spaces.Box(low = -1, high = -1, shape = (4,), dtype=np.float32) + else: + self.action_space = spaces.Box(low = -1, high = 1, shape = (4,), dtype=np.float32) + + ###### Min/max values for scaling control outputs. + + self.rotor_speed_max = self.quadrotor.rotor_speed_max + self.rotor_speed_min = self.quadrotor.rotor_speed_min + + # Compute the min/max thrust by assuming the rotor is spinning at min/max speed. (also generalizes to bidirectional rotors) + self.max_thrust = self.quadrotor.k_eta * self.quadrotor.rotor_speed_max**2 + self.min_thrust = self.quadrotor.k_eta * self.quadrotor.rotor_speed_min**2 + + # Find the maximum moment on each axis, N-m + self.max_roll_moment = self.max_thrust * np.abs(self.quadrotor.rotor_pos['r1'][1]) + self.max_pitch_moment = self.max_thrust * np.abs(self.quadrotor.rotor_pos['r1'][0]) + self.max_yaw_moment = self.quadrotor.k_m * self.quadrotor.rotor_speed_max**2 + + # Set the maximum body rate on each axis (this is hand selected), rad/s + self.max_roll_br = 7.0 + self.max_pitch_br = 7.0 + self.max_yaw_br = 3.0 + + # Set the maximum speed command (this is hand selected), m/s + self.max_vel = 3/math.sqrt(3) # Selected so that at most the max speed is 3 m/s + + ################################################################################################### + + # Save the order of magnitude of the rotor speeds for later normalization + self.rotor_speed_order_mag = math.floor(math.log(self.quadrotor.rotor_speed_max, 10)) + + # # You may define any additional constants you like including control gains. + # self.inertia = np.array([[self.quadrotor.Ixx, self.quadrotor.Ixy, self.quadrotor.Ixz], + # [self.quadrotor.Ixy, self.quadrotor.Iyy, self.quadrotor.Iyz], + # [self.quadrotor.Ixz, self.quadrotor.Iyz, self.quadrotor.Izz]]) # kg*m^2 + + if world is None: + # If no world is specified, assume that it means that the intended world is free space. + wbound = 4 + self.world = World.empty((-wbound, wbound, -wbound, + wbound, -wbound, wbound)) + else: + self.world = world + + if wind_profile is None: + # If wind is not specified, default to no wind. + from rotorpy.wind.default_winds import NoWind + self.wind_profile = NoWind() + else: + self.wind_profile = wind_profile + + if self.render_mode == '3D': + self.fig = plt.figure('Visualization') + # self.fig.clear() + self.ax = self.fig.add_subplot(projection='3d') + self.quad_obj = Quadrotor(self.ax, wind=True) + self.world_artists = None + self.title_artist = self.ax.set_title('t = {}'.format(self.t)) + + return + + def render(self): + if self.render_mode == '3D': + self._plot_quad() + elif self.render_mode == 'console': + self._print_quad() + + def close(self): + if self.fig is not None: + # Close the plots + plt.close('all') + + def reset(self, seed=None, initial_state='random', options={'pos_bound': -2, 'vel_bound': 0}): + """ + Reset the environment + Inputs: + seed: the seed for any random number generation, mostly for reproducibility. + initial_state: determines how to set the quadrotor again. Options are... + 'random': will randomly select the state of the quadrotor. + 'deterministic': will set the state to the initial state selected by the user when creating + the quadrotor environment (usually hover). + the user can also specify the state itself as a dictionary... e.g. + reset(options={'initial_state': + {'x': np.array([0,0,0]), + 'v': np.zeros(3,), + 'q': np.array([0, 0, 0, 1]), # [i,j,k,w] + 'w': np.zeros(3,), + 'wind': np.array([0,0,0]), # Since wind is handled elsewhere, this value is overwritten + 'rotor_speeds': np.array([1788.53, 1788.53, 1788.53, 1788.53])} + }) + options: dictionary for misc options for resetting the scene. + 'pos_bound': the min/max position region for random placement. + 'vel_bound': the min/max velocity region for random placement + + """ + + super().reset(seed=seed) + + if initial_state == 'random': + # Randomly select an initial state for the quadrotor. At least assume it is level. + pos = np.random.uniform(low=-options['pos_bound'], high=options['pos_bound'], size=(3,)) + vel = np.random.uniform(low=-options['vel_bound'], high=options['vel_bound'], size=(3,)) + state = {'x': pos, + 'v': vel, + 'q': np.array([0, 0, 0, 1]), # [i,j,k,w] + 'w': np.zeros(3,), + 'wind': np.array([0,0,0]), # Since wind is handled elsewhere, this value is overwritten + 'rotor_speeds': np.array([1788.53, 1788.53, 1788.53, 1788.53])} + + elif initial_state == 'deterministic': + state = self.initial_state + + elif isinstance(initial_state, dict): + # Ensure the correct keys are in dict. + if all(key in initial_state for key in ('x', 'v', 'q', 'w', 'wind', 'rotor_speeds')): + state = initial_state + else: + raise KeyError("Missing state keys in your initial_state. You must specify values for ('x', 'v', 'q', 'w', 'wind', 'rotor_speeds')") + + else: + raise ValueError("You must either specify 'random', 'deterministic', or provide a dict containing your desired initial state.") + + # Set the initial state. + self.vehicle_state = state + + # Reset the time + self.t = 0.0 + + # Reset the reward + self.reward = 0.0 + + # Now get observation and info using the new state + observation = self._get_obs() + info = self._get_info() + + self.render() + + return (observation, info) + + + def step(self, action): + + """ + Step the quadrotor dynamics forward by one step based on the policy action. + Inputs: + action: The action is a 4x1 vector which depends on the control abstraction: + + if control_mode == 'cmd_vel': + action[0] (-1,1) := commanded velocity in x direction (will be rescaled to m/s) + action[1] (-1,1) := commanded velocity in y direction (will be rescaled to m/s) + action[2] (-1,1) := commanded velocity in z direction (will be rescaled to m/s) + if control_mode == 'cmd_ctbr': + action[0] (-1,1) := the thrust command (will be rescaled to Newtons) + action[1] (-1,1) := the roll body rate (will be rescaled to rad/s) + action[2] (-1,1) := the pitch body rate (will be rescaled to rad/s) + action[3] (-1,1) := the yaw body rate (will be rescaled to rad/s) + if control_mode == 'cmd_ctbm': + action[0] (-1,1) := the thrust command (will be rescaled to Newtons) + action[1] (-1,1) := the roll moment (will be rescaled to Newton-meters) + action[2] (-1,1) := the pitch moment (will be rescaled to Newton-meters) + action[3] (-1,1) := the yaw moment (will be rescaled to Newton-meters) + if control_mode == 'cmd_motor_speeds': + action[0] (-1,1) := motor 1 speed (will be rescaled to rad/s) + action[1] (-1,1) := motor 2 speed (will be rescaled to rad/s) + action[2] (-1,1) := motor 3 speed (will be rescaled to rad/s) + action[3] (-1,1) := motor 4 speed (will be rescaled to rad/s) + if control_mode == 'cmd_motor_forces': + action[0] (-1,1) := motor 1 force (will be rescaled to Newtons) + action[1] (-1,1) := motor 2 force (will be rescaled to Newtons) + action[2] (-1,1) := motor 3 force (will be rescaled to Newtons) + action[3] (-1,1) := motor 4 force (will be rescaled to Newtons) + + """ + + # First rescale the action and get the appropriate control dictionary given the control mode. + self.control_dict = self.rescale_action(action) + + # Now update the wind state using the wind profile + self.vehicle_state['wind'] = self.wind_profile.update(0, self.vehicle_state['x']) + + # Last perform forward integration using the commanded motor speed and the current state + self.vehicle_state = self.quadrotor.step(self.vehicle_state, self.control_dict, self.t_step) + observation = self._get_obs() + + # Update t by t_step + self.t += self.t_step + + # Check for safety + safe = self.safety_exit() + + # Determine whether or not the session should terminate. + terminated = (self.t >= self.max_time) or not safe + + # Now compute the reward based on the current state + self.reward = self._get_reward(observation, action) if safe else -500.0 + + # Finally get info + info = self._get_info() + + truncated = False + + self.render() + + return (observation, self.reward, terminated, truncated, info) + + def close(self): + """ + Close the environment + """ + return None + + def rescale_action(self, action): + """ + Rescales the action to within the control limits and then assigns the appropriate dictionary. + """ + + control_dict = {} + + if self.control_mode == 'cmd_ctbm': + # Scale action[0] to (0,1) and then scale to the max thrust + cmd_thrust = np.interp(action[0], [-1,1], [self.quadrotor.num_rotors*self.min_thrust, self.quadrotor.num_rotors*self.max_thrust]) + + # Scale the moments + cmd_roll_moment = np.interp(action[1], [-1,1], [-self.max_roll_moment, self.max_roll_moment]) + cmd_pitch_moment = np.interp(action[2], [-1,1], [-self.max_pitch_moment, self.max_pitch_moment]) + cmd_yaw_moment = np.interp(action[3], [-1,1], [-self.max_yaw_moment, self.max_yaw_moment]) + + control_dict['cmd_thrust'] = cmd_thrust + control_dict['cmd_moment'] = np.array([cmd_roll_moment, cmd_pitch_moment, cmd_yaw_moment]) + + elif self.control_mode == 'cmd_ctbr': + # Scale action to min and max thrust. + cmd_thrust = np.interp(action[0], [-1, 1], [self.quadrotor.num_rotors*self.min_thrust, self.quadrotor.num_rotors*self.max_thrust]) + + # Scale the body rates. + cmd_roll_br = np.interp(action[1], [-1,1], [-self.max_roll_br, self.max_roll_br]) + cmd_pitch_br = np.interp(action[2], [-1,1], [-self.max_pitch_br, self.max_pitch_br]) + cmd_yaw_br = np.interp(action[3], [-1,1], [-self.max_yaw_br, self.max_yaw_br]) + + control_dict['cmd_thrust'] = cmd_thrust + control_dict['cmd_w'] = np.array([cmd_roll_br, cmd_pitch_br, cmd_yaw_br]) + + elif self.control_mode == 'cmd_motor_speeds': + # Scale the action to min and max motor speeds. + control_dict['cmd_motor_speeds'] = np.interp(action, [-1,1], [self.rotor_speed_min, self.rotor_speed_max]) + + elif self.control_mode == 'cmd_motor_thrusts': + # Scale the action to min and max rotor thrusts. + control_dict['cmd_motor_thrusts'] = np.interp(action, [-1,1], [self.min_thrust, self.max_thrust]) + + elif self.control_mode == 'cmd_vel': + # Scale the velcoity to min and max values. + control_dict['cmd_v'] = np.interp(action, [-1,1], [-self.max_vel, self.max_vel]) + + return control_dict + + def _get_reward(self, observation, action): + """ + Compute the reward for the current state and goal. + Inputs: + observation: The current state of the quadrotor + action: The current action of the controller + Outputs: + the current reward. + """ + + return self.reward_fn(observation, action) + + def safety_exit(self): + """ + Return exit status if any safety condition is violated, otherwise None. + """ + if np.any(np.abs(self.vehicle_state['v']) > 100): + return False + if np.any(np.abs(self.vehicle_state['w']) > 100): + return False + if self.vehicle_state['x'][0] < self.world.world['bounds']['extents'][0] or self.vehicle_state['x'][0] > self.world.world['bounds']['extents'][1]: + return False + if self.vehicle_state['x'][1] < self.world.world['bounds']['extents'][2] or self.vehicle_state['x'][1] > self.world.world['bounds']['extents'][3]: + return False + if self.vehicle_state['x'][2] < self.world.world['bounds']['extents'][4] or self.vehicle_state['x'][2] > self.world.world['bounds']['extents'][5]: + return False + + if len(self.world.world.get('blocks', [])) > 0: + # If a world has objects in it we need to check for collisions. + collision_pts = self.world.path_collisions(self.vehicle_state['x'], 0.25) + no_collision = collision_pts.size == 0 + if not no_collision: + return False + return True + + def _get_obs(self): + # Concatenate all the state variables into a single vector + state_vec = np.concatenate([self.vehicle_state['x'], self.vehicle_state['v'], self.vehicle_state['q'], self.vehicle_state['w'], self.vehicle_state['wind'], self.vehicle_state['rotor_speeds']], dtype=np.float32) + + return state_vec + + def _get_info(self): + return {} + + def _plot_quad(self): + + plot_position = deepcopy(self.vehicle_state['x']) + plot_rotation = Rotation.from_quat(self.vehicle_state['q']).as_matrix() + plot_wind = deepcopy(self.vehicle_state['wind']) + + if self.world_artists is None: + self.world_artists = self.world.draw(self.ax) + + self.quad_obj.transform(position=plot_position, rotation=plot_rotation, wind=plot_wind) + self.title_artist.set_text('t = {:.2f}'.format(self.t)) + + plt.pause(0.0001) + + return + + def _print_quad(self): + + print("Time: %3.2f \t Position: (%3.2f, %3.2f, %3.2f) \t Reward: %3.2f" % (self.t, self.vehicle_state['x'][0], self.vehicle_state['x'][1], self.vehicle_state['x'][2], self.reward)) \ No newline at end of file diff --git a/rotorpy/learning/quadrotor_reward_functions.py b/rotorpy/learning/quadrotor_reward_functions.py new file mode 100644 index 0000000..1662427 --- /dev/null +++ b/rotorpy/learning/quadrotor_reward_functions.py @@ -0,0 +1,36 @@ +import numpy as np +from scipy.spatial.transform import Rotation + +import gymnasium as gym +from gymnasium import spaces + +import math + +""" +Reward functions for quadrotor tasks. +""" + +def hover_reward(observation, action): + """ + Rewards hovering at (0, 0, 0). It is a combination of position error, velocity error, body rates, and + action reward. + """ + + dist_weight = 1 + vel_weight = 0.1 + action_weight = 0.001 + ang_rate_weight = 0.1 + + # Compute the distance to goal + dist_reward = -dist_weight*np.linalg.norm(observation[0:3]) + + # Compute the velocity reward + vel_reward = -vel_weight*np.linalg.norm(observation[3:6]) + + # Compute the angular rate reward + ang_rate_reward = -ang_rate_weight*np.linalg.norm(observation[10:13]) + + # Compute the action reward + action_reward = -action_weight*np.linalg.norm(action) + + return dist_reward + vel_reward + action_reward + ang_rate_reward \ No newline at end of file