Source code for tardis.model.parse_input

import logging
import os

import numpy as np
import pandas as pd
from astropy import units as u

from tardis import constants as const
from tardis.io.model.parse_density_configuration import (
    calculate_density_after_time,
    parse_config_v1_density,
    parse_csvy_density,
)
from tardis.io.model.readers.base import read_abundances_file, read_density_file
from tardis.io.model.readers.csvy import parse_csv_abundances
from tardis.io.model.readers.generic_readers import read_uniform_abundances
from tardis.model.geometry.radial1d import HomologousRadial1DGeometry
from tardis.model.matter.composition import Composition
from tardis.model.matter.decay import IsotopicMassFraction
from tardis.model.radiation_field_state import (
    DiluteBlackBodyRadiationFieldState,
)
from tardis.transport.montecarlo.packet_source import (
    BlackBodySimpleSource,
    BlackBodySimpleSourceRelativistic,
)
from tardis.util.base import quantity_linspace

logger = logging.getLogger(__name__)


[docs]def parse_structure_config(config, time_explosion, enable_homology=True): """ Parse the structure configuration data. Parameters ---------- config : object The configuration data. time_explosion : float The time of the explosion. enable_homology : bool, optional Whether to enable homology (default is True). Returns ------- electron_densities : object The parsed electron densities. temperature : object The parsed temperature. geometry : object The parsed geometry. density : object The parsed density. Raises ------ NotImplementedError If the structure configuration type is not supported. Notes ----- This function parses the structure configuration data and returns the parsed electron densities, temperature, geometry, and density. The structure configuration can be of type 'specific' or 'file'. If it is of type 'specific', the velocity and density are parsed from the configuration. If it is of type 'file', the velocity and density are read from a file. The parsed data is used to create a homologous radial 1D geometry object. """ electron_densities = None temperature = None structure_config = config.model.structure if structure_config.type == "specific": velocity = quantity_linspace( structure_config.velocity.start, structure_config.velocity.stop, structure_config.velocity.num + 1, ).cgs density = parse_config_v1_density(config) elif structure_config.type == "file": if os.path.isabs(structure_config.filename): structure_config_fname = structure_config.filename else: structure_config_fname = os.path.join( config.config_dirname, structure_config.filename ) ( time_0, velocity, density_0, electron_densities, temperature, ) = read_density_file(structure_config_fname, structure_config.filetype) density_0 = density_0.insert(0, 0) density = calculate_density_after_time( density_0, time_0, time_explosion ) else: raise NotImplementedError # Note: This is the number of shells *without* taking in mind the # v boundaries. if len(density) == len(velocity): logger.warning( "Number of density points larger than number of shells. Assuming inner point irrelevant" ) density = density[1:] geometry = HomologousRadial1DGeometry( velocity[:-1], # v_inner velocity[1:], # v_outer v_inner_boundary=structure_config.get("v_inner_boundary", None), v_outer_boundary=structure_config.get("v_outer_boundary", None), time_explosion=time_explosion, ) return electron_densities, temperature, geometry, density
[docs]def parse_csvy_geometry( config, csvy_model_config, csvy_model_data, time_explosion ): """ Parse the geometry data from a CSVY model. Parameters ---------- config : object The configuration data. csvy_model_config : object The configuration data of the CSVY model. csvy_model_data : object The data of the CSVY model. time_explosion : float The time of the explosion. Returns ------- geometry : object The parsed geometry. Raises ------ None. Notes ----- This function parses the geometry data from a CSVY model. It extracts the velocity information from the CSVY model configuration or data. The parsed velocity data is used to create a homologous radial 1D geometry object, which is returned. """ if hasattr(config, "model"): if hasattr(config.model, "v_inner_boundary"): v_boundary_inner = config.model.v_inner_boundary else: v_boundary_inner = None if hasattr(config.model, "v_outer_boundary"): v_boundary_outer = config.model.v_outer_boundary else: v_boundary_outer = None else: v_boundary_inner = None v_boundary_outer = None if hasattr(csvy_model_config, "velocity"): velocity = quantity_linspace( csvy_model_config.velocity.start, csvy_model_config.velocity.stop, csvy_model_config.velocity.num + 1, ).cgs else: velocity_field_index = [ field["name"] for field in csvy_model_config.datatype.fields ].index("velocity") velocity_unit = u.Unit( csvy_model_config.datatype.fields[velocity_field_index]["unit"] ) velocity = csvy_model_data["velocity"].values * velocity_unit velocity = velocity.to("cm/s") geometry = HomologousRadial1DGeometry( velocity[:-1], # v_inner velocity[1:], # v_outer v_inner_boundary=v_boundary_inner, v_outer_boundary=v_boundary_outer, time_explosion=time_explosion, ) return geometry
[docs]def parse_abundance_config(config, geometry, time_explosion): """ Parse the abundance configuration data. Parameters ---------- config : object The configuration data. geometry : object The geometry of the model. time_explosion : float The time of the explosion. Returns ------- nuclide_mass_fraction : object The parsed nuclide mass fraction. raw_isotope_abundance : object The parsed raw isotope abundance. This is the isotope abundance data before decay. Raises ------ None. Notes ----- This function parses the abundance configuration data and returns the parsed nuclide mass fraction. The abundance configuration can be of type 'uniform' or 'file'. If it is of type 'uniform', the abundance and isotope abundance are read using the 'read_uniform_abundances' function. If it is of type 'file', the abundance and isotope abundance are read from a file using the 'read_abundances_file' function. The parsed data is then processed to replace NaN values with 0.0, remove rows with zero sum, and normalize the data if necessary. The resulting nuclide mass fraction is returned. """ abundances_section = config.model.abundances isotope_abundance = pd.DataFrame() if abundances_section.type == "uniform": abundance, isotope_abundance = read_uniform_abundances( abundances_section, geometry.no_of_shells ) elif abundances_section.type == "file": if os.path.isabs(abundances_section.filename): abundances_fname = abundances_section.filename else: abundances_fname = os.path.join( config.config_dirname, abundances_section.filename ) index, abundance, isotope_abundance = read_abundances_file( abundances_fname, abundances_section.filetype ) abundance = abundance.replace(np.nan, 0.0) abundance = abundance[abundance.sum(axis=1) > 0] norm_factor = abundance.sum(axis=0) + isotope_abundance.sum(axis=0) if np.any(np.abs(norm_factor - 1) > 1e-12): logger.warning( "Abundances have not been normalized to 1. - normalizing" ) abundance /= norm_factor isotope_abundance /= norm_factor # The next line is if the abundances are given via dict # and not gone through the schema validator raw_isotope_abundance = isotope_abundance model_isotope_time_0 = config.model.abundances.get( "model_isotope_time_0", 0.0 * u.day ) isotope_abundance = IsotopicMassFraction( isotope_abundance, time_0=model_isotope_time_0 ).decay(time_explosion) nuclide_mass_fraction = convert_to_nuclide_mass_fraction( isotope_abundance, abundance ) return nuclide_mass_fraction, raw_isotope_abundance
[docs]def convert_to_nuclide_mass_fraction(isotopic_mass_fraction, mass_fraction): """ Convert the abundance and isotope abundance data to nuclide mass fraction. Parameters ---------- isotope_abundance : pandas.DataFrame The isotope abundance data. abundance : pandas.DataFrame The abundance data. Returns ------- nuclide_mass_fraction : pandas.DataFrame The converted nuclide mass fraction. Raises ------ None. Notes ----- This function converts the abundance and isotope abundance data to nuclide mass fraction. If the abundance data is not None, it is converted to nuclide mass fraction by mapping the abundance index to nuclide indices using the 'convert_element2nuclide_index' function. The resulting abundance data is then concatenated with the isotope abundance data to obtain the final nuclide mass fraction. """ nuclide_mass_fraction = pd.DataFrame() if mass_fraction is not None: mass_fraction.index = Composition.convert_element2nuclide_index( mass_fraction.index ) nuclide_mass_fraction = mass_fraction else: nuclide_mass_fraction = pd.DataFrame() if isotopic_mass_fraction is not None: nuclide_mass_fraction = pd.concat( [nuclide_mass_fraction, isotopic_mass_fraction] ) return nuclide_mass_fraction
[docs]def parse_csvy_composition( atom_data, csvy_model_config, csvy_model_data, time_explosion, geometry ): """ Parse the composition data from a CSVY model. Parameters ---------- atom_data : object The atom data used for parsing. csvy_model_config : object The configuration data of the CSVY model. csvy_model_data : object The data of the CSVY model. time_explosion : float The time of the explosion. geometry : object The geometry of the model. Returns ------- density : object The parsed density data. abundance : object The parsed abundance data. isotope_abundance : object The parsed isotope abundance data. elemental_mass : object The elemental mass data. Raises ------ None. Notes ----- This function parses the composition data from a CSVY model. It calls the 'parse_density_csvy' function to parse the density data, and the 'parse_abundance_csvy' function to parse the abundance and isotope abundance data. The parsed data is returned as density, abundance, isotope_abundance, and elemental_mass. """ density = parse_density_csvy( csvy_model_config, csvy_model_data, time_explosion ) nuclide_mass_fraction, raw_isotope_mass_fraction = parse_abundance_csvy( csvy_model_config, csvy_model_data, geometry, time_explosion ) return Composition( density, nuclide_mass_fraction, raw_isotope_mass_fraction, atom_data.atom_data.mass.copy(), )
[docs]def parse_abundance_csvy( csvy_model_config, csvy_model_data, geometry, time_explosion ): """ Parse the abundance data from a CSVY model. Parameters ---------- csvy_model_config : object The configuration data of the CSVY model. csvy_model_data : object The data of the CSVY model. geometry : object The geometry of the model. Returns ------- abundance : pd.DataFrame The parsed abundance data. isotope_abundance : pandas.DataFrame The parsed isotope abundance data. Raises ------ None. Notes ----- This function parses the abundance data from a CSVY model. If the CSVY model configuration contains an 'abundance' attribute, it uses the 'read_uniform_abundances' function to parse the abundance and isotope abundance data. Otherwise, it uses the 'parse_csv_abundances' function to parse the data. The parsed data is then processed to replace NaN values with 0.0, remove rows with zero sum, and normalize the data if necessary. The resulting abundance and isotope abundance arrays are returned. """ if hasattr(csvy_model_config, "abundance"): abundances_section = csvy_model_config.abundance mass_fraction, isotope_mass_fraction = read_uniform_abundances( abundances_section, geometry.no_of_shells ) else: _, mass_fraction, isotope_mass_fraction = parse_csv_abundances( csvy_model_data ) mass_fraction = mass_fraction.loc[:, 1:] mass_fraction.columns = np.arange(mass_fraction.shape[1]) isotope_mass_fraction = isotope_mass_fraction.loc[:, 1:] isotope_mass_fraction.columns = np.arange( isotope_mass_fraction.shape[1] ) mass_fraction = mass_fraction.replace(np.nan, 0.0) mass_fraction = mass_fraction[mass_fraction.sum(axis=1) > 0] isotope_mass_fraction = isotope_mass_fraction.replace(np.nan, 0.0) isotope_mass_fraction = isotope_mass_fraction[ isotope_mass_fraction.sum(axis=1) > 0 ] norm_factor = mass_fraction.sum(axis=0) + isotope_mass_fraction.sum(axis=0) if np.any(np.abs(norm_factor - 1) > 1e-12): logger.warning( "Abundances have not been normalized to 1. - normalizing" ) mass_fraction /= norm_factor isotope_mass_fraction /= norm_factor raw_isotope_mass_fraction = isotope_mass_fraction isotope_mass_fraction = IsotopicMassFraction( isotope_mass_fraction, time_0=csvy_model_config.model_isotope_time_0 ).decay(time_explosion) return ( convert_to_nuclide_mass_fraction(isotope_mass_fraction, mass_fraction), raw_isotope_mass_fraction, )
[docs]def parse_density_csvy(csvy_model_config, csvy_model_data, time_explosion): """ Parse the density data from a CSVY model. Parameters ---------- csvy_model_config : object The configuration data of the CSVY model. csvy_model_data : object The data of the CSVY model. time_explosion : float The time of the explosion. Returns ------- density : object The parsed density data. Raises ------ None. Notes ----- This function parses the density data from a CSVY model. If the CSVY model configuration contains a 'density' attribute, it uses the 'parse_csvy_density' function to parse the density data. Otherwise, it calculates the density data using the 'calculate_density_after_time' function. The parsed density data is returned. """ if hasattr(csvy_model_config, "density"): density = parse_csvy_density(csvy_model_config, time_explosion) else: time_0 = csvy_model_config.model_density_time_0 density_field_index = [ field["name"] for field in csvy_model_config.datatype.fields ].index("density") density_unit = u.Unit( csvy_model_config.datatype.fields[density_field_index]["unit"] ) density_0 = csvy_model_data["density"].values * density_unit # Removing as thee new architecture removes the 0th shell already # density_0 = density_0.to("g/cm^3")[1:] # density_0 = density_0.insert(0, 0) density = calculate_density_after_time( density_0, time_0, time_explosion ) return density
[docs]def parse_radiation_field_state( config, t_radiative, geometry, dilution_factor=None, packet_source=None ): """ Parses the radiation field state based on the provided configuration, radiative temperature, geometry, dilution factor, and packet source. Parameters ---------- config : Config The configuration object. t_radiative : {None, Quantity}, optional The radiative temperature. If None, it is calculated based on the initial_t_rad value in the plasma configuration. geometry : Geometry The geometry object. dilution_factor : {None, ndarray}, optional The dilution factor. If None, it is calculated based on the geometry. packet_source : {None, PacketSource}, optional The packet source object. Returns ------- DiluteThermalRadiationFieldState The parsed radiation field state. Raises ------ AssertionError If the length of t_radiative or dilution_factor is not compatible with the geometry. """ if t_radiative is None: if config.plasma.initial_t_rad > 0 * u.K: t_radiative = ( np.ones(geometry.no_of_shells) * config.plasma.initial_t_rad ) else: t_radiative = calculate_t_radiative_from_t_inner( geometry, packet_source ) assert len(t_radiative) == geometry.no_of_shells if dilution_factor is None: dilution_factor = calculate_geometric_dilution_factor(geometry) assert len(dilution_factor) == geometry.no_of_shells return DiluteBlackBodyRadiationFieldState( t_radiative, dilution_factor, geometry )
[docs]def initialize_packet_source( config, geometry, packet_source, legacy_mode_enabled ): """ Initialize the packet source based on config and geometry Parameters ---------- config : Config The configuration object containing the supernova and plasma settings. geometry : Geometry The geometry object containing the inner radius information. packet_source : BasePacketSource The packet source object based on the configuration and geometry. Returns ------- packet_source : BasePacketSource The packet source object based on the configuration and geometry. Raises ------ ValueError If both t_inner and luminosity_requested are None. """ if config.montecarlo.enable_full_relativity: packet_source = BlackBodySimpleSourceRelativistic( base_seed=config.montecarlo.seed, time_explosion=config.supernova.time_explosion, legacy_mode_enabled=legacy_mode_enabled, ) else: packet_source = BlackBodySimpleSource( base_seed=config.montecarlo.seed, legacy_mode_enabled=legacy_mode_enabled, ) luminosity_requested = config.supernova.luminosity_requested if config.plasma.initial_t_inner > 0.0 * u.K: packet_source.radius = geometry.r_inner_active[0] packet_source.temperature = config.plasma.initial_t_inner elif (config.plasma.initial_t_inner < 0.0 * u.K) and ( luminosity_requested is not None ): packet_source.radius = geometry.r_inner_active[0] packet_source.set_temperature_from_luminosity(luminosity_requested) else: raise ValueError( "Both t_inner and luminosity_requested cannot be None." ) return packet_source
[docs]def parse_packet_source(config, geometry, legacy_mode_enabled): """ Parse the packet source based on the given configuration and geometry. Parameters ---------- config : Config The configuration object containing the supernova and plasma settings. geometry : Geometry The geometry object containing the inner radius information. Returns ------- packet_source : BlackBodySimpleSource The packet source object based on the configuration and geometry. """ if config.montecarlo.enable_full_relativity: packet_source = BlackBodySimpleSourceRelativistic( base_seed=config.montecarlo.seed, time_explosion=config.supernova.time_explosion, legacy_mode_enabled=legacy_mode_enabled, ) else: packet_source = BlackBodySimpleSource( base_seed=config.montecarlo.seed, legacy_mode_enabled=legacy_mode_enabled, ) return initialize_packet_source( config, geometry, packet_source, legacy_mode_enabled )
[docs]def parse_csvy_radiation_field_state( config, csvy_model_config, csvy_model_data, geometry, packet_source ): t_radiative = None dilution_factor = None if hasattr(csvy_model_data, "columns") and ( "t_rad" in csvy_model_data.columns ): t_rad_field_index = [ field["name"] for field in csvy_model_config.datatype.fields ].index("t_rad") t_rad_unit = u.Unit( csvy_model_config.datatype.fields[t_rad_field_index]["unit"] ) t_radiative = csvy_model_data["t_rad"].iloc[1:].values * t_rad_unit elif config.plasma.initial_t_rad > 0 * u.K: t_radiative = ( np.ones(geometry.no_of_shells) * config.plasma.initial_t_rad ) else: t_radiative = calculate_t_radiative_from_t_inner( geometry, packet_source ) if np.any(t_radiative < 1000 * u.K): logging.critical( "Radiative temperature is too low in some of the shells, temperatures below 1000K " f"(e.g., T_rad = {t_radiative[np.argmin(t_radiative)]} in shell {np.argmin(t_radiative)} in your model) " "are not accurately handled by TARDIS.", ) if hasattr(csvy_model_data, "columns") and ( "dilution_factor" in csvy_model_data.columns ): dilution_factor = csvy_model_data["dilution_factor"].iloc[1:].values else: dilution_factor = calculate_geometric_dilution_factor(geometry) return DiluteBlackBodyRadiationFieldState( t_radiative, dilution_factor, geometry )
[docs]def calculate_t_radiative_from_t_inner(geometry, packet_source): """ Calculates the radiative temperature based on the inner temperature and the geometry of the system. Parameters ---------- geometry : Geometry The geometry object. packet_source : PacketSource The packet source object. Returns ------- Quantity The calculated radiative temperature. """ lambda_wien_inner = const.b_wien / packet_source.temperature t_radiative = const.b_wien / ( lambda_wien_inner * (1 + (geometry.v_middle - geometry.v_inner_boundary) / const.c) ) return t_radiative
[docs]def calculate_geometric_dilution_factor(geometry): return 0.5 * ( 1 - np.sqrt( 1 - ( geometry.r_inner[geometry.v_inner_boundary_index] ** 2 / geometry.r_middle**2 ) .to(1) .value ) )