Source code for magicctapipe.scripts.lst1_magic.lst1_magic_dl2_to_dl3

#!/usr/bin/env python
# coding: utf-8

"""
This script processes DL2 events and creates a DL3 data file with the
IRFs. At first it reads the configurations of the IRFs and checks the
consistency, and then applies the same condition cuts to DL2 events.

There are three methods for the interpolation of the IRFs, "nearest",
"linear" and "cubic", which can be specified in the configuration file.
The "nearest" method just selects the IRFs of the closest pointing
direction in (cos(Zd), Az), and the other methods work only when there
are multiple IRFs available from different pointing directions.

Usage:
$ python lst1_magic_dl2_to_dl3.py
--input-file-dl2 dl2_LST-1_MAGIC.Run03265.h5
--input-dir-irf irf
(--output-dir dl3)
(--config-file config.yaml)
"""

import argparse
import logging
import operator
import time
from pathlib import Path

import numpy as np
import yaml
from astropy import units as u
from astropy.coordinates import Angle
from astropy.coordinates.angle_utilities import angular_separation
from astropy.io import fits
from astropy.table import QTable
from pyirf.cuts import evaluate_binned_cut
from pyirf.interpolation import GridDataInterpolator
from pyirf.io import (
    create_aeff2d_hdu,
    create_background_2d_hdu,
    create_energy_dispersion_hdu,
    create_psf_table_hdu,
    create_rad_max_hdu,
)
from pyirf.utils import cone_solid_angle
from scipy.interpolate import griddata

from magicctapipe.io import (
    create_event_hdu,
    create_gh_cuts_hdu,
    create_gti_hdu,
    create_pointing_hdu,
    format_object,
    load_dl2_data_file,
    load_irf_files,
)

__all__ = ["dl2_to_dl3"]

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)


