Abracadabra

On-policy Control with Approximation

In this post we turn to the control problem with parametric approximation of the action-value function $\hat{q}(s, a, \mathbf{w}) \approx q_{\ast}(s, a)$, where $\mathbf{w} \in \mathbb{R}^d$ is a finite-dimensional weight vector.

Episodic Semi-gradient Control

The general gradient-descent update for action-value prediction is
$$
\mathbf{w}_{t+1} \doteq \mathbf{w}_t + \alpha \Big[ U_t - \hat{q}(S_t, A_t, \mathbf{w}_t) \Big] \nabla \hat{q}(S_t, A_t, \mathbf{w}_t).
$$
For example, the update for the one-step Sarsa method is
$$
\mathbf{w}_{t+1} \doteq \mathbf{w}_t + \alpha \Big[ R_{t+1} + \gamma \hat{q}(S_{t+1}, A_{t+1}, \mathbf{w}_t)- \hat{q}(S_t, A_t, \mathbf{w}_t) \Big] \nabla \hat{q}(S_t, A_t, \mathbf{w}_t).
$$
We call this method episode semi-gradient one-step sarsa.

To form control methods, we need to couple such action-value prediction methods with techniques for policy improvement and action selection. Suitable techniques applicable to continuous actions, or to actions from large discrete sets, are a topic of ongoing research with as yet no clear resolution. On the other hand, if the action set is discrete and not too large, then we can use the techniques already developed in previous chapters.

episode-semi-grad-sarsa

Example: Mountain-Car Task

Consider the task of driving an underpowered car up a steep mountain road, as suggested by the diagram in the below.

mountain-car

The difficulty is that gravity is stronger than the car’s engine, and even at full throttle the car cannot accelerate up the steep slope. The only solution is to first move away from the goal and up the opposite slope on the left. Then, by applying full throttle the car can build up enough inertia to carry it up the steep slope even though it is slowing down the whole way. This is a simple example of a continuous control task where things have to get worse in a sense (farther from the goal) before they can get better. Many control methodologies have great difficulties with tasks of this kind unless explicitly aided by a human designer.

The reward in this problem is −1 on all time steps until the car moves past its goal position at the top of the mountain, which ends the episode. There are three possible actions: full throttle forward (+1), full throttle reverse (−1), and zero throttle (0). The car moves according to a simplified physics. Its position, $x_t$, and velocity, $\dot{x}_t$, are updated by
$$
\begin{align}
x_{t+1} &\doteq bound \big[x_t + \dot{x}_{t+1} \big] \\
\dot{x}_{t+1} &\doteq bound \big[\dot{x}_t + 0.001 A_t - 0.0025 \cos(3x_t) \big],
\end{align}
$$
where the bound operation enforces $-1.2 \leq x_{t+1} \leq 0.5 \;\; \text{and} \; -0.07 \leq \dot{x}_{t+1} \leq 0.07$. In addition, when $x_{t+1}$ reached the left bound, $\dot{x}_{t+1}$ was reset to zero. When it reached the right bound, the goal was reached and the episode was terminated. Each episode started from a random position $x_t \in [−0.6, −0.4)$ and zero velocity. To convert the two continuous state variables to binary features, we used grid-tilings.

First of all, we define the environment of this problem:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# all possible actions
ACTION_REVERSE = -1
ACTION_ZERO = 0
ACTION_FORWARD = 1
# order is important
ACTIONS = [ACTION_REVERSE, ACTION_ZERO, ACTION_FORWARD]
# bound for position and velocity
POSITION_MIN = -1.2
POSITION_MAX = 0.5
VELOCITY_MIN = -0.07
VELOCITY_MAX = 0.07
# use optimistic initial value, so it's ok to set epsilon to 0
EPSILON = 0

After take an action, we transition to a new state and get a reward:

1
2
3
4
5
6
7
8
9
10
11
# take an @action at @position and @velocity
# @return: new position, new velocity, reward (always -1)
def takeAction(position, velocity, action):
newVelocity = velocity + 0.001 * action - 0.0025 * np.cos(3 * position)
newVelocity = min(max(VELOCITY_MIN, newVelocity), VELOCITY_MAX)
newPosition = position + newVelocity
newPosition = min(max(POSITION_MIN, newPosition), POSITION_MAX)
reward = -1.0
if newPosition == POSITION_MIN:
newVelocity = 0.0
return newPosition, newVelocity, reward

