Source code for magicctapipe.scripts.lst1_magic.lst1_magic_create_irf

#!/usr/bin/env python
# coding: utf-8

"""
This script processes MC DL2 events and creates the IRFs. It can create
two different IRF types, "POINT-LIKE" or "FULL-ENCLOSURE". The effective
area and energy dispersion HDUs are created in case of the "POINT_LIKE"
IRFs, and in addition the PSF table and background HDUs in case of the
"FULL-ENCLOSURE" IRFs.

When the input gamma MC is point-like or ring-wobble data, it creates
one FoV offset bin around the true offset, regardless of the settings in
the configuration file, and creates only the "POINT-LIKE" IRFs. In case
of diffuse data, it creates FoV offset bins based on the configuration
file and creates the "FULL-ENCLOSURE" if the number of FoV offset bins
is more than one. In case the number is one, it creates the "POINT-LIKE"
IRFs, which allows us to perform the 1D spectral analysis even if only
diffuse data is available for test MCs.

There are four different event types with which the IRFs are created.
The "hardware" type is supposed for the hardware trigger between LST-1
and MAGIC, allowing for the events of all the telescope combinations.
The "software(_only_3tel)" types are supposed for the software event
coincidence with LST-mono and MAGIC-stereo observations, allowing for
only the events triggering both M1 and M2. The "software" type allows
for the events of the any 2-tel combinations except the MAGIC-stereo
combination at the moment. The "software_only_3tel" type allows for only
the events of the 3-tel combination. The "magic_only" type allows for
only the events of the MAGIC-stereo combination.

There are two types of gammaness and theta cuts, "global" and "dynamic".
In case of the dynamic cuts, the optimal cut satisfying a given
efficiency will be calculated for every energy bin.

Usage:
$ python lst1_magic_create_irf.py
--input-file-gamma dl2/dl2_gamma_40deg_90deg.h5
(--input-file-proton dl2/dl2_proton_40deg_90deg.h5)
(--input-file-electron dl2/dl2_electron_40deg_90deg.h5)
(--output-dir irf)
(--config-file config.yaml)
"""

import argparse
import logging
import operator
import time
from pathlib import Path

import numpy as np
import yaml
from astropy import units as u
from astropy.io import fits
from astropy.table import QTable, vstack
from pyirf.cuts import calculate_percentile_cut, evaluate_binned_cut
from pyirf.io.gadf import (
    create_aeff2d_hdu,
    create_background_2d_hdu,
    create_energy_dispersion_hdu,
    create_psf_table_hdu,
    create_rad_max_hdu,
)
from pyirf.irf import (
    background_2d,
    effective_area_per_energy,
    effective_area_per_energy_and_fov,
    energy_dispersion,
    psf_table,
)
from pyirf.spectral import (
    IRFDOC_ELECTRON_SPECTRUM,
    IRFDOC_PROTON_SPECTRUM,
    PowerLaw,
    calculate_event_weights,
)

from magicctapipe.io import create_gh_cuts_hdu, format_object, load_mc_dl2_data_file

__all__ = ["create_irf"]

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)


