Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simulation with modulated output #390

Merged
merged 18 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions pulser-core/pulser/sampler/sampler.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
"""Defines the main function for sequence sampling."""
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

from pulser.sampler.samples import SequenceSamples
from pulser.sampler.samples import SequenceSamples, _SlmMask

if TYPE_CHECKING: # pragma: no cover
from pulser import Sequence


def sample(seq: Sequence, modulation: bool = False) -> SequenceSamples:
def sample(
seq: Sequence,
modulation: bool = False,
extended_duration: Optional[int] = None,
) -> SequenceSamples:
"""Construct samples of a Sequence.

Args:
seq: The sequence to sample.
modulation: Whether to modulate the samples.
extended_duration: If defined, extends the samples duration to the
desired value.
"""
samples_list = [
ch_schedule.get_samples(modulated=modulation)
for ch_schedule in seq._schedule.values()
]
if extended_duration:
samples_list = [
cs.extend_duration(extended_duration) for cs in samples_list
]
optionals = {}
if seq._slm_mask_targets and seq._slm_mask_time:
optionals["_slm_mask"] = _SlmMask(
seq._slm_mask_targets,
seq._slm_mask_time[1],
)
return SequenceSamples(
list(seq.declared_channels.keys()),
[
ch_schedule.get_samples(modulated=modulation)
for ch_schedule in seq._schedule.values()
],
samples_list,
seq.declared_channels,
**optionals,
)
76 changes: 64 additions & 12 deletions pulser-core/pulser/sampler/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional

import numpy as np

Expand Down Expand Up @@ -69,6 +70,14 @@ class _TargetSlot:
targets: set[QubitId]


@dataclass
class _SlmMask:
"""Auxiliary class to store the SLM mask configuration."""

targets: set[QubitId] = field(default_factory=set)
end: int = 0


@dataclass
class ChannelSamples:
"""Gathers samples of a channel."""
Expand Down Expand Up @@ -100,9 +109,11 @@ def extend_duration(self, new_duration: int) -> ChannelSamples:
Returns:
The extend channel samples.
"""
extension = new_duration - len(self.amp)
if new_duration < self.duration:
extension = new_duration - self.duration
if extension < 0:
raise ValueError("Can't extend samples to a lower duration.")
if not extension:
return self
lvignoli marked this conversation as resolved.
Show resolved Hide resolved
lvignoli marked this conversation as resolved.
Show resolved Hide resolved

new_amp = np.pad(self.amp, (0, extension))
new_detuning = np.pad(self.det, (0, extension))
Expand All @@ -113,7 +124,17 @@ def extend_duration(self, new_duration: int) -> ChannelSamples:
)
return ChannelSamples(new_amp, new_detuning, new_phase, self.slots)

def modulate(self, channel_obj: Channel) -> ChannelSamples:
def is_empty(self) -> bool:
"""Whether the channel is effectively empty.

We consider the channel to be empty if all amplitude and detuning
samples are zero.
"""
return np.count_nonzero(self.amp) + np.count_nonzero(self.det) == 0

def modulate(
self, channel_obj: Channel, max_duration: Optional[int] = None
) -> ChannelSamples:
"""Modulates the samples for a given channel.

It assumes that the phase starts at its initial value and is kept at
Expand All @@ -122,13 +143,17 @@ def modulate(self, channel_obj: Channel) -> ChannelSamples:

Args:
channel_obj: The channel object for which to modulate the samples.
max_duration: The maximum duration of the modulation samples. If
defined, truncates them to have a duration less than or equal
to the given value.

Returns:
The modulated channel samples.
"""
new_amp = channel_obj.modulate(self.amp)
new_detuning = channel_obj.modulate(self.det)
new_phase = channel_obj.modulate(self.phase, keep_ends=True)
times = slice(0, max_duration)
new_amp = channel_obj.modulate(self.amp)[times]
new_detuning = channel_obj.modulate(self.det)[times]
new_phase = channel_obj.modulate(self.phase, keep_ends=True)[times]
return ChannelSamples(new_amp, new_detuning, new_phase, self.slots)


Expand All @@ -139,6 +164,7 @@ class SequenceSamples:
channels: list[str]
samples_list: list[ChannelSamples]
_ch_objs: dict[str, Channel]
_slm_mask: _SlmMask = field(default_factory=_SlmMask)

@property
def channel_samples(self) -> dict[str, ChannelSamples]:
Expand All @@ -150,10 +176,24 @@ def max_duration(self) -> int:
"""The maximum duration among the channel samples."""
return max(samples.duration for samples in self.samples_list)

def to_nested_dict(self) -> dict:
def used_bases(self) -> set[str]:
"""The bases with non-zero pulses."""
return {
ch_obj.basis
for ch_obj, ch_samples in zip(
self._ch_objs.values(), self.samples_list
)
if not ch_samples.is_empty()
}

def to_nested_dict(self, all_local: bool = False) -> dict:
"""Format in the nested dictionary form.

