Thompson sampling
Nomenclature
Consider:
- a set of contexts $\mathcal{X}$
- a set of actions $\mathcal{A}$
- and rewards in $\mathbb{R}$
Rationale
Definition
For each iteration $t$:
- A “player” obtains a context $x\in \mathcal{X}$
- Plays an action $a\in \mathcal{A}$
- Receives a reward $r\in \mathcal{R}$
- This rewards is distributed according to the context and the resulting action
- The player’s goal is to execute actions that maximize the cumulative rewards.
Implementation
The implementation will focus on these concepts:
- a likelihood function $P(r|\theta ,a,x)$
- a set $\Theta$ of parameters $\theta$ of the distribution of $r$
- a prior distribution $P(\theta )$ on these parameters
- past observations triplets $\mathcal{D}={(x;a;r)}$
- a posterior distribution $P(\theta |{\mathcal {D}})\propto P({\mathcal {D}}|\theta )P(\theta )$, where $P({\mathcal {D}}|\theta )$ is the likelihood function.
Thompson sampling consists in playing the action $a^{\ast }\in {\mathcal {A}}$ according to the probability that it maximizes the expected reward, i.e. action $a^{\ast }$ is chosen with probability
$$ \int \mathbb {I} \left[\mathbb {E} (r|a^{\ast },x,\theta )=\max _{a’}\mathbb {E} (r|a’,x,\theta )\right]P(\theta |{\mathcal {D}})d\theta , $$
where $\mathbb {I}$ is the indicator function.
In practice, the rule is implemented by sampling. In each round, parameters $\theta^\ast$ are sampled from the posterior $P(\theta |{\mathcal {D}})$, and an action $a^{\ast }$ chosen that maximizes ${\mathbb {E}}[r|\theta ^{\ast },a^{\ast },x]$, i.e. the expected reward given the sampled parameters, the action, and the current context. Conceptually, this means that the player instantiates their beliefs randomly in each round according to the posterior distribution, and then acts optimally according to them. In most practical applications, it is computationally onerous to maintain and sample from a posterior distribution over models. As such, Thompson sampling is often used in conjunction with approximate sampling techniques.
Example
N_TRIALS = 2000
N_ARMS = 16
N_FEATURES = 5
BEST_ARMS = [3, 7, 9, 15]
We now define a function to generate context vectors for all arms for each of the trial. We need:
n_trials
, number of trials ($N_T$)n_arms
, number of arms per trial ($N_A$)n_features
, number of feature per context vector ($N_f$)
This function will return a matrix of size $N_{T} \times N_{A} \times N_{f}$
def make_design_matrix(n_trials: int,
n_arms: int,
n_features: int) -> np.ndarray:
available_arms = np.arange(n_arms)
X = np.array([[np.random.uniform(0, 1, size = n_features)
for _ in np.arange(n_arms)]
for _ in np.arange(n_trials)])
return X
X = make_design_matrix(n_trials=N_TRIALS,
n_arms=N_ARMS,
n_features=N_FEATURES)
This will have the shape
![[Thompson sampling trials.excalidraw.svg]]
The following function will generate the true $\Theta = {\theta_1,\dots,\theta_n}$ for testing purposes. We provide:
- $N_A$, number of arms (
n_arms
) - $N_f$, number of features for the context vector (
n_features
) best_arms
, arms in which we should give some bias values (for good)bias
, value to be added to the best arms
A matrix of size $N_{A} \times N_{f}$, each value is a random value with $\mu = 0$ and standard deviation of $\frac{1}{4}$. However, for the best arms, we will add the bias.
![[Thompson sampling thetas.excalidraw.svg]]
def make_theta(n_arms: int,
n_features: int, best_arms, bias = 1):
true_theta = np.array(
[np.random.normal(size=n_features, scale=1.0/4.0)
for _ in range(n_arms)])
true_theta[best_arms] += bias
return true_theta
true_theta = make_theta(
n_arms=N_ARMS,
n_features=N_FEATURES,
best_arms=BEST_ARMS)
A function is also available to generate rewards. It creates rewards for each arm, given a context.
We provide:
- $a$, this is the arm index ($0\leq a \leq N_{A}-1$)
x
, is the context that we are observing for the arm index (arm)- $\theta$, is the theta (true or predicted) that are are using to estimate the reward for each arm (
theta
) scale_noise
, we may need to add some random noise ($\mu=0$ and standard deviation asscale_noise
)
This will return the estimated score for the arm (with the arm index and the context observed corresponding to the given theta).
def generate_reward(arm, x, theta, scale_noise = 1.0/10.0):
signal = theta[arm].dot(x)
noise = np.random.normal(scale=scale_noise)
return signal + noise
random_payoffs = np.array(
[generate_reward(
arm=np.random.choice(N_ARMS),
x=X[t, np.random.choice(N_ARMS)],
theta=true_theta)
for t in range(N_TRIALS)])
# Defining oracle (best payoffs based on the true_theta)
oracles = np.array(
[np.max(
[generate_reward(
arm=arm,
x=X[t, arm],
theta=true_theta)
for arm in range(N_ARMS)])
for t in range(N_TRIALS)])
We also create a function to generate the cumulative regret over time.
We provide:
payoffs
, an array of $T$ payoffs (for $T$ number of trials)oracles
, an array of best values for $T$ trials (oracles)
And we get an array of the cumulative sum over time (of size $T$).
def make_regret(payoffs: np.ndarray,
oracles: np.ndarray) -> np.ndarray:
return np.cumsum(oracles - payoffs)
payoffs = [
[generate_reward(
arm=arm,
x=X[t, arm],
theta=true_theta)
for arm in np.arange(N_ARMS)]
for t in np.arange(N_TRIALS)]
ave_rewards = np.mean(payoffs, axis=0)
The actual sampling
The method to perform the actual sampling is next. We provide:
- $\delta$ (
delta
), with $0 < \delta < 1$. With probability $1 - \delta$, linear thompson sampling satisfies the theoretical regret bound. - $R$, with $R \geq 0$. Assume that the residual $ri(t) - bi(t)^T \hat{\mu}$ is R-sub-gaussian. In this case, $R^2$ represents the variance for residuals of the linear model $bi(t)^T$.
- $\epsilon$ (
epsilon
), with $0 < \epsilon < 1$ A parameter used by the Thompson Sampling algorithm. If the total trials $T$ is known, we can choose $\epsilon = \frac{1}{\ln{T}}$.
delta=0.5
R = 0.01
epsilon=0.5
We use r_payoffs
to store the payoff for each trial (the payoff for the selected arm based on the true_theta
).
As such, we initialise a zero array of size n_trials
.
r_payoffs = np.zeros(N_TRIALS)
v = R * np.sqrt(24 / epsilon * N_FEATURES * np.log(1 / delta))
Model initialisation:
B = np.identity(N_FEATURES)
mu_hat = np.zeros(shape=(N_FEATURES, 1))
f = np.zeros(shape=(N_FEATURES,1))
for t in range(N_TRIALS):
context = X[t]
mu_tilde = np.random.multivariate_normal(mu_hat.flat, v**2 * np.linalg.inv(B))[..., np.newaxis]
score_array = context.dot(mu_tilde)
chosen_arm = np.argmax(score_array)
context_t = context[chosen_arm]
reward = generate_reward(arm=chosen_arm, x=context_t, theta=true_theta)
r_payoffs[t] = reward
context_t = np.reshape(context_t, (-1, 1))
B += context_t.dot(context_t.T)
f += reward*context_t
mu_hat = np.linalg.inv(B).dot(f)