Tutorial 2.1: ODEs discovery in spring-mass-damper systems!#

Open In Colab Binder

In this tutorial, we will go through an example to show how to combine dimensionless learning with SINDy to discover the governing equation in spring-mass-damper systems.

We consider an object with mass \(m\) is suspended from a spring with an initial displacement \(\delta\), a spring constant \(k\), and a damping coefficient \(c\). The schematic of a spring-mass-damper system is shown below:

Schematic

We can express the above system mathematically. The governing equation is

\[ m\frac{d^2 x}{dt^2} + c\frac{dx}{dt} + kx = 0 \]

The spring has a initial displacement \(x(t=0)=\delta\) and no initial velocity \(\frac{dx}{dt}=0\).

To create a dataset, we will change \(\delta\), \(m\), \(k\), and \(c\) to obtain different time-varying curves.

Import python libraries#

import copy
import os
import sys

import pandas as pd
from pyexpat import model
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LinearRegression, Ridge, LassoCV
from sklearn.metrics import r2_score
from sympy.utilities.lambdify import lambdify

%matplotlib inline
plt.rcParams["font.family"] = "Arial"
np.set_printoptions(suppress=True)
# # please uncomment these two lines, if you run this code in Colab
# !git clone https://github.com/xiaoyuxie-vico/PyDimension-Book
# %cd PyDimension-Book/examples

Helper functions#

SeqReg() is a class to identify differential equations using sparse regression.

class SeqReg(object):

    def __init__(self):
        pass

    def normalize(self, X, y):
        '''
        Normalization the data
        '''
        norm_coef_X = np.mean(np.abs(np.mean(X, axis=0)))
        norm_coef_y = np.mean(np.abs(np.mean(y, axis=0)))
        norm_coef = min(norm_coef_X, norm_coef_y)
        # print('Before X', pd.DataFrame(np.concatenate([X, y], axis=1)).describe())
        X = X / norm_coef
        y = y / norm_coef
        # print('After X', pd.DataFrame(np.concatenate([X, y], axis=1)).describe())
        return X, y

    def fit_fixed_threshold(self, X, y, alpha=1.0, threshold=0.005, is_normalize=True):
        if is_normalize:
            X, y = self.normalize(X, y)
        
        # initialize a linear regression model
        # model = LinearRegression(fit_intercept=False)
        model = Ridge(fit_intercept=False, alpha=1)
        model.fit(X, y)
        # r2 = model.score(X, y)
        for idx in range(3):
            coef = model.coef_
            flag = np.repeat((np.abs(coef) > threshold).astype(int).reshape(1,-1), 
                             X.shape[0], axis=0)
            X1 = copy.copy(X)
            X1 = np.multiply(X1, flag)
            model.fit(X1, y)
            r2 = model.score(X1, y)
            print(f'training {idx} r2: {r2}')
        coef = np.squeeze(model.coef_)
        return coef, X1

    def fit_dynamic_thresh(self, X, y, non_zero_term=4, alpha=1.0, threshold=0.005, 
                is_normalize=True, fit_intercept=False, model_name='Ridge', max_iter=200):
        '''
        decrease the threshold when there are only limited non-zero terms
        and increase the threshold when thre are more non-zeros terms
        '''
        if is_normalize:
            X, y = self.normalize(X, y)
        
        # initialize a linear regression model
        if model_name == 'Ridge':
            model = Ridge(fit_intercept=fit_intercept, alpha=alpha)
        elif model_name == 'LR':
            model = LinearRegression(fit_intercept=fit_intercept)
        else:
            raise Exception('Wrong model_name.')
        model.fit(X, y)
        count = 0

        while count <= max_iter:
            coef = model.coef_
            flag = np.repeat((np.abs(coef) > threshold).astype(int).reshape(1,-1), 
                             X.shape[0], axis=0)
            cur_non_zero_term = np.sum(flag[0,:])
            X1 = copy.copy(X)
            X1 = np.multiply(X1, flag)
            model.fit(X1, y)
            r2 = model.score(X1, y)
            # print(f'training r2: {r2}, threshold: {threshold}, cur_non_zero_term: {cur_non_zero_term}')
            if cur_non_zero_term == non_zero_term:
                break
            elif cur_non_zero_term < non_zero_term:
                threshold *= 0.95
            else:
                threshold *= 1.05
            count += 1

        coef = np.squeeze(model.coef_)
        if fit_intercept:
            coef_list = coef.tolist()
            coef_list.append(float(model.intercept_))
            coef = np.array(coef_list)

        return coef, X1, r2

