import numpy as np
import pandas as pd
from numba import njit
from pandas.api.types import CategoricalDtype
from tardis.transport.montecarlo.packets.radiative_packet import (
InteractionType,
PacketStatus,
)
from tardis.transport.montecarlo.packets.trackers.tracker_full import (
TrackerFull,
)
[docs]
@njit
def trackers_full_list_to_arrays(trackers) -> tuple:
"""
Convert a list of TrackerFull instances to concatenated arrays efficiently.
Parameters
----------
trackers : numba.typed.typedlist.List
List of TrackerFull instances.
Returns
-------
tuple
Tuple containing:
- packet_id (np.int64[:]) : Packet ID for each event
- event_id (np.int64[:]) : Event ID within each packet
- r (np.float64[:]) : Radius at each event
- before_shell_id (np.int64[:]) : Shell ID before each event
- after_shell_id (np.int64[:]) : Shell ID after each event
- interaction_type (np.int64[:]) : Interaction type at each event
- status (np.int64[:]) : Status at each event
- before_nu (np.float64[:]) : Frequency before interaction
- before_mu (np.float64[:]) : Mu before interaction
- before_energy (np.float64[:]) : Energy before interaction
- after_nu (np.float64[:]) : Frequency after interaction
- after_mu (np.float64[:]) : Mu after interaction
- after_energy (np.float64[:]) : Energy after interaction
- line_absorb_id (np.int64[:]) : Line absorb ID
- line_emit_id (np.int64[:]) : Line emit ID
"""
# Calculate total array size
total_events = 0
for tracker in trackers:
total_events += len(tracker.radius)
# Pre-allocate all arrays
packet_id = np.empty(total_events, dtype=np.int64)
event_id = np.empty(total_events, dtype=np.int64)
radius = np.empty(total_events, dtype=np.float64)
before_shell_id = np.empty(total_events, dtype=np.int64)
after_shell_id = np.empty(total_events, dtype=np.int64)
interaction_type = np.empty(total_events, dtype=np.int64)
status = np.empty(total_events, dtype=np.int64)
before_nu = np.empty(total_events, dtype=np.float64)
before_mu = np.empty(total_events, dtype=np.float64)
before_energy = np.empty(total_events, dtype=np.float64)
after_nu = np.empty(total_events, dtype=np.float64)
after_mu = np.empty(total_events, dtype=np.float64)
after_energy = np.empty(total_events, dtype=np.float64)
line_absorb_id = np.empty(total_events, dtype=np.int64)
line_emit_id = np.empty(total_events, dtype=np.int64)
# Concatenate arrays from all trackers
offset = 0
for packet_idx, tracker in enumerate(trackers):
n_events = len(tracker.radius)
end_idx = offset + n_events
# Simple array concatenation
packet_id[offset:end_idx] = packet_idx
event_id[offset:end_idx] = np.arange(n_events, dtype=np.int64)
radius[offset:end_idx] = tracker.radius
before_shell_id[offset:end_idx] = tracker.before_shell_id
after_shell_id[offset:end_idx] = tracker.after_shell_id
interaction_type[offset:end_idx] = tracker.interaction_type
status[offset:end_idx] = tracker.status
before_nu[offset:end_idx] = tracker.before_nu
before_mu[offset:end_idx] = tracker.before_mu
before_energy[offset:end_idx] = tracker.before_energy
after_nu[offset:end_idx] = tracker.after_nu
after_mu[offset:end_idx] = tracker.after_mu
after_energy[offset:end_idx] = tracker.after_energy
line_absorb_id[offset:end_idx] = tracker.line_absorb_id
line_emit_id[offset:end_idx] = tracker.line_emit_id
offset = end_idx
return (
packet_id,
event_id,
radius,
before_shell_id,
after_shell_id,
interaction_type,
status,
before_nu,
before_mu,
before_energy,
after_nu,
after_mu,
after_energy,
line_absorb_id,
line_emit_id,
)
[docs]
def trackers_full_to_df(trackers_full_list) -> pd.DataFrame:
"""
Convert a list of RPacketTracker instances to a comprehensive DataFrame.
Parameters
----------
rpacket_trackers : numba.typed.typedlist.List
List of RPacketTracker instances.
Returns
-------
pd.DataFrame
DataFrame containing all event data including line interaction details,
indexed by packet_id and event_id. Line interaction columns have default
values (-1 for IDs, NaN for physical quantities) for non-interaction events.
"""
# Use the fast array extraction function
(
packet_id,
event_id,
r,
before_shell_id,
after_shell_id,
interaction_type,
status,
before_nu,
before_mu,
before_energy,
after_nu,
after_mu,
after_energy,
line_absorb_id,
line_emit_id,
) = trackers_full_list_to_arrays(trackers_full_list)
# Create categorical data types
interaction_type_dtype = CategoricalDtype(
categories=[member.name for member in InteractionType], ordered=False
)
status_dtype = CategoricalDtype(
categories=[member.name for member in PacketStatus], ordered=False
)
# Convert enum values to their string names and create categorical
interaction_type_labels = [
InteractionType(int(int_type)).name for int_type in interaction_type
]
interaction_type_categorical = pd.Categorical(
interaction_type_labels, dtype=interaction_type_dtype
)
# Convert status values to their string names
status_labels = [
PacketStatus(int(int_status)).name for int_status in status
]
status_categorical = pd.Categorical(status_labels, dtype=status_dtype)
# Create comprehensive event DataFrame with specific column order
event_data = {
"interaction_type": interaction_type_categorical,
"status": status_categorical,
"radius": r,
"before_shell_id": before_shell_id,
"after_shell_id": after_shell_id,
"before_nu": before_nu,
"before_mu": before_mu,
"before_energy": before_energy,
"after_nu": after_nu,
"after_mu": after_mu,
"after_energy": after_energy,
"line_absorb_id": pd.array(line_absorb_id, dtype="int64"),
"line_emit_id": pd.array(line_emit_id, dtype="int64"),
}
# Create multi-index with packet_id and event_id
multi_index = pd.MultiIndex.from_arrays(
[packet_id, event_id], names=["packet_id", "event_id"]
)
return pd.DataFrame(event_data, index=multi_index)
[docs]
def tracker_full_df2tracker_last_interaction_df(
full_df: pd.DataFrame,
) -> pd.DataFrame:
"""
Convert a full tracking dataframe to show only the last interaction for each packet.
Takes a full event tracking dataframe (from trackers_full_to_dataframe) and
filters it to show only the last physics interaction each packet experienced.
Parameters
----------
full_df : pd.DataFrame
Full event tracking dataframe with multi-index (packet_id, event_id) and
columns ['radius', 'before_shell_id', 'after_shell_id', 'interaction_type'].
Returns
-------
pd.DataFrame
DataFrame containing last interaction data for each packet,
indexed by packet_id with columns ['radius', 'shell_id', 'last_interaction_type',
'line_absorb_id', 'line_emit_id', 'before_nu', 'before_mu', 'before_energy',
'after_nu', 'after_mu', 'after_energy']. Note: shell_id comes from before_shell_id.
"""
if full_df.empty:
return pd.DataFrame()
# Reset index to work with packet_id and event_id as columns
df = full_df.reset_index()
# Get all unique packet IDs to ensure we include packets with no physics interactions
all_packet_ids = df["packet_id"].unique()
# Filter out boundary events to only get actual interactions
physics_interactions = df[
df["interaction_type"].isin(
["LINE", "ESCATTERING", "CONTINUUM_PROCESS"]
)
]
if physics_interactions.empty:
# No physics interactions - create DataFrame with NO_INTERACTION for all packets
no_interaction_df = pd.DataFrame(
{
"event_id": np.full(len(all_packet_ids), 0),
"last_interaction_type": pd.Categorical(
["NO_INTERACTION"] * len(all_packet_ids)
),
"status": pd.Categorical(["IN_PROCESS"] * len(all_packet_ids)),
"radius": np.full(len(all_packet_ids), np.nan),
"shell_id": np.full(len(all_packet_ids), -1),
"before_nu": np.full(len(all_packet_ids), np.nan),
"before_mu": np.full(len(all_packet_ids), np.nan),
"before_energy": np.full(len(all_packet_ids), np.nan),
"after_nu": np.full(len(all_packet_ids), np.nan),
"after_mu": np.full(len(all_packet_ids), np.nan),
"after_energy": np.full(len(all_packet_ids), np.nan),
"line_absorb_id": pd.array(
np.full(len(all_packet_ids), -1), dtype="int64"
),
"line_emit_id": pd.array(
np.full(len(all_packet_ids), -1), dtype="int64"
),
},
index=pd.Index(all_packet_ids, name="packet_id"),
)
return no_interaction_df
# Get the last interaction for each packet
last_interactions = physics_interactions.groupby("packet_id").last()
# Rename columns to match last interaction format
last_interaction_df = last_interactions.copy()
last_interaction_df.rename(
columns={
"interaction_type": "last_interaction_type",
"before_shell_id": "shell_id",
},
inplace=True,
)
# Drop after_shell_id column
last_interaction_df = last_interaction_df.drop(columns=["after_shell_id"])
# Handle packets that had no physics interactions - add them with NO_INTERACTION
packets_with_interactions = set(last_interaction_df.index)
packets_without_interactions = (
set(all_packet_ids) - packets_with_interactions
)
if packets_without_interactions:
# Create DataFrame for packets with no interactions
no_interaction_data = {
"event_id": np.full(len(packets_without_interactions), 0),
"last_interaction_type": pd.Categorical(
["NO_INTERACTION"] * len(packets_without_interactions)
),
"status": pd.Categorical(
["IN_PROCESS"] * len(packets_without_interactions)
),
"radius": np.full(len(packets_without_interactions), np.nan),
"shell_id": np.full(len(packets_without_interactions), -1),
"before_nu": np.full(len(packets_without_interactions), np.nan),
"before_mu": np.full(len(packets_without_interactions), np.nan),
"before_energy": np.full(len(packets_without_interactions), np.nan),
"after_nu": np.full(len(packets_without_interactions), np.nan),
"after_mu": np.full(len(packets_without_interactions), np.nan),
"after_energy": np.full(len(packets_without_interactions), np.nan),
"line_absorb_id": pd.array(
np.full(len(packets_without_interactions), -1), dtype="int64"
),
"line_emit_id": pd.array(
np.full(len(packets_without_interactions), -1), dtype="int64"
),
}
no_interaction_df = pd.DataFrame(
no_interaction_data,
index=pd.Index(
list(packets_without_interactions), name="packet_id"
),
)
# Combine the DataFrames
last_interaction_df = pd.concat(
[last_interaction_df, no_interaction_df], ignore_index=False
)
last_interaction_df.sort_index(inplace=True)
return last_interaction_df
[docs]
@njit
def generate_tracker_full_list(
no_of_packets: int, length: int, extend_factor: int = 2
) -> list:
"""
Generate list of RPacketTracker instances.
Parameters
----------
no_of_packets : int
The count of RPackets that are sent in the ejecta.
length : int
Initial length of the tracking array.
Returns
-------
list
A list containing RPacketTracker for each RPacket.
"""
rpacket_trackers = []
for i in range(no_of_packets):
rpacket_trackers.append(
TrackerFull(length, extend_factor)
)
return rpacket_trackers