Skip to content

Commit

Permalink
update kinova & identification & utils & add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wissem01chiha committed Jul 8, 2024
1 parent 072262b commit 436adc6
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 164 deletions.
2 changes: 2 additions & 0 deletions exemple/kinova/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ optimisation:
c2 : 0.9
r1: -0.2
r2: 1.1
iter: 100

cmaes:
stiffness_params: [0.25, 0.26, 0.369, 0.125, 0.258, 0.123, 0.11]
actuator_params:
Expand Down
2 changes: 2 additions & 0 deletions exemple/kinova/identification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# identification of the manipulator diffrent models
# implemented with diffrent identifcation alorithms
23 changes: 15 additions & 8 deletions exemple/kinova/simulate.py → exemple/kinova/simulation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
"""
###########################################################################################
# tghis script for robot data visulization adn the static simulation of the robt models
# with pataerts stored in config.yml file
# all figures are saved in figure/pyDynamapp
#
###########################################################################################
import sys
import os
import pandas as pd
Expand All @@ -15,20 +18,22 @@
mlogger = logging.getLogger('matplotlib')
mlogger.setLevel(logging.WARNING)

src_folder = os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), '..', 'src'))
src_folder = os.path.abspath(os.path.join(os.path.dirname(\
os.path.dirname(__file__)), '..', 'src'))
sys.path.append(src_folder)

from dynamics import Robot, StateSpace
from utils import RobotData, plotArray, plot2Arrays, yaml2dict, RMSE, MAE

cutoff_frequency = 3
config_params = yaml2dict("C:/Users/chiha/OneDrive/Bureau/Dynamium/dynamic-identification/exemple/kinova/config.yml")
data = RobotData(config_params['identification']['dataFilePath'])
config_params = yaml2dict(\
"C:/Users/chiha/OneDrive/Bureau/Dynamium/dynamic-identification/exemple/kinova/config.yml")
data = RobotData(config_params['identification']['dataFilePath'])

fildata = data.LowPassfilter(cutoff_frequency)
fildata = data.lowPassfilter(cutoff_frequency)
kinova = Robot()

q_f = fildata['position']
q_f = fildata ['position']
qp_f = fildata['velocity']
qpp_f = fildata['desiredAcceleration']
current_f = fildata['current']
Expand All @@ -44,6 +49,8 @@
rmse_time = RMSE(torque, data.torque_rne,axis=1)
# compute the model with computed matrices tau =M(q)qddot+C(q,qp)qp+G(q)
tau_sim = np.zeros_like(torque)


x = 0.0001*np.abs(np.random.rand(13*7))
tau_f =kinova.computeFrictionTorques(qp,q)
for i in range(data.numRows):
Expand Down
113 changes: 44 additions & 69 deletions src/identification/IDIMNLS.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
import numpy as np
from typing import Callable
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from scipy.optimize import least_squares, curve_fit
from utils import RMSE
from utils import RMSE, clampArray, plotArray, plot2Arrays

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -23,18 +21,22 @@ class IDIMNLS:
- identificationModel : function that return the model computed torques.
of shape : ( Nsamples * ndof )
"""
def __init__(self,nVars,output,identificationModel: Callable[[np.ndarray],np.ndarray],time_step=0.001) -> None:
self.time_step = time_step
self.output = output
self.nVars = nVars

def __init__(self,nVars,output,identificationModel: Callable[[np.ndarray],np.ndarray],\
upperBound=10,lowerBound=0.001,time_step=0.001) -> None:

# Class Attributes
self.time_step = time_step
self.output = output
self.nVars = nVars
self.upperBound = upperBound
self.lowerBound = lowerBound
self.identificationModel = identificationModel
self.optimized_params = None
assert (np.ndim(self.output) == 2)

def __str__(self) -> str:
return (f"IDIMNLS Model with {self.nVars} optimization variables,"
f"output shape: {self.torque.shape}, "
f"output shape: {self.output.shape}, "
f"time step: {self.time_step}")

