Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

59 cosine hungarian #40

Merged
merged 20 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ channels:
dependencies:
- gensim >=3.8.0
- matplotlib
- numba >=0.47
- numpy
- pip
- pyteomics >=4.2
Expand Down
1 change: 1 addition & 0 deletions conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ channels:
dependencies:
- gensim >=3.8.0
- matplotlib
- numba >=0.47
- numpy
- pyteomics >=4.2
- python >=3.7,<3.8
Expand Down
1 change: 1 addition & 0 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ requirements:
run:
- gensim >=3.8.0
- matplotlib
- numba >=0.47
- numpy
- pip
- pyteomics >=4.2
Expand Down
96 changes: 96 additions & 0 deletions matchms/similarity/CosineHungarian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import Tuple
import numpy
from scipy.optimize import linear_sum_assignment
from matchms.similarity.collect_peak_pairs import collect_peak_pairs
from matchms.typing import SpectrumType


class CosineHungarian:
"""Calculate 'cosine similarity score' between two spectra (using Hungarian algorithm).

The cosine score aims at quantifying the similarity between two mass spectra.
The score is calculated by finding best possible matches between peaks
of two spectra. Two peaks are considered a potential match if their
m/z ratios lie within the given 'tolerance'.
The underlying peak assignment problem is here solved using the Hungarian algorithm.
This can perform notably slower than the 'greedy' implementation in CosineGreedy, but
does represent a mathematically proper solution to the problem.

"""
def __init__(self, tolerance=0.1):
"""

Args:
----
tolerance: float
Peaks will be considered a match when <= tolerance appart. Default is 0.1.
"""
self.tolerance = tolerance

def __call__(self, spectrum1: SpectrumType, spectrum2: SpectrumType) -> Tuple[float, int]:
"""Calculate cosine score between two spectra.

Args:
----
spectrum1: SpectrumType
Input spectrum 1.
spectrum2: SpectrumType
Input spectrum 2.
"""
def get_peaks_arrays():
"""Get peaks mz and intensities as numpy array."""
spec1 = numpy.vstack((spectrum1.peaks.mz, spectrum1.peaks.intensities)).T
cwmeijer marked this conversation as resolved.
Show resolved Hide resolved
spec2 = numpy.vstack((spectrum2.peaks.mz, spectrum2.peaks.intensities)).T
assert max(spec1[:, 1]) <= 1, ("Input spectrum1 is not normalized. ",
"Apply 'normalize_intensities' filter first.")
assert max(spec2[:, 1]) <= 1, ("Input spectrum2 is not normalized. ",
"Apply 'normalize_intensities' filter first.")
return spec1, spec2

def get_matching_pairs():
"""Get pairs of peaks that match within the given tolerance."""
matching_pairs = collect_peak_pairs(spec1, spec2, self.tolerance, shift=0.0)
return sorted(matching_pairs, key=lambda x: x[2], reverse=True)

def get_matching_pairs_matrix():
"""Create matrix of multiplied intensities of all matching pairs
between spectrum1 and spectrum2.
Returns
paired_peaks1:
list of paired peaks of spectrum1
paired_peaks2:
list of paired peaks of spectrum2
matching_pairs_matrix:
Array of multiplied intensities between all matching peaks.
"""
if len(matching_pairs) == 0:
return None, None, None
paired_peaks1 = list({x[0] for x in matching_pairs})
paired_peaks2 = list({x[1] for x in matching_pairs})
matrix_size = (len(paired_peaks1), len(paired_peaks2))
matching_pairs_matrix = numpy.ones(matrix_size)
for match in matching_pairs:
matching_pairs_matrix[paired_peaks1.index(match[0]),
paired_peaks2.index(match[1])] = 1 - match[2]
return paired_peaks1, paired_peaks2, matching_pairs_matrix

def solve_hungarian():
"""Use hungarian agorithm to solve the linear sum assignment problem."""
row_ind, col_ind = linear_sum_assignment(matching_pairs_matrix)
score = len(row_ind) - matching_pairs_matrix[row_ind, col_ind].sum()
used_matches = [(paired_peaks1[x], paired_peaks2[y]) for (x, y) in zip(row_ind, col_ind)]
return score, used_matches

def calc_score():
"""Calculate cosine similarity score."""
if matching_pairs_matrix is not None:
score, used_matches = solve_hungarian()
# Normalize score:
score = score/max(numpy.sum(spec1[:, 1]**2), numpy.sum(spec2[:, 1]**2))
return score, len(used_matches)
return 0.0, 0

