-
Notifications
You must be signed in to change notification settings - Fork 10
/
optical_train.py
550 lines (445 loc) · 20.5 KB
/
optical_train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
import copy
import sys
from pathlib import Path
from datetime import datetime
import numpy as np
from scipy.interpolate import interp1d
from astropy import units as u
from synphot.units import PHOTLAM
from .optics_manager import OpticsManager
from .fov_manager import FOVManager
from .image_plane import ImagePlane
from ..commands.user_commands import UserCommands
from ..detector import DetectorArray
from ..effects import ExtraFitsKeywords
from ..utils import from_currsys, top_level_catch
from ..version import version
from .. import rc
class OpticalTrain:
"""
The main class for controlling a simulation.
Parameters
----------
cmds : UserCommands, str
If the name of an instrument is passed, OpticalTrain tries to find the
instrument package, and internally creates the UserCommands object
Examples
--------
Create an optical train::
>>> import scopesim as im
>>> cmd = sim.UserCommands("MICADO")
>>> opt = sim.OpticalTrain(cmd)
Observe a Source object::
>>> src = sim.source.source_templates.empty_sky()
>>> opt.observe(src)
>>> hdus = opt.readout()
List the effects modelled in an OpticalTrain::
>>> print(opt.effects)
Effects can be accessed by using the name of the effect::
>>> print(opt["dark_current"])
To include or exclude an effect during a simulation run, use the
``.include`` attribute of the effect::
>>> opt["dark_current"].include = False
Data used by an Effect object is contained in the ``.data`` attribute,
while other information is contained in the ``.meta`` attribute::
>>> opt["dark_current"].data
>>> opt["dark_current"].meta
Meta data values can be set by either using the ``.meta`` attribute
directly::
>>> opt["dark_current"].meta["value"] = 0.5
or by passing a dictionary (with one or multiple entries) to the
OpticalTrain object::
>>> opt["dark_current"] = {"value": 0.75, "dit": 30}
"""
@top_level_catch
def __init__(self, cmds=None):
self.cmds = cmds
self._description = self.__repr__()
self.optics_manager = None
self.fov_manager = None
self.image_planes = []
self.detector_arrays = []
self.yaml_dicts = None
self._last_source = None
if cmds is not None:
self.load(cmds)
def load(self, user_commands):
"""
(Re)Load an OpticalTrain with a new set of UserCommands.
Parameters
----------
user_commands : UserCommands or str
"""
if isinstance(user_commands, str):
user_commands = UserCommands(use_instrument=user_commands)
elif isinstance(user_commands, UserCommands):
user_commands = copy.deepcopy(user_commands)
else:
raise ValueError("user_commands must be a UserCommands or str object "
f"but is {type(user_commands)}")
self.cmds = user_commands
# FIXME: Setting rc.__currsys__ to user_commands causes many problems:
# UserCommands used SystemDict internally, but is itself not an
# instance or subclas thereof. So rc.__currsys__ actually
# changes type as a result of this line. On one hand, some other
# code relies on this change, i.e. uses attributes from
# UserCommands via rc.__currsys__, but on the other hand some
# tests (now with proper patching) fail because of this type
# change. THIS IS A PROBLEM!
rc.__currsys__ = user_commands
self.yaml_dicts = rc.__currsys__.yaml_dicts
self.optics_manager = OpticsManager(self.yaml_dicts)
self.update()
def update(self, **kwargs):
"""
Update the user-defined parameters and remake main internal classes.
Parameters
----------
kwargs : expanded dict
Any keyword-value pairs from a config file
"""
self.optics_manager.update(**kwargs)
opt_man = self.optics_manager
self.fov_manager = FOVManager(opt_man.fov_setup_effects, **kwargs)
self.image_planes = [ImagePlane(hdr, **kwargs)
for hdr in opt_man.image_plane_headers]
self.detector_arrays = [DetectorArray(det_list, **kwargs)
for det_list in opt_man.detector_setup_effects]
@top_level_catch
def observe(self, orig_source, update=True, **kwargs):
"""
Main controlling method for observing ``Source`` objects.
Parameters
----------
orig_source : Source
update : bool
Reload optical system
kwargs : expanded dict
Any keyword-value pairs from a config file
Notes
-----
How the list of Effects is split between the 5 main tasks:
- Make a FOV list - z_order = 0..99
- Make a image plane - z_order = 100..199
- Apply Source altering effects - z_order = 200..299
- Apply FOV specific (3D) effects - z_order = 300..399
- Apply FOV-independent (2D) effects - z_order = 400..499
- [Apply detector plane (0D, 2D) effects - z_order = 500..599]
.. todo:: List is out of date - update
"""
if update:
self.update(**kwargs)
self.set_focus(**kwargs) # put focus back on current instrument package
# Make a copy of the Source and prepare for observation (convert to
# internally used units, sample to internal wavelength grid)
source = orig_source.make_copy()
source = self.prepare_source(source)
# [1D - transmission curves]
for effect in self.optics_manager.source_effects:
source = effect.apply_to(source)
# [3D - Atmospheric shifts, PSF, NCPAs, Grating shift/distortion]
fovs = self.fov_manager.fovs
for fov in fovs:
# print("FOV", fov_i+1, "of", n_fovs, flush=True)
# .. todo: possible bug with bg flux not using plate_scale
# see fov_utils.combine_imagehdu_fields
fov.extract_from(source)
hdu_type = "cube" if self.fov_manager.is_spectroscope else "image"
fov.view(hdu_type)
for effect in self.optics_manager.fov_effects:
fov = effect.apply_to(fov)
fov.flatten()
self.image_planes[fov.image_plane_id].add(fov.hdu, wcs_suffix="D")
# ..todo: finish off the multiple image plane stuff
# [2D - Vibration, flat fielding, chopping+nodding]
for effect in self.optics_manager.image_plane_effects:
for ii, image_plane in enumerate(self.image_planes):
self.image_planes[ii] = effect.apply_to(image_plane)
self._last_fovs = fovs
self._last_source = source
def prepare_source(self, source):
"""
Prepare source for observation.
The method is currently applied to cube fields only.
The source data are converted to internally used units (PHOTLAM).
The source data are interpolated to the waveset used by the FieldOfView
This is necessary when the source data are sampled on a coarser grid
than used internally, or if the source data are sampled on irregular
wavelengths.
For cube fields, the method assumes that the wavelengths at which the
cube is sampled is provided explicitely as attribute `wave` if the cube
ImageHDU.
"""
# Convert to PHOTLAM per arcsec2
# ..todo: this is not sufficiently general
for cube in source.cube_fields:
header, data, wave = cube.header, cube.data, cube.wave
# Need to check whether BUNIT is per arcsec2 or per pixel
inunit = u.Unit(header["BUNIT"])
data = data.astype(np.float32) * inunit
factor = 1
for base, power in zip(inunit.bases, inunit.powers):
if (base**power).is_equivalent(u.arcsec**(-2)):
conversion =(base**power).to(u.arcsec**(-2)) / base**power
data *= conversion
factor = u.arcsec**(-2)
data = data.to(PHOTLAM,
equivalencies=u.spectral_density(wave[:, None, None]))
if factor == 1: # Normalise to 1 arcsec2 if not a spatial density
# ..todo: lower needed because "DEG" is not understood, this is ugly
pixarea = (header["CDELT1"] * u.Unit(header["CUNIT1"].lower()) *
header["CDELT2"] * u.Unit(header["CUNIT2"].lower())).to(u.arcsec**2)
data = data / pixarea.value # cube is per arcsec2
data = (data * factor).value
cube.header["BUNIT"] = "PHOTLAM/arcsec2" # ..todo: make this more explicit?
# The imageplane_utils like to have the spatial WCS in units of "deg". Ensure
# that the cube is passed on accordingly
cube.header["CDELT1"] = header["CDELT1"] * u.Unit(header["CUNIT1"].lower()).to(u.deg)
cube.header["CDELT2"] = header["CDELT2"] * u.Unit(header["CUNIT2"].lower()).to(u.deg)
cube.header["CUNIT1"] = "deg"
cube.header["CUNIT2"] = "deg"
# Put on fov wavegrid
wave_min = min(fov.meta["wave_min"] for fov in self.fov_manager.fovs)
wave_max = max(fov.meta["wave_max"] for fov in self.fov_manager.fovs)
wave_unit = u.Unit(from_currsys("!SIM.spectral.wave_unit"))
dwave = from_currsys("!SIM.spectral.spectral_bin_width") # Not a quantity
fov_waveset = np.arange(wave_min.value, wave_max.value, dwave) * wave_unit
fov_waveset = fov_waveset.to(u.um)
# Interpolate into new data cube.
# This is done layer by layer for memory reasons.
new_data = np.zeros((fov_waveset.shape[0], data.shape[1], data.shape[2]),
dtype=np.float32)
for j in range(data.shape[1]):
cube_interp = interp1d(wave.to(u.um).value, data[:, j, :],
axis=0, kind="linear",
bounds_error=False, fill_value=0)
new_data[:, j, :] = cube_interp(fov_waveset.value)
cube.data = new_data
cube.header["CTYPE3"] = "WAVE"
cube.header["CRPIX3"] = 1
cube.header["CRVAL3"] = wave_min.value
cube.header["CDELT3"] = dwave
cube.header["CUNIT3"] = wave_unit.name
return source
@top_level_catch
def readout(self, filename=None, **kwargs):
"""
Produce detector readouts for the observed image.
Parameters
----------
filename : str, optional
Where to save the FITS file
kwargs
Returns
-------
hdu : fits.HDUList
Notes
-----
- Apply detector plane (0D, 2D) effects - z_order = 500..599
"""
hduls = []
for i, detector_array in enumerate(self.detector_arrays):
array_effects = self.optics_manager.detector_array_effects
dtcr_effects = self.optics_manager.detector_effects
hdul = detector_array.readout(self.image_planes, array_effects,
dtcr_effects, **kwargs)
fits_effects = self.optics_manager.get_all(ExtraFitsKeywords)
if len(fits_effects) > 0:
for effect in fits_effects:
hdul = effect.apply_to(hdul, optical_train=self)
else:
try:
hdul = self.write_header(hdul)
except Exception as error:
print("\nWarning: header update failed, data will be saved with incomplete header.")
print(f"Reason: {sys.exc_info()[0]} {error}\n")
if filename is not None and isinstance(filename, str):
fname = filename
if len(self.detector_arrays) > 1:
fname = f"{i}_{filename}"
hdul.writeto(fname, overwrite=True)
hduls.append(hdul)
return hduls
def write_header(self, hdulist):
"""Write meaningful header to simulation product."""
# Primary hdu
pheader = hdulist[0].header
pheader["DATE"] = datetime.now().isoformat(timespec="seconds")
pheader["ORIGIN"] = "Scopesim " + version
pheader["INSTRUME"] = from_currsys("!OBS.instrument")
pheader["INSTMODE"] = ", ".join(from_currsys("!OBS.modes"))
pheader["TELESCOP"] = from_currsys("!TEL.telescope")
pheader["LOCATION"] = from_currsys("!ATMO.location")
# Source information taken from first only.
# ..todo: What if source is a composite?
srcfield = self._last_source.fields[0]
if type(srcfield).__name__ == "Table":
pheader["SOURCE"] = "Table"
elif type(srcfield).__name__ == "ImageHDU":
if "BG_SURF" in srcfield.header:
pheader["SOURCE"] = srcfield.header["BG_SURF"]
else:
try:
pheader["SOURCE"] = srcfield.header["FILENAME"]
except KeyError:
pheader["SOURCE"] = "ImageHDU"
# Image hdul
# ..todo: currently only one, update for detector arrays
# ..todo: normalise filenames - some need from_currsys, some need Path(...).name
# this should go into a function so as to reduce clutter here.
iheader = hdulist[1].header
iheader["EXPTIME"] = from_currsys("!OBS.exptime"), "[s]"
iheader["DIT"] = from_currsys("!OBS.dit"), "[s]"
iheader["NDIT"] = from_currsys("!OBS.ndit")
iheader["BUNIT"] = "e", "per EXPTIME"
iheader["PIXSCALE"] = from_currsys("!INST.pixel_scale"), "[arcsec]"
# A simple WCS
iheader["CTYPE1"] = "LINEAR"
iheader["CTYPE2"] = "LINEAR"
iheader["CRPIX1"] = (iheader["NAXIS1"] + 1) / 2
iheader["CRPIX2"] = (iheader["NAXIS2"] + 1) / 2
iheader["CRVAL1"] = 0.
iheader["CRVAL2"] = 0.
iheader["CDELT1"] = iheader["PIXSCALE"]
iheader["CDELT2"] = iheader["PIXSCALE"]
iheader["CUNIT1"] = "arcsec"
iheader["CUNIT2"] = "arcsec"
for eff in self.optics_manager.detector_setup_effects:
efftype = type(eff).__name__
if efftype == "DetectorList" and eff.include:
iheader["DETECTOR"] = eff.meta["detector"]
for eff in self.optics_manager.detector_array_effects:
efftype = type(eff).__name__
if (efftype == "DetectorModePropertiesSetter" and
eff.include):
# ..todo: can we write this into currsys?
iheader["DET_MODE"] = (eff.meta["detector_readout_mode"],
"detector readout mode")
iheader["MINDIT"] = from_currsys("!DET.mindit"), "[s]"
iheader["FULLWELL"] = from_currsys("!DET.full_well"), "[s]"
iheader["RON"] = from_currsys("!DET.readout_noise"), "[e]"
iheader["DARK"] = from_currsys("!DET.dark_current"), "[e/s]"
ifilter = 1 # Counts filter wheels
isurface = 1 # Counts surface lists
for eff in self.optics_manager.source_effects:
efftype = type(eff).__name__
if efftype == "ADCWheel" and eff.include:
iheader["ADC"] = eff.current_adc.meta["name"]
if efftype == "FilterWheel" and eff.include:
iheader[f"FILTER{ifilter}"] = (eff.current_filter.meta["name"],
eff.meta["name"])
ifilter += 1
if efftype == "SlitWheel" and eff.include:
iheader["SLIT"] = (eff.current_slit.meta["name"],
eff.meta["name"])
if efftype == "PupilTransmission" and eff.include:
iheader["PUPTRANS"] = (from_currsys("!OBS.pupil_transmission"),
"cold stop, pupil transmission")
if efftype == "SkycalcTERCurve" and eff.include:
iheader["ATMOSPHE"] = "Skycalc", "atmosphere model"
iheader["LOCATION"] = eff.meta["location"]
iheader["AIRMASS"] = eff.meta["airmass"]
iheader["TEMPERAT"] = eff.meta["temperature"], "[degC]"
iheader["HUMIDITY"] = eff.meta["humidity"]
iheader["PRESSURE"] = eff.meta["pressure"], "[hPa]"
iheader["PWV"] = eff.meta["pwv"], "precipitable water vapour"
if efftype == "AtmosphericTERCurve" and eff.include:
iheader["ATMOSPHE"] = eff.meta["filename"], "atmosphere model"
# ..todo: expand if necessary
if efftype == "SurfaceList" and eff.include:
iheader[f"SURFACE{isurface}"] = (eff.meta["filename"],
eff.meta["name"])
isurface += 1
if efftype == "QuantumEfficiencyCurve" and eff.include:
iheader["QE"] = Path(eff.meta["filename"]).name, eff.meta["name"]
for eff in self.optics_manager.fov_effects:
efftype = type(eff).__name__
# ..todo: needs to be handled with isinstance(eff, PSF)
if efftype == "FieldConstantPSF" and eff.include:
iheader["PSF"] = eff.meta["filename"], "point spread function"
if efftype == "SpectralTraceList" and eff.include:
iheader["SPECTRAC"] = (from_currsys(eff.meta["filename"]),
"spectral trace definition")
if "CTYPE1" in eff.meta:
for key in {"WCSAXES", "CTYPE1", "CTYPE2", "CRPIX1", "CRPIX2", "CRVAL1",
"CRVAL2", "CDELT1", "CDELT2", "CUNIT1", "CUNIT2"}:
iheader[key] = eff.meta[key]
for eff in self.optics_manager.detector_effects:
efftype = type(eff).__name__
if efftype == "LinearityCurve" and eff.include:
iheader["DETLIN"] = from_currsys(eff.meta["filename"])
return hdulist
def set_focus(self, **kwargs):
self.cmds.update(**kwargs)
dy = self.cmds.default_yamls
if len(dy) > 0 and "packages" in dy:
self.cmds.update(packages=self.default_yamls[0]["packages"])
rc.__currsys__ = self.cmds
def shutdown(self):
"""
Shut down the instrument.
This method closes all open file handles and should be called when the
optical train is no longer needed.
"""
for effect_name in self.effects["name"]:
try:
self[effect_name]._file.close()
except AttributeError:
pass
self._description = "The instrument has been shut down."
@property
def effects(self):
return self.optics_manager.list_effects()
def __repr__(self):
return f"{self.__class__.__name__}({self.cmds!r})"
def __str__(self):
return self._description
def _repr_pretty_(self, p, cycle):
"""For ipython."""
if cycle:
p.text(f"{self.__class__.__name__}(...)")
else:
p.text(f"{self.__class__.__name__} ")
p.text(f"for {self.cmds['!OBS.instrument']} ")
p.text(f"@ {self.cmds['!TEL.telescope']}:")
p.breakable()
p.text("UserCommands:")
p.breakable()
p.pretty(self.cmds)
p.breakable()
p.text("OpticalElements:")
with p.indent(2):
for item in self:
p.breakable()
p.pretty(item)
p.breakable()
p.text("DetectorArrays:")
with p.indent(2):
for item in self.detector_arrays:
p.breakable()
p.pretty(item)
p.breakable()
p.text("Effects:")
p.breakable()
with p.indent(2):
p.pretty(self.effects)
def __getitem__(self, item):
return self.optics_manager[item]
def __setitem__(self, key, value):
self.optics_manager[key] = value
def report(self):
pass
# user commands report
# package dependencies
# modes names
# default modes
# yaml hierarchy
#
# optics_manager
# derived properties
# system transmission curve
# list of effects
#
# etc
# limiting magnitudes
#