PolyDiffPoint()is used to calculate derivatives using polynomial functions.

def PolyDiffPoint(u, x, deg=3, diff=1, index=None):
    '''
    Poly diff
    The original code of this part: https://github.com/snagcliffs/PDE-FIND
    '''
    n = len(x)
    # if index == None: index = int((n-1)/2)
    if index == None: index = (n-1)//2

    # Fit to a polynomial
    poly = np.polynomial.chebyshev.Chebyshev.fit(x, u, deg)
    
    # Take derivatives
    derivatives = []
    for d in range(1, diff + 1):
        derivatives.append(poly.deriv(m=d)(x[index]))
    
    return derivatives

Dataset preparation#

SpringMassDataset() is a class to generate simualtion data set.

class SpringMassDataset(object):
    '''
    Generate data for spring-mass-damping systems
    '''
    def __init__(self, k, m, A0, c, v0=0, et=20, Nt=800):
        super(SpringMassDataset, self).__init__()
        self.k = k
        self.m = m
        self.A0 = A0
        self.c = c
        self.et = et
        self.v0 = v0
        self.Nt = Nt

        self.omega_n = np.sqrt(k / m)
        self.xi = c / 2 / np.sqrt(m * k)
        self.omega_d = self.omega_n * np.sqrt(1 - self.xi**2)
        self.A = np.sqrt(A0**2 + ((v0 + self.xi * self.omega_n * A0) / self.omega_d)**2)
        self.phi = np.arctan(self.omega_d * A0 / (v0 + self.xi * self.omega_n * A0))

    def solution(self):
        t = np.linspace(0, self.et, self.Nt, endpoint=False)
        x = self.A * np.exp(-self.xi * self.omega_n * t) * np.sin(self.omega_d * t + self.phi)
        info = {'t': t, 'x': x}
        df = pd.DataFrame(info)
        return df

# test the SpringMassDataset
k, m, A0, c, et, Nt = 0.2, 0.2, 0.07, 0.02, 20, 1000
dataset = SpringMassDataset(k, m, A0, c)
data_old = dataset.solution()
fig = plt.figure()
plt.plot(data_old['t'], data_old['x'])
[<matplotlib.lines.Line2D at 0x7f2190ca42d0>]
../_images/discover_spring_clean_15_1.png

FitEqu() is used for generating and parsing the dataset, build the regression library, and identifying the best differential equations.

class FitEqu(object):
    '''
    For a given data, fit the governing equation.
    '''
    def __init__(self):
        super(FitEqu, self).__init__()
        
    def prepare_data(self, k, m, A0, c, et, Nt):
        '''
        generate the dataset
        '''
        dataset = SpringMassDataset(k, m, A0, c, et=et, Nt=Nt)
        data = dataset.solution()  # {'t': t, 'x': x}
        return data
    
    def cal_derivatives(self, data, dt, Nt, deg=3, num_points=100, boundary_t=5):
        '''
        prepare library for regression
        '''
        x_clean = data['x'].to_numpy()
        t = np.arange(2*boundary_t, Nt-2*boundary_t)
        # points = np.random.choice(t, num_points, replace=False)
        points = t
        num_points = points.shape[0]

        x = np.zeros((num_points, 1))
        xt = np.zeros((num_points, 1))
        xtt = np.zeros((num_points, 1))

        Nt_sample = 2 * boundary_t - 1
        for p in range(num_points):
            t = points[p]
            x[p] = x_clean[t]
            x_part = x_clean[t-int((Nt_sample-1)/2): t+int((Nt_sample+1)/2)]
            xt[p], xtt[p] = PolyDiffPoint(x_part, np.arange(Nt_sample)*dt, deg, 2)

        return x, xt, xtt

    @staticmethod
    def build_library(x, xt, xtt):
        '''
        build the library for sparse regression
        '''
        X_library = [
            x, 
            xtt, 
            x**2, 
            np.multiply(x.reshape(-1, 1), xt.reshape(-1, 1)),
            np.multiply(x.reshape(-1, 1), xtt.reshape(-1, 1)),
        ]
        X_library = np.squeeze(np.stack(X_library, axis=-1))
        names = ['x', 'xtt', 'x^2', 'x*xt', 'x*xtt']
        y_library = xt
        return X_library, y_library, names
    
    @staticmethod
    def fit(X_library, y_library, threshold=0.002):
        '''
        squential threshold with dynamic threshold
        '''
        model = SeqReg()
        coef, _, r2 = model.fit_dynamic_thresh(X_library, y_library, 
                        is_normalize=False, non_zero_term=2, threshold=threshold, fit_intercept=False, model_name='LR')
        print('Fitting r2', r2)
        return coef