[docs] def dl2_to_dl3(input_file_dl2, input_dir_irf, output_dir, config): """ Processes DL2 events and creates a DL3 data file with the IRFs. Parameters ---------- input_file_dl2 : str Path to an input DL2 data file input_dir_irf : str Path to a directory where input IRF files are stored output_dir : str Path to a directory where to save an output DL3 data file config : dict Configuration for the LST-1 + MAGIC analysis """ config_dl3 = config["dl2_to_dl3"] # Load the input IRF data files logger.info(f"\nInput IRF directory: {input_dir_irf}") irf_data, extra_header = load_irf_files(input_dir_irf) logger.info("\nGrid points in (cos(Zd), Az):") logger.info(format_object(irf_data["grid_points"])) logger.info("\nExtra header:") logger.info(format_object(extra_header)) # Load the input DL2 data file logger.info(f"\nInput DL2 data file: {input_file_dl2}") quality_cuts = extra_header.get("QUAL_CUT") event_type = extra_header["EVT_TYPE"] dl2_weight_type = extra_header["DL2_WEIG"] event_table, on_time, deadc = load_dl2_data_file( input_file_dl2, quality_cuts, event_type, dl2_weight_type ) # Calculate the mean pointing direction for the target point of the # IRF interpolation. Please note that the azimuth could make a full # 2 pi turn, whose mean angle may indicate an opposite direction. # Thus, here we calculate the STDs of the azimuth angles with two # ranges, i.e., 0 <= az < 360 deg and -180 <= az < 180 deg, and then # calculate the mean with the range of smaller STD. pnt_coszd_mean = np.sin(event_table["pointing_alt"]).mean().value pnt_az_wrap_360deg = Angle(event_table["pointing_az"]).wrap_at("360 deg") pnt_az_wrap_180deg = Angle(event_table["pointing_az"]).wrap_at("180 deg") if pnt_az_wrap_360deg.std() <= pnt_az_wrap_180deg.std(): pnt_az_mean = pnt_az_wrap_360deg.mean().to_value("rad") else: pnt_az_mean = pnt_az_wrap_180deg.mean().wrap_at("360 deg").to_value("rad") distances = ( angular_separation( pnt_az_mean, np.pi / 2 - np.arccos(pnt_coszd_mean), irf_data["grid_points"][:, 1], np.pi / 2 - np.arccos(irf_data["grid_points"][:, 0]), ) * u.rad ).to("deg") scheme = config_dl3.pop("interpolation_scheme") if scheme == "cosZdAz": target_point = np.array([pnt_coszd_mean, pnt_az_mean]) elif scheme == "cosZd": target_point = np.array([pnt_coszd_mean]) irf_data["grid_points"] = irf_data["grid_points"][:, 0] else: logger.error(f"Not recognized interpolation scheme: {scheme}, exiting") exit(1) logger.info( f"\nTarget point: {target_point.round(5).tolist()} with scheme: {scheme}" ) if "max_distance" in config_dl3: max_distance = u.Quantity(config_dl3.pop("max_distance")) logger.info(f"selecting only nodes up to {max_distance} from the data") idx = np.where(distances < max_distance) keys = [s for s in irf_data.keys() if "_bins" not in s] for key in keys: irf_data[key] = irf_data[key][idx] for i in range(len(idx[0])): logger.info(f"{irf_data['file_names'][i]}: {distances[idx][i]:.2f}") # Prepare for the IRF interpolations interpolation_method = config_dl3.pop("interpolation_method") logger.info(f"\nInterpolation method: {interpolation_method}") extra_header["IRF_INTP"] = interpolation_method hdus = fits.HDUList([fits.PrimaryHDU()]) # Interpolate the effective area logger.info("\nInterpolating the effective area...") if len(irf_data["grid_points"]) > 2: effective_area = irf_data["effective_area"].to_value("m2") # due to large changes in Aeff we will interpolate in log space, # meaning that we need to set a minimal value (1 m^2) effective_area[effective_area < 1] = 1 interpolator = GridDataInterpolator( grid_points=irf_data["grid_points"], params=np.log(effective_area), method=interpolation_method, ) aeff_interp = np.exp(interpolator.interpolate(target_point)[0]) # setting values below the minimal value back to 0, allowing for 10% error margin aeff_interp[aeff_interp < 1.1] = 0 aeff_interp *= u.Unit("m2") else: aeff_interp = irf_data["effective_area"][0] print("skipping interpolation since only one point is given") aeff_hdu = create_aeff2d_hdu( effective_area=aeff_interp, true_energy_bins=irf_data["energy_bins"], fov_offset_bins=irf_data["fov_offset_bins"], point_like=True, extname="EFFECTIVE AREA", **extra_header, ) hdus.append(aeff_hdu) # Interpolate the energy dispersion with a custom way, # TBD: use pyirf quantile interpolation instead logger.info("Interpolating the energy dispersion...") if len(irf_data["grid_points"]) > 2: edisp_interp = griddata( points=irf_data["grid_points"], values=irf_data["energy_dispersion"], xi=target_point, method=interpolation_method, ) else: edisp_interp = irf_data["energy_dispersion"] edisp_interp = edisp_interp[0] # Remove the dimension of the grid points norm = np.sum(edisp_interp, axis=1, keepdims=True) # Along the migration axis mask_zeros = norm != 0 edisp_interp = np.divide( edisp_interp, norm, out=np.zeros_like(edisp_interp), where=mask_zeros ) # according to GDAF standard migration matrix is normalized to integral of bins widths = np.diff(irf_data["migration_bins"]) edisp_interp /= widths[np.newaxis, :, np.newaxis] edisp_hdu = create_energy_dispersion_hdu( energy_dispersion=edisp_interp, true_energy_bins=irf_data["energy_bins"], migration_bins=irf_data["migration_bins"], fov_offset_bins=irf_data["fov_offset_bins"], point_like=True, extname="ENERGY DISPERSION", ) hdus.append(edisp_hdu) if "psf_table" in irf_data: # Interpolate the PSF table with a custom way, since there is a # bug in the function of pyirf v0.6.0 about the renormalization logger.info("Interpolating the PSF table...") psf_interp = griddata( points=irf_data["grid_points"], values=irf_data["psf_table"].to_value("sr-1"), xi=target_point, method=interpolation_method, ) # Remove the dimension of the grid points and add the unit psf_interp = psf_interp[0] * u.Unit("sr-1") # Re-normalize along the source offset axis omegas = np.diff(cone_solid_angle(irf_data["source_offset_bins"])) norm = np.sum(psf_interp * omegas, axis=2, keepdims=True) mask_zeros = norm != 0 psf_interp = np.divide( psf_interp, norm, out=np.zeros_like(psf_interp), where=mask_zeros ) # Create a PSF table HDU psf_hdu = create_psf_table_hdu( psf=psf_interp, true_energy_bins=irf_data["energy_bins"], source_offset_bins=irf_data["source_offset_bins"], fov_offset_bins=irf_data["fov_offset_bins"], extname="PSF", **extra_header, ) hdus.append(psf_hdu) if "background" in irf_data: # Interpolate the background model logger.info("Interpolating the background model...") bkg = griddata( points=irf_data["grid_points"], values=irf_data["background"].to_value("MeV-1 s-1 sr-1"), xi=target_point, method=interpolation_method, ) # Remove the dimension of the grid points and add the unit bkg = bkg[0] * u.Unit("MeV-1 s-1 sr-1") bkg_hdu = create_background_2d_hdu( background_2d=bkg, reco_energy_bins=irf_data["energy_bins"], fov_offset_bins=irf_data["fov_offset_bins"], extname="BACKGROUND", ) hdus.append(bkg_hdu) if "gh_cuts" in irf_data: # Interpolate the dynamic gammaness cuts logger.info("Interpolating the dynamic gammaness cuts...") if len(irf_data["grid_points"]) > 2: gh_cuts_interp = griddata( points=irf_data["grid_points"], values=irf_data["gh_cuts"], xi=target_point, method=interpolation_method, ) else: gh_cuts_interp = irf_data["gh_cuts"] # Remove the dimension of the grid points gh_cuts_interp = gh_cuts_interp[0] gh_cuts_hdu = create_gh_cuts_hdu( gh_cuts=gh_cuts_interp, reco_energy_bins=irf_data["energy_bins"], fov_offset_bins=irf_data["fov_offset_bins"], **extra_header, ) hdus.append(gh_cuts_hdu) if "rad_max" in irf_data: # Interpolate the dynamic theta cuts logger.info("Interpolating the dynamic theta cuts...") rad_max_interp = griddata( points=irf_data["grid_points"], values=irf_data["rad_max"].to_value("deg"), xi=target_point, method=interpolation_method, ) # Remove the dimension of the grid points and add the unit rad_max_interp = rad_max_interp[0] * u.deg rad_max_hdu = create_rad_max_hdu( rad_max=rad_max_interp, reco_energy_bins=irf_data["energy_bins"], fov_offset_bins=irf_data["fov_offset_bins"], point_like=True, extname="RAD_MAX", **extra_header, ) hdus.append(rad_max_hdu) if "GH_CUT" in extra_header: # Apply the global gammaness cut mask_gh = event_table["gammaness"] > extra_header["GH_CUT"] event_table = event_table[mask_gh] else: # Apply the dynamic gammaness cuts gh_cut_table = QTable( data={ "low": irf_data["energy_bins"][:-1], "high": irf_data["energy_bins"][1:], "cut": gh_cuts_interp.T[0], } ) logger.info(f"\nGammaness cut table:\n\n{gh_cut_table}") mask_gh = evaluate_binned_cut( values=event_table["gammaness"], bin_values=event_table["reco_energy"], cut_table=gh_cut_table, op=operator.ge, ) event_table = event_table[mask_gh] # Create an event HDU logger.info("\nCreating an event HDU...") event_hdu = create_event_hdu(event_table, on_time, deadc, **config_dl3) hdus.append(event_hdu) # Create a GTI table logger.info("Creating a GTI HDU...") gti_hdu = create_gti_hdu(event_table) hdus.append(gti_hdu) # Create a pointing table logger.info("Creating a pointing HDU...") pnt_hdu = create_pointing_hdu(event_table) hdus.append(pnt_hdu) # Save the data in an output file Path(output_dir).mkdir(exist_ok=True, parents=True) input_file_name = Path(input_file_dl2).name output_file_name = input_file_name.replace("dl2", "dl3").replace(".h5", ".fits.gz") output_file = f"{output_dir}/{output_file_name}" hdus.writeto(output_file, overwrite=True) logger.info(f"\nOutput file: {output_file}")
def main(): """Main function.""" start_time = time.time() parser = argparse.ArgumentParser() parser.add_argument( "--input-file-dl2", "-d", dest="input_file_dl2", type=str, required=True, help="Path to an input DL2 data file", ) parser.add_argument( "--input-dir-irf", "-i", dest="input_dir_irf", type=str, required=True, help="Path to a directory where input IRF files are stored", ) parser.add_argument( "--output-dir", "-o", dest="output_dir", type=str, default="./data", help="Path to a directory where to save an output DL3 data file", ) parser.add_argument( "--config-file", "-c", dest="config_file", type=str, default="./config.yaml", help="Path to a configuration file", ) args = parser.parse_args() with open(args.config_file, "rb") as f: config = yaml.safe_load(f) # Process the input data dl2_to_dl3(args.input_file_dl2, args.input_dir_irf, args.output_dir, config) logger.info("\nDone.") process_time = time.time() - start_time logger.info(f"\nProcess time: {process_time:.0f} [sec]\n") if __name__ == "__main__": main()