This is the format expected by `pulser_simulation.Simulation()`.

Args:
all_local: Forces all samples to be distributed by their
individual targets, even when applied by a global channel.
"""
bases = {ch_obj.basis for ch_obj in self._ch_objs.values()}
in_xy = False
Expand All @@ -165,14 +205,26 @@ def to_nested_dict(self) -> dict:
cs = samples.extend_duration(self.max_duration)
addr = self._ch_objs[chname].addressing
basis = self._ch_objs[chname].basis
if addr == _GLOBAL:
d[_GLOBAL][basis][_AMP] += cs.amp
d[_GLOBAL][basis][_DET] += cs.det
d[_GLOBAL][basis][_PHASE] += cs.phase
if addr == _GLOBAL and not all_local:
start_t = self._slm_mask.end
d[_GLOBAL][basis][_AMP][start_t:] += cs.amp[start_t:]
d[_GLOBAL][basis][_DET][start_t:] += cs.det[start_t:]
lvignoli marked this conversation as resolved.
Show resolved Hide resolved
d[_GLOBAL][basis][_PHASE][start_t:] += cs.phase[start_t:]
if start_t == 0:
# Prevents lines below from running unnecessarily
continue
unmasked_targets = cs.slots[0].targets - self._slm_mask.targets
for t in unmasked_targets:
d[_LOCAL][basis][t][_AMP][:start_t] += cs.amp[:start_t]
d[_LOCAL][basis][t][_DET][:start_t] += cs.det[:start_t]
d[_LOCAL][basis][t][_PHASE][:start_t] += cs.phase[:start_t]
else:
for s in cs.slots:
for t in s.targets:
times = slice(s.ti, s.tf)
ti = s.ti
if t in self._slm_mask.targets:
ti = max(ti, self._slm_mask.end)
times = slice(ti, s.tf)
d[_LOCAL][basis][t][_AMP][times] += cs.amp[times]
d[_LOCAL][basis][t][_DET][times] += cs.det[times]
d[_LOCAL][basis][t][_PHASE][times] += cs.phase[times]
Expand Down
19 changes: 16 additions & 3 deletions pulser-core/pulser/sequence/_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,32 @@ def get_samples(self, modulated: bool = False) -> ChannelSamples:
amp[s.ti : s.tf] += pulse.amplitude.samples
det[s.ti : s.tf] += pulse.detuning.samples
ph_jump_t = self.channel_obj.phase_jump_time
t_start = max(0, (s.ti - ph_jump_t))
t_start = s.ti - ph_jump_t if ind > 0 else 0
t_end = (
channel_slots[ind + 1].ti - ph_jump_t
if ind < len(channel_slots) - 1
else dt
)
phase[t_start:t_end] += pulse.phase
slots.append(_TargetSlot(s.ti, s.tf, s.targets))
tf = s.tf
if modulated:
# Account for as much fall time as possible
fall_time = pulse.fall_time(self.channel_obj)
tf += (
min(fall_time, channel_slots[ind + 1].ti - s.tf)
if ind < len(channel_slots) - 1
else fall_time
)
lvignoli marked this conversation as resolved.
Show resolved Hide resolved

slots.append(_TargetSlot(s.ti, tf, s.targets))

ch_samples = ChannelSamples(amp, det, phase, slots)

if modulated:
ch_samples = ch_samples.modulate(self.channel_obj)
ch_samples = ch_samples.modulate(
self.channel_obj,
max_duration=self.get_duration(include_fall_time=True),
)

return ch_samples

Expand Down
43 changes: 26 additions & 17 deletions pulser-core/pulser/sequence/_seq_drawer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ def get_output_curves(self, ch_obj: Channel) -> list[np.ndarray]:
mod_samples = self.samples.modulate(ch_obj)
return self._give_curves_from_samples(mod_samples)

def get_interpolated_curves(
self, sampling_rate: float
def interpolate_curves(
self, curves: list[np.ndarray], sampling_rate: float
) -> list[np.ndarray]:
"""The curves with a fractional sampling rate."""
indices = np.linspace(
0,
self.samples.duration - 1,
int(sampling_rate * (self.samples.duration + 1)),
self.samples.duration,
num=int(sampling_rate * self.samples.duration),
endpoint=False,
dtype=int,
)
sampled_curves = [curve[indices] for curve in self.get_input_curves()]
sampled_curves = [curve[indices] for curve in curves]
t = np.arange(self.samples.duration)
return [CubicSpline(indices, curve)(t) for curve in sampled_curves]
return [CubicSpline(indices, sc)(t) for sc in sampled_curves]

def curves_on_indices(self) -> list[int]:
"""The indices of the curves to draw."""
Expand Down Expand Up @@ -312,15 +313,15 @@ def phase_str(phi: float) -> str:
ch_data = data[ch]
basis = ch_obj.basis
ys = ch_data.get_input_curves()
if sampling_rate:
yseff = ch_data.get_interpolated_curves(sampling_rate)

draw_output = draw_modulation and (
ch_obj.mod_bandwidth or not draw_input
)
if draw_output:
ys_mod = ch_data.get_output_curves(ch_obj)

if sampling_rate:
curves = ys_mod if draw_output else ys
yseff = ch_data.interpolate_curves(curves, sampling_rate)
ref_ys = yseff if sampling_rate else ys
max_amp = np.max(ref_ys[0])
max_amp = 1 if max_amp == 0 else max_amp
Expand Down Expand Up @@ -357,14 +358,22 @@ def phase_str(phi: float) -> str:
elif draw_input:
ax.fill_between(t, 0, ys[i], color=COLORS[i], alpha=0.3)
if draw_output:
ax.fill_between(
t,
0,
ys_mod[i][:total_duration],
color=COLORS[i],
alpha=0.3,
hatch="////",
)
if not sampling_rate:
ax.fill_between(
t,
0,
ys_mod[i][:total_duration],
color=COLORS[i],
alpha=0.3,
hatch="////",
)
else:
ax.plot(
t,
ys_mod[i][:total_duration],
color=COLORS[i],
linestyle="dotted",
)
lvignoli marked this conversation as resolved.
Show resolved Hide resolved
special_kwargs = dict(labelpad=10) if i == 0 else {}
ax.set_ylabel(LABELS[i], fontsize=14, **special_kwargs)

Expand Down
18 changes: 14 additions & 4 deletions pulser-simulation/pulser_simulation/simconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from dataclasses import dataclass, field
from sys import version_info
from typing import Any, Union
from typing import Any, Optional, Union

import numpy as np
import qutip
Expand Down Expand Up @@ -69,8 +69,10 @@ class SimConfig:
Useful for cutting down on computing time, but unrealistic.
temperature: Temperature, set in µK, of the Rydberg array.
Also sets the standard deviation of the speed of the atoms.
laser_waist: Waist of the gaussian laser, set in µm,
in global pulses.
laser_waist: Waist of the gaussian laser, set in µm, in global
pulses.
amp_sigma: Dictates the fluctuations in amplitude as a standard
deviation of a normal distribution centered in 1.
solver_options: Options for the qutip solver.
"""

