Source code for measure_extinction.merge_obsspec

import numpy as np
from astropy.table import Table, Column
import astropy.units as u

__all__ = [
    "merge_iue_obsspec",
    "merge_stis_obsspec",
    "merge_spex_obsspec",
    "merge_nircam_ss_obsspec",
    "merge_irs_obsspec",
    "merge_miri_ifu_obsspec",
]

fluxunit = u.erg / (u.s * u.cm * u.cm * u.angstrom)


def _wavegrid(resolution, wave_range):
    """
    Define a wavelength grid at a specified resolution given
    the min/max as input

    Parameters
    ----------
    resolution : float
        resolution of grid

    wave_range : [float, float]
        min/max of grid

    Returns
    -------
    wave_info : tuple [waves, waves_bin_min, waves_bin_max]
        wavelength grid center, min, max wavelengths
    """
    npts = int(
        np.log10(wave_range[1] / wave_range[0])
        / np.log10((1.0 + 2.0 * resolution) / (2.0 * resolution - 1.0))
    )
    delta_wave_log = (np.log10(wave_range[1]) - np.log10(wave_range[0])) / npts
    wave_log10 = np.arange(
        np.log10(wave_range[0]),
        np.log10(wave_range[1]) - delta_wave_log,
        delta_wave_log,
    )
    full_wave_min = 10**wave_log10
    full_wave_max = 10 ** (wave_log10 + delta_wave_log)

    full_wave = (full_wave_min + full_wave_max) / 2.0

    return (full_wave, full_wave_min, full_wave_max)


