Source code for tardis.plasma.properties.ion_population

import logging
import sys
import warnings

import numpy as np
import pandas as pd
from scipy import interpolate

from tardis.plasma.exceptions import PlasmaIonizationError
from tardis.plasma.properties.base import ProcessingPlasmaProperty
from tardis.plasma.properties.continuum_processes.rates import (
    get_ion_multi_index,
)

logger = logging.getLogger(__name__)

ION_ZERO_THRESHOLD = 1e-20

__all__ = [
    "PhiSahaNebular",
    "PhiSahaLTE",
    "RadiationFieldCorrection",
    "IonNumberDensity",
    "IonNumberDensityHeNLTE",
    "SahaFactor",
    "ThermalPhiSahaLTE",
]


def calculate_block_ids_from_dataframe(dataframe):
    block_start_id = (
        np.where(np.diff(dataframe.index.get_level_values(0)) != 0.0)[0] + 1
    )
    return np.hstack(([0], block_start_id, [len(dataframe)]))


[docs] class PhiSahaLTE(ProcessingPlasmaProperty): """ Attributes ---------- phi : pandas.DataFrame, dtype float Used for LTE ionization (at the radiation temperature). Indexed by atomic number, ion number. Columns are zones. """ outputs = ("phi",) latex_name = (r"\Phi",) latex_formula = ( r"\dfrac{2Z_{i,j+1}}{Z_{i,j}}\big( \ \dfrac{2\pi m_{e}/\beta_{\textrm{rad}}}{h^2} \ \big)^{3/2}e^{\dfrac{-\chi_{i,j}}{kT_{\textrm{rad}}}}", ) broadcast_ionization_energy = None
[docs] @staticmethod def calculate(g_electron, beta_rad, partition_function, ionization_data): phis = np.empty( ( partition_function.shape[0] - partition_function.index.get_level_values(0).unique().size, partition_function.shape[1], ) ) block_ids = calculate_block_ids_from_dataframe(partition_function) for i, start_id in enumerate(block_ids[:-1]): end_id = block_ids[i + 1] current_block = partition_function.values[start_id:end_id] current_phis = current_block[1:] / current_block[:-1] phis[start_id - i : end_id - i - 1] = current_phis broadcast_ionization_energy = ionization_data.reindex( partition_function.index ).dropna() phi_index = broadcast_ionization_energy.index broadcast_ionization_energy = broadcast_ionization_energy.values phi_coefficient = ( 2 * g_electron * np.exp(np.outer(broadcast_ionization_energy, -beta_rad)) ) return pd.DataFrame(phis * phi_coefficient, index=phi_index)
@staticmethod def _calculate_block_ids(partition_function): partition_function.index.get_level_values(0).unique()
[docs] class ThermalPhiSahaLTE(PhiSahaLTE): """ Attributes ---------- phi : pandas.DataFrame, dtype float Used for LTE ionization (at the electron temperature). Indexed by atomic number, ion number. Columns are zones. """ outputs = ("thermal_phi_lte",) latex_name = (r"\Phi^{*}(T_\mathrm{e})",) latex_formula = ( r"\dfrac{2Z_{i,j+1}}{Z_{i,j}}\big( \ \dfrac{2\pi m_{e}/\beta_{\textrm{electron}}}{h^2} \ \big)^{3/2}e^{\dfrac{-\chi_{i,j}}{kT_{\textrm{electron}}}}", )
[docs] @staticmethod def calculate( thermal_g_electron, beta_electron, thermal_lte_partition_function, ionization_data, ): return super(ThermalPhiSahaLTE, ThermalPhiSahaLTE).calculate( thermal_g_electron, beta_electron, thermal_lte_partition_function, ionization_data, )
[docs] class PhiSahaNebular(ProcessingPlasmaProperty): """ Attributes ---------- phi : pandas.DataFrame, dtype float Used for nebular ionization. Indexed by atomic number, ion number. Columns are zones. """ outputs = ("phi",) latex_name = (r"\Phi",) latex_formula = ( r"W(\delta\zeta_{i,j}+W(1-\zeta_{i,j}))\left( \ \dfrac{T_{\textrm{electron}}}{T_{\textrm{rad}}}\right)^{1/2}", )
[docs] @staticmethod def calculate( t_rad, w, zeta_data, t_electrons, delta, g_electron, beta_rad, partition_function, ionization_data, ): phi_lte = PhiSahaLTE.calculate( g_electron, beta_rad, partition_function, ionization_data ) zeta = PhiSahaNebular.get_zeta_values(zeta_data, phi_lte.index, t_rad) phis = ( phi_lte * w * ((zeta * delta) + w * (1 - zeta)) * (t_electrons / t_rad) ** 0.5 ) return phis
[docs] @staticmethod def get_zeta_values(zeta_data, ion_index, t_rad): zeta_t_rad = zeta_data.columns.values.astype(np.float64) zeta_values = zeta_data.loc[ion_index].values.astype(np.float64) zeta = interpolate.interp1d( zeta_t_rad, zeta_values, bounds_error=False, fill_value=np.nan )(t_rad) zeta = zeta.astype(float) if np.any(np.isnan(zeta)): warnings.warn( f"t_rads outside of zeta factor interpolation" f" zeta_min={zeta_data.columns.values.min():.2f} zeta_max={zeta_data.columns.values.max():.2f} " f"- replacing with {t_rad}" ) zeta[np.isnan(zeta)] = 1.0 return zeta
[docs] class RadiationFieldCorrection(ProcessingPlasmaProperty): """ Attributes ---------- delta : pandas.DataFrame, dtype float Calculates the radiation field correction (see Mazzali & Lucy, 1993) if not given as input in the config. file. The default chi_0_species is Ca II, which is good for type Ia supernovae. For type II supernovae, (1, 1) should be used. Indexed by atomic number, ion number. The columns are zones. """ outputs = ("delta",) latex_name = (r"\delta",) def __init__( self, plasma_parent=None, departure_coefficient=None, chi_0_species=(20, 2), delta_treatment=None, ): super(RadiationFieldCorrection, self).__init__(plasma_parent) self.departure_coefficient = departure_coefficient self.delta_treatment = delta_treatment self.chi_0_species = chi_0_species def _set_chi_0(self, ionization_data): if self.chi_0_species == (20, 2): self.chi_0 = 1.9020591570241798e-11 else: self.chi_0 = ionization_data.loc[self.chi_0_species]
[docs] def calculate( self, w, ionization_data, beta_rad, t_electrons, t_rad, beta_electron ): if getattr(self, "chi_0", None) is None: self._set_chi_0(ionization_data) if self.delta_treatment is None: if self.departure_coefficient is None: departure_coefficient = ( 1.0 / w ) # see Equation 13 and explanations on page 451 lower right in ML 93 else: departure_coefficient = self.departure_coefficient radiation_field_correction = -np.ones( (len(ionization_data), len(beta_rad)) ) less_than_chi_0 = (ionization_data < self.chi_0).values factor_a = t_electrons / (departure_coefficient * w * t_rad) radiation_field_correction[~less_than_chi_0] = factor_a * np.exp( np.outer( ionization_data.values[~less_than_chi_0], beta_rad - beta_electron, ) ) radiation_field_correction[less_than_chi_0] = 1 - np.exp( np.outer(ionization_data.values[less_than_chi_0], beta_rad) - beta_rad * self.chi_0 ) radiation_field_correction[less_than_chi_0] += factor_a * np.exp( np.outer(ionization_data.values[less_than_chi_0], beta_rad) - self.chi_0 * beta_electron ) else: radiation_field_correction = ( np.ones((len(ionization_data), len(beta_rad))) * self.delta_treatment ) delta = pd.DataFrame( radiation_field_correction, columns=np.arange(len(t_rad)), index=ionization_data.index, ) return delta
[docs] class IonNumberDensity(ProcessingPlasmaProperty): """ Convergence process to find the correct solution. A trial value for the electron density is initiated in a particular zone. The ion number densities are then calculated using the Saha equation. The electron density is then re-calculated by using the ion number densities to sum over the number of free electrons. If the two values for the electron densities are not similar to within the threshold value, a new guess for the value of the electron density is chosen and the process is repeated. Attributes ---------- ion_number_density : pandas.DataFrame, dtype float Index atom number, ion number. Columns zones. electron_densities : numpy.ndarray, dtype float """ outputs = ("ion_number_density", "electron_densities") latex_name = ( "N_{i,j}", "n_{e}", ) def __init__( self, plasma_parent, ion_zero_threshold=ION_ZERO_THRESHOLD, electron_densities=None, ): super(IonNumberDensity, self).__init__(plasma_parent) self.ion_zero_threshold = ion_zero_threshold self.block_ids = None self._electron_densities = electron_densities
[docs] @staticmethod def calculate_with_n_electron( phi, partition_function, number_density, n_electron, block_ids, ion_zero_threshold, ): if block_ids is None: block_ids = IonNumberDensity._calculate_block_ids(phi) ion_populations = np.empty_like(partition_function.values) phi_electron = np.nan_to_num(phi.values / n_electron.values) for i, start_id in enumerate(block_ids[:-1]): end_id = block_ids[i + 1] current_phis = phi_electron[start_id:end_id] phis_product = np.cumprod(current_phis, 0) tmp_ion_populations = np.empty( (current_phis.shape[0] + 1, current_phis.shape[1]) ) tmp_ion_populations[0] = number_density.values[i] / ( 1 + np.sum(phis_product, axis=0) ) tmp_ion_populations[1:] = tmp_ion_populations[0] * phis_product ion_populations[start_id + i : end_id + 1 + i] = tmp_ion_populations ion_populations[ion_populations < ion_zero_threshold] = 0.0 return ( pd.DataFrame(data=ion_populations, index=partition_function.index), block_ids, )
@staticmethod def _calculate_block_ids(phi): return calculate_block_ids_from_dataframe(phi)
[docs] def calculate(self, phi, partition_function, number_density): if self._electron_densities is None: n_e_convergence_threshold = 0.05 n_electron = number_density.sum(axis=0) n_electron_iterations = 0 while True: ( ion_number_density, self.block_ids, ) = self.calculate_with_n_electron( phi, partition_function, number_density, n_electron, self.block_ids, self.ion_zero_threshold, ) ion_numbers = ion_number_density.index.get_level_values( 1 ).values ion_numbers = ion_numbers.reshape((ion_numbers.shape[0], 1)) new_n_electron = (ion_number_density.values * ion_numbers).sum( axis=0 ) if np.any(np.isnan(new_n_electron)): raise PlasmaIonizationError( 'n_electron just turned "nan" -' " aborting" ) n_electron_iterations += 1 if n_electron_iterations > 100: logger.warning( f"n_electron iterations above 100 ({n_electron_iterations}) -" f" something is probably wrong" ) if np.all( np.abs(new_n_electron - n_electron) / n_electron < n_e_convergence_threshold ): break n_electron = 0.5 * (new_n_electron + n_electron) else: n_electron = self._electron_densities ion_number_density, self.block_ids = self.calculate_with_n_electron( phi, partition_function, number_density, n_electron, self.block_ids, self.ion_zero_threshold, ) return ion_number_density, n_electron
[docs] class IonNumberDensityHeNLTE(ProcessingPlasmaProperty): """ Convergence process to find the correct solution. A trial value for the electron density is initiated in a particular zone. The ion number densities are then calculated using the Saha equation. The electron density is then re-calculated by using the ion number densities to sum over the number of free electrons. If the two values for the electron densities are not similar to within the threshold value, a new guess for the value of the electron density is chosen and the process is repeated. Attributes ---------- ion_number_density : pandas.DataFrame, dtype float Index atom number, ion number. Columns zones. electron_densities : numpy.ndarray, dtype float """ outputs = ( "ion_number_density", "electron_densities", "helium_population_updated", ) latex_name = ( "N_{i,j}", "n_{e}", ) def __init__( self, plasma_parent, ion_zero_threshold=1e-20, electron_densities=None ): super(IonNumberDensityHeNLTE, self).__init__(plasma_parent) self.ion_zero_threshold = ion_zero_threshold self.block_ids = None self._electron_densities = electron_densities
[docs] def update_he_population( self, helium_population, n_electron, number_density ): helium_population_updated = helium_population.copy() he_one_population = helium_population_updated.loc[0].mul(n_electron) he_three_population = helium_population_updated.loc[2].mul( 1.0 / n_electron ) helium_population_updated.loc[ 0, helium_population_updated.columns ] = he_one_population.values helium_population_updated.loc[ 2, helium_population_updated.columns ] = he_three_population.values unnormalised = helium_population_updated.sum() normalised = helium_population_updated.mul( number_density.loc[2] / unnormalised ) helium_population_updated.update(normalised) return helium_population_updated
[docs] def calculate( self, phi, partition_function, number_density, helium_population ): if self._electron_densities is None: n_e_convergence_threshold = 0.05 n_electron = number_density.sum(axis=0) n_electron_iterations = 0 while True: ( ion_number_density, self.block_ids, ) = IonNumberDensity.calculate_with_n_electron( phi, partition_function, number_density, n_electron, self.block_ids, self.ion_zero_threshold, ) helium_population_updated = self.update_he_population( helium_population, n_electron, number_density ) ion_number_density.loc[2, 0] = helium_population_updated.loc[ 0 ].sum(axis=0) ion_number_density.loc[2, 1] = helium_population_updated.loc[ 1 ].sum(axis=0) ion_number_density.loc[2, 2] = helium_population_updated.loc[ 2, 0 ] ion_numbers = ion_number_density.index.get_level_values( 1 ).values ion_numbers = ion_numbers.reshape((ion_numbers.shape[0], 1)) new_n_electron = (ion_number_density.values * ion_numbers).sum( axis=0 ) if np.any(np.isnan(new_n_electron)): raise PlasmaIonizationError( 'n_electron just turned "nan" -' " aborting" ) n_electron_iterations += 1 if n_electron_iterations > 100: logger.warning( f"n_electron iterations above 100 ({n_electron_iterations}) -" f" something is probably wrong" ) if np.all( np.abs(new_n_electron - n_electron) / n_electron < n_e_convergence_threshold ): break n_electron = 0.5 * (new_n_electron + n_electron) else: n_electron = self._electron_densities ( ion_number_density, self.block_ids, ) = IonNumberDensity.calculate_with_n_electron( phi, partition_function, number_density, n_electron, self.block_ids, self.ion_zero_threshold, ) helium_population_updated = self.update_he_population( helium_population, n_electron, number_density ) ion_number_density.loc[2, 0].update( helium_population_updated.loc[0].sum(axis=0) ) ion_number_density.loc[2, 1].update( helium_population_updated.loc[1].sum(axis=0) ) ion_number_density.loc[2, 2].update( helium_population_updated.loc[2, 0] ) return ion_number_density, n_electron, helium_population_updated
[docs] class SahaFactor(ProcessingPlasmaProperty): """ Calculates the 'Saha factor' Phi_ik = n_i* / (n_k* n_e), i.e., the ratio of the LTE level population n_i*, and the product of the LTE ion density n_k* and the actual electron density n_e. Attributes ---------- phi_ik : pandas.DataFrame, dtype float Indexed by atom number, ion number, level number. Columns are zones. """ outputs = ("phi_ik",) latex_name = (r"\Phi_{i,\kappa}",)
[docs] def calculate( self, thermal_phi_lte, thermal_lte_level_boltzmann_factor, thermal_lte_partition_function, ): boltzmann_factor = self._prepare_boltzmann_factor( thermal_lte_level_boltzmann_factor ) phi_saha_index = get_ion_multi_index(boltzmann_factor.index) partition_function_index = get_ion_multi_index( boltzmann_factor.index, next_higher=False ) phi_saha = thermal_phi_lte.loc[phi_saha_index].values # Replace zero values in phi_saha to avoid zero division in Saha factor phi_saha[phi_saha == 0.0] = sys.float_info.min partition_function = thermal_lte_partition_function.loc[ partition_function_index ].values return boltzmann_factor / (phi_saha * partition_function)
@staticmethod def _prepare_boltzmann_factor(boltzmann_factor): atomic_number = boltzmann_factor.index.get_level_values(0) ion_number = boltzmann_factor.index.get_level_values(1) selected_ions_mask = atomic_number != ion_number return boltzmann_factor[selected_ions_mask]