Skip to content

Commit

Permalink
Simulation with modulated output (#390)
Browse files Browse the repository at this point in the history
* wip: simulation with sample from pulser.sampler.sample()

* wip: convert defaultdict to dicts in samples()

* wip: determine the basis with empty or zeros samples dicts

* wip: update notebook to demonstrate previous fixes

* wip: add flag for final zero samples

* wip: update Simulation sample extraction

* wip: update notebook to showcase changes

* Adds sampling with SLM mask and simulation with modulation

* Noisy simulations from sampled sequences

* Type hinting

* Simulation drawing with modulation + bug fixes

* Change application of amplitude noise fluctuations

* Small fixes to sampling and simulation logic

* Updating the unit tests

* Updating the tutorials

* Incorporating review comments and suggestions

Co-authored-by: Louis Vignoli <[email protected]>
  • Loading branch information
HGSilveri and lvignoli authored Aug 5, 2022
1 parent 43dcb4d commit 094ab4b
Show file tree
Hide file tree
Showing 12 changed files with 518 additions and 368 deletions.
32 changes: 25 additions & 7 deletions pulser-core/pulser/sampler/sampler.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
"""Defines the main function for sequence sampling."""
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

from pulser.sampler.samples import SequenceSamples
from pulser.sampler.samples import SequenceSamples, _SlmMask

if TYPE_CHECKING: # pragma: no cover
from pulser import Sequence


def sample(seq: Sequence, modulation: bool = False) -> SequenceSamples:
def sample(
seq: Sequence,
modulation: bool = False,
extended_duration: Optional[int] = None,
) -> SequenceSamples:
"""Construct samples of a Sequence.
Args:
seq: The sequence to sample.
modulation: Whether to modulate the samples.
extended_duration: If defined, extends the samples duration to the
desired value.
"""
samples_list = [
ch_schedule.get_samples(modulated=modulation)
for ch_schedule in seq._schedule.values()
]
if extended_duration:
samples_list = [
cs.extend_duration(extended_duration) for cs in samples_list
]
optionals = {}
if seq._slm_mask_targets and seq._slm_mask_time:
optionals["_slm_mask"] = _SlmMask(
seq._slm_mask_targets,
seq._slm_mask_time[1],
)
return SequenceSamples(
list(seq.declared_channels.keys()),
[
ch_schedule.get_samples(modulated=modulation)
for ch_schedule in seq._schedule.values()
],
samples_list,
seq.declared_channels,
**optionals,
)
80 changes: 67 additions & 13 deletions pulser-core/pulser/sampler/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional

import numpy as np

Expand Down Expand Up @@ -69,6 +70,14 @@ class _TargetSlot:
targets: set[QubitId]


@dataclass
class _SlmMask:
"""Auxiliary class to store the SLM mask configuration."""

targets: set[QubitId] = field(default_factory=set)
end: int = 0


@dataclass
class ChannelSamples:
"""Gathers samples of a channel."""
Expand Down Expand Up @@ -100,8 +109,8 @@ def extend_duration(self, new_duration: int) -> ChannelSamples:
Returns:
The extend channel samples.
"""
extension = new_duration - len(self.amp)
if new_duration < self.duration:
extension = new_duration - self.duration
if extension < 0:
raise ValueError("Can't extend samples to a lower duration.")

new_amp = np.pad(self.amp, (0, extension))
Expand All @@ -113,7 +122,17 @@ def extend_duration(self, new_duration: int) -> ChannelSamples:
)
return ChannelSamples(new_amp, new_detuning, new_phase, self.slots)

def modulate(self, channel_obj: Channel) -> ChannelSamples:
def is_empty(self) -> bool:
"""Whether the channel is effectively empty.
We consider the channel to be empty if all amplitude and detuning
samples are zero.
"""
return np.count_nonzero(self.amp) + np.count_nonzero(self.det) == 0

def modulate(
self, channel_obj: Channel, max_duration: Optional[int] = None
) -> ChannelSamples:
"""Modulates the samples for a given channel.
It assumes that the phase starts at its initial value and is kept at
Expand All @@ -122,13 +141,17 @@ def modulate(self, channel_obj: Channel) -> ChannelSamples:
Args:
channel_obj: The channel object for which to modulate the samples.
max_duration: The maximum duration of the modulation samples. If
defined, truncates them to have a duration less than or equal
to the given value.
Returns:
The modulated channel samples.
"""
new_amp = channel_obj.modulate(self.amp)
new_detuning = channel_obj.modulate(self.det)
new_phase = channel_obj.modulate(self.phase, keep_ends=True)
times = slice(0, max_duration)
new_amp = channel_obj.modulate(self.amp)[times]
new_detuning = channel_obj.modulate(self.det)[times]
new_phase = channel_obj.modulate(self.phase, keep_ends=True)[times]
return ChannelSamples(new_amp, new_detuning, new_phase, self.slots)


Expand All @@ -139,6 +162,7 @@ class SequenceSamples:
channels: list[str]
samples_list: list[ChannelSamples]
_ch_objs: dict[str, Channel]
_slm_mask: _SlmMask = field(default_factory=_SlmMask)

@property
def channel_samples(self) -> dict[str, ChannelSamples]:
Expand All @@ -150,10 +174,24 @@ def max_duration(self) -> int:
"""The maximum duration among the channel samples."""
return max(samples.duration for samples in self.samples_list)

def to_nested_dict(self) -> dict:
def used_bases(self) -> set[str]:
"""The bases with non-zero pulses."""
return {
ch_obj.basis
for ch_obj, ch_samples in zip(
self._ch_objs.values(), self.samples_list
)
if not ch_samples.is_empty()
}

def to_nested_dict(self, all_local: bool = False) -> dict:
"""Format in the nested dictionary form.
This is the format expected by `pulser_simulation.Simulation()`.
Args:
all_local: Forces all samples to be distributed by their
individual targets, even when applied by a global channel.
"""
bases = {ch_obj.basis for ch_obj in self._ch_objs.values()}
in_xy = False
Expand All @@ -162,17 +200,33 @@ def to_nested_dict(self) -> dict:
in_xy = True
d = _prepare_dict(self.max_duration, in_xy=in_xy)
for chname, samples in zip(self.channels, self.samples_list):
cs = samples.extend_duration(self.max_duration)
cs = (
samples.extend_duration(self.max_duration)
if samples.duration != self.max_duration
else samples
)
addr = self._ch_objs[chname].addressing
basis = self._ch_objs[chname].basis
if addr == _GLOBAL:
d[_GLOBAL][basis][_AMP] += cs.amp
d[_GLOBAL][basis][_DET] += cs.det
d[_GLOBAL][basis][_PHASE] += cs.phase
if addr == _GLOBAL and not all_local:
start_t = self._slm_mask.end
d[_GLOBAL][basis][_AMP][start_t:] += cs.amp[start_t:]
d[_GLOBAL][basis][_DET][start_t:] += cs.det[start_t:]
d[_GLOBAL][basis][_PHASE][start_t:] += cs.phase[start_t:]
if start_t == 0:
# Prevents lines below from running unnecessarily
continue
unmasked_targets = cs.slots[0].targets - self._slm_mask.targets
for t in unmasked_targets:
d[_LOCAL][basis][t][_AMP][:start_t] += cs.amp[:start_t]
d[_LOCAL][basis][t][_DET][:start_t] += cs.det[:start_t]
d[_LOCAL][basis][t][_PHASE][:start_t] += cs.phase[:start_t]
else:
for s in cs.slots:
for t in s.targets:
times = slice(s.ti, s.tf)
ti = s.ti
if t in self._slm_mask.targets:
ti = max(ti, self._slm_mask.end)
times = slice(ti, s.tf)
d[_LOCAL][basis][t][_AMP][times] += cs.amp[times]
d[_LOCAL][basis][t][_DET][times] += cs.det[times]
d[_LOCAL][basis][t][_PHASE][times] += cs.phase[times]
Expand Down
20 changes: 17 additions & 3 deletions pulser-core/pulser/sequence/_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,33 @@ def get_samples(self, modulated: bool = False) -> ChannelSamples:
amp[s.ti : s.tf] += pulse.amplitude.samples
det[s.ti : s.tf] += pulse.detuning.samples
ph_jump_t = self.channel_obj.phase_jump_time
t_start = max(0, (s.ti - ph_jump_t))
t_start = s.ti - ph_jump_t if ind > 0 else 0
t_end = (
channel_slots[ind + 1].ti - ph_jump_t
if ind < len(channel_slots) - 1
else dt
)
phase[t_start:t_end] += pulse.phase
slots.append(_TargetSlot(s.ti, s.tf, s.targets))
tf = s.tf
if modulated:
# Account for the extended duration of the pulses
# after modulation, which is at most fall_time
fall_time = pulse.fall_time(self.channel_obj)
tf += (
min(fall_time, channel_slots[ind + 1].ti - s.tf)
if ind < len(channel_slots) - 1
else fall_time
)

slots.append(_TargetSlot(s.ti, tf, s.targets))

ch_samples = ChannelSamples(amp, det, phase, slots)

if modulated:
ch_samples = ch_samples.modulate(self.channel_obj)
ch_samples = ch_samples.modulate(
self.channel_obj,
max_duration=self.get_duration(include_fall_time=True),
)

return ch_samples

Expand Down
43 changes: 26 additions & 17 deletions pulser-core/pulser/sequence/_seq_drawer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ def get_output_curves(self, ch_obj: Channel) -> list[np.ndarray]:
mod_samples = self.samples.modulate(ch_obj)
return self._give_curves_from_samples(mod_samples)

def get_interpolated_curves(
self, sampling_rate: float
def interpolate_curves(
self, curves: list[np.ndarray], sampling_rate: float
) -> list[np.ndarray]:
"""The curves with a fractional sampling rate."""
indices = np.linspace(
0,
self.samples.duration - 1,
int(sampling_rate * (self.samples.duration + 1)),
self.samples.duration,
num=int(sampling_rate * self.samples.duration),
endpoint=False,
dtype=int,
)
sampled_curves = [curve[indices] for curve in self.get_input_curves()]
sampled_curves = [curve[indices] for curve in curves]
t = np.arange(self.samples.duration)
return [CubicSpline(indices, curve)(t) for curve in sampled_curves]
return [CubicSpline(indices, sc)(t) for sc in sampled_curves]

def curves_on_indices(self) -> list[int]:
"""The indices of the curves to draw."""
Expand Down Expand Up @@ -312,15 +313,15 @@ def phase_str(phi: float) -> str:
ch_data = data[ch]
basis = ch_obj.basis
ys = ch_data.get_input_curves()
if sampling_rate:
yseff = ch_data.get_interpolated_curves(sampling_rate)

draw_output = draw_modulation and (
ch_obj.mod_bandwidth or not draw_input
)
if draw_output:
ys_mod = ch_data.get_output_curves(ch_obj)

if sampling_rate:
curves = ys_mod if draw_output else ys
yseff = ch_data.interpolate_curves(curves, sampling_rate)
ref_ys = yseff if sampling_rate else ys
max_amp = np.max(ref_ys[0])
max_amp = 1 if max_amp == 0 else max_amp
Expand Down Expand Up @@ -357,14 +358,22 @@ def phase_str(phi: float) -> str:
elif draw_input:
ax.fill_between(t, 0, ys[i], color=COLORS[i], alpha=0.3)
if draw_output:
ax.fill_between(
t,
0,
ys_mod[i][:total_duration],
color=COLORS[i],
alpha=0.3,
hatch="////",
)
if not sampling_rate:
ax.fill_between(
t,
0,
ys_mod[i][:total_duration],
color=COLORS[i],
alpha=0.3,
hatch="////",
)
else:
ax.plot(
t,
ys_mod[i][:total_duration],
color=COLORS[i],
linestyle="dotted",
)
special_kwargs = dict(labelpad=10) if i == 0 else {}
ax.set_ylabel(LABELS[i], fontsize=14, **special_kwargs)

Expand Down
18 changes: 14 additions & 4 deletions pulser-simulation/pulser_simulation/simconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from dataclasses import dataclass, field
from sys import version_info
from typing import Any, Union
from typing import Any, Optional, Union

import numpy as np
import qutip
Expand Down Expand Up @@ -69,8 +69,10 @@ class SimConfig:
Useful for cutting down on computing time, but unrealistic.
temperature: Temperature, set in µK, of the Rydberg array.
Also sets the standard deviation of the speed of the atoms.
laser_waist: Waist of the gaussian laser, set in µm,
in global pulses.
laser_waist: Waist of the gaussian laser, set in µm, in global
pulses.
amp_sigma: Dictates the fluctuations in amplitude as a standard
deviation of a normal distribution centered in 1.
solver_options: Options for the qutip solver.
"""

Expand All @@ -79,11 +81,12 @@ class SimConfig:
samples_per_run: int = 5
temperature: float = 50.0
laser_waist: float = 175.0
amp_sigma: float = 5e-2
eta: float = 0.005
epsilon: float = 0.01
epsilon_prime: float = 0.05
dephasing_prob: float = 0.05
solver_options: qutip.Options = None
solver_options: Optional[qutip.Options] = None
spam_dict: dict[str, float] = field(
init=False, default_factory=dict, repr=False
)
Expand All @@ -92,6 +95,12 @@ class SimConfig:
)

def __post_init__(self) -> None:
if not 0.0 <= self.amp_sigma < 1.0:
raise ValueError(
"The standard deviation in amplitude (amp_sigma="
f"{self.amp_sigma}) must be greater than or equal"
" to 0. and smaller than 1."
)
self._process_temperature()
self._change_attribute(
"spam_dict",
Expand All @@ -118,6 +127,7 @@ def __str__(self, solver_options: bool = False) -> str:
lines.append(f"SPAM dictionary: {self.spam_dict}")
if "doppler" in self.noise:
lines.append(f"Temperature: {self.temperature*1.e6}µK")
lines.append(f"Amplitude standard dev.: {self.amp_sigma}")
if "amplitude" in self.noise:
lines.append(f"Laser waist: {self.laser_waist}μm")
if "dephasing" in self.noise:
Expand Down
Loading

0 comments on commit 094ab4b

Please sign in to comment.