diff --git a/transport_analysis/__init__.py b/transport_analysis/__init__.py index 4aa7f33..05dbe1e 100644 --- a/transport_analysis/__init__.py +++ b/transport_analysis/__init__.py @@ -15,4 +15,5 @@ from . import _version + __version__ = _version.get_versions()["version"] diff --git a/transport_analysis/_version.py b/transport_analysis/_version.py index e9a7a4c..1d48932 100644 --- a/transport_analysis/_version.py +++ b/transport_analysis/_version.py @@ -288,9 +288,7 @@ def git_pieces_from_vcs( env.pop("GIT_DIR", None) runner = functools.partial(runner, env=env) - _, rc = runner( - GITS, ["rev-parse", "--git-dir"], cwd=root, hide_stderr=not verbose - ) + _, rc = runner(GITS, ["rev-parse", "--git-dir"], cwd=root, hide_stderr=not verbose) if rc != 0: if verbose: print("Directory %s not under git control" % root) @@ -325,9 +323,7 @@ def git_pieces_from_vcs( pieces["short"] = full_out[:7] # maybe improved later pieces["error"] = None - branch_name, rc = runner( - GITS, ["rev-parse", "--abbrev-ref", "HEAD"], cwd=root - ) + branch_name, rc = runner(GITS, ["rev-parse", "--abbrev-ref", "HEAD"], cwd=root) # --abbrev-ref was added in git-1.6.3 if rc != 0 or branch_name is None: raise NotThisMethod("'git rev-parse --abbrev-ref' returned error") @@ -376,9 +372,7 @@ def git_pieces_from_vcs( mo = re.search(r"^(.+)-(\d+)-g([0-9a-f]+)$", git_describe) if not mo: # unparsable. Maybe git-describe is misbehaving? - pieces["error"] = ( - "unable to parse git-describe output: '%s'" % describe_out - ) + pieces["error"] = "unable to parse git-describe output: '%s'" % describe_out return pieces # tag @@ -407,9 +401,7 @@ def git_pieces_from_vcs( pieces["distance"] = len(out.split()) # total number of commits # commit date: see ISO-8601 comment in git_versions_from_keywords() - date = runner(GITS, ["show", "-s", "--format=%ci", "HEAD"], cwd=root)[ - 0 - ].strip() + date = runner(GITS, ["show", "-s", "--format=%ci", "HEAD"], cwd=root)[0].strip() # Use only the last line. Previous lines may contain GPG signature # information. date = date.splitlines()[-1] @@ -497,9 +489,7 @@ def render_pep440_pre(pieces: Dict[str, Any]) -> str: if pieces["closest-tag"]: if pieces["distance"]: # update the post release segment - tag_version, post_version = pep440_split_post( - pieces["closest-tag"] - ) + tag_version, post_version = pep440_split_post(pieces["closest-tag"]) rendered = tag_version if post_version is not None: rendered += ".post%d.dev%d" % ( @@ -688,9 +678,7 @@ def get_versions() -> Dict[str, Any]: verbose = cfg.verbose try: - return git_versions_from_keywords( - get_keywords(), cfg.tag_prefix, verbose - ) + return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, verbose) except NotThisMethod: pass diff --git a/transport_analysis/conductivity_integration.py b/transport_analysis/conductivity_integration.py new file mode 100644 index 0000000..325a519 --- /dev/null +++ b/transport_analysis/conductivity_integration.py @@ -0,0 +1,342 @@ +from typing import TYPE_CHECKING + +from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.core.groups import UpdatingAtomGroup +from MDAnalysis.exceptions import NoDataError +from MDAnalysis.units import constants +import numpy as np +import matplotlib.pyplot as plt +from scipy import integrate +from transport_analysis.utils import cross_corr, average_directions + +if TYPE_CHECKING: + from MDAnalysis.core.universe import AtomGroup + + +## Need to pass the entire the universe to get the com of the universe not just the atomg +class ConductivityKubo(AnalysisBase): + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- + atomgroup : AtomGroup + An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. + Note that :class:`UpdatingAtomGroup` instances are not accepted. + temp_avg : float (optional, default 300) + Average temperature over the course of the simulation, in Kelvin. + dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'} + Desired dimensions to be included in the viscosity calculation. + Defaults to 'xyz'. + linear_fit_window : tuple of int (optional) + A tuple of two integers specifying the start and end lag-time for + the linear fit of the viscosity function. If not provided, the + linear fit is not performed and viscosity is not calculated. + + Attributes + ---------- + atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` + The atoms to which this analysis is applied + dim_fac : int + Dimensionality :math:`d` of the viscosity computation. + results.timeseries : :class:`numpy.ndarray` + The averaged viscosity function over all the particles with respect + to lag-time. Obtained after calling :meth:`ViscosityHelfand.run` + results.visc_by_particle : :class:`numpy.ndarray` + The viscosity function of each individual particle with respect to + lag-time. + results.viscosity : float + The viscosity coefficient of the solvent. The argument + `linear_fit_window` must be provided to calculate this to + avoid misinterpretation of the viscosity function. + start : Optional[int] + The first frame of the trajectory used to compute the analysis. + stop : Optional[int] + The frame to stop at for the analysis. + step : Optional[int] + Number of frames to skip between each analyzed frame. + n_frames : int + Number of frames analysed in the trajectory. + n_particles : int + Number of particles viscosity was calculated over. + """ + + def __init__( + self, + allatomgroup: "AtomGroup", + cationgroup_query: str, + aniongroup_query: str, + temp_avg: float = 298.0, + cutoff_step: int = 10000, + cation_num_atoms_per_species=int, + anion_num_atoms_per_species=int, + cation_mass_per_species=float, + anion_mass_per_species=float, + cation_charge=int, + anion_charge=int, + **kwargs, + ) -> None: + # the below line must be kept to initialize the AnalysisBase class! + super().__init__(allatomgroup.universe.trajectory, **kwargs) + # after this you will be able to access `self.results` + # `self.results` is a dictionary-like object + # that can should used to store and retrieve results + # See more at the MDAnalysis documentation: + # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( + "UpdatingAtomGroups are not valid for diffusion computation" + ) + self.temp_avg = temp_avg + self.cutoff_step = cutoff_step + + self.allatomgroup = allatomgroup + self.cations = self.allatomgroup.select_atoms(cationgroup_query) + self.anions = self.allatomgroup.select_atoms(aniongroup_query) + self.cation_num_atoms_per_species = cation_num_atoms_per_species + self.anion_num_atoms_per_species = anion_num_atoms_per_species + self.cation_mass_per_species = cation_mass_per_species + self.anion_mass_per_species = anion_mass_per_species + self.cation_charge = cation_charge + self.anion_charge = anion_charge + + self._dim = 3 + self.cation_particles = len(self.cations) + self.anion_particles = len(self.anions) + + if (self.cation_particles % self.cation_num_atoms_per_species != 0) or ( + self.anion_particles % self.anion_num_atoms_per_species != 0 + ): + raise TypeError("Some species are fragmented. Invalid AtomGroup") + + def _prepare(self): + """ + Set up viscosity, mass, velocity, and position arrays + before the analysis loop begins + """ + # 2D viscosity array of frames x particles + + self._volumes = np.zeros((self.n_frames)) + + self._acf_cation_cation = np.zeros((self.n_frames, self._dim)) + self._acf_cation_anion = np.zeros((self.n_frames, self._dim)) + self._acf_anion_anion = np.zeros((self.n_frames, self._dim)) + + self._l_cation_cation = np.zeros((self.n_frames - 1, self._dim)) + self._l_cation_anion = np.zeros((self.n_frames - 1, self._dim)) + self._l_anion_anion = np.zeros((self.n_frames - 1, self._dim)) + self._lij_cation_cation = np.zeros((self.n_frames - 1)) + self._lij_cation_anion = np.zeros((self.n_frames - 1)) + self._lij_anion_anion = np.zeros((self.n_frames - 1)) + + self._cond = np.zeros((self.n_frames - 1)) + + self._coms_velocity = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._cation_masses = self.cations.masses + self._anion_masses = self.anions.masses + self._all_masses = self.allatomgroup.masses + + # 3D arrays of frames x particles x dimensions + # positions + self._cation_velocties = np.zeros( + (self.n_frames, int(self.cation_particles), self._dim) + ) + self._anion_velocities = np.zeros( + (self.n_frames, int(self.anion_particles), self._dim) + ) + + self._cation_mass_weighted_velocities = np.zeros( + ( + self.n_frames, + int(self.cation_particles / self.cation_num_atoms_per_species), + self._dim, + ) + ) + + self._anion_mass_weighted_velocities = np.zeros( + ( + self.n_frames, + int(self.anion_particles / self.anion_num_atoms_per_species), + self._dim, + ) + ) + self.J_to_KJ = 1000 + self.boltzmann = ( + self.J_to_KJ * constants["Boltzmann_constant"] / constants["N_Avogadro"] + ) + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + self.e_to_C = constants["elementary_charge"] + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ + # This runs once for each frame of the trajectory + + # The trajectory positions update automatically + # You can access the frame number using self._frame_index + + # trajectory must have velocity and position information + if not (self._ts.has_velocities and self._ts.volume != 0): + raise NoDataError( + "Einstein diffusion computation requires positions, and box volume in the trajectory" + ) + + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._cation_velocties[self._frame_index] = self.cations.velocities + self._anion_velocities[self._frame_index] = self.anions.velocities + self._coms_velocity[self._frame_index] = ( + np.multiply( + np.repeat(self._all_masses, 3, axis=0).reshape(-1, 3), + self.allatomgroup.velocities, + ).sum(axis=0) + / self._all_masses.sum() + ) + + self._cation_mass_weighted_velocities[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._cation_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._cation_velocties[self._frame_index], + ) + / self.cation_mass_per_species + ) + .reshape(-1, int(self.cation_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._anion_mass_weighted_velocities[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._anion_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._anion_velocities[self._frame_index], + ) + / self.anion_mass_per_species + ) + .reshape(-1, int(self.anion_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + + self._cation_mass_weighted_velocities[self._frame_index] = ( + self._cation_mass_weighted_velocities[self._frame_index] + - self._coms_velocity[self._frame_index] + ) + self._anion_mass_weighted_velocities[self._frame_index] = ( + self._anion_mass_weighted_velocities[self._frame_index] + - self._coms_velocity[self._frame_index] + ) + + def _conclude(self): + """ + Calculates the conductivity coefficient via the fft. + """ + cation_summed_velocities = np.sum(self._cation_mass_weighted_velocities, axis=1) + anion_summed_velocities = np.sum(self._anion_mass_weighted_velocities, axis=1) + for i in range(self._dim): + self._acf_cation_cation[:, i] = cross_corr( + cation_summed_velocities[:, i], cation_summed_velocities[:, i] + ) + self._acf_anion_anion[:, i] = cross_corr( + anion_summed_velocities[:, i], anion_summed_velocities[:, i] + ) + self._acf_cation_anion[:, i] = cross_corr( + cation_summed_velocities[:, i], anion_summed_velocities[:, i] + ) + for i in range(self._dim): + self._l_cation_cation[:, i] = integrate.cumulative_trapezoid( + self._acf_cation_cation[:, i], self._times[:, i] + ) + self._l_anion_anion[:, i] = integrate.cumulative_trapezoid( + self._acf_anion_anion[:, i], self._times[:, i] + ) + self._l_cation_anion[:, i] = integrate.cumulative_trapezoid( + self._acf_cation_anion[:, i], self.times[:, i] + ) + + self._lij_cation_cation = average_directions( + self._l_cation_cation, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + self._lij_cation_anion = average_directions(self._l_anion_anion, self._dim) / ( + 2 * self.boltzmann * self.temp_avg * self._volumes + ) + self._lij_anion_anion = average_directions(self._l_cation_anion, self._dim) / ( + 2 * self.boltzmann * self.temp_avg * self._volumes + ) + + self._cond = ( + (self.cation_charge**2) * self._lij_cation_cation + + (self.anion_charge**2) * self._lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * self._lij_cation_anion + ) + cation_mass_fraction = self._cation_masses.sum() / self._all_masses.sum() + anion_mass_fraction = self._anion_masses.sum() / self._all_masses.sum() + solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction + self.results.conductivity = ( + self._cond[self.cutoff_step] + * (self.e_to_C * self.e_to_C) + * (1 / (self.fs_to_s * self.A_to_m)) + * 10 + ) + self.results.cation_transference_com = ( + (self.cation_charge**2) * self._lij_cation_cation[self.cutoff_step] + + (self.cation_charge * self.anion_charge) + * self._lij_cation_anion[self.cutoff_step] + ) / self._cond[self.cutoff_step] + self.results.anion_transference_com = ( + (self.anion_charge**2) * self._lij_anion_anion[self.cutoff_step] + + (self.cation_charge * self.anion_charge) + * self._lij_cation_anion[self.cutoff_step] + ) / self._cond[self.cutoff_step] + self.results.cation_transference_solvent = ( + self.results.cation_transference_com - anion_mass_fraction + ) / solvent_fraction + self.results.anion_transference_solvent = ( + self.results.anion_transference_com - cation_mass_fraction + ) / solvent_fraction + + def plot_acf(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot( + self._times[: self.cutoff_step], + self._acf_cation_cation[: self.cutoff_step], + label="+ +", + ) + plt.plot( + self._times[: self.cutoff_step], + self._acf_cation_anion[: self.cutoff_step], + label="+ -", + ) + plt.plot( + self._times[: self.cutoff_step], + self._acf_anion_anion[: self.cutoff_step], + label="- -", + ) + + plt.grid() + plt.ylabel("Velocity Autocorrelation Function") + plt.xlabel("Time") + plt.tight_layout() + plt.show() diff --git a/transport_analysis/conductivity_msd.py b/transport_analysis/conductivity_msd.py new file mode 100644 index 0000000..d118b83 --- /dev/null +++ b/transport_analysis/conductivity_msd.py @@ -0,0 +1,474 @@ +from typing import TYPE_CHECKING + +from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.core.groups import UpdatingAtomGroup +from MDAnalysis.exceptions import NoDataError +from MDAnalysis.units import constants +import numpy as np +import matplotlib.pyplot as plt +from scipy import stats +from transport_analysis.utils import ( + msd_fft_cross_1d, + msd_variance_cross_1d, + average_directions, +) + +if TYPE_CHECKING: + from MDAnalysis.core.universe import AtomGroup + + +## Need to pass the entire the universe to get the com of the universe not just the atomg +class ConductivityEinstein(AnalysisBase): + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- + atomgroup : AtomGroup + An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. + Note that :class:`UpdatingAtomGroup` instances are not accepted. + temp_avg : float (optional, default 300) + Average temperature over the course of the simulation, in Kelvin. + dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'} + Desired dimensions to be included in the viscosity calculation. + Defaults to 'xyz'. + linear_fit_window : tuple of int (optional) + A tuple of two integers specifying the start and end lag-time for + the linear fit of the viscosity function. If not provided, the + linear fit is not performed and viscosity is not calculated. + + Attributes + ---------- + atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` + The atoms to which this analysis is applied + dim_fac : int + Dimensionality :math:`d` of the viscosity computation. + results.timeseries : :class:`numpy.ndarray` + The averaged viscosity function over all the particles with respect + to lag-time. Obtained after calling :meth:`ViscosityHelfand.run` + results.visc_by_particle : :class:`numpy.ndarray` + The viscosity function of each individual particle with respect to + lag-time. + results.viscosity : float + The viscosity coefficient of the solvent. The argument + `linear_fit_window` must be provided to calculate this to + avoid misinterpretation of the viscosity function. + start : Optional[int] + The first frame of the trajectory used to compute the analysis. + stop : Optional[int] + The frame to stop at for the analysis. + step : Optional[int] + Number of frames to skip between each analyzed frame. + n_frames : int + Number of frames analysed in the trajectory. + n_particles : int + Number of particles viscosity was calculated over. + """ + + def __init__( + self, + allatomgroup: "AtomGroup", + cationgroup_query: str, + aniongroup_query: str, + temp_avg: float = 298.0, + linear_fit_window: tuple[int, int] = None, + cation_num_atoms_per_species=int, + anion_num_atoms_per_species=int, + cation_mass_per_species=float, + anion_mass_per_species=float, + cation_charge=int, + anion_charge=int, + **kwargs, + ) -> None: + # the below line must be kept to initialize the AnalysisBase class! + super().__init__(allatomgroup.universe.trajectory, **kwargs) + # after this you will be able to access `self.results` + # `self.results` is a dictionary-like object + # that can should used to store and retrieve results + # See more at the MDAnalysis documentation: + # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( + "UpdatingAtomGroups are not valid for diffusion computation" + ) + self.temp_avg = temp_avg + self.linear_fit_window = linear_fit_window + + self.allatomgroup = allatomgroup + self.cations = self.allatomgroup.select_atoms(cationgroup_query) + self.anions = self.allatomgroup.select_atoms(aniongroup_query) + self.cation_num_atoms_per_species = cation_num_atoms_per_species + self.anion_num_atoms_per_species = anion_num_atoms_per_species + self.cation_mass_per_species = cation_mass_per_species + self.anion_mass_per_species = anion_mass_per_species + self.cation_charge = cation_charge + self.anion_charge = anion_charge + + self._dim = 3 + self.cation_particles = len(self.cations) + self.anion_particles = len(self.anions) + + if (self.cation_particles % self.cation_num_atoms_per_species != 0) or ( + self.anion_particles % self.anion_num_atoms_per_species != 0 + ): + raise TypeError("Some species are fragmented. Invalid AtomGroup") + + def _prepare(self): + """ + Set up viscosity, mass, velocity, and position arrays + before the analysis loop begins + """ + # 2D viscosity array of frames x particles + + self._volumes = np.zeros((self.n_frames)) + + self._msds_cation_cation = np.zeros((self.n_frames, self._dim)) + self._msds_cation_anion = np.zeros((self.n_frames, self._dim)) + self._msds_anion_anion = np.zeros((self.n_frames, self._dim)) + + self._msds_cation_cation_var = np.zeros((self.n_frames, self._dim)) + self._msds_cation_anion_var = np.zeros((self.n_frames, self._dim)) + self._msds_anion_anion_var = np.zeros((self.n_frames, self._dim)) + self._cond = np.zeros((self.n_frames)) + self._cond_var = np.zeros((self.n_frames)) + + self._lij_cation_cation = np.zeros((self.n_frames)) + self._lij_cation_anion = np.zeros((self.n_frames)) + self._lij_anion_anion = np.zeros((self.n_frames)) + + self._lij_cation_cation_weights = np.ones((self.n_frames)) + self._lij_cation_anion_weights = np.ones((self.n_frames)) + self._lij_anion_anion_weights = np.ones((self.n_frames)) + + self._coms = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._cation_masses = self.cations.masses + self._anion_masses = self.anions.masses + + # 3D arrays of frames x particles x dimensions + # positions + self._cation_positions = np.zeros( + (self.n_frames, int(self.cation_particles), self._dim) + ) + self._anion_positions = np.zeros( + (self.n_frames, int(self.anion_particles), self._dim) + ) + + self._cation_mass_weighted_positions = np.zeros( + ( + self.n_frames, + int(self.cation_particles / self.cation_num_atoms_per_species), + self._dim, + ) + ) + + self._anion_mass_weighted_positions = np.zeros( + ( + self.n_frames, + int(self.anion_particles / self.anion_num_atoms_per_species), + self._dim, + ) + ) + self.J_to_KJ = 1000 + self.boltzmann = ( + self.J_to_KJ * constants["Boltzmann_constant"] / constants["N_Avogadro"] + ) + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + self.e_to_C = constants["elementary_charge"] + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ + # This runs once for each frame of the trajectory + + # The trajectory positions update automatically + # You can access the frame number using self._frame_index + + # trajectory must have velocity and position information + if not (self._ts.has_positions and self._ts.volume != 0): + raise NoDataError( + "Einstein diffusion computation requires positions, and box volume in the trajectory" + ) + + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._cation_positions[self._frame_index] = self.cations.positions + self._anion_positions[self._frame_index] = self.anions.positions + self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + + self._cation_mass_weighted_positions[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._cation_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._cation_positions[self._frame_index], + ) + / self.cation_mass_per_species + ) + .reshape(-1, int(self.cation_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._anion_mass_weighted_positions[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._anion_masses, self._dim, axis=0).reshape( + -1, self._dim + ), + self._anion_positions[self._frame_index], + ) + / self.anion_mass_per_species + ) + .reshape(-1, int(self.anion_num_atoms_per_species), self._dim) + .sum(axis=1) + ) + + self._cation_mass_weighted_positions[self._frame_index] = ( + self._cation_mass_weighted_positions[self._frame_index] + - self._coms[self._frame_index] + ) + self._anion_mass_weighted_positions[self._frame_index] = ( + self._anion_mass_weighted_positions[self._frame_index] + - self._coms[self._frame_index] + ) + + def _conclude(self): + """ + Calculates the conductivity coefficient via the fft. + """ + cation_summed_positions = np.sum(self._cation_mass_weighted_positions, axis=1) + anion_summed_positions = np.sum(self._anion_mass_weighted_positions, axis=1) + + self._msds_cation_cation = np.transpose( + [ + msd_fft_cross_1d( + cation_summed_positions[:, i], cation_summed_positions[:, i] + ) + for i in range(self._dim) + ] + ) + self._msds_cation_cation_var = np.transpose( + [ + msd_variance_cross_1d( + cation_summed_positions[:, i], + cation_summed_positions[:, i], + self._msds_cation_cation[:, i], + ) + for i in range(self._dim) + ] + ) + self._msds_anion_anion = np.transpose( + [ + msd_fft_cross_1d( + anion_summed_positions[:, i], anion_summed_positions[:, i] + ) + for i in range(self._dim) + ] + ) + self._msds_anion_anion_var = np.transpose( + [ + msd_variance_cross_1d( + anion_summed_positions[:, i], + anion_summed_positions[:, i], + self._msds_anion_anion[:, i], + ) + for i in range(self._dim) + ] + ) + self._msds_cation_anion = np.transpose( + [ + msd_fft_cross_1d( + cation_summed_positions[:, i], anion_summed_positions[:, i] + ) + for i in range(self._dim) + ] + ) + self._msds_cation_anion_var = np.transpose( + [ + msd_variance_cross_1d( + cation_summed_positions[:, i], + anion_summed_positions[:, i], + self._msds_cation_anion[:, i], + ) + for i in range(self._dim) + ] + ) + self._lij_cation_cation = average_directions( + self._msds_cation_cation, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + self._lij_cation_anion = average_directions( + self._msds_cation_anion, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + self._lij_anion_anion = average_directions( + self._msds_anion_anion, self._dim + ) / (2 * self.boltzmann * self.temp_avg * self._volumes) + + # self._cond = (self.cation_charge**2)*self._lij_cation_cation + (self.anion_charge**2)*self._lij_anion_anion + (2*self.cation_charge*self.anion_charge)*self._lij_cation_anion + # self._cond_var = ((self.cation_charge**2)**2)*self._msds_cation_cation_var + ((self.anion_charge**2)**2)*self._msds_anion_anion_var + ((2*self.cation_charge*self.anion_charge)**2)*self._msds_cation_anion_var + + self._lij_cation_cation_weights = np.sqrt( + np.abs(1 / average_directions(self._msds_cation_cation_var, self._dim)) + ) + self._lij_cation_cation_weights /= np.sum(self._lij_cation_cation_weights) + + self._lij_cation_anion_weights = np.sqrt( + np.abs(1 / average_directions(self._msds_cation_anion_var, self._dim)) + ) + self._lij_cation_anion_weights /= np.sum(self._lij_cation_anion_weights) + + self._lij_anion_anion_weights = np.sqrt( + np.abs(1 / average_directions(self._msds_anion_anion_var, self._dim)) + ) + self._lij_anion_anion_weights /= np.sum(self._lij_anion_anion_weights) + + cond = None + lij_cation_cation = None + lij_cation_anion = None + lij_anion_anion = None + beta = None + if self.linear_fit_window: + lij_cation_cation, _, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lij_cation_cation[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ], + ) + lij_cation_anion, _, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lij_cation_anion[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ], + ) + lij_anion_anion, _, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lij_anion_anion[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ], + ) + cond = ( + (self.cation_charge**2) * lij_cation_cation + + (self.anion_charge**2) * lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * lij_cation_anion + ) + + self._cond = ( + (self.cation_charge**2) * self._lij_cation_cation + + (self.anion_charge**2) * self._lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * self._lij_cation_anion + ) + fit_slope = np.gradient( + np.log( + self._cond[self.linear_fit_window[0] : self.linear_fit_window[1]] + )[1:], + np.log( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]] + - self._times[0] + )[1:], + ) + beta = np.nanmean(np.array(fit_slope)) + + else: + _, lij_cation_cation = np.polynomial.polynomial.polyfit( + self._times[1:], + self._msds_cation_cation[1:], + 1, + w=self._lij_cation_cation_weights[1:], + ) + _, lij_cation_anion = np.polynomial.polynomial.polyfit( + self._times[1:], + self._msds_cation_anion[1:], + 1, + w=self._lij_cation_anion_weights[1:], + ) + _, lij_anion_anion = np.polynomial.polynomial.polyfit( + self._times[1:], + self._msds_anion_anion[1:], + 1, + w=self._lij_anion_anion_weights[1:], + ) + cond = ( + (self.cation_charge**2) * lij_cation_cation + + (self.anion_charge**2) * lij_anion_anion + + (2 * self.cation_charge * self.anion_charge) * lij_cation_anion + ) + + # fit_slope = np.gradient(np.log(self._cond)[1:], np.log(self._times - self._times[0])[1:]) + # beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + if self.linear_fit_window: + self.results.linearity = beta + # self.results.fit_slope = cond + # self.results.fit_intercept = cond_intercept + cation_mass_fraction = ( + self._cation_masses.sum() / self.allatomgroup.masses.sum() + ) + anion_mass_fraction = self._anion_masses.sum() / self.allatomgroup.masses.sum() + solvent_fraction = 1 - cation_mass_fraction - anion_mass_fraction + + self.results.conductivity = ( + cond * (self.e_to_C * self.e_to_C) * (1 / (self.fs_to_s * self.A_to_m)) * 10 + ) + self.results.cation_transference_com = ( + (self.cation_charge**2) * lij_cation_cation + + (self.cation_charge * self.anion_charge) * lij_cation_anion + ) / cond + self.results.anion_transference_com = ( + (self.anion_charge**2) * lij_anion_anion + + (self.cation_charge * self.anion_charge) * lij_cation_anion + ) / cond + self.results.cation_transference_solvent = ( + self.results.cation_transference_com - anion_mass_fraction + ) / solvent_fraction + self.results.anion_transference_solvent = ( + self.results.anion_transference_com - cation_mass_fraction + ) / solvent_fraction + + def plot_linear_fit(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times, np.abs(self._lii_self)) + plt.plot( + self._times, + self._times * self.results.fit_slope + self.results.fit_intercept, + "k--", + alpha=0.5, + ) + plt.grid() + plt.ylabel("MSD") + plt.xlabel("Time") + if self.linear_fit_window: + plt.axvline( + x=self._times[self.linear_fit_window[0]], + color="r", + linestyle="--", + linewidth=2, + ) + plt.axvline( + x=self._times[self.linear_fit_window[1]], + color="r", + linestyle="--", + linewidth=2, + ) + # plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) + # i = int(len(self._msds) / 5) + # slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) + # plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) + plt.tight_layout() + plt.show() diff --git a/transport_analysis/diffusion_msd.py b/transport_analysis/diffusion_msd.py new file mode 100644 index 0000000..a3a12cf --- /dev/null +++ b/transport_analysis/diffusion_msd.py @@ -0,0 +1,278 @@ +from typing import TYPE_CHECKING + +from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.core.groups import UpdatingAtomGroup +from MDAnalysis.exceptions import NoDataError +from MDAnalysis.units import constants +import numpy as np +import matplotlib.pyplot as plt +from scipy import stats +from transport_analysis.utils import msd_fft_1d, msd_variance_1d, average_directions + +if TYPE_CHECKING: + from MDAnalysis.core.universe import AtomGroup + + +## Need to pass the entire the universe to get the com of the universe not just the atomg +class DiffusionCoefficientEinstein(AnalysisBase): + """ + Class to calculate the diffusion_coefficient of a species. + Note that the slope of the mean square displacement provides + the diffusion coeffiecient. This implementation uses FFT to compute MSD faster + as explained here: https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + Since we are using FFT, the sampling frequency (or) dump frequency of the simulation trajectory + plays a critical role in the accuracy of the result. + + Particle velocities are required to calculate the viscosity function. Thus + you are limited to MDA trajectories that contain velocity information, e.g. + GROMACS .trr files, H5MD files, etc. See the MDAnalysis documentation: + https://docs.mdanalysis.org/stable/documentation_pages/coordinates/init.html#writers. + + Parameters + ---------- + atomgroup : AtomGroup + An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`. + Note that :class:`UpdatingAtomGroup` instances are not accepted. + temp_avg : float (optional, default 300) + Average temperature over the course of the simulation, in Kelvin. + dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'} + Desired dimensions to be included in the viscosity calculation. + Defaults to 'xyz'. + linear_fit_window : tuple of int (optional) + A tuple of two integers specifying the start and end lag-time for + the linear fit of the viscosity function. If not provided, the + linear fit is not performed and viscosity is not calculated. + + Attributes + ---------- + atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup` + The atoms to which this analysis is applied + dim_fac : int + Dimensionality :math:`d` of the viscosity computation. + results.timeseries : :class:`numpy.ndarray` + The averaged viscosity function over all the particles with respect + to lag-time. Obtained after calling :meth:`ViscosityHelfand.run` + results.visc_by_particle : :class:`numpy.ndarray` + The viscosity function of each individual particle with respect to + lag-time. + results.viscosity : float + The viscosity coefficient of the solvent. The argument + `linear_fit_window` must be provided to calculate this to + avoid misinterpretation of the viscosity function. + start : Optional[int] + The first frame of the trajectory used to compute the analysis. + stop : Optional[int] + The frame to stop at for the analysis. + step : Optional[int] + Number of frames to skip between each analyzed frame. + n_frames : int + Number of frames analysed in the trajectory. + n_particles : int + Number of particles viscosity was calculated over. + """ + + def __init__( + self, + allatomgroup: "AtomGroup", + atomgroup_query: str, + temp_avg: float = 298.0, + linear_fit_window: tuple[int, int] = None, + num_atoms_per_species=int, + mass_per_species=float, + **kwargs, + ) -> None: + # the below line must be kept to initialize the AnalysisBase class! + super().__init__(allatomgroup.universe.trajectory, **kwargs) + # after this you will be able to access `self.results` + # `self.results` is a dictionary-like object + # that can should used to store and retrieve results + # See more at the MDAnalysis documentation: + # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results + if isinstance(allatomgroup, UpdatingAtomGroup): + raise TypeError( + "UpdatingAtomGroups are not valid for diffusion computation" + ) + self.temp_avg = temp_avg + self.linear_fit_window = linear_fit_window + + self.allatomgroup = allatomgroup + self.atomgroup = self.allatomgroup.select_atoms(atomgroup_query) + self.n_particles = len(self.atomgroup) + self.num_atoms_per_species = num_atoms_per_species + self.mass_per_species = mass_per_species + + self._dim = 3 + + if self.n_particles % self.num_atoms_per_species != 0: + raise TypeError("Some species are fragmented. Invalid AtomGroup") + + def _prepare(self): + """ + Set up viscosity, mass, velocity, and position arrays + before the analysis loop begins + """ + # 2D viscosity array of frames x particles + + self._volumes = np.zeros((self.n_frames)) + self._msds = np.zeros((self.n_frames, self._dim)) + self._msds_var = np.zeros((self.n_frames, self._dim)) + self._lii_self = np.zeros((self.n_frames)) + self.weights = np.ones((self.n_frames)) + self._coms = np.zeros((self.n_frames, self._dim)) + self._times = np.zeros((self.n_frames)) + + self._masses = self.atomgroup.masses + + # 3D arrays of frames x particles x dimensions + # positions + self._positions = np.zeros((self.n_frames, int(self.n_particles), self._dim)) + self._mass_weighted_positions = np.zeros( + ( + self.n_frames, + int(self.n_particles / self.num_atoms_per_species), + self._dim, + ) + ) + self.J_to_KJ = 1000 + self.boltzmann = ( + self.J_to_KJ * constants["Boltzmann_constant"] / constants["N_Avogadro"] + ) + self.A_to_m = 1e-10 + self.fs_to_s = 1e-15 + + def _single_frame(self): + """ + Constructs arrays of velocities and positions + for viscosity computation. + """ + # This runs once for each frame of the trajectory + + # The trajectory positions update automatically + # You can access the frame number using self._frame_index + + # trajectory must have velocity and position information + if not (self._ts.has_positions and self._ts.volume != 0): + raise NoDataError( + "Einstein diffusion computation requires positions, and box volume in the trajectory" + ) + + # fill volume array + self._volumes[self._frame_index] = self._ts.volume + self._times[self._frame_index] = self._ts.time + # set shape of position array + + self._positions[self._frame_index] = self.atomgroup.positions + self._coms[self._frame_index] = self.allatomgroup.center_of_mass(wrap=False) + self._mass_weighted_positions[self._frame_index] = ( + ( + np.multiply( + np.repeat(self._masses, self._dim, axis=0).reshape(-1, self._dim), + self._positions[self._frame_index], + ) + / self.mass_per_species + ) + .reshape(-1, int(self.num_atoms_per_species), self._dim) + .sum(axis=1) + ) + self._mass_weighted_positions[self._frame_index] = ( + self._mass_weighted_positions[self._frame_index] + - self._coms[self._frame_index] + ) + + def _conclude(self): + """ + Calculates the diffusion coefficient via the fft. + """ + for specie_id in range(int(self.n_particles / self.num_atoms_per_species)): + for i in range(self._dim): + msd_temp = msd_fft_1d( + np.array(self._mass_weighted_positions[:, specie_id, :][:, i]) + ) + self._msds[:, i] += msd_temp + self._msds_var[:, i] += msd_variance_1d( + np.array(self._mass_weighted_positions[:, specie_id, :][:, i]), + self._msds[:, i], + ) + + self._lii_self = average_directions(self._msds, self._dim) / ( + 2 * self.boltzmann * self.temp_avg * self._volumes + ) + self.weights = np.sqrt( + np.abs(1 / average_directions(self._msds_var, self._dim)) + ) + self.weights /= np.sum(self.weights) + + lii_self = None + lii_intercept = None + beta = None + if self.linear_fit_window: + lii_self, lii_intercept, _, _, _ = stats.linregress( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]], + self._lii_self[self.linear_fit_window[0] : self.linear_fit_window[1]], + ) + fit_slope = np.gradient( + np.log( + self._lii_self[ + self.linear_fit_window[0] : self.linear_fit_window[1] + ] + )[1:], + np.log( + self._times[self.linear_fit_window[0] : self.linear_fit_window[1]] + - self._times[0] + )[1:], + ) + beta = np.nanmean(np.array(fit_slope)) + + else: + lii_intercept, lii_self = np.polynomial.polynomial.polyfit( + self._times[1:], self._lii_self[1:], 1, w=self.weights[1:] + ) + # fit_slope = np.gradient(np.log(self._lii_self)[1:], np.log(self._times - self._times[0])[1:]) + # beta = np.average(np.array(fit_slope), weights=self.weights[1:]) + + conc = float(self.n_particles / self.num_atoms_per_species) / ( + self._volumes.mean() * (self.A_to_m**3) + ) + + if self.linear_fit_window: + self.results.linearity = beta + # self.results.fit_slope = lii_self + # self.results.fit_intercept = lii_intercept + self.results.lii_self = lii_self * (1 / (self.A_to_m * self.fs_to_s)) + self.results.diffusion_coefficient = ( + lii_self * (1 / (self.A_to_m * self.fs_to_s)) + ) * (self.boltzmann * self.temp_avg / conc) + + def plot_linear_fit(self): + """ + Plot the mead square displacement vs time and the fit region in the log-log scale + """ + plt.plot(self._times, np.abs(self._lii_self)) + plt.plot( + self._times, + self._times * self.results.fit_slope + self.results.fit_intercept, + "k--", + alpha=0.5, + ) + plt.grid() + plt.ylabel("MSD") + plt.xlabel("Time") + if self.linear_fit_window: + plt.axvline( + x=self._times[self.linear_fit_window[0]], + color="r", + linestyle="--", + linewidth=2, + ) + plt.axvline( + x=self._times[self.linear_fit_window[1]], + color="r", + linestyle="--", + linewidth=2, + ) + # plt.ylim(min(np.abs(self._msds[1:])) * 0.9, max(np.abs(self._msds)) * 1.1) + # i = int(len(self._msds) / 5) + # slope_guess = (self._msds[i] - self._msds[5]) / (self._times[i] - self._times[5]) + # plt.plot(self._times[self.linear_fit_window[0]:self.linear_fit_window[1]], self._times[self.linear_fit_window[0]:self.linear_fit_window[1]] * slope_guess * 2, "k--", alpha=0.5) + plt.tight_layout() + plt.show() diff --git a/transport_analysis/due.py b/transport_analysis/due.py index 7f51acd..2a10d19 100644 --- a/transport_analysis/due.py +++ b/transport_analysis/due.py @@ -65,9 +65,7 @@ def _donothing_func(*args, **kwargs): ) # lgtm [py/unused-import] if "due" in locals() and not hasattr(due, "cite"): - raise RuntimeError( - "Imported due lacks .cite. DueCredit is now disabled" - ) + raise RuntimeError("Imported due lacks .cite. DueCredit is now disabled") except Exception as e: if not isinstance(e, ImportError): import logging diff --git a/transport_analysis/tests/test_velocityautocorr.py b/transport_analysis/tests/test_velocityautocorr.py index 975edfe..5f69bba 100644 --- a/transport_analysis/tests/test_velocityautocorr.py +++ b/transport_analysis/tests/test_velocityautocorr.py @@ -93,9 +93,7 @@ def characteristic_poly(last, n_dim, first=0, step=1): return result -@pytest.mark.parametrize( - "tdim, tdim_keys", [(1, [0]), (2, [0, 1]), (3, [0, 1, 2])] -) +@pytest.mark.parametrize("tdim, tdim_keys", [(1, [0]), (2, [0, 1]), (3, [0, 1, 2])]) def test_characteristic_poly(step_vtraj, NSTEP, tdim, tdim_keys): # test `characteristic_poly()` against `tidynamics.acf()`` @@ -262,9 +260,7 @@ def test_plot_running_integral_custom_labels(self, vacf): assert x_act == x_exp assert y_act == y_exp - def test_plot_running_integral_start_stop_step( - self, vacf, start=1, stop=9, step=2 - ): + def test_plot_running_integral_start_stop_step(self, vacf, start=1, stop=9, step=2): t_range = range(start, stop, step) # Expected data to be plotted x_exp = vacf.times[start:stop:step] @@ -328,9 +324,7 @@ def test_fft_vs_simple_default_per_particle(self, vacf, vacf_fft): ], ) class TestAllDims: - def test_simple_step_vtraj_all_dims( - self, step_vtraj, NSTEP, tdim, tdim_factor - ): + def test_simple_step_vtraj_all_dims(self, step_vtraj, NSTEP, tdim, tdim_factor): # testing the "simple" windowed algorithm on unit velocity trajectory # VACF results should fit the polynomial defined in # characteristic_poly() @@ -353,9 +347,7 @@ def test_simple_start_stop_step_all_dims( # test start stop step is working correctly v_simple = VACF(step_vtraj.atoms, dim_type=tdim, fft=False) v_simple.run(start=tstart, stop=tstop, step=tstep) - poly = characteristic_poly( - tstop, tdim_factor, first=tstart, step=tstep - ) + poly = characteristic_poly(tstop, tdim_factor, first=tstart, step=tstep) # polynomial must take offset start into account assert_almost_equal(v_simple.results.timeseries, poly, decimal=4) @@ -370,9 +362,7 @@ def test_self_diffusivity_step_vtraj_all_dims( v_simple.run() sd_actual = v_simple.self_diffusivity_gk() sd_expected = ( - integrate.simpson( - y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP) - ) + integrate.simpson(y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP)) / tdim_factor ) # 24307638750.0 (act) agrees with 24307638888.888885 (exp) to 8 sig figs @@ -393,9 +383,7 @@ def test_self_diffusivity_start_stop_step_all_dims( # check that start, stop, step is working correctly v_simple = VACF(step_vtraj.atoms, dim_type=tdim, fft=False) v_simple.run() - sd_actual = v_simple.self_diffusivity_gk( - start=tstart, stop=tstop, step=tstep - ) + sd_actual = v_simple.self_diffusivity_gk(start=tstart, stop=tstop, step=tstep) sd_expected = ( integrate.simpson( y=characteristic_poly(NSTEP, tdim_factor)[tstart:tstop:tstep], @@ -415,9 +403,7 @@ def test_self_diffusivity_odd_step_vtraj_all_dims( v_simple.run() sd_actual = v_simple.self_diffusivity_gk_odd() sd_expected = ( - integrate.trapezoid( - characteristic_poly(NSTEP, tdim_factor), range(NSTEP) - ) + integrate.trapezoid(characteristic_poly(NSTEP, tdim_factor), range(NSTEP)) / tdim_factor ) # 24307638750.0 (exp) agrees with 24307638888.888885 (act) to 8 sig figs @@ -451,9 +437,7 @@ def test_self_diffusivity_odd_start_stop_step_all_dims( # 7705160166.66 (exp) agrees with 7705162888.88 (act) to 6 sig figs assert_approx_equal(sd_actual, sd_expected, significant=6) - def test_fft_step_vtraj_all_dims( - self, step_vtraj, NSTEP, tdim, tdim_factor - ): + def test_fft_step_vtraj_all_dims(self, step_vtraj, NSTEP, tdim, tdim_factor): # testing the fft algorithm on unit velocity trajectory # defined in step_vtraj() # VACF results should fit the characteristic polynomial defined in @@ -476,9 +460,7 @@ def test_fft_start_stop_step_all_dims( # test start stop step is working correctly v_fft = VACF(step_vtraj.atoms, dim_type=tdim, fft=True) v_fft.run(start=tstart, stop=tstop, step=tstep) - poly = characteristic_poly( - tstop, tdim_factor, first=tstart, step=tstep - ) + poly = characteristic_poly(tstop, tdim_factor, first=tstart, step=tstep) # polynomial must take offset start into account assert_almost_equal(v_fft.results.timeseries, poly, decimal=3) @@ -493,9 +475,7 @@ def test_fft_self_diffusivity_step_vtraj_all_dims( v_fft.run() sd_actual = v_fft.self_diffusivity_gk() sd_expected = ( - integrate.simpson( - y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP) - ) + integrate.simpson(y=characteristic_poly(NSTEP, tdim_factor), x=range(NSTEP)) / tdim_factor ) # 24307638750.0 (act) agrees with 24307638888.888885 (exp) to 8 sig figs @@ -516,9 +496,7 @@ def test_fft_self_diffusivity_start_stop_step_all_dims( # check that start, stop, step is working correctly v_fft = VACF(step_vtraj.atoms, dim_type=tdim, fft=True) v_fft.run() - sd_actual = v_fft.self_diffusivity_gk( - start=tstart, stop=tstop, step=tstep - ) + sd_actual = v_fft.self_diffusivity_gk(start=tstart, stop=tstop, step=tstep) sd_expected = ( integrate.simpson( y=characteristic_poly(NSTEP, tdim_factor)[tstart:tstop:tstep], @@ -561,9 +539,7 @@ def test_fft_self_diffusivity_odd_start_stop_step_all_dims( # check that start, stop, step is working correctly v_fft = VACF(step_vtraj.atoms, dim_type=tdim, fft=True) v_fft.run() - sd_actual = v_fft.self_diffusivity_gk_odd( - start=tstart, stop=tstop, step=tstep - ) + sd_actual = v_fft.self_diffusivity_gk_odd(start=tstart, stop=tstop, step=tstep) sd_expected = ( integrate.trapezoid( y=characteristic_poly(NSTEP, tdim_factor)[tstart:tstop:tstep], diff --git a/transport_analysis/tests/test_viscosity.py b/transport_analysis/tests/test_viscosity.py index 6e8ae8f..c85342f 100644 --- a/transport_analysis/tests/test_viscosity.py +++ b/transport_analysis/tests/test_viscosity.py @@ -177,9 +177,7 @@ def test_ec_universe(self, u_ec): ], ) class TestAllDims: - def test_step_vtraj_all_dims( - self, step_vtraj_full, NSTEP, tdim, tdim_factor - ): + def test_step_vtraj_all_dims(self, step_vtraj_full, NSTEP, tdim, tdim_factor): # Helfand viscosity results should agree with the unit velocity traj # defined in characteristic_poly_helfand() vis_h = VH(step_vtraj_full.atoms, dim_type=tdim) diff --git a/transport_analysis/tests/utils.py b/transport_analysis/tests/utils.py index a3b1d89..03b1495 100644 --- a/transport_analysis/tests/utils.py +++ b/transport_analysis/tests/utils.py @@ -50,9 +50,7 @@ def make_Universe( n_residues=n_residues, n_segments=n_segments, atom_resindex=np.repeat(np.arange(n_residues), n_atoms // n_residues), - residue_segindex=np.repeat( - np.arange(n_segments), n_residues // n_segments - ), + residue_segindex=np.repeat(np.arange(n_segments), n_residues // n_segments), # trajectory things trajectory=trajectory, velocities=velocities, diff --git a/transport_analysis/utils.py b/transport_analysis/utils.py new file mode 100644 index 0000000..bfbbf43 --- /dev/null +++ b/transport_analysis/utils.py @@ -0,0 +1,139 @@ +import numpy as np + + +def autocorrFFT(x): + """ + Calculates the autocorrelation function using the fast Fourier transform. + + :param x: array[float], function on which to compute autocorrelation function + :return: acf: array[float], autocorrelation function + """ + N = len(x) + F = np.fft.fft(x, n=2 * N) + PSD = F * F.conjugate() + res = np.fft.ifft(PSD) + res = (res[:N]).real + n = N * np.ones(N) - np.arange(0, N) + acf = res / n + return acf + + +def msd_fft_1d(r): + """Calculates mean square displacement of the array r using the fast Fourier transform.""" + # Algorithm based on https://stackoverflow.com/questions/34222272/computing-mean-square-displacement-using-python-and-fft + N = len(r) + D = np.square(r) + D = np.append(D, 0) + S2 = autocorrFFT(r) + Q = 2 * D.sum() + S1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + S1[m] = Q / (N - m) + return S1 - 2 * S2 + + +def cross_corr(x, y): + N = len(x) + F1 = np.fft.fft( + x, n=2 ** (N * 2 - 1).bit_length() + ) # 2*N because of zero-padding, use next highest power of 2 + F2 = np.fft.fft(y, n=2 ** (N * 2 - 1).bit_length()) + PSD = F1 * F2.conjugate() + res = np.fft.ifft(PSD) + res = (res[:N]).real + n = N * np.ones(N) - np.arange(0, N) # divide res(m) by (N-m) + return res / n + + +def msd_variance_1d(r, msd): + + # compute A1, recursive relation with D = r^4 + N = len(r) + D = r**4 + D = np.append(D, 0) + Q = 2 * D.sum() + A1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + A1[m] = Q / (N - m) + + # compute A2, autocorrelation of r^2 + A2 = cross_corr(r**2, r**2) + + # compute A3 and A4, cross correlations of r and r^3 + A3 = cross_corr(r, r**3) + A4 = cross_corr(r**3, r) + + var_x = A1 + 6 * A2 - 4 * A3 - 4 * A4 - msd**2 + n_minus_m = N * np.ones(N) - np.arange( + 0, N + ) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + + return var_x / n_minus_m + + +def msd_fft_cross_1d(r, k): + """Calculates mean square displacement of the array r using the fast Fourier transform.""" + + N = len(r) + D = np.multiply(r, k) + D = np.append(D, 0) + S2 = cross_corr(r, k) + S3 = cross_corr(k, r) + Q = 2 * D.sum() + S1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + S1[m] = Q / (N - m) + return S1 - S2 - S3 + + +def msd_variance_cross_1d(r1, r2, msd): + + # compute A1, recursive relation with D = r1^2 r2^2 + N = len(r1) + D = np.square(r1) * np.square(r2) + D = np.append(D, 0) + Q = 2 * D.sum() + A1 = np.zeros(N) + for m in range(N): + Q = Q - D[m - 1] - D[N - m] + A1[m] = Q / (N - m) + + # compute A2, cross correlation of r1^2r2 and r2 + A2 = cross_corr(np.square(r1) * r2, r2) + + # compute A3, cross correlation of r1^2 and r2^2 + A3 = cross_corr(np.square(r1), np.square(r2)) + + # compute A4, cross correlation of (r1r2^2) and r1 + A4 = cross_corr(r1 * np.square(r2), r1) + + # compute A5, cross correlation of r1r2 and r1r2 + A5 = cross_corr(r1 * r2, r1 * r2) + + # compute A6, cross correlation of r1 and r1r2^2 + A6 = cross_corr(r1, np.square(r2) * r1) + + # compute A7, cross correlation of r2^2 and r1^2 + A7 = cross_corr(np.square(r2), np.square(r1)) + + # compute A8, cross correlation of r2 and r1^2r2 + A8 = cross_corr(r2, np.square(r1) * r2) + + var_x = A1 - 2 * A2 + A3 - 2 * A4 + 4 * A5 - 2 * A6 + A7 - 2 * A8 - msd**2 + n_minus_m = N * np.ones(N) - np.arange( + 0, N + ) # divide by (N-m)^2 (Var[E[X]] = Var[X]/n) + + return var_x / n_minus_m + + +def average_directions(r, dim): + if dim == 3: + return (r[:, 0] + r[:, 1] + r[:, 2]) / 3 + if dim == 2: + return (r[:, 0] + r[:, 1]) / 2 + if dim == 1: + return r[:, 0] diff --git a/transport_analysis/velocityautocorr.py b/transport_analysis/velocityautocorr.py index 3718272..d7d35da 100644 --- a/transport_analysis/velocityautocorr.py +++ b/transport_analysis/velocityautocorr.py @@ -47,6 +47,7 @@ enough to obtain a VACF suitable for your needs. """ + from typing import TYPE_CHECKING from MDAnalysis.analysis.base import AnalysisBase @@ -125,9 +126,7 @@ def __init__( # https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results if isinstance(atomgroup, UpdatingAtomGroup): - raise TypeError( - "UpdatingAtomGroups are not valid for VACF computation" - ) + raise TypeError("UpdatingAtomGroups are not valid for VACF computation") # args self.dim_type = dim_type.lower() @@ -142,14 +141,10 @@ def __init__( def _prepare(self): """Set up velocity and VACF arrays before the analysis loop begins""" # 2D array of frames x particles - self.results.vacf_by_particle = np.zeros( - (self.n_frames, self.n_particles) - ) + self.results.vacf_by_particle = np.zeros((self.n_frames, self.n_particles)) # 3D array of frames x particles x dimensionality - self._velocities = np.zeros( - (self.n_frames, self.n_particles, self.dim_fac) - ) + self._velocities = np.zeros((self.n_frames, self.n_particles, self.dim_fac)) # self.results.timeseries not set here @staticmethod @@ -184,14 +179,10 @@ def _single_frame(self): # trajectory must have velocity information if not self._ts.has_velocities: - raise NoDataError( - "VACF computation requires velocities in the trajectory" - ) + raise NoDataError("VACF computation requires velocities in the trajectory") # set shape of velocity array - self._velocities[self._frame_index] = self.atomgroup.velocities[ - :, self._dim - ] + self._velocities[self._frame_index] = self.atomgroup.velocities[:, self._dim] # Results will be in units of (angstroms / ps)^2 def _conclude(self): @@ -222,10 +213,7 @@ def _conclude_simple(self): # iterate through all possible lagtimes up to N for lag in range(N): # get product of velocities shifted by "lag" frames - veloc = ( - self._velocities[: N - lag, :, :] - * self._velocities[lag:, :, :] - ) + veloc = self._velocities[: N - lag, :, :] * self._velocities[lag:, :, :] # dot product of x(, y, z) velocities per particle sum_veloc = np.sum(veloc, axis=-1) diff --git a/transport_analysis/viscosity.py b/transport_analysis/viscosity.py index ea5d958..d468086 100644 --- a/transport_analysis/viscosity.py +++ b/transport_analysis/viscosity.py @@ -114,9 +114,7 @@ def _prepare(self): before the analysis loop begins """ # 2D viscosity array of frames x particles - self.results.visc_by_particle = np.zeros( - (self.n_frames, self.n_particles) - ) + self.results.visc_by_particle = np.zeros((self.n_frames, self.n_particles)) self._volumes = np.zeros((self.n_frames)) @@ -125,13 +123,9 @@ def _prepare(self): # 3D arrays of frames x particles x dimensionality # for velocities and positions - self._velocities = np.zeros( - (self.n_frames, self.n_particles, self.dim_fac) - ) + self._velocities = np.zeros((self.n_frames, self.n_particles, self.dim_fac)) - self._positions = np.zeros( - (self.n_frames, self.n_particles, self.dim_fac) - ) + self._positions = np.zeros((self.n_frames, self.n_particles, self.dim_fac)) # self.results.timeseries not set here # update when mda 2.6.0 releases with typo fix @@ -176,9 +170,7 @@ def _single_frame(self): # trajectory must have velocity and position information if not ( - self._ts.has_velocities - and self._ts.has_positions - and self._ts.volume != 0 + self._ts.has_velocities and self._ts.has_positions and self._ts.volume != 0 ): raise NoDataError( "Helfand viscosity computation requires " @@ -189,14 +181,10 @@ def _single_frame(self): self._volumes[self._frame_index] = self._ts.volume # set shape of velocity array - self._velocities[self._frame_index] = self.atomgroup.velocities[ - :, self._dim - ] + self._velocities[self._frame_index] = self.atomgroup.velocities[:, self._dim] # set shape of position array - self._positions[self._frame_index] = self.atomgroup.positions[ - :, self._dim - ] + self._positions[self._frame_index] = self.atomgroup.positions[:, self._dim] def _conclude(self): """ @@ -260,9 +248,7 @@ def plot_viscosity_function(self): self.linear_fit_window[0], self.linear_fit_window[1], ) - plt.axvline( - fit_start, color="red", linestyle="--", label="Fit Start" - ) + plt.axvline(fit_start, color="red", linestyle="--", label="Fit Start") plt.axvline(fit_end, color="blue", linestyle="--", label="Fit End") plt.xlabel("Lag-time")