The $\varepsilon$-greedy policy:

1
2
3
4
5
6
7
8
# get action at @position and @velocity based on epsilon greedy policy and @valueFunction
def getAction(position, velocity, valueFunction):
if np.random.binomial(1, EPSILON) == 1:
return np.random.choice(ACTIONS)
values = []
for action in ACTIONS:
values.append(valueFunction.value(position, velocity, action))
return np.argmax(values) - 1

We need map out continuous state to discrete state:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# wrapper class for state action value function
class ValueFunction:
# In this example I use the tiling software instead of implementing standard tiling by myself
# One important thing is that tiling is only a map from (state, action) to a series of indices
# It doesn't matter whether the indices have meaning, only if this map satisfy some property
# View the following webpage for more information
# http://incompleteideas.net/sutton/tiles/tiles3.html
# @maxSize: the maximum # of indices
def __init__(self, stepSize, numOfTilings=8, maxSize=2048):
self.maxSize = maxSize
self.numOfTilings = numOfTilings
# divide step size equally to each tiling
self.stepSize = stepSize / numOfTilings
self.hashTable = IHT(maxSize)
# weight for each tile
self.weights = np.zeros(maxSize)
# position and velocity needs scaling to satisfy the tile software
self.positionScale = self.numOfTilings / (POSITION_MAX - POSITION_MIN)
self.velocityScale = self.numOfTilings / (VELOCITY_MAX - VELOCITY_MIN)
# get indices of active tiles for given state and action
def getActiveTiles(self, position, velocity, action):
# I think positionScale * (position - position_min) would be a good normalization.
# However positionScale * position_min is a constant, so it's ok to ignore it.
activeTiles = tiles(self.hashTable, self.numOfTilings,
[self.positionScale * position, self.velocityScale * velocity],
[action])
return activeTiles
# estimate the value of given state and action
def value(self, position, velocity, action):
if position == POSITION_MAX:
return 0.0
activeTiles = self.getActiveTiles(position, velocity, action)
return np.sum(self.weights[activeTiles])
# learn with given state, action and target
def learn(self, position, velocity, action, target):
activeTiles = self.getActiveTiles(position, velocity, action)
estimation = np.sum(self.weights[activeTiles])
delta = self.stepSize * (target - estimation)
for activeTile in activeTiles:
self.weights[activeTile] += delta
# get # of steps to reach the goal under current state value function
def costToGo(self, position, velocity):
costs = []
for action in ACTIONS:
costs.append(self.value(position, velocity, action))
return -np.max(costs)

Because the one-step semi-gradient sarsa is a special case of n-step, so we develop the n-step algorithm
$$
G_{t:t+n} \doteq R_{t+1} + \gamma R_{t+2} + \cdots + \gamma^{n-1}R_{t+n} + \gamma^n \hat{q}(S_{t+n}, A_{t+n}, \mathbf{w}_{t+n-1}), \; n \geq 1,0 \leq t \leq T-n.
$$
The n-step equation is
$$
\mathbf{w}_{t+1} \doteq \mathbf{w}_t + \alpha \Big[ G_{t:t+n}- \hat{q}(S_t, A_t, \mathbf{w}_{t+n-1}) \Big] \nabla \hat{q}(S_t, A_t, \mathbf{w}_{t+n-1}), \;\;\; 0 \leq t \leq T.
$$
Complete pseudocode is given in the box below.

n-step-sg-sarsa