def computeLsCostFunction(self, x:np.ndarray):
Expand All @@ -45,17 +47,24 @@ def computeLsCostFunction(self, x:np.ndarray):
"""
if self.nVars < x.size :
xnew = np.concatenate([x[0:self.nVars], np.zeros(x.size-self.nVars)])
xnew = clampArray(xnew,self.lowerBound,self.upperBound)
tau_s = self.identificationModel(xnew)
elif self.nVars == x.size:
x = clampArray(x,self.lowerBound,self.upperBound)
tau_s = self.identificationModel(x)
else:
logger.error(\
'Identification Engine: Optimisation Variables number should be <= optimisation vector. ')
'Identification Engine: Optimisation Variables number should be <= input vector size. ')

rmse = RMSE(self.output, tau_s)
cost = np.mean(rmse**2)
return cost


def computeRelativeError(self):
relaErr =0
return relaErr


def evaluate(self)->None:
"""Evaluate the model's performance using the current parameters."""

Expand All @@ -67,15 +76,15 @@ def evaluate(self)->None:
mean_rmse = np.mean(rmse)
logger.info(f"Evaluation result - Mean RMSE: {mean_rmse:.4f}")

def optimize(self, x0:np.ndarray, method='least_square', tol=0.5):
def optimize(self, x0:np.ndarray, method='least_square', tol=0.0001):
"""
Optimize the cost function with NLS alorithms to mimize it value.
Args:
x0 : numpy-ndarry : initial paramters values estimation.
method : optimisation algorithm
tol : optimization alorithm error stop tolerence.
"""
assert x0.size == self.nVars
xOpt = x0
if method == 'least_square':
try:
xOpt = least_squares(self.computeLsCostFunction, x0, xtol=tol)
Expand All @@ -89,76 +98,42 @@ def optimize(self, x0:np.ndarray, method='least_square', tol=0.5):
len(self.output))
popt, _ = curve_fit(self.identificationModel, x_data, \
self.output , p0=init_params,method='trf')
self.optimized_params = popt
self.optimized_params = popt.x
except Exception as e:
logger.error(f"An error occurred during optimization: {e}")
else:
logger.error('Optimisation method Not Supported!')
return xOpt

def visualizeCostFunction(self)->None:
""" """



"""
Plot the cost function scalar variation respect to ||x|| only
the optimisation variables
are considerd
"""
plt.figure(figsize=(12, 6))


def visualizeError(self,title=None, ylabel=None)->None:
"""Plot the error between simulated and real torque using Seaborn."""
"""Plot the root squred error between simulated and inputs"""

if self.optimized_params is None:
logger.error("No optimized parameters found. Run optimize() first.")
logger.error("Identification Engine : No optimized parameters found. Run optimize() first.")
return
tau_s = self.identificationModel(self.optimized_params)
error = self.torque - tau_s
time_steps = np.arange(error.shape[0])
error_df = pd.DataFrame(error, columns=[f'Joint {i+1}' for i in \
range(error.shape[1])])
error_df['Time Step'] = time_steps
error_df_melted = error_df.melt(id_vars='Time Step', \
var_name='Joint', value_name='Error')

plt.figure(figsize=(12, 6))
sns.lineplot(data=error_df_melted, x='Time Step', y='Error', hue='Joint')
if not(title is None):
plt.title(title,fontsize=9)
plt.xlabel('Time (ms)',fontsize=9)
if not(ylabel is None):
plt.ylabel(ylabel,fontsize=9)
plt.legend(title='Joint',fontsize=9)
rmse = RMSE(self.output, tau_s,1)
plotArray(rmse,'err per joint','abs err')

def visualizeResults(self, title=None, y_label=None)->None:
"""Plot the simulated and real torque in one figure."""
def visualizeResults(self, title=None, y_label=None)->None:
"""Plot the simulated and real signals in one figure."""

if self.optimized_params is None:
logger.error("No optimized parameters found. Run optimize() first.")

tau_s = self.identificationModel(self.optimized_params)
time_steps = np.arange(self.torque.shape[0])

real_df = pd.DataFrame(self.torque, columns=[f'Joint {i+1}' for i in \
range(self.torque.shape[1])])
simulated_df = pd.DataFrame(tau_s, columns=[f'Joint {i+1}' for i in \
range(tau_s.shape[1])])

real_df['Time'] = time_steps
simulated_df['Time'] = time_steps

combined_df = pd.merge(real_df.melt(id_vars='Time', var_name='Joint', \
value_name='Real Torque'),
simulated_df.melt(id_vars='Time', var_name='Joint',\
value_name='Simulated Torque'),
on=['Time', 'Joint'])

