Source code for tardis.io.model.arepo.utils

"""Utility functions for reprojecting 3D Arepo data to 1D profiles."""

from pathlib import Path
from typing import TYPE_CHECKING

import matplotlib.pyplot as plt
import numpy as np
from astropy import units as u
from scipy import stats

from tardis.configuration.sorting_globals import SORTING_ALGORITHM
from tardis.io.model.arepo.data import ArepoData

if TYPE_CHECKING:
    from matplotlib.figure import Figure


[docs] def create_cone_profile( data: ArepoData, opening_angle: float = 20.0, inner_radius: float | None = None, outer_radius: float | None = None, ) -> tuple[ np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict, dict, ]: """ Create 1D profiles from Arepo data using a cone selection. Extracts data from within a cone around the x-axis, creating separate profiles for the positive and negative directions. Parameters ---------- data : ArepoData The Arepo snapshot data. opening_angle : float, optional Total opening angle of the cone in degrees. Default: 20.0 inner_radius : float, optional Inner radius cutoff in cm. Default: None outer_radius : float, optional Outer radius cutoff in cm. Default: None Returns ------- pos_prof_p : numpy.ndarray Position profile in positive direction (cm). pos_prof_n : numpy.ndarray Position profile in negative direction (cm). vel_prof_p : numpy.ndarray Velocity profile in positive direction (cm/s). vel_prof_n : numpy.ndarray Velocity profile in negative direction (cm/s). rho_prof_p : numpy.ndarray Density profile in positive direction (g/cm^3). rho_prof_n : numpy.ndarray Density profile in negative direction (g/cm^3). mass_prof_p : numpy.ndarray Mass profile in positive direction (g). mass_prof_n : numpy.ndarray Mass profile in negative direction (g). xnuc_prof_p : dict Nuclear fraction profiles in positive direction. xnuc_prof_n : dict Nuclear fraction profiles in negative direction. Raises ------ ValueError If no points remain after applying radius cuts. """ # Convert Cartesian to cylindrical coordinates cyl = np.array( [ data.position[0], np.sqrt(data.position[1] ** 2 + data.position[2] ** 2), np.arctan(data.position[2] / data.position[1]), ] ) # Maximum allowed r for points in cone dist = np.tan(np.radians(opening_angle) / 2) * np.abs(cyl[0]) # Create masks for positive/negative directions cmask_p = np.logical_and(cyl[0] > 0, cyl[1] <= dist) cmask_n = np.logical_and(cyl[0] < 0, cyl[1] <= dist) # Apply masks to get radial distances pos_p = np.sqrt( data.position[0][cmask_p] ** 2 + data.position[1][cmask_p] ** 2 + data.position[2][cmask_p] ** 2 ) pos_n = np.sqrt( data.position[0][cmask_n] ** 2 + data.position[1][cmask_n] ** 2 + data.position[2][cmask_n] ** 2 ) # Velocities vel_p = np.sqrt( data.velocities[0][cmask_p] ** 2 + data.velocities[1][cmask_p] ** 2 + data.velocities[2][cmask_p] ** 2 ) vel_n = np.sqrt( data.velocities[0][cmask_n] ** 2 + data.velocities[1][cmask_n] ** 2 + data.velocities[2][cmask_n] ** 2 ) mass_p = data.mass[cmask_p] mass_n = data.mass[cmask_n] rho_p = data.densities[cmask_p] rho_n = data.densities[cmask_n] # Nuclear fractions spec_p = {} spec_n = {} for spec in data.species: spec_p[spec] = data.isotope_dict[spec][cmask_p] spec_n[spec] = data.isotope_dict[spec][cmask_n] # Sort by position pos_prof_p = np.sort(pos_p, kind=SORTING_ALGORITHM) pos_prof_n = np.sort(pos_n, kind=SORTING_ALGORITHM) # Apply radius cuts maxradius_p = max(pos_prof_p) if outer_radius is None else outer_radius maxradius_n = max(pos_prof_n) if outer_radius is None else outer_radius minradius_p = min(pos_prof_p) if inner_radius is None else inner_radius minradius_n = min(pos_prof_n) if inner_radius is None else inner_radius mask_p = np.logical_and( pos_prof_p >= minradius_p, pos_prof_p <= maxradius_p ) mask_n = np.logical_and( pos_prof_n >= minradius_n, pos_prof_n <= maxradius_n ) if not mask_p.any() or not mask_n.any(): raise ValueError("No points left between inner and outer radius.") # Sort all quantities by position mass_prof_p = np.array( [x for _, x in sorted(zip(pos_p, mass_p), key=lambda pair: pair[0])] )[mask_p] mass_prof_n = np.array( [x for _, x in sorted(zip(pos_n, mass_n), key=lambda pair: pair[0])] )[mask_n] rho_prof_p = np.array( [x for _, x in sorted(zip(pos_p, rho_p), key=lambda pair: pair[0])] )[mask_p] rho_prof_n = np.array( [x for _, x in sorted(zip(pos_n, rho_n), key=lambda pair: pair[0])] )[mask_n] vel_prof_p = np.array( [x for _, x in sorted(zip(pos_p, vel_p), key=lambda pair: pair[0])] )[mask_p] vel_prof_n = np.array( [x for _, x in sorted(zip(pos_n, vel_n), key=lambda pair: pair[0])] )[mask_n] xnuc_prof_p = {} xnuc_prof_n = {} for spec in data.species: xnuc_prof_p[spec] = np.array( [ x for _, x in sorted( zip(pos_p, spec_p[spec]), key=lambda pair: pair[0] ) ] )[mask_p] xnuc_prof_n[spec] = np.array( [ x for _, x in sorted( zip(pos_n, spec_n[spec]), key=lambda pair: pair[0] ) ] )[mask_n] pos_prof_p = pos_prof_p[mask_p] pos_prof_n = pos_prof_n[mask_n] return ( pos_prof_p, pos_prof_n, vel_prof_p, vel_prof_n, rho_prof_p, rho_prof_n, mass_prof_p, mass_prof_n, xnuc_prof_p, xnuc_prof_n, )
[docs] def create_full_profile( data: ArepoData, inner_radius: float | None = None, outer_radius: float | None = None, ) -> tuple[ np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict, dict, ]: """ Create 1D profiles from full Arepo snapshot (angle-averaged). Creates angle-averaged profiles from all data. Positive and negative direction profiles are identical in this case. Parameters ---------- data : ArepoData The Arepo snapshot data. inner_radius : float, optional Inner radius cutoff in cm. Default: None outer_radius : float, optional Outer radius cutoff in cm. Default: None Returns ------- pos_prof_p : numpy.ndarray Position profile in positive direction (cm). pos_prof_n : numpy.ndarray Position profile in negative direction (cm). vel_prof_p : numpy.ndarray Velocity profile in positive direction (cm/s). vel_prof_n : numpy.ndarray Velocity profile in negative direction (cm/s). rho_prof_p : numpy.ndarray Density profile in positive direction (g/cm^3). rho_prof_n : numpy.ndarray Density profile in negative direction (g/cm^3). mass_prof_p : numpy.ndarray Mass profile in positive direction (g). mass_prof_n : numpy.ndarray Mass profile in negative direction (g). xnuc_prof_p : dict Nuclear fraction profiles in positive direction. xnuc_prof_n : dict Nuclear fraction profiles in negative direction. Raises ------ ValueError If no points remain after applying radius cuts. """ # Compute radial distances pos_p = np.sqrt( data.position[0] ** 2 + data.position[1] ** 2 + data.position[2] ** 2 ).flatten() pos_n = pos_p.copy() vel_p = np.sqrt( data.velocities[0] ** 2 + data.velocities[1] ** 2 + data.velocities[2] ** 2 ).flatten() vel_n = vel_p.copy() mass_p = data.mass.flatten() mass_n = mass_p.copy() rho_p = data.densities.flatten() rho_n = rho_p.copy() spec_p = {} spec_n = {} for spec in data.species: spec_p[spec] = data.isotope_dict[spec].flatten() spec_n[spec] = spec_p[spec].copy() # Sort by position pos_prof_p = np.sort(pos_p, kind=SORTING_ALGORITHM) pos_prof_n = np.sort(pos_n, kind=SORTING_ALGORITHM) # Apply radius cuts maxradius_p = max(pos_prof_p) if outer_radius is None else outer_radius maxradius_n = max(pos_prof_n) if outer_radius is None else outer_radius minradius_p = min(pos_prof_p) if inner_radius is None else inner_radius minradius_n = min(pos_prof_n) if inner_radius is None else inner_radius mask_p = np.logical_and( pos_prof_p >= minradius_p, pos_prof_p <= maxradius_p ) mask_n = np.logical_and( pos_prof_n >= minradius_n, pos_prof_n <= maxradius_n ) if not mask_p.any() or not mask_n.any(): raise ValueError("No points left between inner and outer radius.") # Sort all quantities by position mass_prof_p = np.array( [x for _, x in sorted(zip(pos_p, mass_p), key=lambda pair: pair[0])] )[mask_p] mass_prof_n = np.array( [x for _, x in sorted(zip(pos_n, mass_n), key=lambda pair: pair[0])] )[mask_n] rho_prof_p = np.array( [x for _, x in sorted(zip(pos_p, rho_p), key=lambda pair: pair[0])] )[mask_p] rho_prof_n = np.array( [x for _, x in sorted(zip(pos_n, rho_n), key=lambda pair: pair[0])] )[mask_n] vel_prof_p = np.array( [x for _, x in sorted(zip(pos_p, vel_p), key=lambda pair: pair[0])] )[mask_p] vel_prof_n = np.array( [x for _, x in sorted(zip(pos_n, vel_n), key=lambda pair: pair[0])] )[mask_n] xnuc_prof_p = {} xnuc_prof_n = {} for spec in data.species: xnuc_prof_p[spec] = np.array( [ x for _, x in sorted( zip(pos_p, spec_p[spec]), key=lambda pair: pair[0] ) ] )[mask_p] xnuc_prof_n[spec] = np.array( [ x for _, x in sorted( zip(pos_n, spec_n[spec]), key=lambda pair: pair[0] ) ] )[mask_n] pos_prof_p = pos_prof_p[mask_p] pos_prof_n = pos_prof_n[mask_n] return ( pos_prof_p, pos_prof_n, vel_prof_p, vel_prof_n, rho_prof_p, rho_prof_n, mass_prof_p, mass_prof_n, xnuc_prof_p, xnuc_prof_n, )
[docs] def rebin_profile( pos_prof_p: np.ndarray, pos_prof_n: np.ndarray, vel_prof_p: np.ndarray, vel_prof_n: np.ndarray, rho_prof_p: np.ndarray, rho_prof_n: np.ndarray, mass_prof_p: np.ndarray, mass_prof_n: np.ndarray, xnuc_prof_p: dict, xnuc_prof_n: dict, nshells: int, ) -> tuple[ np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict, dict, ]: """ Rebin profile data to specified number of shells. Uses scipy.stats.binned_statistic to bin the data into nshells bins. Parameters ---------- pos_prof_p : numpy.ndarray Position profile in positive direction. pos_prof_n : numpy.ndarray Position profile in negative direction. vel_prof_p : numpy.ndarray Velocity profile in positive direction. vel_prof_n : numpy.ndarray Velocity profile in negative direction. rho_prof_p : numpy.ndarray Density profile in positive direction. rho_prof_n : numpy.ndarray Density profile in negative direction. mass_prof_p : numpy.ndarray Mass profile in positive direction. mass_prof_n : numpy.ndarray Mass profile in negative direction. xnuc_prof_p : dict Nuclear fraction profiles in positive direction. xnuc_prof_n : dict Nuclear fraction profiles in negative direction. nshells : int Number of bins for rebinned data. Returns ------- pos_prof_p : numpy.ndarray Rebinned position profile in positive direction. pos_prof_n : numpy.ndarray Rebinned position profile in negative direction. vel_prof_p : numpy.ndarray Rebinned velocity profile in positive direction. vel_prof_n : numpy.ndarray Rebinned velocity profile in negative direction. rho_prof_p : numpy.ndarray Rebinned density profile in positive direction. rho_prof_n : numpy.ndarray Rebinned density profile in negative direction. mass_prof_p : numpy.ndarray Rebinned mass profile in positive direction. mass_prof_n : numpy.ndarray Rebinned mass profile in negative direction. xnuc_prof_p : dict Rebinned nuclear fraction profiles in positive direction. xnuc_prof_n : dict Rebinned nuclear fraction profiles in negative direction. """ species = list(xnuc_prof_p.keys()) # Bin velocities (mass-weighted mean) vel_prof_p_new, bins_p = stats.binned_statistic( pos_prof_p, vel_prof_p * mass_prof_p, statistic="mean", bins=nshells, )[:2] vel_prof_p_new /= stats.binned_statistic( pos_prof_p, mass_prof_p, statistic="mean", bins=nshells )[0] vel_prof_n_new, bins_n = stats.binned_statistic( pos_prof_n, vel_prof_n * mass_prof_n, statistic="mean", bins=nshells, )[:2] vel_prof_n_new /= stats.binned_statistic( pos_prof_n, mass_prof_n, statistic="mean", bins=nshells )[0] # Bin nuclear fractions (mass-weighted mean) xnuc_prof_p_new = {} xnuc_prof_n_new = {} for spec in species: xnuc_prof_p_new[spec] = ( stats.binned_statistic( pos_prof_p, xnuc_prof_p[spec] * mass_prof_p, statistic="mean", bins=nshells, )[0] / stats.binned_statistic( pos_prof_p, mass_prof_p, statistic="mean", bins=nshells )[0] ) xnuc_prof_n_new[spec] = ( stats.binned_statistic( pos_prof_n, xnuc_prof_n[spec] * mass_prof_n, statistic="mean", bins=nshells, )[0] / stats.binned_statistic( pos_prof_n, mass_prof_n, statistic="mean", bins=nshells )[0] ) # Calculate bin volumes vol_prof_p_new = np.array( [ 4 / 3 * np.pi * (bins_p[i + 1] ** 3 - bins_p[i] ** 3) for i in range(len(bins_p) - 1) ] ) vol_prof_n_new = np.array( [ 4 / 3 * np.pi * (bins_n[i + 1] ** 3 - bins_n[i] ** 3) for i in range(len(bins_n) - 1) ] ) # Sum masses in bins mass_prof_p_new = stats.binned_statistic( pos_prof_p, mass_prof_p, statistic="sum", bins=nshells )[0] mass_prof_n_new = stats.binned_statistic( pos_prof_n, mass_prof_n, statistic="sum", bins=nshells )[0] # Calculate densities rho_prof_p_new = mass_prof_p_new / vol_prof_p_new rho_prof_n_new = mass_prof_n_new / vol_prof_n_new # Bin centers pos_prof_p_new = np.array( [(bins_p[i] + bins_p[i + 1]) / 2 for i in range(len(bins_p) - 1)] ) pos_prof_n_new = np.array( [(bins_n[i] + bins_n[i + 1]) / 2 for i in range(len(bins_n) - 1)] ) return ( pos_prof_p_new, pos_prof_n_new, vel_prof_p_new, vel_prof_n_new, rho_prof_p_new, rho_prof_n_new, mass_prof_p_new, mass_prof_n_new, xnuc_prof_p_new, xnuc_prof_n_new, )
[docs] def export_profile_to_csvy( pos_prof: np.ndarray, vel_prof: np.ndarray, rho_prof: np.ndarray, mass_prof: np.ndarray, xnuc_prof: dict, time: u.Quantity, filename: str | Path, nshells: int, overwrite: bool = False, ) -> str: """ Export a 1D profile to CSVY format for TARDIS. Parameters ---------- pos_prof : numpy.ndarray Position profile (cm). vel_prof : numpy.ndarray Velocity profile (cm/s). rho_prof : numpy.ndarray Density profile (g/cm^3). mass_prof : numpy.ndarray Mass profile (g). xnuc_prof : dict Nuclear fraction profiles keyed by species name. time : astropy.units.Quantity Time of the snapshot. filename : str or pathlib.Path Name of the exported file. nshells : int Number of shells to export. overwrite : bool, optional If True, will overwrite existing files. Default: False Returns ------- str Actual filename of the saved file. """ filename = Path(filename) species = list(xnuc_prof.keys()) # Handle filename collision if filename.suffix == ".csvy": filename = filename.with_suffix("") if filename.with_suffix(".csvy").exists() and not overwrite: i = 0 while ( filename.with_name(f"{filename.stem}_{i}") .with_suffix(".csvy") .exists() ): i += 1 filename = filename.with_name(f"{filename.stem}_{i}") filename = filename.with_suffix(".csvy") with open(filename, "w") as f: # Write YAML header f.write( "".join( [ "---\n", "name: csvy_full\n", f"model_density_time_0: {time.to(u.day):g}\n", f"model_isotope_time_0: {time.to(u.day):g}\n", "description: Config file for TARDIS from Arepo snapshot.\n", "tardis_model_config_version: v1.0\n", "datatype:\n", " fields:\n", " - name: velocity\n", " unit: cm/s\n", " desc: velocities of shell outer bounderies.\n", " - name: density\n", " unit: g/cm^3\n", " desc: density of shell.\n", ] ) ) for spec in species: f.write( "".join( [ f" - name: {spec.capitalize()}\n", f" desc: fractional {spec.capitalize()} abundance.\n", ] ) ) f.write("\n---\n") # Write CSV header datastring = ["velocity,", "density,"] for spec in species[:-1]: datastring.append(f"{spec.capitalize()},") datastring.append(f"{species[-1].capitalize()}") f.write("".join(datastring)) # Prepare data arrays exp = [vel_prof, rho_prof] for spec in species: exp.append(xnuc_prof[spec]) # Write data rows inds = np.linspace(0, len(exp[0]) - 1, num=nshells, dtype=int) for i in inds: f.write("\n") for ii in range(len(exp) - 1): f.write(f"{exp[ii][i]:g},") f.write(f"{exp[-1][i]:g}") return str(filename)
[docs] def plot_profile( pos_prof_p: np.ndarray, pos_prof_n: np.ndarray, vel_prof_p: np.ndarray, vel_prof_n: np.ndarray, rho_prof_p: np.ndarray, rho_prof_n: np.ndarray, xnuc_prof_p: dict, xnuc_prof_n: dict, time: u.Quantity, save: str | Path | None = None, dpi: int = 600, **kwargs, ) -> "Figure": """ Plot 1D profiles for both positive and negative directions. Parameters ---------- pos_prof_p : numpy.ndarray Position profile in positive direction. pos_prof_n : numpy.ndarray Position profile in negative direction. vel_prof_p : numpy.ndarray Velocity profile in positive direction. vel_prof_n : numpy.ndarray Velocity profile in negative direction. rho_prof_p : numpy.ndarray Density profile in positive direction. rho_prof_n : numpy.ndarray Density profile in negative direction. xnuc_prof_p : dict Nuclear fraction profiles in positive direction. xnuc_prof_n : dict Nuclear fraction profiles in negative direction. time : astropy.units.Quantity Time of the snapshot. save : str or pathlib.Path, optional Path to save the figure. Default: None dpi : int, optional DPI for saved figure. Default: 600 **kwargs Additional keyword arguments passed to matplotlib.pyplot.plot() Returns ------- matplotlib.figure.Figure The generated figure object. """ species = list(xnuc_prof_p.keys()) fig, (ax1, ax2) = plt.subplots(2, 1, figsize=[9.8, 9.6]) # Positive direction plots ax1.plot( pos_prof_p, rho_prof_p / max(rho_prof_p), label="Density", **kwargs, ) ax1.plot( pos_prof_p, vel_prof_p / max(vel_prof_p), label="Velocity", **kwargs, ) for spec in species: ax1.plot( pos_prof_p, xnuc_prof_p[spec], label=spec.capitalize(), **kwargs, ) ax1.grid() ax1.set_ylabel("Profile (arb. unit)") ax1.set_title("Profiles along the positive axis") # Negative direction plots ax2.plot( pos_prof_n, rho_prof_n / max(rho_prof_n), label="Density", **kwargs, ) ax2.plot( pos_prof_n, vel_prof_n / max(vel_prof_n), label="Velocity", **kwargs, ) for spec in species: ax2.plot( pos_prof_n, xnuc_prof_n[spec], label=spec.capitalize(), **kwargs, ) ax2.grid() ax2.set_ylabel("Profile (arb. unit)") ax2.set_xlabel("Radial position (cm)") ax2.set_title("Profiles along the negative axis") # Styling fig.tight_layout() handles, labels = ax1.get_legend_handles_labels() ax1.legend( handles, labels, loc="upper left", bbox_to_anchor=(1.05, 1.05), title=f"Time = {time}", ) if save is not None: plt.savefig(save, bbox_inches="tight", dpi=dpi) return fig