Skip to content

Commit

Permalink
Handle multiple models and datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkBlyth committed Apr 30, 2024
1 parent 511e9e1 commit aebf45b
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 162 deletions.
22 changes: 15 additions & 7 deletions pybop/_optimisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Optimisation:
If True, the optimization progress is printed (default: False).
physical_viability : bool, optional
If True, the feasibility of the optimised parameters is checked (default: True).
allow_infeasible_solutions : bool, optional
allow_infeasible_solutions : bool or list[bool], optional
If True, infeasible parameter values will be allowed in the optimisation (default: True).
Attributes
Expand Down Expand Up @@ -65,9 +65,14 @@ def __init__(

# Set whether to allow infeasible locations
if self.cost.problem is not None and hasattr(self.cost.problem, "_model"):
self.cost.problem._model.allow_infeasible_solutions = (
self.allow_infeasible_solutions
)
try:
for model, allow_infeasible in zip(
self.cost.problem._model, self.allow_infeasible_solutions
):
model.allow_infeasible_solutions = allow_infeasible
except TypeError:
for model in self.cost.problem._model:
model.allow_infeasible_solutions = self.allow_infeasible_solutions
else:
# Turn off this feature as there is no model
self.physical_viability = False
Expand Down Expand Up @@ -517,13 +522,16 @@ def check_optimal_parameters(self, x):
Check if the optimised parameters are physically viable.
"""

if self.cost.problem._model.check_params(
inputs=x, allow_infeasible_solutions=False
if all(
[
model.check_params(inputs=x, allow_infeasible_solutions=False)
for model in self.cost.problem._model
]
):
return
else:
warnings.warn(
"Optimised parameters are not physically viable! \nConsider retrying the optimisation"
"One or more optimised parameters are not physically viable! \nConsider retrying the optimisation"
+ " with a non-gradient-based optimiser and the option allow_infeasible_solutions=False",
UserWarning,
stacklevel=2,
Expand Down
175 changes: 96 additions & 79 deletions pybop/costs/fitting_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class RootMeanSquaredError(BaseCost):
"""

def __init__(self, problem):
super(RootMeanSquaredError, self).__init__(problem)
def __init__(self, problem, weights=None):
super(RootMeanSquaredError, self).__init__(problem, weights=weights)

# Default fail gradient
self._de = 1.0
Expand All @@ -41,62 +41,61 @@ def _evaluate(self, x, grad=None):
The root mean square error.
"""
prediction = self.problem.evaluate(x)

for key in self.signal:
if len(prediction.get(key, [])) != len(self._target.get(key, [])):
return np.float64(np.inf) # prediction doesn't match target

e = np.array(
[
np.sqrt(np.mean((prediction[signal] - self._target[signal]) ** 2))
for signal in self.signal
]
)
predictions = self.problem.evaluate(x)
errors = np.zeros(len(predictions))
# For each dataset...
for i, (prediction, target) in enumerate(zip(predictions, self._target)):
for key in self.signal:
if len(prediction.get(key, [])) != len(target.get(key, [])):
return np.float64(np.inf) # prediction doesn't match target

# Calculate the root mean square error
e = np.array(
[
np.sqrt(np.mean((prediction[signal] - target[signal]) ** 2))
for signal in self.signal
]
)
if self.n_outputs == 1:
errors[i] = e.item()
else:
errors[i] = np.sum(e)

if self.n_outputs == 1:
return e.item()
else:
return np.sum(e)
# Weight errors and return
if self.weights is None:
return errors.sum()
return np.inner(self.weights, errors)

def _evaluateS1(self, x):
"""
Compute the cost and its gradient with respect to the parameters.
Parameters
----------
x : array-like
The parameters for which to compute the cost and gradient.
ys, dys = self.problem.evaluateS1(x)

errors, gradients = [], []
for i, (prediction, dy, target) in enumerate(zip(ys, dys, self._target)):
for key in self.signal:
if len(prediction.get(key, [])) != len(target.get(key, [])):
e = np.float64(np.inf)
de = self._de * np.ones(self.n_parameters)
return e, de

r = np.array(
[prediction[signal] - target[signal] for signal in self.signal]
)
errors.append(np.sqrt(np.mean(r**2, axis=1)))
gradients.append(
np.mean((r * dy.T), axis=2)
/ (np.sqrt(np.mean((r * dy.T) ** 2, axis=2)) + np.finfo(float).eps)
)

Returns
-------
tuple
A tuple containing the cost and the gradient. The cost is a float,
and the gradient is an array-like of the same length as `x`.
errors = np.array(errors)
gradients = np.array(gradients)
if self.weights is None:
return errors.sum(), gradients.sum(axis=1)

Raises
------
ValueError
If an error occurs during the calculation of the cost or gradient.
"""
y, dy = self.problem.evaluateS1(x)

for key in self.signal:
if len(y.get(key, [])) != len(self._target.get(key, [])):
e = np.float64(np.inf)
de = self._de * np.ones(self.n_parameters)
return e, de

r = np.array([y[signal] - self._target[signal] for signal in self.signal])
e = np.sqrt(np.mean(r**2, axis=1))
de = np.mean((r * dy.T), axis=2) / (
np.sqrt(np.mean((r * dy.T) ** 2, axis=2)) + np.finfo(float).eps
# Weight and return
weighted_gradients = np.inner(self.weights.reshape((1, 1, -1)), gradients).sum(
axis=1
)

if self.n_outputs == 1:
return e.item(), de.flatten()
else:
return np.sum(e), np.sum(de, axis=1)
return np.inner(self.weights, errors), weighted_gradients

def set_fail_gradient(self, de):
"""
Expand Down Expand Up @@ -155,22 +154,29 @@ def _evaluate(self, x, grad=None):
float
The sum of squared errors.
"""
prediction = self.problem.evaluate(x)

for key in self.signal:
if len(prediction.get(key, [])) != len(self._target.get(key, [])):
return np.float64(np.inf) # prediction doesn't match target
predictions = self.problem.evaluate(x)
errors = np.zeros(len(predictions))
# For each dataset...
for i, (prediction, target) in enumerate(zip(predictions, self._target)):
for key in self.signal:
if len(prediction.get(key, [])) != len(target.get(key, [])):
return np.float64(np.inf) # prediction doesn't match target

e = np.array(
[
np.sum(((prediction[signal] - target[signal]) ** 2))
for signal in self.signal
]
)
if self.n_outputs == 1:
errors[i] = e.item()
else:
errors[i] = np.sum(e)

e = np.array(
[
np.sum(((prediction[signal] - self._target[signal]) ** 2))
for signal in self.signal
]
)
if self.n_outputs == 1:
return e.item()
else:
return np.sum(e)
# Weight errors and return
if self.weights is None:
return errors.sum()
return np.inner(self.weights, errors)

def _evaluateS1(self, x):
"""
Expand All @@ -192,18 +198,29 @@ def _evaluateS1(self, x):
ValueError
If an error occurs during the calculation of the cost or gradient.
"""
y, dy = self.problem.evaluateS1(x)
for key in self.signal:
if len(y.get(key, [])) != len(self._target.get(key, [])):
e = np.float64(np.inf)
de = self._de * np.ones(self.n_parameters)
return e, de

r = np.array([y[signal] - self._target[signal] for signal in self.signal])
e = np.sum(np.sum(r**2, axis=0), axis=0)
de = 2 * np.sum(np.sum((r * dy.T), axis=2), axis=1)

return e, de
ys, dys = self.problem.evaluateS1(x)

errors, gradients = [], []
for i, (prediction, dy, target) in enumerate(zip(ys, dys, self._target)):
for key in self.signal:
if len(prediction.get(key, [])) != len(target.get(key, [])):
e = np.float64(np.inf)
de = self._de * np.ones(self.n_parameters)
return e, de

r = np.array(
[prediction[signal] - target[signal] for signal in self.signal]
)
errors.append(np.sum(np.sum(r**2, axis=0), axis=0))
gradients.append(2 * np.sum(np.sum((r * dy.T), axis=2), axis=1))

errors = np.array(errors)
gradients = np.array(gradients)
if self.weights is None:
return errors, gradients
return np.inner(self.weights, errors), np.inner(self.weights, gradients)



def set_fail_gradient(self, de):
"""
Expand Down
54 changes: 29 additions & 25 deletions pybop/plotting/plot_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,34 @@ def plot_dataset(
# Get data dictionary
dataset.check(signal)

# Compile ydata and labels or legend
y = [dataset[s] for s in signal]
if len(signal) == 1:
yaxis_title = signal[0]
if trace_names is None:
trace_names = ["Data"]
else:
yaxis_title = "Output"
if trace_names is None:
trace_names = pybop.StandardPlot.remove_brackets(signal)

# Create the figure
fig = pybop.plot_trajectories(
x=dataset["Time [s]"],
y=y,
trace_names=trace_names,
show=False,
xaxis_title="Time / s",
yaxis_title=yaxis_title,
)
fig.update_layout(**layout_kwargs)
if "ipykernel" in sys.modules and show:
fig.show("svg")
elif show:
fig.show()
if not hasattr(dataset, '__len__'):
dataset = [dataset]

for data in dataset:
# Compile ydata and labels or legend
y = [data[s] for s in signal]
if len(signal) == 1:
yaxis_title = signal[0]
if trace_names is None:
trace_names = ["Data"]
else:
yaxis_title = "Output"
if trace_names is None:
trace_names = pybop.StandardPlot.remove_brackets(signal)

# Create the figure
fig = pybop.plot_trajectories(
x=data["Time [s]"],
y=y,
trace_names=trace_names,
show=False,
xaxis_title="Time / s",
yaxis_title=yaxis_title,
)
fig.update_layout(**layout_kwargs)
if "ipykernel" in sys.modules and show:
fig.show("svg")
elif show:
fig.show()

return fig
Loading

0 comments on commit aebf45b

Please sign in to comment.