Source code for tardis.scripts.convert_atomic_data

"""
Quick-and-dirty script for converting an older atomic data format to one compatible with modern TARDIS.

Arguments are
{old_atomdata_to_convert} {old_photoionization_data_to_convert}
{new_atom_data_as_template} {output_atomdata}
"""

import argparse
import hashlib
import pickle
import platform
import uuid
from datetime import datetime
from pathlib import Path

import h5py
import numpy as np
import pandas as pd
import pytz


[docs] def serialize_pandas_object(pd_object): """ Serialize Pandas objects with Pickle. Parameters ---------- pd_object : pandas.Series or pandas.DataFrame Pandas object to be serialized with Pickle. Returns ------- Pickle serialized Python object. """ return pickle.dumps(pd_object)
[docs] def hash_pandas_object(pd_object, algorithm="md5"): """ Hash Pandas objects. Parameters ---------- pd_object : pandas.Series or pandas.DataFrame Pandas object to be hashed. algorithm : str, optional Algorithm available in `hashlib`, by default "md5" Returns ------- str Hash values. Raises ------ ValueError If `algorithm` is not available in `hashlib`. """ algorithm = algorithm.lower() if hasattr(hashlib, algorithm): hash_func = getattr(hashlib, algorithm) else: raise ValueError('algorithm not supported') return hash_func(serialize_pandas_object(pd_object)).hexdigest()
[docs] def multiindex_port(olddata, templatedata, templatekey, oldkey=None): if oldkey is None: oldkey = templatekey # Get the format of the multi-index from the desired template index_names = templatedata[templatekey].index.names # Attempt conversion of the old structured array to a pd DataFrame newdata = pd.DataFrame(olddata[oldkey][:]).set_index(index_names) # Check datatypes of columns, convert if necessary if hasattr(templatedata[templatekey], "columns"): for col in templatedata[templatekey].columns: desired_dtype = templatedata[templatekey][col].dtype if desired_dtype == np.dtype("object"): desired_dtype = str newdata[col] = newdata[col].astype(desired_dtype) else: col = [templatedata[templatekey].name] newdata[col] = newdata[col].astype(templatedata[templatekey].dtype) # Convert datatypes of each level of the multi-index if not isinstance(newdata.index, pd.MultiIndex): template_ind_dtype = templatedata[templatekey].index.dtype newdata.index = newdata.index.astype(template_ind_dtype) else: for i, indname in enumerate(newdata.index.names): template_ind_dtype = templatedata[templatekey].index.dtypes[indname] converted_index = newdata.index.levels[i].astype(template_ind_dtype) newdata.index = newdata.index.set_levels(converted_index, level=i) return newdata
[docs] def simple_port(olddata, templatedata, templatekey, oldkey=None): if oldkey is None: oldkey = templatekey newdata = pd.DataFrame(olddata[oldkey][:]) # Check datatypes of columns, convert if necessary for col in templatedata[templatekey].columns: newdata[col] = newdata[col].astype(templatedata[templatekey][col].dtype) return newdata
## Files used to convert Christian's atomic data: #default_oldatomdata = "merged_mod_20SNG_forbidden_yg_fix_H30_cmfgen_yg.h5" #default_pi_filename = "photoionization_data_H30_He.h5" #default_template = "/home/connor/tardis-regression-data/atom_data/nlte_atom_data/TestNLTE_He_Ti.h5" #default_new = "test.h5" if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("oldatomdata", type=str)#, default=default_oldatomdata) parser.add_argument("oldPIdata", type=str)#, default=default_pi_filename) parser.add_argument("template", type=str)#, default=default_template) parser.add_argument("newatomdata", type=str)#, default=default_new) args = parser.parse_args() # Atomic data in old format - "/Y/g" seems to be the only accessible dataset when # using pandas for some reason (contains collision data) if Path(args.oldatomdata).is_file(): old_df = pd.HDFStore(args.oldatomdata) else: raise FileNotFoundError() # Photoionization data stored in separate file in Christian's version if Path(args.oldPIdata).is_file(): pi_data = pd.HDFStore(args.oldPIdata) else: raise FileNotFoundError() # Reference atomic data file for the format we want to convert to if Path(args.template).is_file(): template = pd.HDFStore(args.template) else: raise FileNotFoundError() # Open up a new pandas HDFStore to port the old data into if Path(args.newatomdata).is_file(): raise FileExistsError(f"Destination file {args.newatomdata} already exists. Delete it or specify a different destination.") else: new = pd.HDFStore(args.newatomdata) ### COLLISIONS DATA multiindex_cols = list(old_df["/Y/g"].columns[:4]) new["collisions_data"] = old_df["/Y/g"].set_index(multiindex_cols) tempcols = list(new['collisions_data'].columns) new['collisions_data'] = new['collisions_data'].rename(lambda f: tempcols.index(f), axis=1) # We've gotten all useful info out of the old dataframe using pandas # so now load the same file with h5py directly to get the rest old_df.close() old = h5py.File(args.oldatomdata) ### COLLISIONS METADATA new_metadata = template["collisions_metadata"].copy() new_metadata["temperatures"] = np.array(tempcols, dtype=np.int64) new["collisions_metadata"] = new_metadata ### ATOM DATA new_atom_data = multiindex_port( old, template, "atom_data", oldkey="basic_atom_data" ) new["atom_data"] = new_atom_data ### IONIZATION DATA new["ionization_data"] = multiindex_port(old, template, "ionization_data") ### LEVELS DATA new["levels_data"] = multiindex_port(old, template, "levels_data") ### LINES DATA new["lines_data"] = multiindex_port(old, template, "lines_data") ### LINES METADATA new["lines_metadata"] = template["lines_metadata"].copy() ### MACRO ATOM DATA new["macro_atom_data"] = simple_port(old, template, "macro_atom_data") ### MACRO ATOM REFERENCES new["macro_atom_references"] = multiindex_port( old, template, "macro_atom_references" ) ### PHOTOIONIZATION DATA ### Note this comes from a DIFFERENT file! new["photoionization_data"] = pi_data["photoionization_data"].copy() ### ZETA DATA zeta_index = pd.MultiIndex.from_arrays( old["zeta_data"][:, :2].T.astype(np.int64), names=template["zeta_data"].index.names, ) zeta_temps = pd.Index( old["zeta_data"].attrs["t_rad"].astype(np.float64), name="temp" ) new_zeta_data = pd.DataFrame( old["zeta_data"][:, 2:], index=zeta_index, columns=zeta_temps ) new["zeta_data"] = new_zeta_data ### METADATA # Copied over from Andrew's notebook demonstrating how to do this meta = [] meta.append(("format", "version", "1.0")) total_checksum = hashlib.md5() for key in new.keys(): # update the total checksum to sign the file total_checksum.update(serialize_pandas_object(new[key])) # save individual DataFrame/Series checksum checksum = hash_pandas_object(new[key]) meta.append(("md5sum", key.lstrip("/"), checksum)) # relevant package versions meta.append(("software", "python", platform.python_version())) imports = [ "carsus", "astropy", "numpy", "pandas", "tables", "ChiantiPy", ] for package in imports: meta.append(("software", package, __import__(package).__version__)) meta_df = pd.DataFrame.from_records( meta, columns=["field", "key", "value"], index=["field", "key"] ) uuid1 = uuid.uuid1().hex new.root._v_attrs["MD5"] = total_checksum.hexdigest() new.root._v_attrs["UUID1"] = uuid1 new.root._v_attrs["FORMAT_VERSION"] = "1.0" tz = pytz.timezone("UTC") date = datetime.now(tz).isoformat() new.root._v_attrs["DATE"] = date new.put("/metadata", meta_df) # Close / write out files old.close() template.close() pi_data.close() new.close()