Expand All @@ -79,11 +81,12 @@ class SimConfig:
samples_per_run: int = 5
temperature: float = 50.0
laser_waist: float = 175.0
amp_sigma: float = 5e-2
eta: float = 0.005
epsilon: float = 0.01
epsilon_prime: float = 0.05
dephasing_prob: float = 0.05
solver_options: qutip.Options = None
solver_options: Optional[qutip.Options] = None
spam_dict: dict[str, float] = field(
init=False, default_factory=dict, repr=False
)
Expand All @@ -92,6 +95,12 @@ class SimConfig:
)

def __post_init__(self) -> None:
if not 0.0 <= self.amp_sigma < 1.0:
raise ValueError(
"The standard deviation in amplitude (amp_sigma="
f"{self.amp_sigma}) must be greater than or equal"
" to 0. and smaller than 1."
)
self._process_temperature()
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved
self._change_attribute(
"spam_dict",
Expand All @@ -118,6 +127,7 @@ def __str__(self, solver_options: bool = False) -> str:
lines.append(f"SPAM dictionary: {self.spam_dict}")
if "doppler" in self.noise:
lines.append(f"Temperature: {self.temperature*1.e6}µK")
lines.append(f"Amplitude standard dev.: {self.amp_sigma}")
if "amplitude" in self.noise:
lines.append(f"Laser waist: {self.laser_waist}μm")
if "dephasing" in self.noise:
Expand Down
Loading