Source code for pyretlife.retrieval.units

import astropy.units as u
import astropy.constants as const
import numpy as np


# -----------------------------------------------------------------------------
# DEFINITIONS
# -----------------------------------------------------------------------------


[docs] class UnitsUtil: def __init__(self): # Standard units for input of the retrieval. All parameters not # mentioned here are assumed to be dimensionless self.std_input_units = { # GENERAL "pressure": u.bar, "temperature": u.K, # LAMBDA RANGE "WMIN": u.micron, "WMAX": u.micron, # PHYSICAL PARAMETERS "P0": u.bar, "d_syst": self.custom_unit("pRT_pc", const.pc.cgs), "R_pl": self.custom_unit("pRT_R_earth", const.R_earth.cgs), "M_pl": self.custom_unit("pRT_M_earth", const.M_earth.cgs), # SCATTERING PARAMETERS "stellar_temperature": u.K, "stellar_radius": self.custom_unit("pRT_R_sun", const.R_sun.cgs), "semimajor_axis": self.custom_unit("pRT_AU", const.au.cgs), # MOON PARAMETERS "T_m": u.K, "R_m": u.pRT_R_earth, # CLOUD PARAMETERS "H2SO484(c)_am_top_pressure": u.bar, "H2SO484(c)_am_thickness": u.bar, "H2SO484(c)_am_particle_radius": u.cm, 'Pcloud':u.bar, # Input Files "wavelength": u.micron, "flux": u.erg / u.s / u.Hz / u.m**2, } # Units for the retrieval (mostly cgs). All parameters not # mentioned here are assumed to be dimensionless self.retrieval_units = { # GENERAL "pressure": u.bar, "temperature": u.K, # LAMBDA RANGE "WMIN": u.micron, "WMAX": u.micron, # PHYSICAL PARAMETERS "P0": u.bar, "d_syst": u.m, "R_pl": u.cm, "M_pl": u.g, # SCATTERING PARAMETERS "stellar_temperature": u.K, "stellar_radius": u.cm, "semimajor_axis": u.cm, # MOON PARAMETERS "T_m": u.K, "R_m": u.cm, # CLOUD PARAMETERS "H2SO484(c)_am_top_pressure": u.bar, "H2SO484(c)_am_thickness": u.bar, "H2SO484(c)_am_particle_radius": u.cm, 'Pcloud':u.bar, # Input Files "wavelength": u.micron, "flux": u.erg / u.s / u.Hz / u.m**2, }
[docs] @staticmethod def custom_unit(name, astropy_conv): temp_unit = u.def_unit(name, astropy_conv) setattr(u, name, temp_unit) u.add_enabled_units([getattr(u, name)]) return getattr(u, name)
[docs] @staticmethod def return_units(key, units): try: return units[key] except KeyError: return u.dimensionless_unscaled
[docs] @staticmethod def prior_unit_conversion( key, input_unit, target_unit, prior, printing=True ) -> dict: converted_prior = {} # Conversion of LogUniform priors if prior["kind"] == "log-uniform": for spec in prior["prior_specs"]: converted_prior[spec] = np.log10( (10 ** prior["prior_specs"][spec] * input_unit) .to(target_unit) .value ) elif prior["kind"] == "log-gaussian": # only translate the mean but leave the sigma the same (in log space) converted_prior["log_mean"] = np.log10( (10 ** prior["prior_specs"]["log_mean"] * input_unit) .to(target_unit) .value ) converted_prior["log_sigma"] = prior["prior_specs"]["log_sigma"] else: for spec in prior["prior_specs"]: converted_prior[spec] = ( (prior["prior_specs"][spec] * input_unit) .to(target_unit) .value ) # G and U priors are transformed easily as they do not contain a # logarithm they are just scaled # ULU and FU (only used for abundances and PT parameters in the # polynomial profile. no special treatment) # If a conversion was performed print it as a check if (target_unit != input_unit) and printing: print( "Conversion performed for prior of " + key + ". Prior kind: " + prior["kind"] ) print("Input values:", prior["prior_specs"], input_unit) print("Converted value:", converted_prior, target_unit) print() return converted_prior
[docs] @staticmethod def truth_unit_conversion( key, input_unit, target_unit, input_truth, printing=True, ): if input_truth is not None: converted_truth = (input_truth * input_unit).to(target_unit).value # If a conversion was performed print it as a check if (target_unit != input_unit) and printing: print("Conversion performed for truth value of " + key + ".") print("Input value:", input_truth, input_unit) print("Converted value:", converted_truth, target_unit) print() return converted_truth else: return None
[docs] @staticmethod def unit_spectrum_conversion( key, input_unit, target_unit, input_data, printing=True ): conv_data = np.zeros_like(input_data) conv_data[:, 0] = ( (input_data[:, 0] * input_unit[0]) .to(target_unit[0], equivalencies=u.spectral()) .value ) conv_data[:, 1] = ( (input_data[:, 1] * input_unit[1]) .to( target_unit[1], equivalencies=u.spectral_density( input_data[:, 0] * input_unit[0] ), ) .value ) conv_data[:, 2] = ( (input_data[:, 2] * input_unit[1]) .to( target_unit[1], equivalencies=u.spectral_density( input_data[:, 0] * input_unit[0] ), ) .value ) if (target_unit != input_unit) and printing: print("Conversion performed for input spectrum " + key + ".") print( "Input wavelength unit:", input_unit[0], "\tConverted wavelength unit:", target_unit[0], ) print( "Input flux unit:", input_unit[1], "\tConverted flux unit:", target_unit[1], ) print() return conv_data
[docs] @staticmethod def unit_spectrum_cube(input_unit, target_unit, input_wl, input_flux): conv_wl = ( (input_wl * input_unit[0]) .to(target_unit[0], equivalencies=u.spectral()) .value ) conv_flux = ( (input_flux * input_unit[1]) .to( target_unit[1], equivalencies=u.spectral_density(input_wl * input_unit[0]), ) .value ) return conv_wl, conv_flux
[docs] def convert_spectrum(instrument: dict, units: UnitsUtil) -> dict: for data_file in instrument.keys(): converted_unit_wavelength = units.return_units( "wavelength", units.retrieval_units ) converted_unit_flux = units.return_units("flux", units.retrieval_units) converted_data = units.unit_spectrum_conversion( data_file, [ instrument[data_file]["input_unit_wavelength"], instrument[data_file]["input_unit_flux"], ], [converted_unit_wavelength, converted_unit_flux], instrument[data_file]["input_data"], ) instrument[data_file] = { "wavelength": converted_data[:, 0], "flux": converted_data[:, 1], "error": converted_data[:, 2], "unit_wavelength": converted_unit_wavelength, "unit_flux": converted_unit_flux, "input_wavelength": instrument[data_file]["input_data"][:, 0], "input_flux": instrument[data_file]["input_data"][:, 1], "input_error": instrument[data_file]["input_data"][:, 2], "input_unit_wavelength": instrument[data_file]["input_unit_wavelength"], "input_unit_flux": instrument[data_file]["input_unit_flux"], } return instrument
[docs] def convert_knowns_and_parameters(dictionary: dict, units: UnitsUtil) -> dict: for section in dictionary.keys(): refined_dict = {} # Convert the input to retrieval units converted_unit = units.return_units(section, units.retrieval_units) refined_dict["input_unit"] = dictionary[section]["unit"] refined_dict["unit"] = converted_unit if "truth" in dictionary[section].keys(): converted_truth = units.truth_unit_conversion( section, dictionary[section]["unit"], converted_unit, dictionary[section]["truth"], ) refined_dict["input_truth"] = dictionary[section]["truth"] refined_dict["truth"] = converted_truth if "prior" in dictionary[section].keys(): converted_prior = units.prior_unit_conversion( section, dictionary[section]["unit"], converted_unit, dictionary[section]["prior"], ) refined_dict["prior"] = dictionary[section]["prior"] refined_dict["prior"]["input_prior_specs"] = dictionary[section][ "prior" ]["prior_specs"] refined_dict["prior"]["prior_specs"] = converted_prior refined_dict["type"] = dictionary[section]["type"] dictionary[section] = refined_dict return dictionary