Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scopesim/effects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@
from .metis_ifu_simple import *
# from . import effects_utils
from .selector_wheel import *
from .sky_ter_curves import *
from .atmo_dispersion import *
495 changes: 495 additions & 0 deletions scopesim/effects/atmo_dispersion.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions scopesim/effects/psfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
logger = get_logger(__name__)

from .psf_base import PSF, PoorMansFOV
from .analytical import (Vibration, NonCommonPathAberration, SeeingPSF,
GaussianDiffractionPSF)
from .analytical import (AnalyticalPSF, Vibration, NonCommonPathAberration, SeeingPSF,
GaussianDiffractionPSF, MoffatPSF, AOEnhanceablePSF)
from .semianalytical import AnisocadoConstPSF
from .discrete import FieldConstantPSF, FieldVaryingPSF
208 changes: 205 additions & 3 deletions scopesim/effects/psfs/analytical.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
# -*- coding: utf-8 -*-
"""Contains simple Vibration, NCPA, Seeing and Diffraction PSFs."""

from typing import ClassVar
from typing import ClassVar, Any
from functools import partial

import numpy as np
from astropy import units as u
from astropy.convolution import Gaussian2DKernel
from astropy.modeling.models import Moffat2D
from scipy.interpolate import make_interp_spline

from .. import DataContainer
from ...optics import ImagePlane
from ...optics.fov import FieldOfView
from ...utils import (from_currsys, quantify, quantity_from_table,
figure_factory, check_keys)
from ...utils import (from_currsys, quantify, quantity_from_table, figure_factory, check_keys, get_logger, find_file,
get_zenith_angle, get_observation_info_from_cmds)
from . import PSF, PoorMansFOV

logger = get_logger(__name__)

class AnalyticalPSF(PSF):
"""Base class for analytical PSFs."""
Expand Down Expand Up @@ -202,6 +207,203 @@ def plot(self):
return super().plot(PoorMansFOV(pixel_scale, spec_dict))


class MoffatPSF(AnalyticalPSF):
"""
Moffat PSF with given FWHM (seeing), and alpha (also known as beta) parameters.

Required kwargs:

- alpha: Moffat alpha parameter
- fwhm: Moffat FWHM (dict OR filename)

Optional kwargs:

- kernel_size: Size of kernel in multiples of FWHM (int)

Examples
--------

FWHM as a function of wavelength through seeing law parameters. If "pivot_wave" is not supplied, FWHM is
assumed to be wavelength independent and no scaling is applied.
::

name: seeing_psf
class: MoffatPSF
kwargs:
alpha: 4.765
fwhm:
seeing: 0.7
seeing_unit: "arcsec"
pivot_wave: 500
pivot_wave_unit: "nm"


* If "fwhm" is a float, it is assumed to be wavelength independent and in arcsec unit.
* If "fwhm" is a string, it is assumed to be a .dat filename that contains a table with columns
"wavelength" and "fwhm".

"""
z_order: ClassVar[tuple[int, ...]] = (43, 643)
required_keys = {"alpha", "fwhm"}

def __init__(self, **kwargs):
super().__init__(**kwargs)

self.target, self.location, self.time = get_observation_info_from_cmds(self.cmds)
self.alpha = self.meta["alpha"]
self.fwhm = self.get_fwhm_interp()

def get_fwhm_interp(self):
"""
Parses supplied FWHM input kwarg and returns a function that takes wavelength as input and returns FWHM.
Overwrite this function for subclassing if FWHM input options change.
"""
if isinstance(self.meta["fwhm"], dict):
logger.info("dict supplied for FWHM")
fwhm = self.meta["fwhm"]
if check_keys(fwhm, {"seeing", "seeing_unit", "pivot_wave", "pivot_wave_unit"}, action="warn"):
logger.info("seeing and pivot supplied, using natural scale seeing law")
zenith_angle = get_zenith_angle(self.target, self.location, self.time)

return partial(self.natural_scale, seeing=fwhm["seeing"]*u.Unit(fwhm["seeing_unit"]),
pivot=fwhm["pivot_wave"]*u.Unit(fwhm["pivot_wave_unit"]),
zenith_angle=zenith_angle*u.deg)

elif check_keys(fwhm, {"seeing", "seeing_unit"}, action="error"):
logger.info("only seeing supplied, FWHM is wavelength independent")
return lambda wavelengths: fwhm["seeing"] * u.Unit(fwhm["seeing_unit"])

if isinstance(self.meta["fwhm"], (int, float)):
logger.info("float value supplied, assuming arcsec")
return lambda wavelengths: self.meta["fwhm"] * u.arcsec

if isinstance(self.meta["fwhm"], str):
logger.info("filename supplied for FWHM")
self.table = DataContainer(filename=find_file(from_currsys(self.meta["fwhm"], self.cmds))).table
return self.fwhm_from_table(self.table)

raise TypeError("fwhm kwarg must be of type dict or float or str")

def get_kernel(self, fov):
pixel_scale = fov.header["CDELT1"] * u.deg.to(u.arcsec)
pixel_scale = quantify(pixel_scale, u.arcsec)