spec1, spec2 = get_peaks_arrays()
matching_pairs = get_matching_pairs()
paired_peaks1, paired_peaks2, matching_pairs_matrix = get_matching_pairs_matrix()
return calc_score()
2 changes: 2 additions & 0 deletions matchms/similarity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""similarity module"""
from .CosineGreedy import CosineGreedy
from .CosineHungarian import CosineHungarian
from .IntersectMz import IntersectMz


__all__ = [
"CosineGreedy",
"CosineHungarian",
"IntersectMz"
]
33 changes: 33 additions & 0 deletions matchms/similarity/collect_peak_pairs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import numba
import numpy


@numba.njit
def collect_peak_pairs(spec1, spec2, tolerance, shift=0):
"""Find matching pairs between two spectra.

Args
----
spec1: numpy array
Spectrum peaks and intensities as numpy array.
spec2: numpy array
Spectrum peaks and intensities as numpy array.
tolerance : float
Peaks will be considered a match when <= tolerance appart.
shift : float, optional
Shift spectra peaks by shift. The default is 0.

Returns
-------
matching_pairs : list
List of found matching peaks.
"""
matching_pairs = []

for idx in range(len(spec1)):
intensity = spec1[idx, 1]
matches = numpy.where((numpy.abs(spec2[:, 0] - spec1[idx, 0] + shift) <= tolerance))[0]
for match in matches:
matching_pairs.append((idx, match, intensity*spec2[match][1]))

return matching_pairs
28 changes: 28 additions & 0 deletions tests/test_collect_peak_pairs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import numpy
from matchms.similarity.collect_peak_pairs import collect_peak_pairs


def test_cosine_hungarian_tolerance_01():
"""Test finding expected peak matches within tolerance=0.2."""
spec1 = numpy.array([[100, 200, 300, 500],
[0.1, 0.1, 1.0, 1.0]], dtype="float").T

spec2 = numpy.array([[105, 205.1, 300, 500.1],
[0.1, 0.1, 1.0, 1.0]], dtype="float").T

matching_pairs = collect_peak_pairs(spec1, spec2, tolerance=0.2)
assert len(matching_pairs) == 2, "Expected different number of matching peaks"
assert matching_pairs == [(2, 2, 1.0), (3, 3, 1.0)], "Expected different matchin pairs."


def test_cosine_hungarian_tolerance_01_shift_min5():
"""Test finding expected peak matches when given a mass_shift of -5.0."""
spec1 = numpy.array([[100, 200, 300, 500],
[1.0, 1.0, 0.1, 0.1]], dtype="float").T

spec2 = numpy.array([[105, 205.1, 300, 500.1],
[1.0, 1.0, 0.1, 0.1]], dtype="float").T

matching_pairs = collect_peak_pairs(spec1, spec2, tolerance=0.2, shift=-5.0)
assert len(matching_pairs) == 2, "Expected different number of matching peaks"
assert matching_pairs == [(0, 0, 1.0), (1, 1, 1.0)], "Expected different matchin pairs."
114 changes: 114 additions & 0 deletions tests/test_cosine_hungarian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import numpy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you some how clearify how you got to the 'expected' values in all the tests? Did you calculate that manually or something like that? I'm just thinking what one would think if any of these tests start to fail at some point and you are not there to explain what you meant.
If the calculation is simple and clear, you may want to include it in the test as opposed to these 'magic' ground truth values.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the test cases to simpler ones where I now compare the actual scores with expected values that are derived within the same test (same as for you similar comment in #38 ).

import pytest
from matchms import Spectrum
from matchms.similarity import CosineHungarian


def test_cosine_hungarian_without_parameters():
"""Compare output cosine score with own calculation on simple dummy spectrums."""
spectrum_1 = Spectrum(mz=numpy.array([100, 200, 300, 500, 510], dtype="float"),
intensities=numpy.array([0.1, 0.2, 1.0, 0.3, 0.4], dtype="float"))

spectrum_2 = Spectrum(mz=numpy.array([100, 200, 290, 490, 510], dtype="float"),
intensities=numpy.array([0.1, 0.2, 1.0, 0.3, 0.4], dtype="float"))
cosine_hungarian = CosineHungarian()
score, n_matches = cosine_hungarian(spectrum_1, spectrum_2)

# Derive expected cosine score
expected_matches = [0, 1, 4] # Those peaks have matching mz values (within given tolerance)
multiply_matching_intensities = spectrum_1.peaks.intensities[expected_matches] \
* spectrum_2.peaks.intensities[expected_matches]
expected_score = multiply_matching_intensities.sum() / (spectrum_1.peaks.intensities ** 2).sum()

assert score == pytest.approx(expected_score, 0.0001), "Expected different cosine score."
assert n_matches == len(expected_matches), "Expected different number of matching peaks."


def test_cosine_hungarian_with_tolerance_0_2():
"""Compare output cosine score for tolerance 0.2 with own calculation on simple dummy spectrums."""
spectrum_1 = Spectrum(mz=numpy.array([100, 299, 300, 301, 510], dtype="float"),
intensities=numpy.array([0.1, 1.0, 0.2, 0.3, 0.4], dtype="float"))

spectrum_2 = Spectrum(mz=numpy.array([100, 300, 301, 511], dtype="float"),
intensities=numpy.array([0.1, 1.0, 0.3, 0.4], dtype="float"))
cosine_hungarian = CosineHungarian(tolerance=0.2)
score, n_matches = cosine_hungarian(spectrum_1, spectrum_2)

# Derive expected cosine score
expected_matches = [[0, 2, 3], [0, 1, 2]] # Those peaks have matching mz values (within given tolerance)
multiply_matching_intensities = spectrum_1.peaks.intensities[expected_matches[0]] \
* spectrum_2.peaks.intensities[expected_matches[1]]
expected_score = multiply_matching_intensities.sum() / (spectrum_1.peaks.intensities ** 2).sum()

assert score == pytest.approx(expected_score, 0.0001), "Expected different cosine score."
assert n_matches == len(expected_matches[0]), "Expected different number of matching peaks."


def test_cosine_hungarian_with_tolerance_2_0():
"""Compare output cosine score for tolerance 2.0 with own calculation on simple dummy spectrums."""
spectrum_1 = Spectrum(mz=numpy.array([100, 299, 300, 301, 510], dtype="float"),
intensities=numpy.array([0.1, 1.0, 0.2, 0.3, 0.4], dtype="float"))

spectrum_2 = Spectrum(mz=numpy.array([100, 300, 301, 511], dtype="float"),
intensities=numpy.array([0.1, 1.0, 0.3, 0.4], dtype="float"))
cosine_hungarian = CosineHungarian(tolerance=2.0)
score, n_matches = cosine_hungarian(spectrum_1, spectrum_2)

# Derive expected cosine score
expected_matches = [[0, 1, 3, 4], [0, 1, 2, 3]] # Those peaks have matching mz values (within given tolerance)
multiply_matching_intensities = spectrum_1.peaks.intensities[expected_matches[0]] \
* spectrum_2.peaks.intensities[expected_matches[1]]
expected_score = multiply_matching_intensities.sum() / (spectrum_1.peaks.intensities ** 2).sum()

assert score == pytest.approx(expected_score, 0.0001), "Expected different cosine score."
assert n_matches == len(expected_matches[0]), "Expected different number of matching peaks."


def test_cosine_hungarian_order_of_arguments():
"""Compare cosine scores for A,B versus B,A, which should give the same score."""
spectrum_1 = Spectrum(mz=numpy.array([100, 200, 299, 300, 301, 500, 510], dtype="float"),
intensities=numpy.array([0.02, 0.02, 1.0, 0.2, 0.4, 0.04, 0.2], dtype="float"),
metadata=dict())

spectrum_2 = Spectrum(mz=numpy.array([100, 200, 300, 301, 500, 512], dtype="float"),
intensities=numpy.array([0.02, 0.02, 1.0, 0.2, 0.04, 0.2], dtype="float"),
metadata=dict())

cosine_hungarian = CosineHungarian(tolerance=2.0)
score_1_2, n_matches_1_2 = cosine_hungarian(spectrum_1, spectrum_2)
score_2_1, n_matches_2_1 = cosine_hungarian(spectrum_2, spectrum_1)

assert score_1_2 == score_2_1, "Expected that the order of the arguments would not matter."
assert n_matches_1_2 == n_matches_2_1, "Expected that the order of the arguments would not matter."


def test_cosine_hungarian_case_where_greedy_would_fail():
"""Test case that would fail for cosine greedy implementations."""
spectrum_1 = Spectrum(mz=numpy.array([100.005, 100.016], dtype="float"),
intensities=numpy.array([1.0, 0.9], dtype="float"),
metadata={})

spectrum_2 = Spectrum(mz=numpy.array([100.005, 100.01], dtype="float"),
intensities=numpy.array([0.9, 1.0], dtype="float"),
metadata={})

cosine_hungarian = CosineHungarian(tolerance=0.01)
score, n_matches = cosine_hungarian(spectrum_1, spectrum_2)
assert score == pytest.approx(0.994475, 0.0001), "Expected different cosine score."
assert n_matches == 2, "Expected different number of matching peaks."


def test_cosine_hungarian_case_without_matches():
"""Test case for spectrums without any matching peaks."""
spectrum_1 = Spectrum(mz=numpy.array([100, 200], dtype="float"),
intensities=numpy.array([1.0, 0.1], dtype="float"),
metadata={})

spectrum_2 = Spectrum(mz=numpy.array([110, 210], dtype="float"),
intensities=numpy.array([1.0, 0.1], dtype="float"),
metadata={})

cosine_hungarian = CosineHungarian()
score, n_matches = cosine_hungarian(spectrum_1, spectrum_2)
assert score == 0.0, "Expected different cosine score."
assert n_matches == 0, "Expected different number of matching peaks."