So the code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# semi-gradient n-step Sarsa
# @valueFunction: state value function to learn
# @n: # of steps
def semiGradientNStepSarsa(valueFunction, n=1):
# start at a random position around the bottom of the valley
currentPosition = np.random.uniform(-0.6, -0.4)
# initial velocity is 0
currentVelocity = 0.0
# get initial action
currentAction = getAction(currentPosition, currentVelocity, valueFunction)
# track previous position, velocity, action and reward
positions = [currentPosition]
velocities = [currentVelocity]
actions = [currentAction]
rewards = [0.0]
# track the time
time = 0
# the length of this episode
T = float('inf')
while True:
# go to next time step
time += 1
if time < T:
# take current action and go to the new state
newPostion, newVelocity, reward = takeAction(currentPosition, currentVelocity, currentAction)
# choose new action
newAction = getAction(newPostion, newVelocity, valueFunction)
# track new state and action
positions.append(newPostion)
velocities.append(newVelocity)
actions.append(newAction)
rewards.append(reward)
if newPostion == POSITION_MAX:
T = time
# get the time of the state to update
updateTime = time - n
if updateTime >= 0:
returns = 0.0
# calculate corresponding rewards
for t in range(updateTime + 1, min(T, updateTime + n) + 1):
returns += rewards[t]
# add estimated state action value to the return
if updateTime + n <= T:
returns += valueFunction.value(positions[updateTime + n],
velocities[updateTime + n],
actions[updateTime + n])
# update the state value function
if positions[updateTime] != POSITION_MAX:
valueFunction.learn(positions[updateTime], velocities[updateTime], actions[updateTime], returns)
if updateTime == T - 1:
break
currentPosition = newPostion
currentVelocity = newVelocity
currentAction = newAction
return time

Next, we use the method mentioned earlier to solve this problem:

1
2
3
4
5
6
7
8
9
10
episodes = 9000
targetEpisodes = [1-1, 12-1, 104-1, 1000-1, episodes - 1]
numOfTilings = 8
alpha = 0.3
valueFunction = ValueFunction(alpha, numOfTilings)
for episode in range(0, episodes):
print('episode:', episode)
semiGradientNStepSarsa(valueFunction)
if episode in targetEpisodes:
prettyPrint(valueFunction, 'Episode: ' + str(episode + 1))

Result is as follows:

mcar-sg-sarsa

The result shows what typically happens while learning to solve this task with this form of function approximation. Shown is the negative of the value function (the cost-to-go function) learned on a single run. The initial action values were all zero, which was optimistic (all true values are negative in this task), causing extensive exploration to occur even though the exploration parameter, $\varepsilon$, was 0. All the states visited frequently are valued worse than unexplored states, because the actual rewards have been worse than what was (unrealistically) expected. This continually drives the agent away from wherever it has been, to explore new states, until a solution is found.

Next, let us test the performance of various step size (learning rate).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
runs = 10
episodes = 500
numOfTilings = 8
alphas = [0.1, 0.2, 0.5]
steps = np.zeros((len(alphas), episodes))
for run in range(0, runs):
valueFunctions = [ValueFunction(alpha, numOfTilings) for alpha in alphas]
for index in range(0, len(valueFunctions)):
for episode in range(0, episodes):
print('run:', run, 'alpha:', alphas[index], 'episode:', episode)
step = semiGradientNStepSarsa(valueFunctions[index])
steps[index, episode] += step
steps /= runs
global figureIndex
plt.figure(figureIndex)
figureIndex += 1
for i in range(0, len(alphas)):
plt.plot(steps[i], label='alpha = '+str(alphas[i])+'/'+str(numOfTilings))
plt.xlabel('Episode')
plt.ylabel('Steps per episode')
plt.yscale('log')
plt.legend()

The result is as follows:

mcar-sg-sarsa-var-alpha

And then, let us test the performance of one-step sarsa and multi-step sarsa on the Mountain Car task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
runs = 10
episodes = 500
numOfTilings = 8
alphas = [0.5, 0.3]
nSteps = [1, 8]
steps = np.zeros((len(alphas), episodes))
for run in range(0, runs):
valueFunctions = [ValueFunction(alpha, numOfTilings) for alpha in alphas]
for index in range(0, len(valueFunctions)):
for episode in range(0, episodes):
print('run:', run, 'steps:', nSteps[index], 'episode:', episode)
step = semiGradientNStepSarsa(valueFunctions[index], nSteps[index])
steps[index, episode] += step
steps /= runs
global figureIndex
plt.figure(figureIndex)
figureIndex += 1
for i in range(0, len(alphas)):
plt.plot(steps[i], label='n = '+str(nSteps[i]))
plt.xlabel('Episode')
plt.ylabel('Steps per episode')
plt.yscale('log')
plt.legend()

The result is as follows:

mcar-sg-sarsa-var-n

