diff --git a/pyat/at/physics/frequency_maps.py b/pyat/at/physics/frequency_maps.py index 2be163c38..976527507 100644 --- a/pyat/at/physics/frequency_maps.py +++ b/pyat/at/physics/frequency_maps.py @@ -227,22 +227,22 @@ def fmap_parallel_track(ring, # calc frequency from array, # jump the cycle is no frequency is found - xfreqfirst = get_tunes_harmonic(xfirst, num_harmonics=1) + xfreqfirst = get_tunes_harmonic(xfirst) if len(xfreqfirst) == 0: verboseprint(" No frequency") continue # xfreqlast = PyNAFF.naff(xlastpart,tns,1,0,False) - xfreqlast = get_tunes_harmonic(xlast, num_harmonics=1) + xfreqlast = get_tunes_harmonic(xlast) if len(xfreqlast) == 0: verboseprint(" No frequency") continue # yfreqfirst = PyNAFF.naff(yfirstpart,tns,1,0,False) - yfreqfirst = get_tunes_harmonic(yfirst, num_harmonics=1) + yfreqfirst = get_tunes_harmonic(yfirst) if len(yfreqfirst) == 0: verboseprint(" No frequency") continue # yfreqlast = PyNAFF.naff(ylastpart,tns,1,0,False) - yfreqlast = get_tunes_harmonic(ylast, num_harmonics=1) + yfreqlast = get_tunes_harmonic(ylast) if len(yfreqlast) == 0: verboseprint(" No frequency") continue diff --git a/pyat/at/physics/harmonic_analysis.py b/pyat/at/physics/harmonic_analysis.py index 55d783ad4..d601a08e6 100644 --- a/pyat/at/physics/harmonic_analysis.py +++ b/pyat/at/physics/harmonic_analysis.py @@ -1,269 +1,245 @@ -""" -Original author of HarmonicAnalysis class -Jaime Maria Coello De Portugal - Martinez Vazquez - - -Written while at CERN circa 2016 -This is a reduced version keeping only the key components. -Full code can be found at: https://github.com/pylhc/harpy - -""" - -import numpy as np -from typing import Optional, Tuple - -PI2I = 2 * np.pi * complex(0, 1) - -ZERO_PAD_DEF = False -HANN_DEF = False - - -class HarmonicAnalysis(object): - - def __init__(self, samples, zero_pad=ZERO_PAD_DEF, hann=HANN_DEF): - self._samples = samples - self._compute_orbit() - if zero_pad: - self._pad_signal() - self._length = len(self._samples) - self._int_range = np.arange(self._length) - self._hann_window = None - if hann: - self._hann_window = np.hanning(self._length) - - def laskar_method(self, num_harmonics): - samples = self._samples[:] # Copy the samples array. - n = self._length - coefficients = [] - frequencies = [] - for _ in range(num_harmonics): - # Compute this harmonic frequency and coefficient. - dft_data = HarmonicAnalysis._fft(samples) - frequency = self._jacobsen(dft_data) - coefficient = HarmonicAnalysis._compute_coef( - samples, - frequency * n - ) / n - - # Store frequency and amplitude - coefficients.append(coefficient) - frequencies.append(frequency) - - # Subtract the found pure tune from the signal - new_signal = coefficient * np.exp(PI2I - * frequency * self._int_range) - samples = samples - new_signal - - coefficients, frequencies = zip(*sorted(zip(coefficients, - frequencies), - key=lambda tuple: np.abs(tuple[0]), - reverse=True)) - - return frequencies, coefficients - - def get_signal(self): - if self._hann_window is not None: - return self._samples * self._hann_window - else: - return self._samples - - def get_coefficient_for_freq(self, freq): - return self._compute_coef(self._samples, - freq * self._length) / self._length - - def _pad_signal(self): - """ - Pads the signal with zeros to a "good" FFT size. - """ - length = len(self._samples) - # TODO Think proper pad size - pad_length = (1 << (length - 1).bit_length()) - length - # pad_length = 6600 - length - self._samples = np.pad( - self._samples, - (0, pad_length), - 'constant' - ) - # self._samples = self._samples[:6000] - - def _jacobsen(self, dft_values): - """ - This method interpolates the real frequency of the - signal using the three highest peaks in the FFT. - """ - k = np.argmax(np.abs(dft_values)) - n = self._length - r = dft_values - delta = np.tan(np.pi / n) / (np.pi / n) - kp = (k + 1) % n - km = (k - 1) % n - delta = delta * np.real((r[km] - r[kp]) / (2 * r[k] - r[km] - r[kp])) - return (k + delta) / n - - @staticmethod - def _compute_coef_simple(samples, kprime): - """ - Computes the coefficient of the Discrete Time Fourier - Transform corresponding to the given frequency (kprime). - """ - n = len(samples) - freq = kprime / n - exponents = np.exp(-PI2I * freq * np.arange(n)) - coef = np.sum(exponents * samples) - return coef - - @staticmethod - def _compute_coef_goertzel(samples, kprime): - """ - Computes the coefficient of the Discrete Time Fourier - Transform corresponding to the given frequency (kprime). - This function is faster than the previous one if compiled - with Numba. - """ - n = len(samples) - a = 2 * np.pi * (kprime / n) - b = 2 * np.cos(a) - c = np.exp(-complex(0, 1) * a) - d = np.exp(-complex(0, 1) * - ((2 * np.pi * kprime) / n) * - (n - 1)) - s0 = 0. - s1 = 0. - s2 = 0. - for i in range(n - 1): - s0 = samples[i] + b * s1 - s2 - s2 = s1 - s1 = s0 - s0 = samples[n - 1] + b * s1 - s2 - y = s0 - s1 * c - return y * d - - def _compute_orbit(self): - self.closed_orbit = np.mean(self._samples) - self.closed_orbit_rms = np.std(self._samples) - self.peak_to_peak = np.max(self._samples) - np.min(self._samples) - - @staticmethod - def _conditional_import_compute_coef(): - """ - Checks if Numba is installed. - If it is, it sets the compiled goertzel algorithm as the - coefficient function to use. If it isn't, it uses the - normal Numpy one. - """ - try: - from numba import jit - # print("Using compiled Numba functions.") - return jit(HarmonicAnalysis._compute_coef_goertzel, - nopython=True, nogil=True) - except ImportError: - # print("Numba not found, using numpy functions.") - return HarmonicAnalysis._compute_coef_simple - - @staticmethod - def _conditional_import_fft(): - """ - If SciPy is installed, it will set its fft as the one - to use as it is slightly faster. Otherwise it will use - the Numpy one. - """ - try: - from scipy.fftpack import fft as scipy_fft - from scipy.fftpack import fftfreq as scipy_fftfreq - fft = staticmethod(scipy_fft) - fftfreq = staticmethod(scipy_fftfreq) - # print("Scipy found, using scipy FFT.") - except ImportError: - from numpy.fft import fft as numpy_fft - from numpy.fft import fftfreq as numpy_fftfreq - fft = staticmethod(numpy_fft) - fftfreq = staticmethod(numpy_fftfreq) - # print("Scipy not found, using numpy FFT.") - return fft, fftfreq - - -# Set up conditional functions on load -############################################## -HarmonicAnalysis._compute_coef = staticmethod( - HarmonicAnalysis._conditional_import_compute_coef()) -HarmonicAnalysis._fft, HarmonicAnalysis._fftfreq = \ - HarmonicAnalysis._conditional_import_fft() -############################################## - - -def get_spectrum_harmonic(cent, method: str = 'laskar', - num_harmonics: int = 20, - hann: bool = False) -> Tuple[np.ndarray, np.ndarray]: - """Frequency analysis of beam motion - - Parameters: - cent: Centroid motions of the particle - method: ``'laskar'`` or ``'fft'``. Default: ``'laskar'`` - num_harmonics: Number of harmonic components to compute (before mask - applied) - hann: Turn on Hanning window. Default: :py:obj:`False` - - Returns: - frequency (ndarray): (num_harmonics,) array of frequencies - amplitude (ndarray): (num_harmonics,) array of amplitudes - """ - ha = HarmonicAnalysis(cent, hann=hann) - - if method == 'laskar': - ha_tune, ha_amp = ha.laskar_method(num_harmonics=num_harmonics) - elif method == 'fft': - signal = ha.get_signal() - fft = ha._fft(signal) - ha_tune, ha_amp = ha._fftfreq(len(fft)), np.abs(fft) - else: - raise ValueError('The method ' + method + ' is undefined') - - ha_amp = np.abs(np.array(ha_amp)) - ha_tune = np.array(ha_tune) - return ha_tune, ha_amp - - -def get_tunes_harmonic(cents, method: str = 'laskar', - num_harmonics: int = 20, hann: bool = False, - fmin: float = 0, fmax: float = 1) -> np.ndarray: - """Computes tunes from harmonic analysis - - Parameters: - cents: Centroid motions of the particle - method: ``'laskar'`` or ``'fft'``. Default: ``'laskar'`` - num_harmonics: Number of harmonic components to compute (before mask - applied) - fmin: Lower bound for tune - fmax: Upper bound for tune - hann: Turn on Hanning window. Default: :py:obj:`False` - - Returns: - tunes (ndarray): numpy array of length len(cents), max of the - spectrum within [fmin fmax] - """ - def get_max_spectrum(freq, amp, fmin, fmax): - msk = np.logical_and(freq >= fmin, freq <= fmax) - amp = amp[msk] - freq = freq[msk] - tune = freq[np.argmax(amp)] - return tune - - cents = np.array(cents) - if cents.ndim > 1: - npart = cents.shape[0] - else: - cents = [cents] - npart = 1 - tunes = np.zeros(npart) - - for i in range(npart): - freq, amp = get_spectrum_harmonic(cents[i], - num_harmonics=num_harmonics, - method=method, - hann=hann) - try: - tunes[i] = get_max_spectrum(freq, amp, fmin, fmax) - except ValueError: - tunes[i] = np.nan - - return tunes +""" +Simplified version of harpy from +Jaime Coello Maria de Portugal - Martinez Vazquez +Github: https://github.com/pylhc/harpy +""" + +from __future__ import annotations +import numpy +from warnings import warn +from scipy.fft import fft, fftfreq +from at.lattice import AtWarning +from at.lattice import AtError + + +__all__ = ['get_spectrum_harmonic', 'get_main_harmonic', + 'get_tunes_harmonic'] + + +def _pad_signal(samples, pad_length): + """ Pad signal and round to the next power of 2""" + if pad_length is not None: + length = len(samples) + pad_length = (1 << (length + int(pad_length) + - 1).bit_length()) - length + samples = numpy.pad(samples, (0, pad_length)) + return samples + + +def _interpolate_peak(complex_values): + """This method interpolates the real frequency of the + signal using the three highest peaks in the FFT. + """ + k = numpy.argmax(numpy.abs(complex_values)) + rv = complex_values + n = len(complex_values) + delta = numpy.tan(numpy.pi / n) / (numpy.pi / n) + kp = (k + 1) % n + km = (k - 1) % n + dk = rv[km] - rv[kp] + sk = 2 * rv[k] - rv[km] - rv[kp] + delta *= numpy.real(dk/sk) + return (k + delta) / n + + +def _compute_coef(samples, freq): + """ + Computes the coefficient of the Discrete Time Fourier + Transform corresponding to the given frequency + """ + n = len(samples) + exponents = numpy.exp(-2j*numpy.pi * freq * numpy.arange(n)) + coef = numpy.sum(exponents * samples) + return coef + + +def _interpolated_fft(samples, num_harmonics, fmin, fmax, + maxiter): + """Compute the interpolated FFT of a signal""" + rn = numpy.arange(len(samples)) + coefficients = numpy.zeros(num_harmonics, dtype=complex) + frequencies = numpy.zeros(num_harmonics) + + nfound = 0 + niter = 0 + nmax = num_harmonics*maxiter + + while (nfound < num_harmonics) and (niter < nmax): + fft_data = fft(samples) + frequency = _interpolate_peak(fft_data) + coefficient = _compute_coef(samples, frequency) + if frequency >= fmin and frequency <= fmax: + frequencies[nfound] = frequency + coefficients[nfound] = coefficient + nfound += 1 + samples = samples - coefficient * numpy.exp(2j*numpy.pi*frequency*rn) + niter += 1 + + if nfound < num_harmonics: + msg = ('{0}/{1} harmonics found in ' + 'requested range'.format(nfound, num_harmonics)) + warn(AtWarning(msg)) + if nfound == 0: + msg = ('No harmonic found within range, ' + 'consider extending it or increase maxiter') + raise AtError(msg) + + coefficients, frequencies = zip(*sorted(zip(coefficients, frequencies), + key=lambda tuple: numpy.abs(tuple[0]), + reverse=True)) + return frequencies, coefficients + + +def get_spectrum_harmonic(cent: numpy.ndarray, method: str = 'interp_fft', + num_harmonics: int = 20, + hann: bool = False, + fmin: float = 0, fmax: float = 1, + maxiter: float = 10, + pad_length=None) -> tuple[numpy.ndarray, + numpy.ndarray]: + """Frequency analysis of beam motion + + Parameters: + cent: Centroid motions of the particle + method: ``'interp_fft'`` or ``'interp_fft'``. + Default: ``'interp_fft'`` + num_harmonics: Number of harmonics to search for with interp_fft + fmin: Lower bound for spectrum search with interp_fft + fmax: Upper bound for spectrum search with interp_fft + maxiter: Multiplies ``num_harmonics`` to define the max. + number of iteration for the search + hann: Turn on Hanning window. Default: :py:obj:`False`. + Ignored for interpolated FFT + pad_length Zero pad the input signal. + Rounded to the higher power of 2 + Ignored for interpolated FFT + + Returns: + frequency (ndarray): (num_harmonics,) array of frequencies + amplitude (ndarray): (num_harmonics,) array of amplitudes + phase (ndarray): (num_harmonics,) array of phases + """ + lc = len(cent) + # laskar kept for backward compatibility + if method == 'interp_fft' or method == 'laskar': + if hann: + warn(AtWarning('Windowing not efficient for' + 'interpolated FFT: ignored')) + if pad_length is not None: + warn(AtWarning('Padding not efficient for' + 'interpolated FFT: ignored')) + ha_tune, ha_amp = _interpolated_fft(cent, num_harmonics, + fmin, fmax, maxiter) + elif method == 'fft': + if hann: + cent *= numpy.hanning(lc) + cent = _pad_signal(cent, pad_length) + fft_data = fft(cent) + ha_tune, ha_amp = fftfreq(len(fft_data)), fft_data + else: + raise ValueError('The method ' + method + ' is undefined') + + ha_phase = numpy.angle(numpy.array(ha_amp)) + ha_amp = numpy.abs(numpy.array(ha_amp)) / lc + ha_tune = numpy.array(ha_tune) + return ha_tune, ha_amp, ha_phase + + +def get_main_harmonic(cents: numpy.ndarray, method: str = 'interp_fft', + hann: bool = False, + fmin: float = 0, fmax: float = 1, + maxiter: float = 10, + pad_length=None) -> numpy.ndarray: + """Computes tunes, amplitudes and pahses from harmonic analysis + + Parameters: + cents: Centroid motions of the particle + method: ``'interp_fft'`` or ``'fft'``. + Default: ``'interp_fft'`` + fmin: Lower bound for tune search + fmax: Upper bound for tune search + maxiter: Maximum number of iterations for the search + hann: Turn on Hanning window. Default: :py:obj:`False`. + Ignored for interpolated FFT + pad_length: Zero pad the input signal. + Rounded to the higher power of 2 + Ignored for interpolated FFT + + Returns: + tunes (ndarray): numpy array of length len(cents), max of the + spectrum within [fmin fmax] + amplitude (ndarray): (len(cents), ) array of amplitudes + corresponding to the tune + phase (ndarray): (len(cents), ) array of phases + corresponding to the tune + """ + def get_max_spectrum(freq, amp, phase, fmin, fmax, method): + if method == 'interp_fft': + return freq[0], amp[0], phase[0] + msk = numpy.logical_and(freq >= fmin, freq <= fmax) + amp = amp[msk] + freq = freq[msk] + phase = phase[msk] + freq = freq[numpy.argmax(amp)] + phase = phase[numpy.argmax(amp)] + amp = numpy.amax(amp) + return freq, amp, phase + + cents = numpy.array(cents) + if cents.ndim > 1: + npart = cents.shape[0] + else: + cents = [cents] + npart = 1 + + tunes = numpy.zeros(npart) + amps = numpy.zeros(npart) + phases = numpy.zeros(npart) + + for i in range(npart): + out = get_spectrum_harmonic(cents[i], num_harmonics=1, method=method, + hann=hann, pad_length=pad_length, + fmin=fmin, fmax=fmax, maxiter=maxiter) + freq, amp, phase = out + try: + tunes[i], amps[i], phases[i] = get_max_spectrum(freq, amp, phase, + fmin, fmax, + method) + except ValueError: + tunes[i] = numpy.nan + amps[i] = numpy.nan + phases[i] = numpy.nan + return tunes, amps, phases + + +def get_tunes_harmonic(cents: numpy.ndarray, method: str = 'interp_fft', + hann: bool = False, + fmin: float = 0, fmax: float = 1, + maxiter: float = 10, + pad_length=None, **kwargs) -> numpy.ndarray: + """Computes tunes from harmonic analysis + + Parameters: + cents: Centroid motions of the particle + method: ``'interp_fft'`` or ``'fft'``. + Default: ``'interp_fft'`` + fmin: Lower bound for tune search + fmax: Upper bound for tune search + maxiter: Maximum number of iterations for the search + hann: Turn on Hanning window. Default: :py:obj:`False`. + Ignored for interpolated FFT + pad_length: Zero pad the input signal. + Rounded to the higher power of 2 + Ignored for interpolated FFT + + Returns: + tunes (ndarray): numpy array of length len(cents), max of the + spectrum within [fmin fmax] + """ + num_harmonics = kwargs.pop('num_harmonics', 1) # Backward compatibility + if num_harmonics != 1: + msg = "num_harmonics is deprecated and ignored for tune calculation" + warn(AtWarning(msg)) + tunes, _, _ = get_main_harmonic(cents, method=method, hann=hann, fmin=fmin, + fmax=fmax, pad_length=pad_length) + return tunes diff --git a/pyat/at/physics/linear.py b/pyat/at/physics/linear.py index 55c3114cf..5b16568ff 100644 --- a/pyat/at/physics/linear.py +++ b/pyat/at/physics/linear.py @@ -1080,15 +1080,15 @@ def get_tune(ring: Lattice, *, method: str = 'linopt', ``'fft'`` tracks a single particle and computes the tunes with fft, - ``'laskar'`` tracks a single particle and computes the tunes with - NAFF. + ``'interp_fft'`` tracks a single particle and computes the tunes with + interpolated FFT. dp (float): Momentum deviation. dct (float): Path lengthening. df (float): Deviation of RF frequency. orbit (Orbit): Avoids looking for the closed orbit if it is already known ((6,) array) - for the ``'fft'`` and ``'laskar'`` methods only: + for the ``'fft'`` and ``'interp_fft'`` methods only: Keyword Args: nturns (int): Number of turns. Default: 512 @@ -1101,7 +1101,7 @@ def get_tune(ring: Lattice, *, method: str = 'linopt', fmin (float): Lower tune bound. Default: 0 fmax (float): Upper tune bound. Default: 1 hann (bool): Turn on Hanning window. - Default: :py:obj:`False` + Default: :py:obj:`False`. Work only for ``'fft'`` get_integer(bool): Turn on integer tune (slower) Returns: @@ -1155,8 +1155,8 @@ def get_chrom(ring: Lattice, *, method: str = 'linopt', ``'fft'`` tracks a single particle and computes the tunes with :py:func:`~scipy.fftpack.fft`, - ``'laskar'`` tracks a single particle and computes the tunes with - NAFF. + ``'interp_fft'`` tracks a single particle and computes the tunes with + interpolated FFT. dp (float): Momentum deviation. dct (float): Path lengthening. df (float): Deviation of RF frequency. @@ -1167,7 +1167,7 @@ def get_chrom(ring: Lattice, *, method: str = 'linopt', DPStep (float): Momentum step for differentiation Default: :py:data:`DConstant.DPStep <.DConstant>` - for the ``'fft'`` and ``'laskar'`` methods only: + for the ``'fft'`` and ``'interp_fft'`` methods only: Keyword Args: nturns (int): Number of turns. Default: 512 @@ -1180,7 +1180,7 @@ def get_chrom(ring: Lattice, *, method: str = 'linopt', fmin (float): Lower tune bound. Default: 0 fmax (float): Upper tune bound. Default: 1 hann (bool): Turn on Hanning window. - Default: :py:obj:`False` + Default: :py:obj:`False`, Work only for ``'fft'`` Returns: chromaticities (ndarray): array([:math:`\xi_x,\xi_y`]) diff --git a/pyat/test/test_physics.py b/pyat/test/test_physics.py index 9b537db5e..28a4556b2 100644 --- a/pyat/test/test_physics.py +++ b/pyat/test/test_physics.py @@ -256,8 +256,8 @@ def test_linopt_line(hmba_lattice, refpts): def test_get_tune_chrom(hmba_lattice): qlin = hmba_lattice.get_tune() qplin = hmba_lattice.get_chrom() - qharm = hmba_lattice.get_tune(method='laskar') - qpharm = hmba_lattice.get_chrom(method='laskar') + qharm = hmba_lattice.get_tune(method='interp_fft') + qpharm = hmba_lattice.get_chrom(method='interp_fft') assert_close(qlin, [0.38156245, 0.85437541], rtol=1e-8) assert_close(qharm, [0.38156245, 0.85437541], rtol=1e-8) assert_close(qplin, [0.1791909, 0.12242558], rtol=1e-5) @@ -267,7 +267,7 @@ def test_get_tune_chrom(hmba_lattice): def test_nl_detuning_chromaticity(hmba_lattice): nlqplin, _, _ = at.nonlinear.chromaticity(hmba_lattice, npoints=11) nlqpharm, _, _ = at.nonlinear.chromaticity(hmba_lattice, - method='laskar', npoints=11) + method='interp_fft', npoints=11) q0, q1, _, _, _, _ = at.nonlinear.detuning(hmba_lattice, npoints=11) assert_close(nlqplin, [[0.38156741, 0.17908231, 1.18656034, -16.47368694],