Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

holistic #58

Merged
merged 17 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions attune/curve/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ def get_dependent_positions(self, setpoint, units="same", full=True):
for k, v in self.dependents.items():
out[k] = v(setpoint, units)
if full and self.subcurve:
out.update(self.subcurve(self.source_setpoints(setpoint, units), self.source_setpoints.units, full))
out.update(
self.subcurve(
self.source_setpoints(setpoint, units), self.source_setpoints.units, full
)
)
return out

def get_source_setpoint(self, setpoint, units="same"):
Expand Down Expand Up @@ -332,15 +336,18 @@ def map_setpoints(self, setpoints, units="same"):
if self.subcurve:
new_source_setpoints = self.source_setpoints(new_setpoints)
self.source_setpoints = Dependent(
new_source_setpoints, self.source_setpoints.name, self.source_setpoints.units, index=self.source_setpoints.index
new_source_setpoints,
self.source_setpoints.name,
self.source_setpoints.units,
index=self.source_setpoints.index,
)
# finish
self.setpoints = Setpoints(new_setpoints, self.setpoints.name, self.setpoints.units)
self.dependents = new_dependents
for obj in self.dependents.values():
setattr(self, obj.name, obj)
self.interpolate()

def sort(self):
order = self.setpoints[:].argsort()
self.setpoints[:] = self.setpoints[order]
Expand All @@ -352,7 +359,6 @@ def sort(self):
d[:] = d[order]
self.interpolate()


def offset_by(self, dependent, amount):
"""Offset a dependent by some ammount.

Expand Down
11 changes: 7 additions & 4 deletions attune/curve/_topas.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def _read_file(cls, filepath):
comment=comment,
offsets=offsets,
)
for key,value in curves.items():
setattr(value, "siblings", [v for k,v in curves.items() if key != k])
for key, value in curves.items():
setattr(value, "siblings", [v for k, v in curves.items() if key != k])
setattr(value, "supercurves", [])
f.close()
return curves
Expand Down Expand Up @@ -201,13 +201,13 @@ def save(self, save_directory, full=True):
for c in all_sibs:
_write_curve(new_crv, c)
to_insert.pop(c.interaction, None)

return out_paths

def _get_family_dict(self, start=None):
if start is None:
start = {}
d = {k:v for k,v in start.items()}
d = {k: v for k, v in start.items()}
d.update({self.interaction: self})
if self.siblings:
for s in self.siblings:
Expand All @@ -222,6 +222,7 @@ def _get_family_dict(self, start=None):
d.update(s._get_family_dict(d))
return d


def _insert(curve):
motor_indexes = curve.motor_indexes
arr = np.empty((len(motor_indexes) + 3, len(curve.setpoints)))
Expand All @@ -241,6 +242,7 @@ def _convert(curve):
curve.polarization = "V" if curve.polarization == "H" else "H"
return curve


def _write_headers(f, curve):
f.write("600\n")
if curve.kind in "OPA/NOPA":
Expand All @@ -256,6 +258,7 @@ def _write_headers(f, curve):
f.write("\t".join(str(i) for i in curve.motor_indexes))
f.write("\n")


def _write_curve(f, curve):
curve = curve.copy()
curve.convert("nm")
Expand Down
1 change: 1 addition & 0 deletions attune/workup/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Methods for processing OPA 800 tuning data."""
from ._holistic import *
from ._intensity import *
from ._setpoint import *
from ._tune_test import *
13 changes: 13 additions & 0 deletions attune/workup/_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pathlib

import WrightTools as wt


