Source code for pyretlife.retrieval.radiative_transfer

import os
import sys
from pathlib import Path
from typing import Union, Tuple, Set

import numpy as np
import spectres
from numpy import ndarray
from astropy import constants as const

#from pyretlife.retrieval.atmospheric_variables import (
#    get_mm
#)


[docs] def define_linelists( config: dict, settings: dict, input_opacity_path: Union[str, Path] ) -> Tuple[list, list, list, list]: used_rayleigh_species = [] used_cia_species = [] used_cloud_species = [] species = list(config["CHEMICAL COMPOSITION PARAMETERS"].keys()) #rebin_opacity_linelists(settings, input_opacity_path, pRT) used_line_species = ingest_opacity_linelists(settings, input_opacity_path) tot_mols = [s.split("_")[0] for s in used_line_species] # RAYLEIGH if settings["include_scattering"]["Rayleigh"]: used_rayleigh_species = ingest_rayleigh_linelists(species) tot_mols.extend(used_rayleigh_species) # CIA if settings["include_CIA"]: used_cia_species = ingest_cia_linelists(species, input_opacity_path) tot_mols.extend( [ item for sublist in [key.split("-") for key in used_cia_species] for item in sublist ] ) # CLOUDS if settings["include_scattering"]["clouds"]: cloud_species = set( [ "_".join(key.split("_")[:2]) for key in config["CLOUD PARAMETERS"].keys() if key != "settings_clouds" ] ) used_cloud_species = ingest_cloud_linelists( cloud_species, input_opacity_path ) tot_mols.extend([cloud.split("_", 1)[0] for cloud in cloud_species]) tot_mols = list(set(tot_mols)) print("Used line species:\t\t" + str(used_line_species)) print("Used rayleigh species:\t\t" + str(used_rayleigh_species)) print("Used continuum opacities:\t" + str(used_cia_species)) print("Used cloud species:\t\t" + str(used_cloud_species)) print("Used species *in general*:\t" + str(tot_mols)) print() return ( used_line_species, used_rayleigh_species, used_cia_species, used_cloud_species, )
#def rebin_opacity_linelists(settings: dict, input_opacity_path: Union[str, Path], pRT): # # get all folders and filter out the ones that are not r 1000 # line_species = os.listdir(Path(input_opacity_path)/"opacities"/"lines"/"corr_k") # full_resolution_lists = [] # for line_list in line_species: # if not '_R_' in line_list: # full_resolution_lists += [line_list] # # Load the molecular weights # masses={} # for species in full_resolution_lists: # masses[species.split('_')[0]]=float(get_mm(species)) # print(masses) # # Rebin if necessary # for species in full_resolution_lists: # for resolution in int(settings["resolution"]): # if not os.path.exists(Path(input_opacity_path)+'opacities/lines/corr_k/'+str(species)+"_R_"+str(resolution)): # os.mkdir(Path(input_opacity_path)+'opacities/lines/corr_k/'+str(species)+"_R_"+str(resolution)) # temp_atmosphere = pRT.Radtrans(line_species = [str(species)], wlen_bords_micron = [0.1, 251.]) # temp_atmosphere.write_out_rebin(resolution, path = Path(input_opacity_path)+'opacities/lines/corr_k/', species = [str(species)], masses = masses)
[docs] def ingest_opacity_linelists( settings: dict, input_opacity_path: Union[str, Path] ) -> list: species_at_resolution = [] for line in settings["opacity_linelist"]: species_at_resolution.append(line + ".R" + str(settings["resolution"])) line_species = os.listdir( Path(input_opacity_path) / "opacities" / "lines" / "correlated_k" ) # return list(set(species_at_resolution) & set(line_species)) return list(set(species_at_resolution))
[docs] def ingest_rayleigh_linelists(species: list) -> list: rayleigh_species = [ "H2", "He", "H2O", "CO2", "O2", "N2", "CO", "CH4", "N2", ] return list(set(species) & set(rayleigh_species))
[docs] def ingest_cia_linelists( species: list, input_opacity_path: Union[str, Path] ) -> list: used_cia_species = [] continuum_opacities = os.listdir( Path(input_opacity_path) / "opacities" / "continuum" / "collision_induced_absorptions" ) for cia in continuum_opacities: cia: str cia_components = cia.split("--") if len(cia_components) > 1: if ( species.count(cia_components[0]) + species.count(cia_components[1]) == 2 ): used_cia_species.append(cia) return used_cia_species
[docs] def ingest_cloud_linelists( cloud_species: Set[str], input_opacity_path: Union[str, Path] ) -> list: used_cloud_species = [] cloud_dict = { "a": "/amorphous", "c": "/crystalline", "m": "/mie", "d": "/DHS", } for cloud in cloud_species: cloud_dir = ( str(input_opacity_path) + "/opacities/continuum/clouds/" + cloud.split("_")[0].replace("(c)", "_c") + cloud_dict[cloud[-2]] + cloud_dict[cloud[-1]] ) if not os.path.exists(cloud_dir): sys.exit( "ERROR: No opacities found for the cloud species " + str(cloud) + "." ) used_cloud_species.append(cloud) return used_cloud_species
[docs] def calculate_moon_flux(frequency: ndarray, moon_vars: dict): exponent = const.h.cgs.value * frequency / (const.k_B.cgs.value * moon_vars["T_m"]) blackbody_nu = ( 2 * const.h.cgs.value * frequency ** 3 / const.c.cgs.value ** 2 / (np.exp(exponent) - 1) ) # in erg/cm^2/s/Hz/sr return np.pi * blackbody_nu # in erg/cm^2/s/Hz
[docs] def assign_reflectance_emissivity( scat_vars: dict, frequency: ndarray ) -> Tuple[ndarray, ndarray]: reflectance = scat_vars["reflectance"] * np.ones_like(frequency) emissivity = scat_vars["emissivity"] * np.ones_like(frequency) return reflectance, emissivity
[docs] def calculate_emission_flux( rt_object, settings: dict, temp: ndarray, abundances_MMR: dict, gravity: float, mmw: ndarray, cloud_radii: dict, cloud_lnorm: int, scat_vars: dict, em_contr=False, Pcloud=None, ) -> Tuple[ndarray, ndarray]: """ Creates the pressure-temperature profile for the current atmosphere and calculates the corresponding emitted flux using petitRADTRANS. See the documentation for petitRADTRANS.radtrans.Radtrans.calculate_flux. """ # Calculate the Spectrum of the planet, prevent unnecessary printing # TODO implement better logging # old_stdout = sys.stdout # sys.stdout = open(os.devnull, "w") if not settings["include_scattering"]["direct_light"]: freq, flux, additional_output = rt_object.calculate_flux( temp, abundances_MMR, mmw, gravity, cloud_particles_mean_radii=cloud_radii, cloud_particle_radius_distribution_std=cloud_lnorm, # add_cloud_scat_as_abs=settings["include_scattering"]["clouds"], #Deprecated #TODO: add warning if config has cloud scattering on return_contribution=em_contr, opaque_cloud_top_pressure=Pcloud, frequencies_to_wavelengths=False ) else: if settings['geometry'] == 'quadrature': freq, flux, additional_output = rt_object.calculate_flux( temp, abundances_MMR, mmw, gravity, cloud_particles_mean_radii=cloud_radii, cloud_particle_radius_distribution_std=cloud_lnorm, irradiation_geometry='non-isotropic', star_irradiation_angle=77.756, star_effective_temperature=scat_vars["stellar_temperature"], star_radius=scat_vars["stellar_radius"], orbit_semi_major_axis=scat_vars["semimajor_axis"], # add_cloud_scat_as_abs=settings["include_scattering"]["clouds"], #Deprecated return_contribution=em_contr, opaque_cloud_top_pressure=Pcloud, frequencies_to_wavelengths=False ) else: freq, flux, additional_output = rt_object.calculate_flux( temp, abundances_MMR, mmw, gravity, cloud_particles_mean_radii=cloud_radii, cloud_particle_radius_distribution_std=cloud_lnorm, irradiation_geometry=settings["geometry"], star_effective_temperature=scat_vars["stellar_temperature"], star_radius=scat_vars["stellar_radius"], orbit_semi_major_axis=scat_vars["semimajor_axis"], # add_cloud_scat_as_abs=settings["include_scattering"]["clouds"], #Deprecated return_contribution=em_contr, opaque_cloud_top_pressure=Pcloud, frequencies_to_wavelengths=False ) # sys.stdout = old_stdout if em_contr: return freq, flux, additional_output['emission_contribution'].copy() else: return freq, flux, None
[docs] def scale_flux_to_distance( flux: ndarray, radius: float, distance: float ) -> ndarray: # Scale the fluxes to the desired separation # TODO: add limb darkening factor option flux = flux * radius**2 / distance**2 return flux
[docs] def rebin_spectrum(instrument, wavelength, flux): return spectres.spectres( instrument["wavelength"], wavelength, flux, )