def prepare_dataset(is_show=False):
    '''
    prepare a sets of dataset with different parameters
    '''
    data = []
    fit_equ = FitEqu()
    # k, m, A0, c, et, Nt
    params = [
        [1, 1, 0.1, 0.1, 20, 800],
        [0.5, 0.8, 0.05, 0.05, 20, 800],
        [0.1, 0.1, 0.02, 0.004, 20, 800],
        [0.2, 0.2, 0.07, 0.02, 20, 800],
    ]
    if is_show: fig = plt.figure(); 
    for k, m, A0, c, et, Nt in params:
        dt = et / float(Nt)
        df_each = fit_equ.prepare_data(k, m, A0, c, et, Nt)
        if is_show: plt.plot(df_each['t'], df_each['x'])
        x, xt, xtt = fit_equ.cal_derivatives(df_each, dt, Nt)
        X_library, y_library, names = fit_equ.build_library(x, xt, xtt)
        coef = fit_equ.fit(X_library, y_library)
        coef_res = [(each[0], round(each[1], 4)) for each in list(zip(names, coef.tolist())) if abs(each[1]) >= 1e-3]
        coef_res = sorted(coef_res, key=lambda x: abs(x[1]), reverse=True)
        m_coef, k_coef = coef_res[0][1], coef_res[1][1]
        data.append([m, k, A0, c, k_coef, m_coef, abs(k_coef+k/c)/abs(k/c), abs(m_coef+m/c)/abs(m/c)])
    if is_show: 
        plt.xlabel('Time: t/s', fontsize=20)
        plt.ylabel('Displacement: x/m', fontsize=20)
        plt.tick_params(labelsize=14)
        plt.tight_layout()
        plt.show()

    df = pd.DataFrame(
        data, columns=['m', 'k', 'A0', 'c', 
                       'k_coef', 'm_coef', 'k_coef_err_per', 'c_coef_err_per'])
    return df

df = prepare_dataset(is_show=True)
print(df)
Fitting r2 1.0
Fitting r2 1.0
Fitting r2 1.0
Fitting r2 1.0
../_images/discover_spring_clean_17_4.png
     m    k    A0      c   k_coef   m_coef  k_coef_err_per  c_coef_err_per
0  1.0  1.0  0.10  0.100 -10.0086 -10.0170        0.000860        0.001700
1  0.8  0.5  0.05  0.050 -10.0054 -16.0171        0.000540        0.001069
2  0.1  0.1  0.02  0.004 -25.0214 -25.0428        0.000856        0.001712
3  0.2  0.2  0.07  0.020 -10.0086 -10.0170        0.000860        0.001700

Recover -m/c#

Dimension matrix for input parameters: \(\begin{align} D_{in}= \begin{bmatrix} 1, 1, 0, 1 \\ 0, 0, 1, 0 \\ 0, -2, 0, -1 \end{bmatrix} \end{align}\)