[docs] def create_irf( input_file_gamma, input_file_proton, input_file_electron, output_dir, config ): """ Processes MC DL2 events and creates the IRFs. Parameters ---------- input_file_gamma : str Path to an input gamma MC DL2 data file input_file_proton : str Path to an input proton MC DL2 data file input_file_electron : str Path to an input electron MC DL2 data file output_dir : str Path to a directory where to save an output IRF file config : dict Configuration for the LST-1 + MAGIC analysis Raises ------ RuntimeError If the pointing direction does not match between the input MCs ValueError If the input type of gammaness or theta cut is not known """ config_irf = config["create_irf"] quality_cuts = config_irf["quality_cuts"] event_type = config_irf["event_type"] weight_type_dl2 = config_irf["weight_type_dl2"] logger.info(f"\nQuality cuts: {quality_cuts}") logger.info(f"Event type: {event_type}") logger.info(f"DL2 weight type: {weight_type_dl2}") # Load the input gamma MC DL2 data file logger.info(f"\nInput gamma MC DL2 data file: {input_file_gamma}") event_table_gamma, pnt_gamma, sim_info_gamma = load_mc_dl2_data_file( input_file_gamma, quality_cuts, event_type, weight_type_dl2 ) viewcone = sim_info_gamma.viewcone_max.to_value( "deg" ) - sim_info_gamma.viewcone_min.to_value("deg") is_diffuse_mc = viewcone > 0 logger.info(f"\nIs diffuse MC: {is_diffuse_mc}") if is_diffuse_mc: # Create FoV offset bins based on the configuration config_fov_bins = config_irf["fov_offset_bins"] logger.info("\nFov offset bins (linear scale):") logger.info(format_object(config_fov_bins)) fov_bins_start = u.Quantity(config_fov_bins["start"]) fov_bins_stop = u.Quantity(config_fov_bins["stop"]) fov_offset_bins = u.deg * np.linspace( start=fov_bins_start.to_value("deg").round(1), stop=fov_bins_stop.to_value("deg").round(1), num=config_fov_bins["n_edges"], ) else: # Create one FoV offset bin around the true offset true_fov_offset = event_table_gamma["true_source_fov_offset"].to("deg") mean_true_fov_offset = true_fov_offset.mean().round(1) fov_offset_bins = mean_true_fov_offset + [-0.1, 0.1] * u.deg logger.info(f"\nMean true FoV offset: {mean_true_fov_offset}") logger.info(f"--> FoV offset bin: {fov_offset_bins}") # Here we decide the IRF type based on the number of FoV offset # bins - "POINT-LIKE" in case of 1 bin and "FULL-ENCLOSURE" in the # other cases. It allows for creating the "POINT-LIKE" IRFs with # diffuse gamma MCs by selecting a given FoV offset region. n_fov_offset_bins = len(fov_offset_bins) - 1 is_point_like = n_fov_offset_bins == 1 if is_point_like: logger.info("\nIRF type: POINT-LIKE") else: logger.info("\nIRF type: FULL-ENCLOSURE") # Check the existence of background MC data is_proton_mc = input_file_proton is not None is_electron_mc = input_file_electron is not None logger.info(f"\nIs proton MC: {is_proton_mc}") logger.info(f"Is electron MC: {is_electron_mc}") is_bkg_mc = all([is_proton_mc, is_electron_mc]) if is_point_like and is_bkg_mc: logger.warning( "\nWARNING: Will skip the creation of a background model, " "since it is not needed for the 'POINT-LIKE' IRFs." ) if (not is_point_like) and (not is_bkg_mc): logger.warning( "\nWARNING: Will skip the creation of a background model, " "since both or either of background MCs are missing." ) event_table_bkg = QTable() if not is_point_like and is_bkg_mc: # Load the input proton MC DL2 data file logger.info(f"\nInput proton MC DL2 data file: {input_file_proton}") event_table_proton, pnt_proton, sim_info_proton = load_mc_dl2_data_file( input_file_proton, quality_cuts, event_type, weight_type_dl2 ) if any(pnt_proton != pnt_gamma): raise RuntimeError( f"Pointing direction of the proton MC {pnt_proton.tolist()} deg " f"does not match with that of the gamma MC {pnt_gamma.tolist()} deg." ) # Load the input electron MC DL2 data file logger.info(f"\nInput electron MC DL2 data file: {input_file_electron}") event_table_electron, pnt_electron, sim_info_electron = load_mc_dl2_data_file( input_file_electron, quality_cuts, event_type, weight_type_dl2 ) if any(pnt_electron != pnt_gamma): raise RuntimeError( f"Pointing direction of the electron MC {pnt_electron.tolist()} deg " f"does not match with that of the gamma MC {pnt_gamma.tolist()} deg." ) # Calculate event weights obs_time = config_irf["obs_time_irf"] logger.info(f"\nIRF observation time: {obs_time}") obs_time = u.Quantity(obs_time) sim_spectrum_proton = PowerLaw.from_simulation(sim_info_proton, obs_time) sim_spectrum_electron = PowerLaw.from_simulation(sim_info_electron, obs_time) event_table_proton["weight"] = calculate_event_weights( true_energy=event_table_proton["true_energy"], target_spectrum=IRFDOC_PROTON_SPECTRUM, simulated_spectrum=sim_spectrum_proton, ) event_table_electron["weight"] = calculate_event_weights( true_energy=event_table_electron["true_energy"], target_spectrum=IRFDOC_ELECTRON_SPECTRUM, simulated_spectrum=sim_spectrum_electron, ) # Combine the background MCs event_table_bkg = vstack([event_table_proton, event_table_electron]) # Prepare for creating IRFs config_eng_bins = config_irf["energy_bins"] config_mig_bins = config_irf["migration_bins"] logger.info("\nEnergy bins (log space):") logger.info(format_object(config_eng_bins)) eng_bins_start = u.Quantity(config_eng_bins["start"]) eng_bins_stop = u.Quantity(config_eng_bins["stop"]) energy_bins = u.TeV * np.geomspace( start=eng_bins_start.to_value("TeV").round(3), stop=eng_bins_stop.to_value("TeV").round(3), num=config_eng_bins["n_edges"], ) logger.info("\nMigration bins (log space):") logger.info(format_object(config_mig_bins)) migration_bins = np.geomspace( config_mig_bins["start"], config_mig_bins["stop"], config_mig_bins["n_edges"] ) if not is_point_like: config_src_bins = config_irf["source_offset_bins"] logger.info("\nSource offset bins (linear space):") logger.info(format_object(config_src_bins)) src_bins_start = u.Quantity(config_src_bins["start"]) src_bins_stop = u.Quantity(config_src_bins["stop"]) source_offset_bins = u.deg * np.linspace( start=src_bins_start.to_value("deg").round(1), stop=src_bins_stop.to_value("deg").round(1), num=config_src_bins["n_edges"], ) if is_bkg_mc: config_bkg_bins = config_irf["bkg_fov_offset_bins"] logger.info("\nBackground FoV offset bins (linear space):") logger.info(format_object(config_bkg_bins)) bkg_bins_start = u.Quantity(config_bkg_bins["start"]) bkg_bins_stop = u.Quantity(config_bkg_bins["stop"]) bkg_fov_offset_bins = u.deg * np.linspace( start=bkg_bins_start.to_value("deg").round(1), stop=bkg_bins_stop.to_value("deg").round(1), num=config_bkg_bins["n_edges"], ) extra_header = { "TELESCOP": "CTA-N", "INSTRUME": "LST-1_MAGIC", "FOVALIGN": "RADEC", "PNT_ZD": (pnt_gamma[0], "deg"), "PNT_AZ": (pnt_gamma[1], "deg"), "EVT_TYPE": event_type, "DL2_WEIG": weight_type_dl2, } if quality_cuts is not None: extra_header["QUAL_CUT"] = quality_cuts if is_bkg_mc: extra_header["IRF_OBST"] = (obs_time.to_value("h"), "h") irf_hdus = fits.HDUList([fits.PrimaryHDU()]) # Apply the gammaness cut config_gh_cuts = config_irf["gammaness"] cut_type_gh = config_gh_cuts.pop("cut_type") if cut_type_gh == "global": cut_value_gh = config_gh_cuts["global_cut_value"] logger.info(f"\nGlobal gammaness cut: {cut_value_gh}") extra_header["GH_CUT"] = cut_value_gh output_suffix = f"gh_glob{cut_value_gh}" # Apply the global gammaness cut mask_gh = event_table_gamma["gammaness"] > cut_value_gh event_table_gamma = event_table_gamma[mask_gh] if is_bkg_mc: mask_gh = event_table_bkg["gammaness"] > cut_value_gh event_table_bkg = event_table_bkg[mask_gh] elif cut_type_gh == "dynamic": config_gh_cuts.pop("global_cut_value", None) logger.info("\nDynamic gammaness cuts:") logger.info(format_object(config_gh_cuts)) gh_efficiency = config_gh_cuts["efficiency"] gh_cut_min = config_gh_cuts["min_cut"] gh_cut_max = config_gh_cuts["max_cut"] extra_header["GH_EFF"] = gh_efficiency extra_header["GH_MIN"] = gh_cut_min extra_header["GH_MAX"] = gh_cut_max output_suffix = f"gh_dyn{gh_efficiency}" # Calculate dynamic gammaness cuts gh_percentile = 100 * (1 - gh_efficiency) cut_table_gh = calculate_percentile_cut( values=event_table_gamma["gammaness"], bin_values=event_table_gamma["reco_energy"], bins=energy_bins, fill_value=gh_cut_min, percentile=gh_percentile, min_value=gh_cut_min, max_value=gh_cut_max, ) logger.info(f"\nGammaness-cut table:\n\n{cut_table_gh}") # Apply the dynamic gammaness cuts mask_gh = evaluate_binned_cut( values=event_table_gamma["gammaness"], bin_values=event_table_gamma["reco_energy"], cut_table=cut_table_gh, op=operator.ge, ) event_table_gamma = event_table_gamma[mask_gh] if is_bkg_mc: mask_gh = evaluate_binned_cut( values=event_table_bkg["gammaness"], bin_values=event_table_bkg["reco_energy"], cut_table=cut_table_gh, op=operator.ge, ) event_table_bkg = event_table_bkg[mask_gh] # Add one dimension for the FoV offset bin gh_cuts = cut_table_gh["cut"][:, np.newaxis] # Create a gammaness-cut HDU logger.info("\nCreating a gammaness-cut HDU...") hdu_gh_cuts = create_gh_cuts_hdu( gh_cuts=gh_cuts, reco_energy_bins=energy_bins, fov_offset_bins=fov_offset_bins, **extra_header, ) irf_hdus.append(hdu_gh_cuts) else: raise ValueError(f"Unknown gammaness-cut type '{cut_type_gh}'.") if is_point_like: # Apply the theta cut config_theta_cuts = config_irf["theta"] cut_type_theta = config_theta_cuts.pop("cut_type") if cut_type_theta == "global": cut_value_theta = config_theta_cuts["global_cut_value"] logger.info(f"\nGlobal theta cut: {cut_value_theta}") cut_value_theta = u.Quantity(cut_value_theta).to_value("deg") extra_header["RAD_MAX"] = (cut_value_theta, "deg") output_suffix += f"_theta_glob{cut_value_theta}deg" # Apply the global theta cut mask_theta = event_table_gamma["theta"].to_value("deg") < cut_value_theta event_table_gamma = event_table_gamma[mask_theta] elif cut_type_theta == "dynamic": config_theta_cuts.pop("global_cut_value", None) logger.info("\nDynamic theta cuts:") logger.info(format_object(config_theta_cuts)) theta_efficiency = config_theta_cuts["efficiency"] theta_cut_min = u.Quantity(config_theta_cuts["min_cut"]) theta_cut_max = u.Quantity(config_theta_cuts["max_cut"]) extra_header["TH_EFF"] = theta_efficiency extra_header["TH_MIN"] = (theta_cut_min.to_value("deg"), "deg") extra_header["TH_MAX"] = (theta_cut_max.to_value("deg"), "deg") output_suffix += f"_theta_dyn{theta_efficiency}" # Calculate dynamic theta cuts theta_percentile = 100 * theta_efficiency cut_table_theta = calculate_percentile_cut( values=event_table_gamma["theta"], bin_values=event_table_gamma["reco_energy"], bins=energy_bins, fill_value=theta_cut_max, percentile=theta_percentile, min_value=theta_cut_min, max_value=theta_cut_max, ) logger.info(f"\nTheta-cut table:\n\n{cut_table_theta}") # Apply the dynamic theta cuts mask_theta = evaluate_binned_cut( values=event_table_gamma["theta"], bin_values=event_table_gamma["reco_energy"], cut_table=cut_table_theta, op=operator.le, ) event_table_gamma = event_table_gamma[mask_theta] # Add one dimension for the FoV offset bin theta_cuts = cut_table_theta["cut"][:, np.newaxis] # Create a rad-max HDU logger.info("\nCreating a rad-max HDU...") hdu_rad_max = create_rad_max_hdu( rad_max=theta_cuts, reco_energy_bins=energy_bins, fov_offset_bins=fov_offset_bins, extname="RAD_MAX", **extra_header, ) irf_hdus.append(hdu_rad_max) else: raise ValueError(f"Unknown theta-cut type '{cut_type_theta}'.") # Create an effective-area HDU logger.info("\nCreating an effective-area HDU...") with np.errstate(invalid="ignore", divide="ignore"): if is_diffuse_mc: aeff = effective_area_per_energy_and_fov( selected_events=event_table_gamma, simulation_info=sim_info_gamma, true_energy_bins=energy_bins, fov_offset_bins=fov_offset_bins, ) else: aeff = effective_area_per_energy( selected_events=event_table_gamma, simulation_info=sim_info_gamma, true_energy_bins=energy_bins, ) # Add one dimension for the FoV offset bin aeff = aeff[:, np.newaxis] aeff_hdu = create_aeff2d_hdu( effective_area=aeff, true_energy_bins=energy_bins, fov_offset_bins=fov_offset_bins, point_like=is_point_like, extname="EFFECTIVE AREA", **extra_header, ) irf_hdus.append(aeff_hdu) # Create an energy-dispersion HDU logger.info("Creating an energy-dispersion HDU...") edisp = energy_dispersion( selected_events=event_table_gamma, true_energy_bins=energy_bins, fov_offset_bins=fov_offset_bins, migration_bins=migration_bins, ) edisp_hdu = create_energy_dispersion_hdu( energy_dispersion=edisp, true_energy_bins=energy_bins, migration_bins=migration_bins, fov_offset_bins=fov_offset_bins, point_like=is_point_like, extname="ENERGY DISPERSION", **extra_header, ) irf_hdus.append(edisp_hdu) if not is_point_like: # Create a PSF table HDU logger.info("Creating a PSF table HDU...") psf = psf_table( events=event_table_gamma, true_energy_bins=energy_bins, source_offset_bins=source_offset_bins, fov_offset_bins=fov_offset_bins, ) psf_hdu = create_psf_table_hdu( psf=psf, true_energy_bins=energy_bins, source_offset_bins=source_offset_bins, fov_offset_bins=fov_offset_bins, extname="PSF", **extra_header, ) irf_hdus.append(psf_hdu) if is_bkg_mc: # Create a background HDU logger.info("Creating a background HDU...") bkg = background_2d( events=event_table_bkg, reco_energy_bins=energy_bins, fov_offset_bins=bkg_fov_offset_bins, t_obs=obs_time, ) bkg_hdu = create_background_2d_hdu( background_2d=bkg, reco_energy_bins=energy_bins, fov_offset_bins=bkg_fov_offset_bins, extname="BACKGROUND", **extra_header, ) irf_hdus.append(bkg_hdu) # Save the data in an output file Path(output_dir).mkdir(exist_ok=True, parents=True) output_file = ( f"{output_dir}/irf_zd_{pnt_gamma[0]}deg_az_{pnt_gamma[1]}deg_" f"{event_type}_{output_suffix}.fits.gz" ) irf_hdus.writeto(output_file, overwrite=True) logger.info(f"\nOutput file: {output_file}")
def main(): """Main function.""" start_time = time.time() parser = argparse.ArgumentParser() parser.add_argument( "--input-file-gamma", "-g", dest="input_file_gamma", type=str, required=True, help="Path to an input gamma MC DL2 data file", ) parser.add_argument( "--input-file-proton", "-p", dest="input_file_proton", type=str, help="Path to an input proton MC DL2 data file", ) parser.add_argument( "--input-file-electron", "-e", dest="input_file_electron", type=str, help="Path to an input electron MC DL2 data file", ) parser.add_argument( "--output-dir", "-o", dest="output_dir", type=str, default="./data", help="Path to a directory where to save an output IRF file", ) parser.add_argument( "--config-file", "-c", dest="config_file", type=str, default="./config.yaml", help="Path to a configuration file", ) args = parser.parse_args() with open(args.config_file, "rb") as f: config = yaml.safe_load(f) # Create the IRFs create_irf( input_file_gamma=args.input_file_gamma, input_file_proton=args.input_file_proton, input_file_electron=args.input_file_electron, output_dir=args.output_dir, config=config, ) logger.info("\nDone.") process_time = time.time() - start_time logger.info(f"\nProcess time: {process_time:.0f} [sec]\n") if __name__ == "__main__": main()