Finally, let me shows the results of a more detailed study of the effect of the parameters $\alpha$ and $n$ on the rate of learning on this task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
alphas = np.arange(0.25, 1.75, 0.25)
nSteps = np.power(2, np.arange(0, 5))
episodes = 50
runs = 5
truncateStep = 300
steps = np.zeros((len(nSteps), len(alphas)))
for run in range(0, runs):
for nStepIndex, nStep in zip(range(0, len(nSteps)), nSteps):
for alphaIndex, alpha in zip(range(0, len(alphas)), alphas):
if (nStep == 8 and alpha > 1) or \
(nStep == 16 and alpha > 0.75):
# In these cases it won't converge, so ignore them
steps[nStepIndex, alphaIndex] += truncateStep * episodes
continue
valueFunction = ValueFunction(alpha)
for episode in range(0, episodes):
print('run:', run, 'steps:', nStep, 'alpha:', alpha, 'episode:', episode)
step = semiGradientNStepSarsa(valueFunction, nStep)
steps[nStepIndex, alphaIndex] += step
# average over independent runs and episodes
steps /= runs * episodes
# truncate high values for better display
steps[steps > truncateStep] = truncateStep
global figureIndex
plt.figure(figureIndex)
figureIndex += 1
for i in range(0, len(nSteps)):
plt.plot(alphas, steps[i, :], label='n = '+str(nSteps[i]))
plt.xlabel('alpha * number of tilings(8)')
plt.ylabel('Steps per episode')
plt.legend()

The result is as follows:

mcar-sg-sarsa-var-alpha-n

Update

Use OpenAI gym

Now, let us use the OpenAI gym toolkit to simply our Mountain Car task. As we know, we need to define the environment of task before develop the detail algorithm. It’s easy to make mistakes with some detail. So it is fantastic if we do not need to care about that. The gym toolkit help us to define the environment. Such as the Mountain Car task, there is a build-in model in the gym toolkit. We just need to write one row code:

1
env = gym.envs.make("MountainCar-v0")

That is amazing!

We also can test the environment very convenience and get a pretty good user graphic:

1
2
3
4
5
6
7
8
9
10
11
12
13
env.reset()
plt.figure()
plt.imshow(env.render(mode='rgb_array'))
# for x in range(10000):
# env.step(0)
# plt.figure()
# plt.imshow(env.render(mode='rgb_array'))
[env.step(0) for x in range(10000)]
plt.figure()
plt.imshow(env.render(mode='rgb_array'))
env.render(close=True)

These codes will return the result as follows:

mcar-gym-test

Bravo~

Now, let us solve the task by to use the Q-Learning algorithm. Meanwhile, we will use the RBF kernel to construct the features and use the SGDRegressor gradient-descent method of scikit-learn package to update $\mathbf{w}$.

First of all, we need to normalized our features to accelerate the convergence speed and use the RBF kernel to reconstruct our feature space. The both methods (Normalization and Reconstruction) need to use some training set to train a model. Then we use this model to transform our raw data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Feature Preprocessing: Normalize to zero mean and unit variance
# We use a few samples from the observation space to do this
observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(observation_examples)
# Used to converte a state to a featurizes represenation.
# We use RBF kernels with different variances to cover different parts of the space
featurizer = sklearn.pipeline.FeatureUnion([
("rbf1", RBFSampler(gamma=5.0, n_components=100)),
("rbf2", RBFSampler(gamma=2.0, n_components=100)),
("rbf3", RBFSampler(gamma=1.0, n_components=100)),
("rbf4", RBFSampler(gamma=0.5, n_components=100))
])
featurizer.fit(scaler.transform(observation_examples))