def save(curve, fig, image_name, save_directory=None):
if save_directory is None:
save_directory = "."
save_directory = pathlib.Path(save_directory)
curve.save(save_directory=save_directory, full=True)
# Should we timestamp the image?
p = (save_directory / image_name).with_suffix(".png")
wt.artists.savefig(p, fig=fig)
196 changes: 196 additions & 0 deletions attune/workup/_holistic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""Function for processing multi-dependent tuning data."""

import itertools

import numpy as np
import scipy

import WrightTools as wt
from ._plot import plot_holistic
from ._common import save


__all__ = ["holistic"]


def _holistic(data, amplitudes, centers, curve):
points = np.array([np.broadcast_to(a[:], amplitudes.shape).flatten() for a in data.axes]).T
ndim = len(data.axes)
delaunay = scipy.spatial.Delaunay(points)

amp_interp = scipy.interpolate.LinearNDInterpolator(delaunay, amplitudes.points.flatten())
cen_interp = scipy.interpolate.LinearNDInterpolator(delaunay, centers.points.flatten())

# def
out_points = []
for p in curve.setpoints[:]:
iso_points = []
for s, pts, vals in _find_simplices_containing(delaunay, cen_interp, p):
iso_points.extend(_edge_intersections(pts, vals, p))
iso_points = np.array(iso_points)
if len(iso_points) > 3:
out_points.append(
tuple(_fit_gauss(iso_points.T[i], amp_interp(iso_points)) for i in range(ndim))
)
else:
out_points.append(tuple(np.nan for i in range(ndim)))

return np.array(out_points)


def holistic(
data,
channels,
dependents,
curve,
*,
spectral_axis=-1,
level=False,
gtol=0.01,
autosave=True,
save_directory=None,
**spline_kwargs,
):
"""Workup multi-dependent tuning data.

Note:
At this time, this function expects 2-dimensional motor space.
The algorithm should generalize to N-dimensional motor space,
however this is untested and plotting likely will fail.

Parameters
----------
data: WrightTools.Data
The data object to process.
channels: WrightTools.data.Channel or int or str or 2-tuple
If singular: the spectral axis, from which the 0th and 1st moments will be taken to
obtain amplitudes and centers. In this case, `spectral_axis` determines which axis is
used to obtain the moments.
If a tuple: (amplitudes, centers), then these channels will be used directly.
dependents: tuple of str
Names of the dependents to modify in the curve, in the same order as the axes of `data`.
curve: attune.Curve
Curve object to modify. Setpoints are determined from the curve.