plt.figure(figsize=(12, 6))
sns.lineplot(data=combined_df, x='Time', y='Real Torque', hue='Joint',\
style='Joint', dashes=False, markers=True, alpha=0.6)
sns.lineplot(data=combined_df, x='Time', y='Simulated Torque', hue='Joint',\
style='Joint', dashes=True)
if self.optimized_params is None:
logger.error("No optimized parameters found. Run optimize() first.")
tau_s = self.identificationModel(self.optimized_params)
plot2Arrays(tau_s,self.output,'simultion','true',title)

def visualizeRelativeError(self):
""" """


if title is not None:
plt.title(title)
plt.xlabel('Time (ms)', fontsize=9)
if y_label is not None:
plt.ylabel(y_label, fontsize=9)
plt.legend(title='Joint', fontsize=9)

10 changes: 7 additions & 3 deletions src/identification/IDIMOLS.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import numpy as np
import numpy as np
import logging
import matplotlib.pyplot as plt


class IDIMOLS:
""" Inverse Dynamics Identification Method Ordiany Least Square.
this class valid only when the friction
Args:
-
"""
def __init__(self) -> None:
def __init__(self, ) -> None:
pass

def computeCostFunction(self):
def computeLsCostFunction(self):
""" """
cost = 0
return cost
22 changes: 16 additions & 6 deletions src/identification/Kalman.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


class Kalman:
"""
KALMAN
Expand All @@ -15,6 +14,8 @@ class Kalman:
R (np.ndarray): Covariance of the observation noise.
P (np.ndarray): Initial error covariance.
x0 (np.ndarray): Initial state estimate.
Ref:
An Introduction to the Kalman Filter - Greg Welch and Gary Bishop.
"""

def __init__(self, F, H, Q, R, P, x0, alpha=0.2, beta=1):
Expand Down Expand Up @@ -58,11 +59,14 @@ def filter(self, observations):
states (np.ndarray): Sequence of state estimates.
"""
states = []
innovations = []
for z in observations:
self.predict()
self.update(z)
y = self.update(z)
states.append(self.x.copy())
return np.array(states)
innovations.append(y.copy())
return np.array(states), np.array(innovations)

def evaluate(self, true_states, estimated_states):
"""
Expand All @@ -89,6 +93,12 @@ def adaptNoiseCovariance(self, y):
self.Q = self.Q + self.alpha * (np.outer(y, y) - self.Q)
self.R = self.R + self.beta * (np.outer(y, y) - self.R)

def visualizeSatets(self):
""" """
plt.figure(figsize=(12, 6))
def visualizeEstimates(self):
"""Visualize the estimated states over time."""
plt.figure(figsize=(12, 6))
for i in range(self.x.shape[1]):
plt.plot(self.x[:, i], label=f'Estimated state {i+1}')
plt.xlabel('Time (ms)')
plt.ylabel('state values')
plt.title('Estimated states over time')
plt.legend()
4 changes: 2 additions & 2 deletions src/identification/N4SID.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import numpy as np
from dynamics import StateSpace as ss
from dynamics import StateSpace

class N4SID:
"""
Numerical Subspace State Space System IDentification
Numerical Subspace State Space System Identification
This method uses input-output data to construct a state-space model by minimizing
a prediction error criterion.
Expand Down
1 change: 1 addition & 0 deletions src/optimisation/DEA.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
from numba import cuda

class DEA:
"""
Expand Down
14 changes: 10 additions & 4 deletions src/optimisation/PSO.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import random
import numpy as np
from numba import cuda
import warnings
from numba.cuda.dispatcher import NumbaPerformanceWarning
import logging

warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)
from numba import cuda
if (cuda.is_available()):
from numba.cuda.dispatcher import NumbaPerformanceWarning
warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)

W = 0.85
c1 = 0.72
c2 = 0.91

print(cuda.is_available())
n_iterations = int(input("Inform the number of iterations: "))
target_error = float(input("Inform the target error: "))
n_particles = int(input("Inform the number of particles: "))
Expand Down Expand Up @@ -98,6 +100,10 @@ def move_particles(self):
for i, particle in enumerate(self.particles):
particle.position = d_positions[i]
particle.velocity = d_velocities[i]

class PSO:
def __init__(self, iter,particles,w,c1,c2) -> None:
pass



Loading

0 comments on commit 436adc6

Please sign in to comment.