Next, we define a class named Estimator to simply the gradient descent process:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Estimator():
"""
Value Function approximator.
"""
def __init__(self):
# We create a separate model for each action in the environment's
# action space. Alternatively we could somehow encode the action
# into the features, but this way it's easier to code up.
self.models = []
for _ in range(env.action_space.n):
model = SGDRegressor(learning_rate="constant")
# We need to call partial_fit once to initialize the model
# or we get a NotFittedError when trying to make a prediction
# This is quite hacky.
model.partial_fit([self.featurize_state(env.reset())], [0])
self.models.append(model)
def featurize_state(self, state):
"""
Returns the featurized representation for a state.
"""
scaled = scaler.transform([state])
featurized = featurizer.transform(scaled)
return featurized[0]
def predict(self, s, a=None):
"""
Makes value function predictions.
Args:
s: state to make a prediction for
a: (Optional) action to make a prediction for
Returns
If an action a is given this returns a single number as the prediction.
If no action is given this returns a vector or predictions for all actions
in the environment where pred[i] is the prediction for action i.
"""
features = self.featurize_state(s)
if not a:
return np.array([m.predict([features])[0] for m in self.models])
else:
return self.models[a].predict([features])[0]
def update(self, s, a, y):
"""
Updates the estimator parameters for a given state and action towards
the target y.
"""
features = self.featurize_state(s)
self.models[a].partial_fit([features], [y])

We also need a $\varepsilon$-greedy policy to select action:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def make_epsilon_greedy_policy(estimator, epsilon, nA):
"""
Creates an epsilon-greedy policy based on a given Q-function approximator and epsilon.
Args:
estimator: An estimator that returns q values for a given state
epsilon: The probability to select a random action . float between 0 and 1.
nA: Number of actions in the environment.
Returns:
A function that takes the observation as an argument and returns
the probabilities for each action in the form of a numpy array of length nA.
"""
def policy_fn(observation):
A = np.ones(nA, dtype=float) * epsilon / nA
q_values = estimator.predict(observation)
best_action = np.argmax(q_values)
A[best_action] += (1.0 - epsilon)
return A
return policy_fn

Then we develop the Q-Learning method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def q_learning(env, estimator, num_episodes, discount_factor=1.0, epsilon=0.1, epsilon_decay=1.0):
"""
Q-Learning algorithm for fff-policy TD control using Function Approximation.
Finds the optimal greedy policy while following an epsilon-greedy policy.
Args:
env: OpenAI environment.
estimator: Action-Value function estimator
num_episodes: Number of episodes to run for.
discount_factor: Lambda time discount factor.
epsilon: Chance the sample a random action. Float betwen 0 and 1.
epsilon_decay: Each episode, epsilon is decayed by this factor
Returns:
An EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
"""
# Keeps track of useful statistics
stats = plotting.EpisodeStats(
episode_lengths=np.zeros(num_episodes),
episode_rewards=np.zeros(num_episodes))
for i_episode in range(num_episodes):
# The policy we're following
policy = make_epsilon_greedy_policy(
estimator, epsilon * epsilon_decay**i_episode, env.action_space.n)
# Print out which episode we're on, useful for debugging.
# Also print reward for last episode
last_reward = stats.episode_rewards[i_episode - 1]
sys.stdout.flush()
# Reset the environment and pick the first action
state = env.reset()
# Only used for SARSA, not Q-Learning
next_action = None
# One step in the environment
for t in itertools.count():
# Choose an action to take
# If we're using SARSA we already decided in the previous step
if next_action is None:
action_probs = policy(state)
action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
else:
action = next_action
# Take a step
next_state, reward, done, _ = env.step(action)
# Update statistics
stats.episode_rewards[i_episode] += reward
stats.episode_lengths[i_episode] = t
# TD Update
q_values_next = estimator.predict(next_state)
# Use this code for Q-Learning
# Q-Value TD Target
td_target = reward + discount_factor * np.max(q_values_next)
# Use this code for SARSA TD Target for on policy-training:
# next_action_probs = policy(next_state)
# next_action = np.random.choice(np.arange(len(next_action_probs)), p=next_action_probs)
# td_target = reward + discount_factor * q_values_next[next_action]
# Update the function approximator using our target
estimator.update(state, action, td_target)
print("\rStep {} @ Episode {}/{} ({})".format(t, i_episode + 1, num_episodes, last_reward), end="")
if done:
break
state = next_state
return stats

Run this method:

1
2
3
4
5
estimator = Estimator()
# Note: For the Mountain Car we don't actually need an epsilon > 0.0
# because our initial estimate for all states is too "optimistic" which leads
# to the exploration of all states.
stats = q_learning(env, estimator, 100, epsilon=0.0)

The result is as follows:

mcar-ql-gym