# for each wavelength in waveset, get the corresponding FWHM, convert to gamma, and create a Moffat kernel
npts = (fov.meta["wave_max"] - fov.meta["wave_min"]) / (from_currsys("!SIM.spectral.spectral_bin_width", self.cmds) * u.um)
##sample only npts from len(fov.waveset)
wavelengths = fov.waveset[::max(1, int(len(fov.waveset) / npts))]
## get fwhm and gamma for the sampled wave pts
fwhms = self.fwhm(wavelengths).to(u.arcsec) / pixel_scale
gammas = self.fwhm2gamma(fwhms, self.alpha).value

kx = self.meta.get("kernel_size", 4.0)
ksize = int(kx * np.max(fwhms).value)
ksize = ksize + 1 if ksize % 2 == 0 else ksize

amplitude = (self.alpha - 1)/(np.pi * gammas**2)
x, y = np.meshgrid(np.arange(ksize)-ksize//2, np.arange(ksize)-ksize//2)
cube = Moffat2D.evaluate(x=x[None, ...], y=y[None, ...],
amplitude=amplitude[:, None, None], x_0=0, y_0=0,
gamma=gammas[:, None, None], alpha=self.alpha)
kernel = np.mean(cube, axis=0) # average over wavelength axis
norm = np.sum(kernel)
if norm < 0.98:
logger.warning(f"Kernel size too small, kernel sums to {norm}")
kernel /= norm
return kernel

@staticmethod
def natural_scale(wavelengths: u.Quantity,
seeing: u.Quantity = 0.7*u.arcsec, pivot: u.Quantity = 500*u.nm,
zenith_angle: u.Quantity = 0.0*u.deg) -> u.Quantity:
"""
Seeing law scaled to wavelength

https://opg.optica.org/josa/fulltext.cfm?uri=josa-68-7-877&id=57124
https://www.mdpi.com/2072-4292/14/2/405
"""
return seeing * (wavelengths / pivot) ** -0.2 * 1 / np.cos(zenith_angle.to(u.rad)) ** .6

@staticmethod
def fwhm2gamma(fwhm: u.Quantity, alpha) -> u.Quantity:
"""Convert FWHM to Moffat gamma parameter."""
theta_factor = 0.5 / np.sqrt(2**(1./alpha) - 1.0)
return fwhm * theta_factor

@staticmethod
def fwhm_from_table(table):
if "wavelength" not in table.colnames or "fwhm" not in table.colnames:
raise ValueError("Table must contain 'wavelength' and 'fwhm' columns.")
wave_array = quantity_from_table("wavelength", table, "um").to_value(u.um)
fwhm_array = table["fwhm"]
return make_interp_spline(wave_array, fwhm_array) # returns bspline instance


class AOEnhanceablePSF(MoffatPSF):
"""
Wavelength dependent Moffat PSF but with added AO scaling.
Required kwargs:

- ao_table: A .dat file with AO scaling as a function of wavelength. Must have columns "wavelength" and "fwhm".
Either supply 'alpha' parameter in table header or as kwarg. Table header value overrides kwarg.
- is_absolute: bool, True if fwhm in ao_table are absolute values, False if they are scaling factors for "seeing".
- enable_ao: bool, True to turn AO enhancement "on", False for regular seeing law Moffat

Optional kwargs:

- kernel_size: Size of kernel in multiples of FWHM (int)
- alpha: Moffat alpha parameter
- fwhm: Moffat FWHM (dict OR filename), not needed if is_absolute is True

Examples
--------

FWHM as a function of wavelength through seeing law parameters. If "pivot_wave" is not supplied, FWHM is
assumed to be wavelength independent and no scaling is applied.
::

name: seeing_psf
class: MoffatPSF
kwargs:
ao_table: "path/to/table.dat"
is_absolute: True
enable_ao: True
alpha: 4.765
fwhm:
seeing: 0.7
seeing_unit: "arcsec"
pivot_wave: 500
pivot_wave_unit: "nm"

"""
z_order: ClassVar[tuple[int, ...]] = (43, 643)
required_keys = {"ao_table", "is_absolute", "enable_ao"}

def __init__(self, **kwargs):
# setting alpha and fwhm before super().__init__ to avoid KeyError
kwargs["alpha"] = None if "alpha" not in kwargs else kwargs["alpha"]
kwargs["fwhm"] = 1.0 if "fwhm" not in kwargs else kwargs["fwhm"]
super().__init__(**kwargs)

aotab = DataContainer(filename=find_file(from_currsys(self.meta["ao_table"], self.cmds))).table
self.ao_scale = self.fwhm_from_table(aotab)

if "alpha" in aotab.meta:
self.alpha = aotab.meta["alpha"]
logger.info(f"Alpha parameter found in AO table header: {self.alpha}")
elif self.meta["alpha"] is not None:
self.alpha = self.meta["alpha"]
logger.info(f"Alpha parameter found in kwargs: {self.alpha}")
else:
raise ValueError("Alpha parameter missing: Not found in ao_table header or kwargs.")

if self.meta["enable_ao"]:
if self.meta["is_absolute"]:
self.fwhm = self.ao_scale
else:
self.fwhm = lambda wavelengths: (self.fwhm(wavelengths) * self.ao_scale(wavelengths)).to(u.arcsec)


def wfe2gauss(wfe, wave, width=None):
strehl = wfe2strehl(wfe, wave)
sigma = _strehl2sigma(strehl)
Expand Down
Loading
Loading