Dimension matrix for output parameters: \(\begin{align} D_{out}= \begin{bmatrix} 0 \\ 0 \\ 1 \end{bmatrix} \end{align}\)

Solution space is: \(\begin{align} w &= \begin{bmatrix} 1 \\ 1 \\ 0 \\ -2 \end{bmatrix} * \gamma + \begin{bmatrix} 1 \\ 0 \\ 0 \\ -1 \end{bmatrix} \end{align}\)

The best basis coefficients are \(\gamma=0\).

The best solution \(w^*\) is \(\begin{align} w^* &= \begin{bmatrix} 1 \\ 0 \\ 0 \\ -1 \end{bmatrix} \end{align}\)

class DimensionlessLearning(object):
    '''
    Indentify the explicit form one coefficient using dimensionless learning
    '''
    
    def __init__(self, df, input_list, output_coef, dimension_info, basis_list):
        super(DimensionlessLearning, self).__init__()
        self.df = df
        self.input_list = input_list
        self.output_coef = output_coef
        self.X, self.y = self.prepare_dataset()
        self.dimension_info, self.basis_list = dimension_info, basis_list
        self.basis1_in, self.basis2_in = self.prepare_dimension()

    def prepare_dataset(self):
        '''
        prepare the input and output data
        '''
        X = self.df[self.input_list].to_numpy()
        y = self.df[self.output_coef].to_numpy().reshape(-1, 1)
        return X, y
        
    def prepare_dimension(self):
        '''
        parse dimension for input and output
        '''
        basis1_in, basis2_in = self.basis_list[0], self.basis_list[1]
        return basis1_in, basis2_in

    def fetch_coef_pi(self, coef):
        '''
        parse the combined weights for the input
        '''
        coef_pi = coef[0] * self.basis1_in + self.basis2_in
        return coef_pi
        
    def check_dimension(self, coef):
        '''
        check whether the basis vectors can formulated as the D_out
        '''
        coef_pi = self.fetch_coef_pi(coef)
        print('[check] coef_pi: \n', coef_pi)
        target_D_out = np.dot(self.dimension_info[0], coef_pi)
        print('[check] target_D_out: \n', target_D_out)
        assert np.array_equal(target_D_out, self.dimension_info[1]), 'Wrong target_D_out!'

    def fit_pattern_search(self, seed):
        '''
        pattern search
        '''
        def get_coordinates(a, delta):
            '''
            Build a list to store all possible coordiantes
            '''
            coord_all = []
            for a_ in [a-delta, a, a+delta]:
                if [a_] != [a]:
                    coord_all.append([a_])
            return coord_all
        
        def opt(coef):
            '''
            fit a linear regression
            '''
            coef_pi = self.fetch_coef_pi(coef)
            pi_in = np.prod(np.power(self.X, coef_pi.reshape(-1,)), axis=1).reshape(-1, 1)
            reg =LinearRegression(fit_intercept=False)
            reg.fit(pi_in, self.y)
            y_pred = reg.predict(pi_in)
            r2 = r2_score(self.y, y_pred)
            return r2, coef_pi, reg.coef_

        np.random.seed(seed)
        res, break_points = [], []
        a = np.random.choice(np.linspace(-2, 2, 9), 1)[0]  # [-2, 2] delta=0.5
        # a= 0
        coef = np.array([a]).reshape(-1, 1)

        iter_num, max_iter, delta = 0, 10, 0.5
        while iter_num < max_iter:
            candidate_coord = get_coordinates(a, delta)
            r2_center, reg_coef_center, coef_w_center = opt(coef)
            # print('r2_center', round(r2_center, 2), 'reg_coef_center', [round(each, 2) for each in list(reg_coef_center.reshape(-1,))])
            # print('coef_w_center', coef_w_center)

            if r2_center < 0.2:
                break_points.append([a])
                break

            r2_bounds_val = []
            for [a_] in candidate_coord:
                coef_temp = np.array([a_]).reshape(-1, 1)
                r2_bound, reg_coef_bound, coef_w_bound = opt(coef_temp)
                r2_bounds_val.append(r2_bound)

            # sort r2 from high to low
            highest_index = np.argsort(r2_bounds_val)[::-1][0]
            iter_num += 1

            # udpate the center coordiantes when the R2 in the neighborhood is higher
            if r2_center < r2_bounds_val[highest_index]:
                [a] = candidate_coord[highest_index]
                coef = np.array([a]).reshape(-1, 1)
                coef_pi = self.fetch_coef_pi(coef)
                res_info = {'a': a, 'r2_center': round(r2_bounds_val[highest_index], 4)}
                # print('update', res_info)
                res.append(res_info)
            else:
                break
        
        coef_pi = self.fetch_coef_pi(coef)
        r2, reg_coef_final, coef_w_final = opt(coef)
        return r2, reg_coef_final, coef_w_final


