Skip to content

Commit

Permalink
Add a WeightedCost (#329)
Browse files Browse the repository at this point in the history
* Add a WeightedCost

* Fix setting

* Add tests

* Update base_cost.py

* Update CHANGELOG.md

* Update imports

* Update x0 to [0.5]

* Update spm_weighted_cost.py

* Update TypeError with test

* Update spm_weighted_cost.py

* Update evaluate and _evaluate

* Pass current_sensitivities to MAP

* Add test_weighted_design_cost

* Add evaluate back into GaussianLogLikelihood

* Update to super()

* Update prediction to self._current_prediction

* Update y to self._current_prediction

* Update cost_list to args

* Add descriptions

* refactor: move WeightedCost into separate file

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Brady Planden <[email protected]>
Co-authored-by: Brady Planden <[email protected]>
  • Loading branch information
4 people authored Jul 22, 2024
1 parent e34099f commit f3a32bc
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Features

- [#327](https://github.com/pybop-team/PyBOP/issues/327) - Adds the `WeightedCost` subclass, defines when to evaluate a problem and adds the `spm_weighted_cost` example script.
- [#393](https://github.com/pybop-team/PyBOP/pull/383) - Adds Minkowski and SumofPower cost classes, with an example and corresponding tests.
- [#403](https://github.com/pybop-team/PyBOP/pull/403/) - Adds lychee link checking action.

Expand Down
61 changes: 61 additions & 0 deletions examples/scripts/spm_weighted_cost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import numpy as np

import pybop

# Parameter set and model definition
parameter_set = pybop.ParameterSet.pybamm("Chen2020")
model = pybop.lithium_ion.SPM(parameter_set=parameter_set)

# Fitting parameters
parameters = pybop.Parameters(
pybop.Parameter(
"Negative electrode active material volume fraction",
prior=pybop.Gaussian(0.68, 0.05),
bounds=[0.5, 0.8],
true_value=parameter_set["Negative electrode active material volume fraction"],
),
pybop.Parameter(
"Positive electrode active material volume fraction",
prior=pybop.Gaussian(0.58, 0.05),
bounds=[0.4, 0.7],
true_value=parameter_set["Positive electrode active material volume fraction"],
),
)

# Generate data
sigma = 0.001
t_eval = np.arange(0, 900, 3)
values = model.predict(t_eval=t_eval)
corrupt_values = values["Voltage [V]"].data + np.random.normal(0, sigma, len(t_eval))

# Form dataset
dataset = pybop.Dataset(
{
"Time [s]": t_eval,
"Current function [A]": values["Current [A]"].data,
"Voltage [V]": corrupt_values,
}
)

# Generate problem, cost function, and optimisation class
problem = pybop.FittingProblem(model, parameters, dataset)
cost1 = pybop.SumSquaredError(problem)
cost2 = pybop.RootMeanSquaredError(problem)
weighted_cost = pybop.WeightedCost(cost1, cost2, weights=[1, 100])

for cost in [weighted_cost, cost1, cost2]:
optim = pybop.IRPropMin(cost, max_iterations=60)

# Run the optimisation
x, final_cost = optim.run()
print("True parameters:", parameters.true_value())
print("Estimated parameters:", x)

# Plot the timeseries output
pybop.quick_plot(problem, problem_inputs=x, title="Optimised Comparison")

# Plot convergence
pybop.plot_convergence(optim)

# Plot the cost landscape with optimisation path
pybop.plot2d(optim, steps=15)
1 change: 1 addition & 0 deletions pybop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
GaussianLogLikelihoodKnownSigma,
MAP,
)
from .costs._weighted_cost import WeightedCost

#
# Optimiser class
Expand Down
64 changes: 46 additions & 18 deletions pybop/costs/_likelihoods.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ def _evaluate(self, inputs: Inputs, grad: Union[None, np.ndarray] = None) -> flo
"""
Evaluates the Gaussian log-likelihood for the given parameters with known sigma.
"""
y = self.problem.evaluate(inputs)

if not self.verify_prediction(y):
if not self.verify_prediction(self._current_prediction):
return -np.inf

e = np.asarray(
[
np.sum(
self._offset
+ self._multip * np.sum((self._target[signal] - y[signal]) ** 2.0)
+ self._multip
* np.sum(
(self._target[signal] - self._current_prediction[signal]) ** 2.0
)
)
for signal in self.signal
]
Expand All @@ -62,17 +63,22 @@ def _evaluate(self, inputs: Inputs, grad: Union[None, np.ndarray] = None) -> flo

def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]:
"""
Calls the problem.evaluateS1 method and calculates the log-likelihood and gradient.
Calculates the log-likelihood and gradient.
"""
y, dy = self.problem.evaluateS1(inputs)

if not self.verify_prediction(y):
if not self.verify_prediction(self._current_prediction):
return -np.inf, -self._de * np.ones(self.n_parameters)

likelihood = self._evaluate(inputs)

r = np.asarray([self._target[signal] - y[signal] for signal in self.signal])
dl = np.sum((np.sum((r * dy.T), axis=2) / self.sigma2), axis=1)
r = np.asarray(
[
self._target[signal] - self._current_prediction[signal]
for signal in self.signal
]
)
dl = np.sum(
(np.sum((r * self._current_sensitivities.T), axis=2) / self.sigma2), axis=1
)

return likelihood, dl

Expand Down Expand Up @@ -117,6 +123,7 @@ def __init__(
super().__init__(problem)
self._dsigma_scale = dsigma_scale
self._logpi = -0.5 * self.n_time_data * np.log(2 * np.pi)
self._fixed_problem = False # keep problem evaluation within _evaluate

self.sigma = Parameters()
self._add_sigma_parameters(sigma0)
Expand Down Expand Up @@ -189,16 +196,20 @@ def _evaluate(self, inputs: Inputs, grad: Union[None, np.ndarray] = None) -> flo
if np.any(sigma <= 0):
return -np.inf

y = self.problem.evaluate(self.problem.parameters.as_dict())
if not self.verify_prediction(y):
self._current_prediction = self.problem.evaluate(
self.problem.parameters.as_dict()
)
if not self.verify_prediction(self._current_prediction):
return -np.inf

e = np.asarray(
[
np.sum(
self._logpi
- self.n_time_data * np.log(sigma)
- np.sum((self._target[signal] - y[signal]) ** 2.0)
- np.sum(
(self._target[signal] - self._current_prediction[signal]) ** 2.0
)
/ (2.0 * sigma**2.0)
)
for signal in self.signal
Expand All @@ -209,7 +220,7 @@ def _evaluate(self, inputs: Inputs, grad: Union[None, np.ndarray] = None) -> flo

def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]:
"""
Calls the problem.evaluateS1 method and calculates the log-likelihood.
Calculates the log-likelihood and sensitivities.
Parameters
----------
Expand All @@ -227,14 +238,23 @@ def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]:
if np.any(sigma <= 0):
return -np.inf, -self._de * np.ones(self.n_parameters)

y, dy = self.problem.evaluateS1(self.problem.parameters.as_dict())
if not self.verify_prediction(y):
self._current_prediction, self._current_sensitivities = self.problem.evaluateS1(
self.problem.parameters.as_dict()
)
if not self.verify_prediction(self._current_prediction):
return -np.inf, -self._de * np.ones(self.n_parameters)

likelihood = self._evaluate(inputs)

r = np.asarray([self._target[signal] - y[signal] for signal in self.signal])
dl = np.sum((np.sum((r * dy.T), axis=2) / (sigma**2.0)), axis=1)
r = np.asarray(
[
self._target[signal] - self._current_prediction[signal]
for signal in self.signal
]
)
dl = np.sum(
(np.sum((r * self._current_sensitivities.T), axis=2) / (sigma**2.0)), axis=1
)
dsigma = (
-self.n_time_data / sigma + np.sum(r**2.0, axis=1) / (sigma**3.0)
) / self._dsigma_scale
Expand Down Expand Up @@ -300,7 +320,10 @@ def _evaluate(self, inputs: Inputs, grad=None) -> float:
if not np.isfinite(log_prior).any():
return -np.inf

if self._fixed_problem:
self.likelihood._current_prediction = self._current_prediction
log_likelihood = self.likelihood._evaluate(inputs)

posterior = log_likelihood + log_prior
return posterior

Expand Down Expand Up @@ -331,6 +354,11 @@ def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]:
if not np.isfinite(log_prior).any():
return -np.inf, -self._de * np.ones(self.n_parameters)

if self._fixed_problem:
(
self.likelihood._current_prediction,
self.likelihood._current_sensitivities,
) = self._current_prediction, self._current_sensitivities
log_likelihood, dl = self.likelihood._evaluateS1(inputs)

# Compute a finite difference approximation of the gradient of the log prior
Expand Down
138 changes: 138 additions & 0 deletions pybop/costs/_weighted_cost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from typing import Optional

import numpy as np

from pybop import BaseCost
from pybop.parameters.parameter import Inputs


class WeightedCost(BaseCost):
"""
A subclass for constructing a linear combination of cost functions as
a single weighted cost function.
Inherits all parameters and attributes from ``BaseCost``.
Additional Attributes
---------------------
costs : list[pybop.BaseCost]
A list of PyBOP cost objects.
weights : list[float]
A list of values with which to weight the cost values.
_different_problems : bool
If True, the problem for each cost is evaluated independently during
each evaluation of the cost (default: False).
"""

def __init__(self, *args, weights: Optional[list[float]] = None):
self.costs = []
for cost in args:
if not isinstance(cost, BaseCost):
raise TypeError(f"Received {type(cost)} instead of cost object.")
self.costs.append(cost)
self.weights = weights
self._different_problems = False

if self.weights is None:
self.weights = np.ones(len(self.costs))
elif isinstance(self.weights, list):
self.weights = np.array(self.weights)
if not isinstance(self.weights, np.ndarray):
raise TypeError(
"Expected a list or array of weights the same length as costs."
)
if not len(self.weights) == len(self.costs):
raise ValueError(
"Expected a list or array of weights the same length as costs."
)

# Check if all costs depend on the same problem
for cost in self.costs:
if hasattr(cost, "problem") and cost.problem is not self.costs[0].problem:
self._different_problems = True

if not self._different_problems:
super().__init__(self.costs[0].problem)
self._fixed_problem = self.costs[0]._fixed_problem
else:
super().__init__()
self._fixed_problem = False
for cost in self.costs:
self.parameters.join(cost.parameters)

def _evaluate(self, inputs: Inputs, grad=None):
"""
Calculate the weighted cost for a given set of parameters.
Parameters
----------
inputs : Inputs
The parameters for which to compute the cost.
grad : array-like, optional
An array to store the gradient of the cost function with respect
to the parameters.
Returns
-------
float
The weighted cost value.
"""
e = np.empty_like(self.costs)

if not self._fixed_problem and self._different_problems:
self.parameters.update(values=list(inputs.values()))
elif not self._fixed_problem:
self._current_prediction = self.problem.evaluate(inputs)

for i, cost in enumerate(self.costs):
if not self._fixed_problem and self._different_problems:
inputs = cost.parameters.as_dict()
cost._current_prediction = cost.problem.evaluate(inputs)
else:
cost._current_prediction = self._current_prediction
e[i] = cost._evaluate(inputs, grad)

return np.dot(e, self.weights)

def _evaluateS1(self, inputs: Inputs):
"""
Compute the weighted cost and its gradient with respect to the parameters.
Parameters
----------
inputs : Inputs
The parameters for which to compute the cost and gradient.
Returns
-------
tuple
A tuple containing the cost and the gradient. The cost is a float,
and the gradient is an array-like of the same length as `x`.
"""
e = np.empty_like(self.costs)
de = np.empty((len(self.parameters), len(self.costs)))

if not self._fixed_problem and self._different_problems:
self.parameters.update(values=list(inputs.values()))
elif not self._fixed_problem:
self._current_prediction, self._current_sensitivities = (
self.problem.evaluateS1(inputs)
)

for i, cost in enumerate(self.costs):
if not self._fixed_problem and self._different_problems:
inputs = cost.parameters.as_dict()
cost._current_prediction, cost._current_sensitivities = (
cost.problem.evaluateS1(inputs)
)
else:
cost._current_prediction, cost._current_sensitivities = (
self._current_prediction,
self._current_sensitivities,
)
e[i], de[:, i] = cost._evaluateS1(inputs)

e = np.dot(e, self.weights)
de = np.dot(de, self.weights)

return e, de
Loading

0 comments on commit f3a32bc

Please sign in to comment.