Files
rotor_py_control/rotorpy/learning/quadrotor_environments.py
2024-01-04 12:18:53 -05:00

441 lines
21 KiB
Python

import numpy as np
from scipy.spatial.transform import Rotation
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from enum import Enum
from rotorpy.world import World
from rotorpy.vehicles.multirotor import Multirotor
from rotorpy.vehicles.crazyflie_params import quad_params as crazyflie_params
from rotorpy.learning.quadrotor_reward_functions import hover_reward
from rotorpy.utils.shapes import Quadrotor
import gymnasium as gym
from gymnasium import spaces
import math
from copy import deepcopy
class QuadrotorEnv(gym.Env):
"""
A quadrotor environment for reinforcement learning using Gymnasium.
Inputs:
initial_state: the initial state of the quadrotor. The default is hover.
control_mode: the appropriate control abstraction that is used by the controller, options are...
'cmd_motor_speeds': the controller directly commands motor speeds.
'cmd_motor_thrusts': the controller commands forces for each rotor.
'cmd_ctbr': the controller commands a collective thrsut and body rates.
'cmd_ctbm': the controller commands a collective thrust and moments on the x/y/z body axes
'cmd_vel': the controller commands a velocity vector in the body frame.
reward_fn: the reward function, default to hover, but the user can pass in any function that is used as a reward.
quad_params: the parameters for the quadrotor.
max_time: the maximum time of the session. After this time, the session will exit.
world: the world for the quadrotor to operate within.
sim_rate: the simulation rate (in Hz), i.e. the timestep.
aero: boolean, determines whether or not aerodynamic wrenches are computed.
render_mode: render the quadrotor.
render_fps: rendering frames per second, lower this for faster visualization.
ax: for plotting purposes, you can supply an axis object that the quadrotor will visualize on.
color: choose the color of the quadrotor. If none, it will randomly select a color.
"""
metadata = {"render_modes": ["None", "3D", "console"],
"render_fps": 30,
"control_modes": ['cmd_motor_speeds', 'cmd_motor_thrusts', 'cmd_ctbr', 'cmd_ctbm', 'cmd_vel']}
def __init__(self,
initial_state = {'x': np.array([0,0,0]),
'v': np.zeros(3,),
'q': np.array([0, 0, 0, 1]), # [i,j,k,w]
'w': np.zeros(3,),
'wind': np.array([0,0,0]), # Since wind is handled elsewhere, this value is overwritten
'rotor_speeds': np.array([1788.53, 1788.53, 1788.53, 1788.53])},
control_mode = 'cmd_vel',
reward_fn = hover_reward,
quad_params = crazyflie_params,
max_time = 10, # Maximum time to run the simulation for in a single session.
wind_profile = None, # wind profile object, if none is supplied it will choose no wind.
world = None, # The world object
sim_rate = 100, # The update frequency of the simulator in Hz
aero = True, # Whether or not aerodynamic wrenches are computed.
render_mode = "None", # The rendering mode
render_fps = 30, # The rendering frames per second. Lower this for faster visualization.
fig = None, # Figure for rendering. Optional.
ax = None, # Axis for rendering. Optional.
color = None, # The color of the quadrotor.
):
super(QuadrotorEnv, self).__init__()
self.metadata['render_fps'] = render_fps
self.initial_state = initial_state
self.vehicle_state = initial_state
assert control_mode in self.metadata["control_modes"] # Don't accept improper control modes
self.control_mode = control_mode
self.sim_rate = sim_rate
self.t_step = 1/self.sim_rate
self.reward_fn = reward_fn
# Create quadrotor from quad params and control abstraction.
self.quadrotor = Multirotor(quad_params=quad_params, initial_state=initial_state, control_abstraction=control_mode, aero=aero)
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
self.t = 0 # The current time of the instance
self.max_time = max_time
############ OBSERVATION SPACE
# The observation state is the full state of the quadrotor.
# position, x, observation_state[0:3]
# velocity, v, observation_state[3:6]
# orientation, q, observation_state[6:10]
# body rates, w, observation_state[10:13]
# wind, wind, observation_state[13:16]
# motor_speeds, rotor_speeds, observation_state[16:20]
# For simplicitly, we assume these observations can lie within -inf to inf.
self.observation_space = spaces.Box(low = -np.inf, high=np.inf, shape = (13,), dtype=np.float32)
############ ACTION SPACE
# For generalizability, we assume the controller outputs 4 numbers between -1 and 1. Depending on the control mode, we scale these appropriately.
if self.control_mode == 'cmd_vel':
self.action_space = spaces.Box(low = -1, high = 1, shape = (3,), dtype=np.float32)
else:
self.action_space = spaces.Box(low = -1, high = 1, shape = (4,), dtype=np.float32)
###### Min/max values for scaling control outputs.
self.rotor_speed_max = self.quadrotor.rotor_speed_max
self.rotor_speed_min = self.quadrotor.rotor_speed_min
# Compute the min/max thrust by assuming the rotor is spinning at min/max speed. (also generalizes to bidirectional rotors)
self.max_thrust = self.quadrotor.k_eta * self.quadrotor.rotor_speed_max**2
self.min_thrust = self.quadrotor.k_eta * self.quadrotor.rotor_speed_min**2
# Find the maximum moment on each axis, N-m
self.max_roll_moment = self.max_thrust * np.abs(self.quadrotor.rotor_pos['r1'][1])
self.max_pitch_moment = self.max_thrust * np.abs(self.quadrotor.rotor_pos['r1'][0])
self.max_yaw_moment = self.quadrotor.k_m * self.quadrotor.rotor_speed_max**2
# Set the maximum body rate on each axis (this is hand selected), rad/s
self.max_roll_br = 7.0
self.max_pitch_br = 7.0
self.max_yaw_br = 3.0
# Set the maximum speed command (this is hand selected), m/s
self.max_vel = 3/math.sqrt(3) # Selected so that at most the max speed is 3 m/s
###################################################################################################
# Save the order of magnitude of the rotor speeds for later normalization
self.rotor_speed_order_mag = math.floor(math.log(self.quadrotor.rotor_speed_max, 10))
if world is None:
# If no world is specified, assume that it means that the intended world is free space.
wbound = 4
self.world = World.empty((-wbound, wbound, -wbound,
wbound, -wbound, wbound))
else:
self.world = world
if wind_profile is None:
# If wind is not specified, default to no wind.
from rotorpy.wind.default_winds import NoWind
self.wind_profile = NoWind()
else:
self.wind_profile = wind_profile
if self.render_mode == '3D':
if fig is None and ax is None:
self.fig = plt.figure('Visualization')
self.ax = self.fig.add_subplot(projection='3d')
else:
self.fig = fig
self.ax = ax
if color is None:
colors = list(mcolors.CSS4_COLORS)
else:
colors = [color]
self.quad_obj = Quadrotor(self.ax, wind=True, color=np.random.choice(colors))
self.world_artists = None
self.title_artist = self.ax.set_title('t = {}'.format(self.t))
self.rendering = False # Bool for tracking when the renderer is actually rendering a frame.
return
def render(self):
if self.render_mode == '3D':
self._plot_quad()
elif self.render_mode == 'console':
self._print_quad()
def close(self):
if self.fig is not None:
# Close the plots
plt.close('all')
def reset(self, seed=None, initial_state='random', options={'pos_bound': 2, 'vel_bound': 0}):
"""
Reset the environment
Inputs:
seed: the seed for any random number generation, mostly for reproducibility.
initial_state: determines how to set the quadrotor again. Options are...
'random': will randomly select the state of the quadrotor.
'deterministic': will set the state to the initial state selected by the user when creating
the quadrotor environment (usually hover).
the user can also specify the state itself as a dictionary... e.g.
reset(options={'initial_state':
{'x': np.array([0,0,0]),
'v': np.zeros(3,),
'q': np.array([0, 0, 0, 1]), # [i,j,k,w]
'w': np.zeros(3,),
'wind': np.array([0,0,0]), # Since wind is handled elsewhere, this value is overwritten
'rotor_speeds': np.array([1788.53, 1788.53, 1788.53, 1788.53])}
})
options: dictionary for misc options for resetting the scene.
'pos_bound': the min/max position region for random placement.
'vel_bound': the min/max velocity region for random placement
"""
assert options['pos_bound'] >= 0 and options['vel_bound'] >= 0 , "Bounds must be greater than or equal to 0."
super().reset(seed=seed)
if initial_state == 'random':
# Randomly select an initial state for the quadrotor. At least assume it is level.
pos = np.random.uniform(low=-options['pos_bound'], high=options['pos_bound'], size=(3,))
vel = np.random.uniform(low=-options['vel_bound'], high=options['vel_bound'], size=(3,))
state = {'x': pos,
'v': vel,
'q': np.array([0, 0, 0, 1]), # [i,j,k,w]
'w': np.zeros(3,),
'wind': np.array([0,0,0]), # Since wind is handled elsewhere, this value is overwritten
'rotor_speeds': np.array([1788.53, 1788.53, 1788.53, 1788.53])}
elif initial_state == 'deterministic':
state = self.initial_state
elif isinstance(initial_state, dict):
# Ensure the correct keys are in dict.
if all(key in initial_state for key in ('x', 'v', 'q', 'w', 'wind', 'rotor_speeds')):
state = initial_state
else:
raise KeyError("Missing state keys in your initial_state. You must specify values for ('x', 'v', 'q', 'w', 'wind', 'rotor_speeds')")
else:
raise ValueError("You must either specify 'random', 'deterministic', or provide a dict containing your desired initial state.")
# Set the initial state.
self.vehicle_state = state
# Reset the time
self.t = 0.0
# Reset the reward
self.reward = 0.0
# Now get observation and info using the new state
observation = self._get_obs()
info = self._get_info()
self.render()
return (observation, info)
def step(self, action):
"""
Step the quadrotor dynamics forward by one step based on the policy action.
Inputs:
action: The action is a 4x1 vector which depends on the control abstraction:
if control_mode == 'cmd_vel':
action[0] (-1,1) := commanded velocity in x direction (will be rescaled to m/s)
action[1] (-1,1) := commanded velocity in y direction (will be rescaled to m/s)
action[2] (-1,1) := commanded velocity in z direction (will be rescaled to m/s)
if control_mode == 'cmd_ctbr':
action[0] (-1,1) := the thrust command (will be rescaled to Newtons)
action[1] (-1,1) := the roll body rate (will be rescaled to rad/s)
action[2] (-1,1) := the pitch body rate (will be rescaled to rad/s)
action[3] (-1,1) := the yaw body rate (will be rescaled to rad/s)
if control_mode == 'cmd_ctbm':
action[0] (-1,1) := the thrust command (will be rescaled to Newtons)
action[1] (-1,1) := the roll moment (will be rescaled to Newton-meters)
action[2] (-1,1) := the pitch moment (will be rescaled to Newton-meters)
action[3] (-1,1) := the yaw moment (will be rescaled to Newton-meters)
if control_mode == 'cmd_motor_speeds':
action[0] (-1,1) := motor 1 speed (will be rescaled to rad/s)
action[1] (-1,1) := motor 2 speed (will be rescaled to rad/s)
action[2] (-1,1) := motor 3 speed (will be rescaled to rad/s)
action[3] (-1,1) := motor 4 speed (will be rescaled to rad/s)
if control_mode == 'cmd_motor_forces':
action[0] (-1,1) := motor 1 force (will be rescaled to Newtons)
action[1] (-1,1) := motor 2 force (will be rescaled to Newtons)
action[2] (-1,1) := motor 3 force (will be rescaled to Newtons)
action[3] (-1,1) := motor 4 force (will be rescaled to Newtons)
"""
# First rescale the action and get the appropriate control dictionary given the control mode.
self.control_dict = self.rescale_action(action)
# Now update the wind state using the wind profile
self.vehicle_state['wind'] = self.wind_profile.update(0, self.vehicle_state['x'])
# Last perform forward integration using the commanded motor speed and the current state
self.vehicle_state = self.quadrotor.step(self.vehicle_state, self.control_dict, self.t_step)
observation = self._get_obs()
# Update t by t_step
self.t += self.t_step
# Check for safety
safe = self.safety_exit()
# Determine whether or not the session should terminate.
terminated = (self.t >= self.max_time) or not safe
# Now compute the reward based on the current state
self.reward = self._get_reward(observation, action) if safe else -500.0
# Finally get info
info = self._get_info()
truncated = False
self.render()
return (observation, self.reward, terminated, truncated, info)
def close(self):
"""
Close the environment
"""
return None
def rescale_action(self, action):
"""
Rescales the action to within the control limits and then assigns the appropriate dictionary.
"""
control_dict = {}
if self.control_mode == 'cmd_ctbm':
# Scale action[0] to (0,1) and then scale to the max thrust
cmd_thrust = np.interp(action[0], [-1,1], [self.quadrotor.num_rotors*self.min_thrust, self.quadrotor.num_rotors*self.max_thrust])
# Scale the moments
cmd_roll_moment = np.interp(action[1], [-1,1], [-self.max_roll_moment, self.max_roll_moment])
cmd_pitch_moment = np.interp(action[2], [-1,1], [-self.max_pitch_moment, self.max_pitch_moment])
cmd_yaw_moment = np.interp(action[3], [-1,1], [-self.max_yaw_moment, self.max_yaw_moment])
control_dict['cmd_thrust'] = cmd_thrust
control_dict['cmd_moment'] = np.array([cmd_roll_moment, cmd_pitch_moment, cmd_yaw_moment])
elif self.control_mode == 'cmd_ctbr':
# Scale action to min and max thrust.
cmd_thrust = np.interp(action[0], [-1, 1], [self.quadrotor.num_rotors*self.min_thrust, self.quadrotor.num_rotors*self.max_thrust])
# Scale the body rates.
cmd_roll_br = np.interp(action[1], [-1,1], [-self.max_roll_br, self.max_roll_br])
cmd_pitch_br = np.interp(action[2], [-1,1], [-self.max_pitch_br, self.max_pitch_br])
cmd_yaw_br = np.interp(action[3], [-1,1], [-self.max_yaw_br, self.max_yaw_br])
control_dict['cmd_thrust'] = cmd_thrust
control_dict['cmd_w'] = np.array([cmd_roll_br, cmd_pitch_br, cmd_yaw_br])
elif self.control_mode == 'cmd_motor_speeds':
# Scale the action to min and max motor speeds.
control_dict['cmd_motor_speeds'] = np.interp(action, [-1,1], [self.rotor_speed_min, self.rotor_speed_max])
elif self.control_mode == 'cmd_motor_thrusts':
# Scale the action to min and max rotor thrusts.
control_dict['cmd_motor_thrusts'] = np.interp(action, [-1,1], [self.min_thrust, self.max_thrust])
elif self.control_mode == 'cmd_vel':
# Scale the velcoity to min and max values.
control_dict['cmd_v'] = np.interp(action, [-1,1], [-self.max_vel, self.max_vel])
return control_dict
def _get_reward(self, observation, action):
"""
Compute the reward for the current state and goal.
Inputs:
observation: The current state of the quadrotor
action: The current action of the controller
Outputs:
the current reward.
"""
return self.reward_fn(observation, action)
def safety_exit(self):
"""
Return exit status if any safety condition is violated, otherwise None.
"""
if np.any(np.abs(self.vehicle_state['v']) > 100):
return False
if np.any(np.abs(self.vehicle_state['w']) > 100):
return False
if self.vehicle_state['x'][0] < self.world.world['bounds']['extents'][0] or self.vehicle_state['x'][0] > self.world.world['bounds']['extents'][1]:
return False
if self.vehicle_state['x'][1] < self.world.world['bounds']['extents'][2] or self.vehicle_state['x'][1] > self.world.world['bounds']['extents'][3]:
return False
if self.vehicle_state['x'][2] < self.world.world['bounds']['extents'][4] or self.vehicle_state['x'][2] > self.world.world['bounds']['extents'][5]:
return False
if len(self.world.world.get('blocks', [])) > 0:
# If a world has objects in it we need to check for collisions.
collision_pts = self.world.path_collisions(self.vehicle_state['x'], 0.25)
no_collision = collision_pts.size == 0
if not no_collision:
return False
return True
def _get_obs(self):
# Concatenate all the state variables into a single vector
state_vec = np.concatenate([self.vehicle_state['x'], self.vehicle_state['v'], self.vehicle_state['q'], self.vehicle_state['w']], dtype=np.float32)
return state_vec
def _get_info(self):
return {}
def _plot_quad(self):
if abs(self.t / (1/self.metadata['render_fps']) - round(self.t / (1/self.metadata['render_fps']))) > 5e-2:
self.rendering = False # Set rendering bool to false.
return
self.rendering = True # Set rendering bool to true.
plot_position = deepcopy(self.vehicle_state['x'])
plot_rotation = Rotation.from_quat(self.vehicle_state['q']).as_matrix()
plot_wind = deepcopy(self.vehicle_state['wind'])
if self.world_artists is None and not ('x' in self.ax.get_xlabel()):
self.world_artists = self.world.draw(self.ax)
self.ax.plot(0, 0, 0, 'go')
self.quad_obj.transform(position=plot_position, rotation=plot_rotation, wind=plot_wind)
self.title_artist.set_text('t = {:.2f}'.format(self.t))
plt.pause(1e-9)
return
def _print_quad(self):
print("Time: %3.2f \t Position: (%3.2f, %3.2f, %3.2f) \t Reward: %3.2f" % (self.t, self.vehicle_state['x'][0], self.vehicle_state['x'][1], self.vehicle_state['x'][2], self.reward))