Markov chain Monte Carlo explained with Python
A simple example
last update: Feb 12, 2025
A simple example of a Metropolis-Hastings algorithm used to find a probability distribution of possible values of the spring constant, \(k\), and the damping factor, \(c\) for a damped harmonic oscillator. Here the true values are \(k = 4.10\), and \(c = 0.50\).
Article Motivation
- What is a Markov chain Monte Carlo?
- What is the Metropolis-Hastings algorithm?
- Code a simple example in python
Introduction
The point I want to make in this article is that, with real world, noisy data from a system, and a simulation of that system, we can find a probability distribution of possible parameter values for the system using an MCMC method.
MCMC is a broad family of sampling algorithms used to sample a probability distribution that is too difficult to sample directly. It was created in Los Alamos in 1953 as the Metropolis algorithm and later generalized in 1973 to the Metropolis-Hastings (MH) algorithm [1,2].
This MH algorithm is the specific instance of MCMC that were going to focus on in this post. We will consider a simple example, code it up in python, and visualize it using matplotlib
.
Example Problem
We have a mass on a spring. We know that this system obeys the following the damped harmoic oscillator equation
We even know the initial conditions of the position and velocity, and the mass
Finally, we have a noisy data set, \(y_{sample}\) that tells us the displacement of our mass at \(n\) time steps
Our goal is the use Metropolis-Hastings algorithm to find a probability distribution for the spring parameters \(k\), and \(c\).
Keep in mind that in a simple example like this, there are much easier ways to solve for \(k\) and \(c\). MCMC shines is in high dimensions spaces. Today we'll just get a better understanding of the basics.
Setting up the problem
First we need to initiate the spring constant k
, and damping constant c
, that we will eventually solve for, along with our initial guesses.
We'll also decide how many times, n
, we want to run our MH algorithm.
Finally we'll set up the time stepping parameters.
# Parameters n = 15000 # Number of runs m = 2.0 # mass k = 4.10 # true spring constant c = 0.50 # true damping factor k_guess = 4.30 # spring constant guess c_guess = 0.45 # damping factor guess # Setup timestepping t_max = 10 steps_per_unit_t = 100 num_t_steps = t_max * steps_per_unit_t + 1 t = np.linspace(0, t_max, num_t_steps) dt = t[1] - t[0]
Creating synthetic data
Exact solutions to interesting problems are rare. In the physical world there is noise and uncertainty so we are forced to treat problems as statistical in nature.
We'll create some synthetic data for our problem. In this simple problem we know the analytical solution of a damped harmonic oscillator.
We can just take this and add some Gaussian noise to simulate real world data as follows.
# Create synthetic data s = 0.1 # Noise level walk_size_k = 4 * s # Random walk step size walk_size_c = 0.2 * s # Random walk step size num_samples = 50 # sample 50 time points time_samples = np.linspace(1, num_t_steps-1, num_samples, dtype=int) time_of_samples = (time_samples - 1) / num_t_steps * t_max # find true solution y_true = 2 * np.exp(-c * t / (2 * m)) * np.cos(t * np.sqrt(4 * m * k - c * c) / (2 * m)) # get only sampled points y_true_sampled = y_true[time_samples] # add Gaussian noise to create synthetic data y_synthetic = y_true_sampled + s * np.random.randn(num_samples)
Now y_synthetic
contains a noisy sample of the displacement of the mass at 50 points in our time interval. We could think of this as our experimental data.
Damped harmonic oscillator simulation
We need a simulation of the damped harmonic oscillator. This means we will need a ODE solver. Instead of trying to make the best ODE solver ever, we'll just use a simple mid point solver.
This will predict the next half time step value using the previous values. Then use the midpoint \(y\) to update \(y_{dot}\) and the midpoint \(y_{dot}\) to update \(y\).
def damped_oscillator_solver(m, k, c, y_initial, y_dot_initial, dt, num_t_steps): # initialize y and y dot y = np.zeros(num_t_steps) y_dot = np.zeros(num_t_steps) # set initial conditions y[0] = y_initial y_dot[0] = y_dot_initial # compute half time step for midpoint rule ddt = 0.5 * dt # time loop using midpoint rule for t_current in range(1, num_t_steps): t_prev = t_current - 1 # previous time index # compute midpoint estimate for y and y dot y_mid = y[t_prev] + ddt * y_dot[t_prev] y_dot_mid = y_dot[t_prev] - (c/m) * ddt * y_dot[t_prev] - (k/m) * ddt * y[t_prev] # update y with y_dot at midpoint y[t_current] = y[t_prev] + dt * y_dot_mid # update y_dot with acceleration at midpoint y_dot[t_current] = y_dot[t_prev] - (c/m) * dt * y_dot_mid - (k/m) * dt * y_mid return y
Here our model is simple, but in a real world MCMC problem our model could be a complicated PDE solver.
Metropolis-Hastings algorithm
There are a lot of definitions of the MH algorithm that might differ slightly in their notation. Here is a simple explanation.
Steps in the Metropolis-Hastings algorithm
- Take a random walk away from your current guess. Call this the proposed guess.
- Solve the simulation with the proposed values.
- Create a likelihood function that tells you how similar that simulation is to your measured values.
- Calculate an acceptance ratio to decide if we should accept the proposal or not.
- Accept or reject the proposal
- Generate a random number
- If that random number is less than alpha, then accept, otherwise reject.
- Generate a random number
- Repeat
Using these guidelines we produce the following code
# initialze arrays to keep track of likelihood likelihood = np.zeros(n) initial_likelihood = calculate_likelihood(y_synthetic, y_true_sampled, s) likelihood[0] = initial_likelihood old_likelihood = initial_likelihood # array of spring parameter guesses inputs = np.zeros((2, n)) inputs[:, 0] = [k_guess, c_guess] # Run Metropolis-Hastings iteration for current_run in range(1, n): # propose next guesses for k and c kp = abs(k_guess + walk_size_k * (2 * np.random.rand() - 1)) cp = abs(c_guess + walk_size_c * (2 * np.random.rand() - 1)) # solve simulation with proposed values for k and c y_initial = 2.0 y_dot_initial = 0.0 y_proposed = damped_oscillator_solver(m, kp, cp, y_initial, y_dot_initial, dt, num_t_steps) # get values at selected time samples y_proposed_sampled = y_proposed[time_samples] # calculate the likelihood of the new data new_likelihood = calculate_likelihood(y_synthetic, y_proposed_sampled, s) ratio = np.exp(-(new_likelihood - old_likelihood) / 2) # rejection criteria acceptance_ratio = min(1, ratio) # accept new k and c values if np.random.rand() < acceptance_ratio: k_guess, c_guess = kp, cp old_likelihood = new_likelihood # update guesses and likelihood inputs[:, current_run] = [k_guess, c_guess] likelihood[current_run] = old_likelihood
Now we have an MH algorithm that can sample our unknown distribution. If we keep track of this sampling we can find a probability distribution for our damped spring parameters.
Putting it all together
Taking all the code we've written so far and putting it together we get:
import numpy as np from scipy.stats import mode import matplotlib.pyplot as plt from matplotlib.ticker import FormatStrFormatter import os def find_spring_params(): """ Given noisy measurements of y from a damped spring system m * d^2y/dt^2 + c * dy/dt + k * y = 0, with y(0)=2, dy/dt(0)=0. From this, estimate c, k """ # Parameters n = 15000 # Number of runs m = 2.0 # mass k = 4.10 # true spring constant c = 0.50 # true damping factor k_guess = 3.8 # spring constant guess c_guess = 0.40 # damping factor guess # Setup timestepping t_max = 10 steps_per_unit_t = 100 num_t_steps = t_max * steps_per_unit_t + 1 t = np.linspace(0, t_max, num_t_steps) dt = t[1] - t[0] # Create folder to store plots os.makedirs("mcmc_images", exist_ok=True) # Create synthetic data s = 0.1 # Noise level walk_size_k = 2 * s # Random walk step size walk_size_c = 0.1 * s # Random walk step size num_samples = 50 # sample 50 time points time_samples = np.linspace(1, num_t_steps-1, num_samples, dtype=int) time_of_samples = (time_samples - 1) / num_t_steps * t_max # find true solution y_true = 2 * np.exp(-c * t / (2 * m)) * np.cos(t * np.sqrt(4 * m * k - c * c) / (2 * m)) # get only sampled points y_true_sampled = y_true[time_samples] # add Gaussian noise to create synthetic data y_synthetic = y_true_sampled + s * np.random.randn(num_samples) # initialze arrays to keep track of likelihood likelihood = np.zeros(n) initial_likelihood = calculate_likelihood(y_synthetic, y_true_sampled, s) likelihood[0] = initial_likelihood old_likelihood = initial_likelihood # array of spring parameter guesses inputs = np.zeros((2, n)) inputs[:, 0] = [k_guess, c_guess] # Run Metropolis-Hastings iteration for current_run in range(1, n): # propose next guesses for k and c kp = abs(k_guess + walk_size_k * (2 * np.random.rand() - 1)) cp = abs(c_guess + walk_size_c * (2 * np.random.rand() - 1)) # solve simulation with proposed values for k and c y_initial = 2.0 y_dot_initial = 0.0 y_proposed = damped_oscillator_solver(m, kp, cp, y_initial, y_dot_initial, dt, num_t_steps) # get values at selected time samples y_proposed_sampled = y_proposed[time_samples] # calculate the likelihood of the new data new_likelihood = calculate_likelihood(y_synthetic, y_proposed_sampled, s) ratio = np.exp(-(new_likelihood - old_likelihood) / 2) # rejection criteria acceptance_ratio = min(1, ratio) # accept new k and c values if np.random.rand() < acceptance_ratio: k_guess, c_guess = kp, cp old_likelihood = new_likelihood # update guesses and likelihood inputs[:, current_run] = [k_guess, c_guess] likelihood[current_run] = old_likelihood # Uncomment the following to produce plots # plot_data(current_run, inputs, y_true, y_synthetic, y_proposed, t, time_of_samples, m, current_run) def calculate_likelihood(y_synthetic, y_proposed, s): return np.sum((y_synthetic - y_proposed) ** 2) def damped_oscillator_solver(m, k, c, y_initial, y_dot_initial, dt, num_t_steps): # initialize y and y dot y = np.zeros(num_t_steps) y_dot = np.zeros(num_t_steps) # set initial conditions y[0] = y_initial y_dot[0] = y_dot_initial # compute half time step for midpoint rule ddt = 0.5 * dt # time loop using midpoint rule for t_current in range(1, num_t_steps): t_prev = t_current - 1 # previous time index # compute midpoint estimate for y and y dot y_mid = y[t_prev] + ddt * y_dot[t_prev] y_dot_mid = y_dot[t_prev] - (c/m) * ddt * y_dot[t_prev] - (k/m) * ddt * y[t_prev] # update y with y_dot at midpoint y[t_current] = y[t_prev] + dt * y_dot_mid # update y_dot with acceleration at midpoint y_dot[t_current] = y_dot[t_prev] - (c/m) * dt * y_dot_mid - (k/m) * dt * y_mid return y # Run the function find_spring_params()
Plotting function
Using matplotlib
and ffmpeg
we can create the video at the top. Here's the plotting code.
def plot_data(current_run, inputs, y_true, y_synthetic, y_proposed, t, time_of_samples, m, n): # Colors and parameters edge_color = "#eceff4" font_color = "#eceff4" bg_color = "#4c566a" axis_color = "#eceff4" border_color = "#eceff4" bins = 50 # Create figure and subplots fig, axs = plt.subplots(2, 2, figsize=(12, 8), facecolor=bg_color) fig.patch.set_facecolor(bg_color) # Add a supertitle for the figure fig.suptitle(f"Run {n}", x=0.511, color=font_color, fontsize=16) # Apply common settings to every subplot for ax in axs.flat: ax.tick_params(axis='both', colors=axis_color) for spine in ax.spines.values(): spine.set_color(border_color) ax.set_facecolor(bg_color) ax.xaxis.label.set_color(font_color) ax.yaxis.label.set_color(font_color) ax.title.set_color(font_color) # Scatter plot of k vs. c axs[0, 1].scatter(inputs[0, :current_run], inputs[1, :current_run], color=font_color, alpha=0.2) axs[0, 1].set_xlabel('k', color=font_color) axs[0, 1].set_ylabel('c', color=font_color) axs[0, 1].set_title('Scatter plot of k vs. c', color=font_color) ax.xaxis.set_major_formatter(FormatStrFormatter('%.2f')) ax.yaxis.set_major_formatter(FormatStrFormatter('%.2f')) # y_proposed vs yTrue axs[1, 0].plot(t, y_true, label='True Solution', color="#b48ead") axs[1, 0].plot(t, y_proposed, label='Current Guess', color="#ebcb8b") axs[1, 0].scatter(time_of_samples, y_synthetic, color="#b48ead", marker='.', label='Measurements') axs[1, 0].legend(facecolor=bg_color, labelcolor=font_color, loc="upper right") axs[1, 0].set_title('Current y_proposed vs y_true', color=font_color) axs[1, 0].set_xlabel('time', color=font_color) axs[1, 0].set_ylim(-2.1, 2.5) axs[1, 0].xaxis.set_major_formatter(FormatStrFormatter('%.0f')) axs[1, 0].yaxis.set_major_formatter(FormatStrFormatter('%.1f')) # Histogram of k values axs[1, 1].hist(inputs[0, :current_run], bins=bins, color="#5e81ac", edgecolor=edge_color) axs[1, 1].set_xlabel('k values', color=font_color) axs[1, 1].set_title('Histogram of k', color=font_color) axs[1, 1].xaxis.set_major_formatter(FormatStrFormatter('%.2f')) axs[1, 1].yaxis.set_major_formatter(FormatStrFormatter('%.0f')) # Histogram of c values axs[0, 0].hist(inputs[1, :current_run], bins=bins, orientation="horizontal", color="#bf616a", edgecolor=edge_color) axs[0, 0].set_ylabel('c values', color=font_color) axs[0, 0].set_title('Histogram of c', color=font_color) axs[0, 0].xaxis.set_major_formatter(FormatStrFormatter('%.0f')) axs[0, 0].yaxis.set_major_formatter(FormatStrFormatter('%.2f')) # Save the figure plt.subplots_adjust(hspace=0.3) plt.savefig(f'mcmc_images/frame_{current_run:06d}.png') plt.close()
Don't forget to include plot_data()
in the time loop of the code if you want to produce the graphs. They will output to a folder called mcmc_images/
. Using the following Linux terminal commands we can reproduce the movie above (with a different random noise in the measurements).
# fix the file names ls frame_*.png | awk 'BEGIN{ a=0 }{ printf "mv %s frame_%04d.png\n", $0, a++ }' | bash # create the video ffmpeg -framerate 24 -start_number 4 -i frame_%04d.png -vf "scale=1920:1080" -c:v libx264 -pix_fmt yuv420p output.mp4
Conclusion
With a numerical simulation (like an ODE or a PDE solver), of a system with \(p\) parameters, and experimental data from the real life system, we can use an MCMC to produce a probability distribution of the parameters that produced the experimental data.
I've shown that here in a very simple way using synthetic data from a damped harmonic oscillator and a simple ODE solver.
However, MCMC algorithms are high configurable. What we've done here is just the tip of the iceberg.
For example, here we've used a uniform random walk for the proposal distribution. We could tweak the hyper-parameters of our distribution, like the walk size, or we could use an entirely different proposal distribution that might be better suited for the problem.
We used an L2 norm as our likelihood function and an exponential function for our acceptance ratio. These could both be improved upon.
Advanced techniques like delayed rejection, can help keep interesting points that might otherwise be lost.
Simulated Annealing is a way of using this process to find maximum and minimum of functions.
Overall, when dealing with messy, high dimensional data, MCMC is a very effective way of exploring an unknown probability distribution, and extracting interesting information from it.
Code
All code the above code is available on the github page.