def recover_coef1(seed):
    input_list = ['m', 'k', 'A0', 'c']
    output_coef = 'm_coef'

    D_in = np.mat('1, 0, 0; 1, 0, -2; 0, 1, 0; 1, 0, -1').T
    D_out = np.mat('0;, 0; 1')
    dimension_info = [D_in, D_out]

    basis1_in = np.array([1, 1, 0, -2]).reshape(-1, 1)
    basis2_in = np.array([1, 0, 0, -1]).reshape(-1, 1)
    basis_list = [basis1_in, basis2_in]
    
    dimensionless_learning = DimensionlessLearning(
        df, input_list, output_coef, dimension_info, basis_list)
    # dimensionless_learning.check_dimension(coef=[0])

    # pattern search
    r2, coef, coef_w = dimensionless_learning.fit_pattern_search(seed=seed)
    if r2 > 0.8:
        print('final r2', r2, coef.flatten(), coef_w)


for i in range(5):
    recover_coef1(seed=i)
final r2 0.9999994702111235 [ 1.  0.  0. -1.] [[-1.00155745]]
final r2 0.9999994702111235 [ 1.  0.  0. -1.] [[-1.00155745]]

Recover -k/c#

Dimension matrix for input parameters: \(\begin{align} D_{in}= \begin{bmatrix} 1, 1, 0, 1 \\ 0, 0, 1, 0 \\ 0, -2, 0, -1 \end{bmatrix} \end{align}\)

Dimension matrix for output parameters: \(\begin{align} D_{out}= \begin{bmatrix} 0 \\ 0 \\ -1 \end{bmatrix} \end{align}\)

Solution space is: \(\begin{align} w &= \begin{bmatrix} 1 \\ 1 \\ 0 \\ -2 \end{bmatrix} * \gamma + \begin{bmatrix} 0 \\ 1 \\ 0 \\ -1 \end{bmatrix} \end{align}\)

The best basis coefficients are \(\gamma=0\).

The best solution \(w^*\) is \(\begin{align} w^* &= \begin{bmatrix} 0 \\ 1 \\ 0 \\ -1 \end{bmatrix} \end{align}\)

def recover_coef2(seed):
    input_list = ['m', 'k', 'A0', 'c']
    output_coef = 'k_coef'

    D_in = np.mat('1, 0, 0; 1, 0, -2; 0, 1, 0; 1, 0, -1').T
    D_out = np.mat('0;, 0; -1')
    dimension_info = [D_in, D_out]

    basis1_in = np.array([1, 1, 0, -2]).reshape(-1, 1)
    basis2_in = np.array([0, 1, 0, -1]).reshape(-1, 1)
    basis_list = [basis1_in, basis2_in]
    
    dimensionless_learning = DimensionlessLearning(
        df, input_list, output_coef, dimension_info, basis_list)
    # dimensionless_learning.check_dimension(coef=[0, 1])

    # pattern search
    r2, coef, coef_w = dimensionless_learning.fit_pattern_search(seed=seed)
    if r2 > 0.8:
        print('final r2', r2, coef.flatten(), coef_w)


for i in range(5):
    recover_coef2(seed=i)
final r2 0.9999999469825553 [ 0.  1.  0. -1.] [[-1.0008227]]
final r2 0.9999999469825553 [ 0.  1.  0. -1.] [[-1.0008227]]