[docs] def merge_iue_obsspec(obstables, output_resolution=1000): """ Merge one or more IUE 1D spectra into a single spectrum on a uniform wavelength scale Parameters ---------- obstables : list of astropy Table objects list of tables containing the observed IUE spectra usually the result of reading tables output_resolution : float output resolution of spectra input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ wave_range = [1000.0, 3400.0] * u.angstrom iwave_range = wave_range.to(u.angstrom).value full_wave, full_wave_min, full_wave_max = _wavegrid(output_resolution, iwave_range) n_waves = len(full_wave) full_flux = np.zeros((n_waves), dtype=float) full_unc = np.zeros((n_waves), dtype=float) full_npts = np.zeros((n_waves), dtype=int) for ctable in obstables: # may want to add in the SYS-ERROR, but need to be careful # to propagate it correctly, SYS-ERROR will not reduce with # multiple spectra or measurements in a wavelength bin cuncs = ctable["STAT-ERROR"].data cwaves = ctable["WAVELENGTH"].data cfluxes = ctable["FLUX"].data cnpts = ctable["NPTS"].data for k in range(n_waves): (indxs,) = np.where( (cwaves >= full_wave_min[k]) & (cwaves < full_wave_max[k]) & (cnpts > 0) ) if len(indxs) > 0: weights = 1.0 / np.square(cuncs[indxs]) full_flux[k] += np.sum(weights * cfluxes[indxs]) full_unc[k] += np.sum(weights) full_npts[k] += len(indxs) # divide by the net weights (indxs,) = np.where(full_npts > 0) if len(indxs) > 0: full_flux[indxs] /= full_unc[indxs] full_unc[indxs] = np.sqrt(1.0 / full_unc[indxs]) otable = Table() otable["WAVELENGTH"] = Column(full_wave, unit=u.angstrom) otable["FLUX"] = Column(full_flux, unit=u.erg / (u.s * u.cm * u.cm * u.angstrom)) otable["SIGMA"] = Column(full_unc, unit=u.erg / (u.s * u.cm * u.cm * u.angstrom)) otable["NPTS"] = Column(full_npts) return otable
[docs] def merge_stis_obsspec(obstables, waveregion="UV", output_resolution=1000): """ Merge one or more STIS 1D spectra into a single spectrum on a uniform wavelength scale Parameters ---------- obstables : list of astropy Table objects list of tables containing the observed STIS spectra usually the result of reading tables waveregion : string [default = 'UV'] wavelength region of spectra possibilities are 'UV', 'Opt' output_resolution : float output resolution of spectra input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ if waveregion == "UV": wave_range = [1000.0, 3400.0] * u.angstrom elif waveregion == "Opt": wave_range = [2850.0, 10250.0] * u.angstrom iwave_range = wave_range.to(u.angstrom).value full_wave, full_wave_min, full_wave_max = _wavegrid(output_resolution, iwave_range) n_waves = len(full_wave) full_flux = np.zeros((n_waves), dtype=float) full_unc = np.zeros((n_waves), dtype=float) full_npts = np.zeros((n_waves), dtype=int) for ctable in obstables: # may want to add in the SYS-ERROR, but need to be careful # to propagate it correctly, SYS-ERROR will not reduce with # multiple spectra or measurements in a wavelength bin cuncs = ctable["STAT-ERROR"] # / 1e-10 # deal with overflow errors cwaves = ctable["WAVELENGTH"].data cfluxes = ctable["FLUX"] cnpts = ctable["NPTS"].data cnpts[cuncs == 0] = 0 for k in range(n_waves): gvals = ( (cwaves >= full_wave_min[k]) & (cwaves < full_wave_max[k]) & (cnpts > 0) ) if np.sum(gvals) > 0: weights = np.square(1.0 / cuncs[gvals].value) full_flux[k] += np.sum(weights * cfluxes[gvals].value) full_unc[k] += np.sum(weights) full_npts[k] += np.sum(gvals) # make sure any wavelengths with no unc are not used full_npts[full_unc == 0] = 0 # divide by the net weights (indxs,) = np.where(full_npts > 0) if len(indxs) > 0: full_flux[indxs] /= full_unc[indxs] full_unc[indxs] = np.sqrt( 1.0 / full_unc[indxs] ) # * 1e-10 # put back factor for overflow errors otable = Table() otable["WAVELENGTH"] = Column(full_wave, unit=u.angstrom) otable["FLUX"] = Column(full_flux, unit=u.erg / (u.s * u.cm * u.cm * u.angstrom)) otable["SIGMA"] = Column(full_unc, unit=u.erg / (u.s * u.cm * u.cm * u.angstrom)) otable["NPTS"] = Column(full_npts) return otable
[docs] def merge_spex_obsspec(obstable, mask=[], output_resolution=2000): """ Merge one or more IRTF SpeX 1D spectra into a single spectrum on a uniform wavelength scale Parameters ---------- obstable : astropy Table object table containing the observed SpeX spectrum usually the result of reading tables mask : list of tuples [default=[]] list of tuples with wavelength regions (in micron) that need to be masked, e.g. [(2.55,2.61),(3.01,3.10)] output_resolution : float [default=2000] output resolution of spectrum input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ waves = obstable["WAVELENGTH"].data * 1e4 fluxes = obstable["FLUX"].data uncs = obstable["ERROR"].data npts = np.full((len(obstable["FLUX"])), 1.0) # take out data points that were "flagged" as bad by SpeXtool (i.e. FLAG is not zero) npts[obstable["FLAG"] != 0] = 0 # take out data points with NaN fluxes npts[np.isnan(fluxes)] = 0 # quadratically add 1 percent uncertainty to account for unknown uncertainties uncs = np.sqrt(uncs**2 + (0.01 * fluxes) ** 2) # take out data points with low SNR npts[np.less(fluxes / uncs, 10, where=~np.isnan(fluxes / uncs))] = 0 # take out wavelength regions affected by the atmosphere npts[np.logical_and(1.347e4 < waves, waves < 1.415e4)] = 0 npts[np.logical_and(1.798e4 < waves, waves < 1.949e4)] = 0 npts[np.logical_and(2.514e4 < waves, waves < 2.880e4)] = 0 npts[np.logical_and(4.000e4 < waves, waves < 4.594e4)] = 0 # take out data points that need to be masked for region in mask: npts[(waves > region[0] * 1e4) & (waves < region[1] * 1e4)] = 0 # determine the wavelength range and calculate the wavelength grid if np.max(waves) < 25000: # SXD wave_range = [0.8, 2.45] * u.micron else: # LXD (this includes all 3 LXD modes: 1.9, 2.1 and 2.3, to make sure all LXD spectra have the same wavelength grid, independent of the original observing mode) wave_range = [1.9, 5.5] * u.micron iwave_range = wave_range.to(u.angstrom).value full_wave, full_wave_min, full_wave_max = _wavegrid(output_resolution, iwave_range) # create empty arrays n_waves = len(full_wave) full_flux = np.zeros((n_waves), dtype=float) full_unc = np.zeros((n_waves), dtype=float) full_npts = np.zeros((n_waves), dtype=int) # fill the arrays for k in range(n_waves): (indxs,) = np.where( (waves >= full_wave_min[k]) & (waves < full_wave_max[k]) & (npts > 0) ) if len(indxs) > 0: weights = 1.0 / np.square(uncs[indxs]) full_flux[k] = np.sum(weights * fluxes[indxs]) full_unc[k] = np.sum(weights) full_npts[k] = len(indxs) # divide by the net weights (indxs,) = np.where(full_npts > 0) if len(indxs) > 0: full_flux[indxs] /= full_unc[indxs] full_unc[indxs] = np.sqrt( 1.0 / full_unc[indxs] ) # this is the standard error of the weighted mean # create the output table otable = Table() otable["WAVELENGTH"] = Column(full_wave, unit=u.angstrom) otable["FLUX"] = Column(full_flux, unit=u.erg / (u.s * u.cm * u.cm * u.angstrom)) otable["SIGMA"] = Column(full_unc, unit=u.erg / (u.s * u.cm * u.cm * u.angstrom)) otable["NPTS"] = Column(full_npts) return otable
def merge_gen_obsspec(obstables, wave_range, output_resolution=100): """ Merge one or more generic spectra into a single spectrum on a uniform wavelength scale. Useful for spectra that do not require specific processing. Parameters ---------- obstables : list of astropy Table objects list of tables containing the observed IRS spectra usually the result of reading tables wave_range : 2 element float min/max wavelengths with units for output grid output_resolution : float output resolution of spectra input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ iwave_range = wave_range.to(u.angstrom).value full_wave, full_wave_min, full_wave_max = _wavegrid(output_resolution, iwave_range) n_waves = len(full_wave) full_flux = np.zeros((n_waves), dtype=float) full_unc = np.zeros((n_waves), dtype=float) full_npts = np.zeros((n_waves), dtype=int) for ctable in obstables: cwaves = ctable["WAVELENGTH"].to(u.angstrom).value cfluxes = ( ctable["FLUX"] .to(fluxunit, equivalencies=u.spectral_density(ctable["WAVELENGTH"])) .value ) cuncs = ( ctable["ERROR"] .to(fluxunit, equivalencies=u.spectral_density(ctable["WAVELENGTH"])) .value ) cnpts = ctable["NPTS"].value for k in range(n_waves): (indxs,) = np.where( (cwaves >= full_wave_min[k]) & (cwaves < full_wave_max[k]) & (cnpts > 0) ) if len(indxs) > 0: weights = 1.0 / np.square(cuncs[indxs]) full_flux[k] += np.sum(weights * cfluxes[indxs]) full_unc[k] += np.sum(weights) full_npts[k] += len(indxs) # divide by the net weights (indxs,) = np.where(full_npts > 0) if len(indxs) > 0: full_flux[indxs] /= full_unc[indxs] full_unc[indxs] = np.sqrt(1.0 / full_unc[indxs]) otable = Table() otable["WAVELENGTH"] = Column(full_wave, unit=u.angstrom) otable["FLUX"] = Column(full_flux, unit=fluxunit) otable["SIGMA"] = Column(full_unc, unit=fluxunit) otable["NPTS"] = Column(full_npts) return otable
[docs] def merge_irs_obsspec(obstables, output_resolution=150): """ Merge one or more Spitzer IRS 1D spectra into a single spectrum on a uniform wavelength scale Parameters ---------- obstables : list of astropy Table objects list of tables containing the observed IRS spectra usually the result of reading tables output_resolution : float output resolution of spectra input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ wave_range = [5.0, 40.0] * u.micron otable = merge_gen_obsspec( obstables, wave_range, output_resolution=output_resolution ) return otable
[docs] def merge_nircam_ss_obsspec(obstables, output_resolution=1600): """ Merge one or more NIRCam slitless 1D spectra into a single spectrum on a uniform wavelength scale Parameters ---------- obstables : list of astropy Table objects list of tables containing the observed IRS spectra usually the result of reading tables output_resolution : float output resolution of spectra input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ wave_range = [2.35, 5.55] * u.micron otable = merge_gen_obsspec( obstables, wave_range, output_resolution=output_resolution ) return otable
[docs] def merge_miri_ifu_obsspec(obstables, output_resolution=3000): """ Merge one or more MIRI IFU 1D spectra into a single spectrum on a uniform wavelength scale Parameters ---------- obstables : list of astropy Table objects list of tables containing the observed IRS spectra usually the result of reading tables output_resolution : float output resolution of spectra input spectrum assumed to be at the appropriate resolution Returns ------- output_table : astropy Table object merged spectra """ wave_range = [4.8, 29.0] * u.micron otable = merge_gen_obsspec( obstables, wave_range, output_resolution=output_resolution ) return otable