Keyword Parameters
------------------
spectral_axis: WrightTools.data.Axis or int or str (default -1)
The axis along which to take moments.
Only applies if a single channel is given.
level: bool (default False)
Toggle leveling data. If two channels are given, only the amplitudes are leveled.
If a single channel is given, leveling occurs before taking the moments.
gtol: float (default 0.01)
Global tolerance for rejecting noise level relative to the global maximum.
autosave: bool (default True)
Toggles saving of curve files and images.
save_directory: Path-like (Defaults to current working directory)
Specify where to save files.
**spline_kwargs:
Extra arguments to pass to spline creation (e.g. s=0, k=1 for linear interpolation)
"""
data = data.copy()

if isinstance(channels, (str, wt.data.Channel)):
if level:
data.level(channels, 0, -3)
if isinstance(spectral_axis, int):
spectral_axis = data.axis_names[spectral_axis]
elif isinstance(spectral_axis, wt.data.Axis):
spectral_axis = spectral_axis.expression
# take channel moments
data.moment(
axis=spectral_axis,
channel=channels,
resultant=wt.kit.joint_shape(*[a for a in data.axes if a.expression != spectral_axis]),
moment=0,
)
data.moment(
axis=spectral_axis,
channel=channels,
resultant=wt.kit.joint_shape(*[a for a in data.axes if a.expression != spectral_axis]),
moment=1,
)
amplitudes = data.channels[-2]
centers = data.channels[-1]
data.transform(*[a for a in data.axis_expressions if a != spectral_axis])
else:
amplitudes, centers = channels
if isinstance(amplitudes, (int, str)):
amplitudes = data.channels[wt.kit.get_index(data.channel_names, amplitudes)]
if isinstance(centers, (int, str)):
centers = data.channels[wt.kit.get_index(data.channel_names, centers)]
if level:
data.level(amplitudes.natural_name, 0, -3)

if gtol is not None:
cutoff = amplitudes.max() * gtol
amplitudes.clip(min=cutoff)
centers[np.isnan(amplitudes)] = np.nan

out_points = _holistic(data, amplitudes, centers, curve)
splines = [wt.kit.Spline(curve.setpoints, vals, **spline_kwargs) for vals in out_points.T]

new_curve = _gen_curve(curve, dependents, splines)

fig, _ = plot_holistic(
data,
amplitudes.natural_name,
centers.natural_name,
dependents,
new_curve,
curve,
out_points,
)

if autosave:
save(new_curve, fig, "holistic", save_directory)
return new_curve


def _gen_curve(curve, dependents, splines):
new_curve = curve.copy()
for dep, spline in zip(dependents, splines):
new_curve[dep][:] = spline(new_curve.setpoints)
new_curve.interpolate()
return new_curve


def _find_simplices_containing(delaunay, interpolator, point):
for s in delaunay.simplices:
extrema = interpolator([p for p in delaunay.points[s]])
if min(extrema) < point <= max(extrema):
yield s, delaunay.points[s], extrema


def _edge_intersections(points, evaluated, target):
sortord = np.argsort(evaluated)
evaluated = evaluated[sortord]
points = points[sortord]
for (p1, p2), (v1, v2) in zip(
itertools.combinations(points, 2), itertools.combinations(evaluated, 2)
):
if v1 < target <= v2:
yield tuple(
p1[i] + (p2[i] - p1[i]) * ((target - v1) / (v2 - v1)) for i in range(len(p1))
)


def _fit_gauss(x, y):
x, y = wt.kit.remove_nans_1D(x, y)

def resid(inps):
nonlocal x, y
return y - _gauss(*inps)(x)

bounds = [(-np.inf, np.inf) for i in range(3)]
x_range = np.max(x) - np.min(x)
bounds[0] = (np.min(x) - x_range / 10, np.max(x) + x_range / 10)
bounds = np.array(bounds).T
x0 = [np.median(x), x_range / 10, np.max(y)]
opt = scipy.optimize.least_squares(resid, x0, bounds=bounds)
return opt.x[0]


def _gauss(center, sigma, amplitude):
return lambda x: amplitude * np.exp(-1 / 2 * (x - center) ** 2 / sigma ** 2)
22 changes: 8 additions & 14 deletions attune/workup/_intensity.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""Methods for processing OPA 800 tuning data."""

import pathlib

import numpy as np
import WrightTools as wt

from .. import Curve, Dependent, Setpoints
from ._plot import plot_intensity
from ._common import save


# --- processing methods --------------------------------------------------------------------------
Expand Down Expand Up @@ -34,8 +33,8 @@ def intensity(
curve=None,
*,
level=False,
gtol = 0.01,
ltol = 0.1,
gtol=0.01,
ltol=0.1,
autosave=True,
save_directory=None,
**spline_kwargs,
Expand Down Expand Up @@ -77,13 +76,15 @@ def intensity(
old_curve.convert("wn")
setpoints = old_curve.setpoints
else:
old_curve = None
old_curve = None
setpoints = Setpoints(data.axes[0].points, data.axes[0].expression, data.axes[0].units)
# TODO: units

if isinstance(channel, (int, str)):
channel = data.channels[wt.kit.get_index(data.channel_names, channel)]
orig_channel = data.create_channel(f"{channel.natural_name}_orig", channel, units=channel.units)
orig_channel = data.create_channel(
f"{channel.natural_name}_orig", channel, units=channel.units
)

# TODO: check if level does what we want
if level:
Expand Down Expand Up @@ -121,12 +122,5 @@ def intensity(
fig, _ = plot_intensity(data, channel.natural_name, dependent, curve, old_curve, raw_offsets)

if autosave:
if save_directory is None:
# TODO: Formal decision on whether this should be cwd or data/curve location
save_directory = "."
save_directory = pathlib.Path(save_directory)
curve.save(save_directory=save_directory, full=True)
# Should we timestamp the image?
p = save_directory / "intensity.png"
wt.artists.savefig(p, fig=fig)
save(curve, fig, "intensity", save_directory)
return curve
Loading