Source code for measure_extinction.stardata

import math
import warnings
import os.path

from collections import OrderedDict

import numpy as np

from astropy.table import Table
from astropy import constants as const
import astropy.units as u
from astropy.units import UnitsWarning

from dust_extinction.parameter_averages import CCM89
from dust_extinction.shapes import _curve_F99_method

from measure_extinction.merge_obsspec import _wavegrid

__all__ = ["StarData", "BandData", "SpecData"]

# Jy to ergs/(cm^2 s A)
#   1 Jy = 1e-26 W/(m^2 Hz)
#   1 W = 1e7 ergs
#   1 m^2 = 1e4 cm^2
#   1 um = 1e4 A
#   Hz/um = c[micron/s]/(lambda[micron])^2
# const = 1e-26*1e7*1e-4*1e-4 = 1e-27
Jy_to_cgs_const = 1e-27 * const.c.to("micron/s").value

fluxunit = u.erg / ((u.cm**2) * u.s * u.angstrom)


[docs] class BandData: """ Photometric band data (used by StarData) Attributes: type : string descriptive string of type of data (currently always BAND) waves : array of floats wavelengths fluxes : array of floats fluxes uncs : array of floats uncertainties on the fluxes npts : array of floats number of measurements contributing to the flux points n_bands : int number of bands bands : dict of band_name:measurement measurement is a tuple of (value, uncertainty) band_units : dict of band_name:strings band units ('mag' or 'mJy') band_waves : dict of band_name:floats band wavelengths in micron band_fluxes : dict of band_name:floats band fluxes in ergs/(cm^2 s A) """ def __init__(self, type): """ Parameters ---------- type: string descriptive string of type of data (currently always BAND) """ self.type = type self.n_bands = 0 self.bands = OrderedDict() self.band_units = OrderedDict() self.band_waves = OrderedDict() self.band_fluxes = OrderedDict()
[docs] def read_bands(self, lines): """ Read the photometric band data from a DAT file and upate class variables. Bands are filled in wavelength order to make life easier in subsequent calcuations (interpolations!) Parameters ---------- lines : list of string lines from a DAT formatted file Returns ------- Updates self.bands, self.band_units, and self.n_bands """ for line in lines: eqpos = line.find("=") pmpos = line.find("+/-") if (eqpos >= 0) & (pmpos >= 0) & (line[0] != "#"): # check for reference or unit colpos = max((line.find(";"), line.find("#"), line.find("mJy"))) if colpos == -1: colpos = len(line) # if there is both a reference and a unit elif line.find(";") != -1 and line.find("mJy") != -1: colpos = min(line.find(";"), line.find("mJy")) band_name = line[0:eqpos].strip() self.bands[band_name] = ( float(line[eqpos + 1 : pmpos]), float(line[pmpos + 3 : colpos]), ) # units if line.find("mJy") >= 0: self.band_units[band_name] = "mJy" else: self.band_units[band_name] = "mag" self.n_bands = len(self.bands)
[docs] @staticmethod def get_poss_bands(): """ Provides the possible bands and bands wavelengths and zeromag fluxes Returns ------- band_info : dict of band_name: value value is tuple of ( zeromag_flux, wavelength [micron] ) The zeromag_flux is the flux in erg/cm2/s/A for a star of (Vega) mag=0. It gives the conversion factor from Vega magnitudes to erg/cm2/s/A. """ _johnson_band_names = ["U", "B", "V", "R", "I", "J", "H", "K", "L", "M"] _johnson_band_waves = np.array( [0.366, 0.438, 0.545, 0.641, 0.798, 1.22, 1.63, 2.19, 3.45, 4.75] ) _johnson_band_zeromag_fluxes = ( np.array( [417.5, 632.0, 363.1, 217.7, 112.6, 31.47, 11.38, 3.961, 0.699, 0.204] ) * 1e-11 ) _spitzer_band_names = ["IRAC1", "IRAC2", "IRAC3", "IRAC4", "IRS15", "MIPS24"] _spitzer_band_waves = np.array([3.52, 4.45, 5.66, 7.67, 15.4, 23.36]) _spitzer_band_zeromag_fluxes = ( np.array([0.6605, 0.2668, 0.1069, 0.03055, 1.941e-3, 3.831e-4]) * 1e-11 ) # WISE bands. Wavelenghts and zeropoints are taken from http://svo2.cab.inta-csic.es/theory/fps/index.php?mode=browse&gname=WISE. _wise_band_names = ["WISE1", "WISE2", "WISE3", "WISE4"] _wise_band_waves = np.array([3.3526, 4.6028, 11.5608, 22.0883]) _wise_band_zeromag_fluxes = np.array( [8.1787e-12, 2.415e-12, 6.5151e-14, 5.0901e-15] ) # WFPC2 bands _wfpc2_band_names = [ "WFPC2_F170W", "WFPC2_F255W", "WFPC2_F336W", "WFPC2_F439W", "WFPC2_F555W", "WFPC2_F814W", ] _wfpc2_band_waves = np.array([0.170, 0.255, 0.336, 0.439, 0.555, 0.814]) _wfpc2_photflam = np.array( [1.551e-15, 5.736e-16, 5.613e-17, 2.945e-17, 3.483e-18, 2.508e-18] ) _wfpc2_vegamag = np.array([16.335, 17.019, 19.429, 20.884, 22.545, 21.639]) _n_wfpc2_bands = len(_wfpc2_vegamag) _wfpc2_band_zeromag_fluxes = np.zeros(_n_wfpc2_bands) # zeromag Vega flux not given in standard WFPC2 documenation # instead the flux and Vega magnitudes are given for 1 DN/sec # the following code coverts these numbers to zeromag Vega fluxes for i in range(_n_wfpc2_bands): _wfpc2_band_zeromag_fluxes[i] = _wfpc2_photflam[i] * ( 10 ** (0.4 * _wfpc2_vegamag[i]) ) # WFC3 bands _wfc3_band_names = [ "WFC3_F275W", "WFC3_F336W", "WFC3_F475W", "WFC3_F814W", "WFC3_F110W", "WFC3_F160W", ] _wfc3_band_waves = np.array([0.2710, 0.3355, 0.4772, 0.8053, 1.1534, 1.5369]) _wfc3_photflam = np.array( [3.186e-18, 1.267e-18, 2.458e-19, 1.477e-19, 1.53e-20, 1.93e-20] ) _wfc3_vegamag = np.array([22.331, 23.513, 25.809, 24.712, 26.063, 24.695]) _n_wfc3_bands = len(_wfc3_vegamag) _wfc3_band_zeromag_fluxes = np.zeros(_n_wfc3_bands) # zeromag Vega flux not given in standard WFPC2 documenation # instead the flux and Vega magnitudes are given for 1 DN/sec # the following code coverts these numbers to zeromag Vega fluxes for i in range(_n_wfc3_bands): _wfc3_band_zeromag_fluxes[i] = _wfc3_photflam[i] * ( 10 ** (0.4 * _wfc3_vegamag[i]) ) # ACS bands _acs_band_names = ["ACS_F475W", "ACS_F814W"] _acs_band_waves = np.array([0.4746, 0.8045]) _acs_photflam = np.array([1.827e-19, 7.045e-20]) _acs_vegamag = np.array([26.149, 25.517]) _n_acs_bands = len(_acs_vegamag) _acs_band_zeromag_fluxes = np.zeros(_n_acs_bands) # zeromag Vega flux not given in standard WFPC2 documenation # instead the flux and Vega magnitudes are given for 1 DN/sec # the following code coverts these numbers to zeromag Vega fluxes for i in range(_n_acs_bands): _acs_band_zeromag_fluxes[i] = _acs_photflam[i] * ( 10 ** (0.4 * _acs_vegamag[i]) ) # combine all the possible _poss_band_names = np.concatenate( [ _johnson_band_names, _spitzer_band_names, _wise_band_names, _wfpc2_band_names, _wfc3_band_names, _acs_band_names, ] ) _poss_band_waves = np.concatenate( [ _johnson_band_waves, _spitzer_band_waves, _wise_band_waves, _wfpc2_band_waves, _wfc3_band_waves, _acs_band_waves, ] ) _poss_band_zeromag_fluxes = np.concatenate( [ _johnson_band_zeromag_fluxes, _spitzer_band_zeromag_fluxes, _wise_band_zeromag_fluxes, _wfpc2_band_zeromag_fluxes, _wfc3_band_zeromag_fluxes, _acs_band_zeromag_fluxes, ] ) # zip everything together into a dictonary to pass back # and make sure it is in wavelength order windxs = np.argsort(_poss_band_waves) return OrderedDict( zip( _poss_band_names[windxs], zip(_poss_band_zeromag_fluxes[windxs], _poss_band_waves[windxs]), ) )
[docs] def get_band_names(self): """ Get the names of the bands in the data Returns ------- names : string array names of the bands in the data """ pbands = self.get_poss_bands() gbands = [] for cband in pbands.keys(): mag = self.get_band_mag(cband) if mag is not None: gbands.append(cband) return gbands
[docs] def get_band_mag(self, band_name): """ Get the magnitude and uncertainties for a band based on measurements colors and V band or direct measurements in the band Parameters ---------- band_name : str name of the band Returns ------- info: tuple (mag, unc, unit) """ band_data = self.bands.get(band_name) if band_data is not None: if self.band_units[band_name] == "mJy": # case of the IRAC/MIPS photometry # need to convert to Vega magnitudes pbands = self.get_poss_bands() band_zflux_wave = pbands[band_name] flux_p_unc = self.bands[band_name] flux = (1e-3 * Jy_to_cgs_const / band_zflux_wave[1] ** 2) * flux_p_unc[ 0 ] mag_unc = -2.5 * np.log10( (flux_p_unc[0] - flux_p_unc[1]) / (flux_p_unc[0] + flux_p_unc[1]) ) mag = -2.5 * np.log10(flux / band_zflux_wave[0]) return (mag, mag_unc, "mag") else: return self.bands[band_name] + (self.band_units[band_name],) else: _mag = 0.0 _mag_unc = 0.0 if band_name == "U": if ( (self.bands.get("V") is not None) & (self.bands.get("(B-V)") is not None) & (self.bands.get("(U-B)") is not None) ): _mag = ( self.bands["V"][0] + self.bands["(B-V)"][0] + self.bands["(U-B)"][0] ) _mag_unc = math.sqrt( self.bands["V"][1] ** 2 + self.bands["(B-V)"][1] ** 2 + self.bands["(U-B)"][1] ** 2 ) elif (self.bands.get("V") is not None) & ( self.bands.get("(U-V)") is not None ): _mag = self.bands["V"][0] + self.bands["(U-V)"][0] _mag_unc = math.sqrt( self.bands["V"][1] ** 2 + self.bands["(U-V)"][1] ** 2 ) elif band_name == "B": if (self.bands.get("V") is not None) & ( self.bands.get("(B-V)") is not None ): _mag = self.bands["V"][0] + self.bands["(B-V)"][0] _mag_unc = math.sqrt( self.bands["V"][1] ** 2 + self.bands["(B-V)"][1] ** 2 ) elif band_name == "R": if (self.bands.get("V") is not None) & ( self.bands.get("(V-R)") is not None ): _mag = self.bands["V"][0] - self.bands["(V-R)"][0] _mag_unc = math.sqrt( self.bands["V"][1] ** 2 + self.bands["(V-R)"][1] ** 2 ) elif band_name == "I": if (self.bands.get("V") is not None) & ( self.bands.get("(V-I)") is not None ): _mag = self.bands["V"][0] - self.bands["(V-I)"][0] _mag_unc = math.sqrt( self.bands["V"][1] ** 2 + self.bands["(V-I)"][1] ** 2 ) elif ( (self.bands.get("V") is not None) & (self.bands.get("(V-R)") is not None) & (self.bands.get("(R-I)") is not None) ): _mag = ( self.bands["V"][0] - self.bands["(V-R)"][0] - self.bands["(R-I)"][0] ) _mag_unc = math.sqrt( self.bands["V"][1] ** 2 + self.bands["(V-R)"][1] ** 2 + self.bands["(R-I)"][1] ** 2 ) if (_mag != 0.0) & (_mag_unc != 0.0): return (_mag, _mag_unc, "mag")
[docs] def get_band_flux(self, band_name): """ Compute the flux for the input band Parameters ---------- band_name : str name of the band Returns ------- info: tuple (flux, unc) """ mag_vals = self.get_band_mag(band_name) if mag_vals is not None: # get the zero mag fluxes poss_bands = self.get_poss_bands() if mag_vals[2] == "mag": # flux +/- unc flux1 = poss_bands[band_name][0] * ( 10 ** (-0.4 * (mag_vals[0] + mag_vals[1])) ) flux2 = poss_bands[band_name][0] * ( 10 ** (-0.4 * (mag_vals[0] - mag_vals[1])) ) return ( 0.5 * (flux1 + flux2), 0.5 * (flux2 - flux1), poss_bands[band_name][1], ) elif mag_vals[2] == "mJy": mfac = 1e-3 * Jy_to_cgs_const / np.square(poss_bands[band_name][0]) return ( mag_vals[0] * mfac, mag_vals[1] * mfac, poss_bands[band_name][1], ) else: warnings.warn("cannot get flux for %s" % band_name, UserWarning) else: warnings.warn("cannot get flux for %s" % band_name, UserWarning)
[docs] def get_band_fluxes(self): """ Compute the fluxes and uncertainties in each band Returns ------- Updates self.band_fluxes and self.band_waves Also sets self.(n_waves, waves, fluxes, npts) """ poss_bands = self.get_poss_bands() for pband_name in poss_bands.keys(): _mag_vals = self.get_band_mag(pband_name) if _mag_vals is not None: if _mag_vals[2] == "mag": _flux1 = poss_bands[pband_name][0] * ( 10 ** (-0.4 * (_mag_vals[0] + _mag_vals[1])) ) _flux2 = poss_bands[pband_name][0] * ( 10 ** (-0.4 * (_mag_vals[0] - _mag_vals[1])) ) self.band_fluxes[pband_name] = ( 0.5 * (_flux1 + _flux2), 0.5 * (_flux2 - _flux1), ) self.band_waves[pband_name] = poss_bands[pband_name][1] elif _mag_vals[2] == "mJy": self.band_waves[pband_name] = poss_bands[pband_name][1] mfac = ( 1e-3 * Jy_to_cgs_const / np.square(self.band_waves[pband_name]) ) self.band_fluxes[pband_name] = ( _mag_vals[0] * mfac, _mag_vals[1] * mfac, ) else: warnings.warn("cannot get flux for %s" % pband_name, UserWarning) # also store the band data in flat numpy vectors for # computational speed in fitting routines # mimics the format of the spectral data self.n_waves = len(self.band_waves) self.waves = np.zeros(self.n_waves) self.fluxes = np.zeros(self.n_waves) self.uncs = np.zeros(self.n_waves) self.npts = np.full(self.n_waves, 1.0) for k, pband_name in enumerate(self.band_waves.keys()): self.waves[k] = self.band_waves[pband_name] self.fluxes[k] = self.band_fluxes[pband_name][0] self.uncs[k] = self.band_fluxes[pband_name][1] self.wave_range = [min(self.waves), max(self.waves)] # add the units self.waves = self.waves * u.micron self.wave_range = self.wave_range * u.micron self.fluxes = self.fluxes * (u.erg / ((u.cm**2) * u.s * u.angstrom)) self.uncs = self.uncs * (u.erg / ((u.cm**2) * u.s * u.angstrom))
[docs] def get_band_mags_from_fluxes(self): """ Compute the magnitudes from fluxes in each band Useful for generating "observed data" from models Returns ------- Updates self.bands and self.band_units """ poss_bands = self.get_poss_bands() for cband in self.band_fluxes.keys(): if cband in poss_bands.keys(): self.bands[cband] = ( -2.5 * np.log10(self.band_fluxes[cband][0] / poss_bands[cband][0]), 0.0, ) self.band_waves[cband] = poss_bands[cband][1] self.band_units[cband] = "mag" else: warnings.warn("cannot get mag for %s" % cband, UserWarning) self.n_bands = len(self.bands)
def _getspecfilename(line, path): """ Get the full filename including path from the line in the dat file Parameters ---------- line : string formated line from DAT file example: 'IUE = hd029647_iue.fits' path : string path of the FITS file Returns ------- full_filename : str full name of file including path """ eqpos = line.find("=") tfile = line[eqpos + 2 :].rstrip() return path + tfile
[docs] class SpecData: """ Spectroscopic data (used by StarData) Attributes: waves : array of floats wavelengths fluxes : array of floats fluxes uncs : array of floats uncertainties on the fluxes npts : array of floats number of measurements contributing to the flux points n_waves : int number of wavelengths wmin, wmax : floats wavelength min and max """ # ----------------------------------------------------------- def __init__(self, type): """ Parameters ---------- type: string descriptive string of type of data (e.g., IUE, FUSE, IRS) """ self.type = type self.n_waves = 0
[docs] def read_spectra(self, line, path="./"): """ Read spectra from a FITS file FITS file has a binary table in the 1st extension Header needs to have: - wmin, wmax : min/max of wavelengths in file Expected columns are: - wave - flux - sigma [uncertainty in flux units] - npts [number of observations include at this wavelength] Parameters ---------- line : string formatted line from DAT file example: 'IUE = hd029647_iue.fits' path : string, optional location of the FITS files path Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ full_filename = _getspecfilename(line, path) # open and read the spectrum # ignore units warnings as non-standard units are explicitly handled a few lines later with warnings.catch_warnings(): warnings.simplefilter("ignore", UnitsWarning) tdata = Table.read(full_filename) self.waves = tdata["WAVELENGTH"].quantity self.fluxes = tdata["FLUX"].quantity self.uncs = tdata["SIGMA"].quantity self.npts = tdata["NPTS"].quantity self.n_waves = len(self.waves) # include the model if it exists # currently only used for FUSE H2 model if "MODEL" in tdata.colnames: self.model = tdata["MODEL"].quantity # fix odd unit designations if self.waves.unit == "ANGSTROM": self.waves = self.waves.value * u.angstrom if self.waves.unit == "MICRON": self.waves = self.waves.value * u.micron if self.fluxes.unit == "ERG/CM2/S/A": self.fluxes = self.fluxes.value * (u.erg / ((u.cm**2) * u.s * u.angstrom)) self.uncs = self.uncs.value * (u.erg / ((u.cm**2) * u.s * u.angstrom)) # compute the min/max wavelengths self.wave_range = ( np.array([min(self.waves.value), max(self.waves.value)]) * self.waves.unit ) # trim any data that is not finite (indxs,) = np.where(~np.isfinite(self.fluxes)) if len(indxs) > 0: self.fluxes[indxs] = 0.0 self.npts[indxs] = 0 # convert wavelengths to microns (standardization) self.waves = self.waves.to(u.micron) self.wave_range = self.wave_range.to(u.micron)
[docs] def read_fuse(self, line, path="./"): """ Read in FUSE spectra Converts the wavelengths from Anstroms to microns Parameters ---------- line : string formatted line from DAT file example: 'STIS = hd029647_fuse.fits' path : string, optional location of the FITS files path Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ self.read_spectra(line, path)
[docs] def read_iue(self, line, path="./"): """ Read in IUE spectra Removes data with wavelengths > 3200 A Converts the wavelengths from Anstroms to microns Parameters ---------- line : string formatted line from DAT file example: 'IUE = hd029647_iue.fits' path : string, optional location of the FITS files path Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ self.read_spectra(line, path) # trim the long wavelength data by setting the npts to zero (indxs,) = np.where(self.waves > 3200.0 * u.angstrom) if len(indxs) > 0: self.npts[indxs] = 0
[docs] def read_stis(self, line, path="./"): """ Read in STIS spectra Converts the wavelengths from Anstroms to microns Parameters ---------- line : string formatted line from DAT file example: 'STIS = hd029647_stis.fits' path : string, optional location of the FITS files path Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ self.read_spectra(line, path) # add units self.fluxes = self.fluxes.value * (u.erg / ((u.cm**2) * u.s * u.angstrom)) self.uncs = self.uncs.value * (u.erg / ((u.cm**2) * u.s * u.angstrom))
[docs] def read_spex(self, line, path="./", use_corfac=True, corfac=None): """ Read in SpeX spectra Parameters ---------- line : string formatted line from DAT file example: 'SpeX = hd029647_spex.fits' path : string, optional location of the FITS files path corfac : dict of key: coefficients keys identify the spectrum to be corrected and how Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ self.read_spectra(line, path) # determine which correction factor to use if self.type == "SpeX_SXD": if "SpeX_SXD" in corfac.keys(): corfac = corfac["SpeX_SXD"] else: corfac = None else: if "SpeX_LXD" in corfac.keys(): corfac = corfac["SpeX_LXD"] else: corfac = None # correct the SpeX spectra if desired and if the correction factor is defined if use_corfac and corfac is not None: self.fluxes *= corfac self.uncs *= corfac # add units self.fluxes = self.fluxes.value * (u.erg / ((u.cm**2) * u.s * u.angstrom)) self.uncs = self.uncs.value * (u.erg / ((u.cm**2) * u.s * u.angstrom))
[docs] def read_irs(self, line, path="./", use_corfac=True, corfac=None): """ Read in Spitzer/IRS spectra Correct the IRS spectra if the appropriate corfacs are present in the DAT file. Does a multiplicative correction that can include a linear term if corfac_irs_zerowave and corfac_irs_slope factors are present. Otherwise, just apply a multiplicative factor based on corfac_irs. Parameters ---------- line : string formatted line from DAT file example: 'IRS = hd029647_irs.fits' path : string, optional location of the FITS files path corfac : dict of key: coefficients keys identify the spectrum to be corrected and how Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ self.read_spectra(line, path) # standardization # mfac = Jy_to_cgs_const/np.square(self.waves) # self.fluxes *= mfac # self.uncs *= mfac # correct the IRS spectra if desired and if corfacs are defined if use_corfac and "IRS" in corfac.keys(): if ("IRS_zerowave" in corfac.keys()) and ("IRS_slope" in corfac.keys()): mod_line = corfac["IRS"] + ( corfac["IRS_slope"] * (self.waves.value - corfac["IRS_zerowave"]) ) self.fluxes *= mod_line self.uncs *= mod_line else: self.fluxes *= corfac["IRS"] self.uncs *= corfac["IRS"] # remove bad long wavelength IRS data if keyword set if "IRS_maxwave" in corfac.keys(): (indxs,) = np.where(self.waves.value > corfac["IRS_maxwave"]) if len(indxs) > 0: self.npts[indxs] = 0 # add units self.fluxes = self.fluxes.value * (u.Jy) self.uncs = self.uncs.value * (u.Jy)
[docs] def read_miri_ifu(self, line, path="./"): """ Read in Webb/MRS IFU spectra Parameters ---------- line : string formatted line from DAT file example: 'IRS = hd029647_irs.fits' path : string, optional location of the FITS files path Returns ------- Updates self.(file, wave_range, waves, flux, uncs, npts, n_waves) """ self.read_spectra(line, path) # add units self.fluxes = self.fluxes.value * u.Jy self.uncs = self.uncs.value * u.Jy self.fluxes = self.fluxes.to( fluxunit, equivalencies=u.spectral_density(self.waves) ) self.uncs = self.uncs.to(fluxunit, equivalencies=u.spectral_density(self.waves))
[docs] def rebin_constres(self, waverange, resolution): """ Rebin the spectrum to a fixed spectral resolution and min/max wavelength range. Parameters ---------- waverange : 2 element array of astropy Quantities Min/max of wavelength range with units resolution : float Spectral resolution of rebinned spectrum Returns ------- measure_extinction SpecData Object with rebinned spectrum """ # setup new wavelength grid full_wave, full_wave_min, full_wave_max = _wavegrid( resolution, waverange.to(u.micron).value ) n_waves = len(full_wave) # setup the new rebinned vectors new_waves = full_wave * u.micron new_fluxes = np.zeros((n_waves), dtype=float) new_uncs = np.zeros((n_waves), dtype=float) new_npts = np.zeros((n_waves), dtype=int) # rebin using a weighted average owaves = self.waves.to(u.micron).value for k in range(n_waves): # check for zero uncs to avoid divide by zero # errors when the flux uncertainty of a real measurement # is zero for any reason (indxs,) = np.where( (owaves >= full_wave_min[k]) & (owaves < full_wave_max[k]) & (self.npts > 0.0) & (self.uncs > 0.0) ) if len(indxs) > 0: weights = 1.0 / np.square(self.uncs[indxs].value) sweights = np.sum(weights) new_fluxes[k] = np.sum(weights * self.fluxes[indxs].value) / sweights new_uncs[k] = 1.0 / np.sqrt(sweights) new_npts[k] = np.sum(self.npts[indxs]) # update values self.waves = new_waves self.fluxes = new_fluxes * self.fluxes.unit self.uncs = new_uncs * self.uncs.unit self.npts = new_npts
[docs] class StarData: """ Photometric and spectroscopic data for a star Attributes ---------- file : string DAT filename path : string DAT filename path sptype : string spectral type of star model_params : dict has the stellar atmosphere model parameters empty dict if observed data data : dict of key:BandData or SpecData key gives the type of data (e.g., BAND, IUE, IRS) photonly: boolean Only read in the photometry (no spectroscopy) corfac : dict of key:correction factors key gives the type (e.g., IRS, IRS_slope) use_corfac : boolean whether or not to use the correction factors, default = True LXD_man : boolean whether or not the LXD scaling factor has been set manually, default = False """ def __init__( self, datfile, path="", photonly=False, use_corfac=True, deredden=False ): """ Parameters ---------- datfile: string filename of the DAT file path: string, optional DAT file path photonly: boolean Only read in the photometry (no spectroscopy) use_corfac: boolean Modify the spectra based on precomputed correction factors Currently only affects Spitzer/IRS data and SpeX data deredden : boolean [default=False] Deredden the data based on dereddening parameters given in the DAT file. Generally used to deredden standards. """ self.file = datfile self.path = path self.sptype = "" self.model_params = {} self.data = {} self.corfac = {} self.deredden_params = {} self.photonly = photonly self.use_corfac = use_corfac self.LXD_man = False self.dereddened = deredden if self.file is not None: self.read(deredden=deredden)
[docs] def read(self, deredden=False): """ Populate the object from a DAT file + spectral files Parameters ---------- deredden : boolean [default=False] Deredden the data based on dereddening parameters given in the DAT file. Generally used to deredden standards. """ # open and read all the lines in the file f = open(self.path + self.file, "r") self.datfile_lines = list(f) f.close() # get the photometric band data self.data["BAND"] = BandData("BAND") self.data["BAND"].read_bands(self.datfile_lines) # covert the photoemtric band data to fluxes in all possible bands self.data["BAND"].get_band_fluxes() # go through and get info before reading the spectra poss_mod_params = ["model_type", "Z", "vturb", "logg", "Teff", "origin"] for line in self.datfile_lines: cpair = self._parse_dfile_line(line) if cpair is not None: if cpair[0] == "sptype": self.sptype = cpair[1] elif cpair[0] in poss_mod_params: self.model_params[cpair[0]] = cpair[1] elif cpair[0] == "corfac_spex_SXD": self.corfac["SpeX_SXD"] = eval(cpair[1]) elif cpair[0] == "corfac_spex_LXD": self.corfac["SpeX_LXD"] = eval(cpair[1]) elif cpair[0] == "LXD_man": self.LXD_man = eval(cpair[1]) elif cpair[0] == "corfac_irs_zerowave": self.corfac["IRS_zerowave"] = float(cpair[1]) elif cpair[0] == "corfac_irs_slope": self.corfac["IRS_slope"] = float(cpair[1]) elif cpair[0] == "corfac_irs_maxwave": self.corfac["IRS_maxwave"] = float(cpair[1]) elif cpair[0] == "corfac_irs": self.corfac["IRS"] = float(cpair[1]) elif cpair[0].find("dered") != -1: if cpair[0].find("RV") != -1: self.deredden_params["RV"] = float(cpair[1]) elif cpair[0].find("AV") != -1: self.deredden_params["AV"] = float(cpair[1]) elif cpair[0].find("FM") != -1: self.deredden_params["FM90"] = [ float(cfm) for cfm in cpair[1].split(" ") ] # read the spectra if not self.photonly: for line in self.datfile_lines: if line.find("IUE") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["IUE"] = SpecData("IUE") self.data["IUE"].read_iue(line, path=self.path) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("FUSE") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["FUSE"] = SpecData("FUSE") self.data["FUSE"].read_fuse(line, path=self.path) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("STIS_Opt") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["STIS_Opt"] = SpecData("STIS_Opt") self.data["STIS_Opt"].read_stis(line, path=self.path) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("STIS") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["STIS"] = SpecData("STIS") self.data["STIS"].read_stis(line, path=self.path) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("SpeX_SXD") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["SpeX_SXD"] = SpecData("SpeX_SXD") self.data["SpeX_SXD"].read_spex( line, path=self.path, use_corfac=self.use_corfac, corfac=self.corfac, ) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("SpeX_LXD") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["SpeX_LXD"] = SpecData("SpeX_LXD") self.data["SpeX_LXD"].read_spex( line, path=self.path, use_corfac=self.use_corfac, corfac=self.corfac, ) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("IRS") == 0 and line.find("IRS15") < 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["IRS"] = SpecData("IRS") self.data["IRS"].read_irs( line, path=self.path, use_corfac=self.use_corfac, corfac=self.corfac, ) else: warnings.warn(f"{fname} does not exist", UserWarning) elif line.find("MIRI_IFU") == 0: fname = _getspecfilename(line, self.path) if os.path.isfile(fname): self.data["MIRI_IFU"] = SpecData("MIRI_IFU") self.data["MIRI_IFU"].read_miri_ifu( line, path=self.path, ) else: warnings.warn(f"{fname} does not exist", UserWarning) # if desired and the necessary dereddening parameters are present if deredden: self.deredden()
@staticmethod def _parse_dfile_line(line): """ Parses a string and return key, value pair. Pair separated by '=' and ends with ';'. Parameters ---------- line : string DAT file formatted string Returns ------- substring : string The value substring in a DAT file formatted string """ if line[0] != "#": eqpos = line.find("=") if eqpos >= 0: colpos = line.find(";") if colpos == 0: colpos = len(line) return (line[0 : eqpos - 1].strip(), line[eqpos + 1 : colpos].strip())
[docs] def deredden(self): """ Remove the effects of extinction from all the data. Prime use to deredden a standard star for a small amount of extinction. Information on the extinction curve to use is given in the DAT_file. Uses FM90 parameters for the UV portion of the extinction curve and CCM89 extinction values for the optical/NIR based on R(V). A(V) values used for dust column. """ if self.deredden_params: # deredden the BANDs and IUE/STIS/FUSE data # will need to add other options if/when dereddening expanded # info for dereddening model (F99 method) optnirext = CCM89() gvals = self.data["BAND"].waves < 3.0 * u.micron optnir_axav_x = 1.0 / self.data["BAND"].waves[gvals] optnir_axav_y = optnirext(optnir_axav_x) fm = self.deredden_params["FM90"] optnir_sindxs = np.argsort(optnir_axav_x) xrange = np.flip(1.0 / np.array(optnirext.x_range)) * u.micron for curtype in self.data.keys(): cwaves = self.data[curtype].waves gvals = (cwaves > xrange[0]) & (cwaves < xrange[1]) if np.any(gvals): alav = _curve_F99_method( self.data[curtype].waves[gvals], self.deredden_params["RV"], fm[0], fm[1], fm[2], fm[3], fm[4], fm[5], optnir_axav_x.value[optnir_sindxs], optnir_axav_y[optnir_sindxs], optnirext.x_range, "F99_method", ) self.data[curtype].fluxes[gvals] /= 10 ** ( -0.4 * alav * self.deredden_params["AV"] ) else: warnings.warn(f"{curtype} cannot be dereddened", UserWarning) else: warnings.warn( "cannot deredden as no dereddening parameters set", UserWarning )
[docs] def get_flat_data_arrays(self, req_datasources): """ Get the data in a simple format Parameters ---------- req_datasources : list of str list of data sources (e.g., ['IUE', 'BAND']) Returns ------- (waves, fluxes, uncs) : tuple of numpy.ndarrays arrays are sorted from short to long wavelengths waves is wavelengths in microns fluxes is fluxes in erg/cm2/s/A uncs is uncertainties on flux in erg/cm2/s/A """ wavedata = [] fluxdata = [] uncdata = [] nptsdata = [] for data_source in req_datasources: if data_source in self.data.keys(): wavedata.append(self.data[data_source].waves.to(u.micron).value) fluxdata.append( self.data[data_source] .fluxes.to( fluxunit, equivalencies=u.spectral_density(self.data[data_source].waves), ) .value ) uncdata.append( self.data[data_source] .uncs.to( fluxunit, equivalencies=u.spectral_density(self.data[data_source].waves), ) .value ) nptsdata.append(self.data[data_source].npts) waves = np.concatenate(wavedata) fluxes = np.concatenate(fluxdata) uncs = np.concatenate(uncdata) npts = np.concatenate(nptsdata) # sort the arrays from short to long wavelengths # at the same time, remove points with no data (gindxs,) = np.where(npts > 0) sindxs = np.argsort(waves[gindxs]) gindxs = gindxs[sindxs] waves = waves[gindxs] fluxes = fluxes[gindxs] uncs = uncs[gindxs] return (waves, fluxes, uncs)
[docs] def plot( self, ax, pcolor=None, norm_wave_range=None, mlam4=False, wavenum=False, fluxunit=fluxunit, exclude=[], yoffset=None, yoffset_type="multiply", annotate_key=None, annotate_wave_range=None, annotate_alignment="left", annotate_text=None, annotate_rotation=0.0, annotate_yoffset=0.0, annotate_color="k", legend_key=None, legend_label=None, fontsize=None, ): """ Plot all the data for a star (bands and spectra) Parameters ---------- ax : matplotlib plot object pcolor : matplotlib color [default=None] color to use for all the data norm_wave_range : list of 2 floats [default=None] min/max wavelength range to use to normalize data mlam4 : boolean [default=False] plot the data multiplied by lambda^4 to remove the Rayleigh-Jeans slope wavenum : boolean [default=False] plot x axis as 1/wavelength as is standard for UV extinction curves fluxunit : astropy unit flux units for plot, default is ergs/(cm^2 s A) exclude : list of strings [default=[]] List of data type(s) to exclude from the plot (e.g., "IRS", "IRAC1",...) yoffset : float [default=None] multiplicative or additive offset for the data yoffset_type : str [default="multiply"] yoffset type: "multiply" or "add" annotate_key : string [default=None] type of data for which to annotate text (e.g., SpeX_LXD) annotate_wave_range : list of 2 floats [default=None] min/max wavelength range for the annotation of the text annotate_alignment : string [default="left"] horizontal alignment of the annotated text ("left", "center", "right") annotate_text : string [default=None] text to annotate annotate_rotation : float [default=0.0] rotation angle of the annotated text annotate_yoffset : float [default=0.0] y-offset for the annotated text annotate_color : string [default="k"] color of the annotated text legend_key : string [default=None] legend the spectrum using the given data key legend_label : string [default=None] label to use for legend fontsize : int [default=None] fontsize for plot """ if yoffset is None: if yoffset_type == "multiply": yoffset = 1.0 else: yoffset = 0.0 # find the data to use for the normalization if requested if norm_wave_range is not None: normtype = None for curtype in self.data.keys(): if (norm_wave_range[0] >= self.data[curtype].wave_range[0]) & ( (norm_wave_range[1] <= self.data[curtype].wave_range[1]) ): # prioritize spectra over photometric bands if normtype is not None: if normtype == "BAND": normtype = curtype else: normtype = curtype if normtype is None: return # raise ValueError("requested normalization range not valid") (gindxs,) = np.where( (self.data[normtype].npts > 0) & ( (self.data[normtype].waves >= norm_wave_range[0]) & (self.data[normtype].waves <= norm_wave_range[1]) ) ) if len(gindxs) > 0: waves = self.data[normtype].waves[gindxs] fluxes = ( self.data[normtype] .fluxes[gindxs] .to(fluxunit, equivalencies=u.spectral_density(waves)) .value ) if mlam4: ymult = np.power(waves.value, 4.0) else: ymult = np.full((len(waves.value)), 1.0) normval = np.average(fluxes * ymult) else: raise ValueError("no good data in requested norm range") else: normval = 1.0 # plot all band and spectral data for this star for curtype in self.data.keys(): # do not plot the excluded data type(s) if curtype in exclude: continue x = self.data[curtype].waves.value if wavenum: x = 1.0 / x # replace fluxes by NaNs for wavelength regions that need to be excluded from the plot, to avoid separate regions being connected artificially self.data[curtype].fluxes[self.data[curtype].npts == 0] = np.nan if mlam4: ymult = np.power(self.data[curtype].waves.value, 4.0) else: ymult = np.full((len(self.data[curtype].waves.value)), 1.0) # multiply by the overall normalization ymult /= normval if legend_key == curtype: if legend_label is None: red_name = self.file.replace(".dat", "") red_name = red_name.replace("DAT_files/", "") clabel = "%s / %s" % (red_name, self.sptype) else: clabel = legend_label else: clabel = None yvals = ( self.data[curtype] .fluxes.to( fluxunit, equivalencies=u.spectral_density(self.data[curtype].waves), ) .value ) yuncs = ( self.data[curtype] .uncs.to( fluxunit, equivalencies=u.spectral_density(self.data[curtype].waves), ) .value ) yplotvals = ymult * yvals if yoffset_type == "multiply": yplotvals *= yoffset else: yplotvals += yoffset if curtype == "BAND": # do not plot the excluded band(s) for i, bandname in enumerate(self.data[curtype].get_band_names()): if bandname in exclude: yplotvals[i] = np.nan # plot band data as points with errorbars ax.errorbar( x, yplotvals, yerr=ymult * yuncs, fmt="o", color=pcolor, label=clabel, mfc="white", ) else: ax.plot( x, yplotvals, "-", color=pcolor, label=clabel, ) if curtype == annotate_key: # annotate the spectra waves = self.data[curtype].waves ann_indxs = np.where( (waves >= annotate_wave_range[0]) & (waves <= annotate_wave_range[1]) ) ann_val = np.nanmedian(yplotvals[ann_indxs]) ann_val += (annotate_yoffset,) ann_xval = 0.5 * np.sum(annotate_wave_range.value) if wavenum: ann_xval = 1 / ann_xval ax.text( ann_xval, ann_val, annotate_text, color=annotate_color, horizontalalignment=annotate_alignment, rotation=annotate_rotation, fontsize=fontsize, )