Source code for magicctapipe.scripts.lst1_magic.lst1_magic_mc_dl0_to_dl1

#!/usr/bin/env python
# coding: utf-8

"""
This script processes LST and MAGIC events of simtel MC DL0 data
(*.simtel.gz) and computes the DL1 parameters, i.e., Hillas, timing and
leakage parameters. It saves only the events where all the DL1 parameters
are successfully reconstructed.

Since it cannot identify the telescopes from the input file, please assign
the correct telescope ID to each telescope in the configuration file.

The telescope coordinates will be converted to those
relative to the center of the LST and MAGIC positions (including the
altitude) for the convenience of the geometrical stereo reconstruction.

Usage:
$ python lst1_magic_mc_dl0_to_dl1.py
--input-file dl0/gamma_40deg_90deg_run1.simtel.gz
(--output-dir dl1)
(--config-file config_step1.yaml)
"""

import argparse  # Parser for command-line options, arguments etc
import logging  # Used to manage the log file
import re
import time
from pathlib import Path

import numpy as np
import yaml
from astropy.coordinates import Angle, angular_separation
from ctapipe.calib import CameraCalibrator
from ctapipe.image import (
    concentration_parameters,
    hillas_parameters,
    leakage_parameters,
    number_of_islands,
    timing_parameters,
)
from ctapipe.instrument import FocalLengthKind
from ctapipe.io import EventSource, HDF5TableWriter
from traitlets.config import Config

from magicctapipe.image import MAGICClean
from magicctapipe.image.calib import calibrate
from magicctapipe.io import SimEventInfoContainer, check_input_list, format_object
from magicctapipe.utils import calculate_disp, calculate_impact

__all__ = ["mc_dl0_to_dl1"]

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)

# The CORSIKA particle types #CORSIKA simulates Cherenkov light
PARTICLE_TYPES = {1: "gamma", 3: "electron", 14: "proton", 402: "helium"}


[docs] def mc_dl0_to_dl1(input_file, output_dir, config, focal_length): """ Processes LST-1 and MAGIC events of simtel MC DL0 data and computes the DL1 parameters. Parameters ---------- input_file : str Path to an input simtel MC DL0 data file output_dir : str Path to a directory where to save an output DL1 data file config : dict Configuration for the LST-1 + MAGIC analysis focal_length : str Focal length choice, effective or equivalent """ assigned_tel_ids = config[ "mc_tel_ids" ] # This variable becomes the dictionary {'LST-1': 1, 'MAGIC-I': 2, 'MAGIC-II': 3} logger.info( "\nAssigned telescope IDs:" ) # Here we are just adding infos to the log file logger.info(format_object(assigned_tel_ids)) # Load the input file logger.info(f"\nInput file: {input_file}") if focal_length == "effective": focal_length = FocalLengthKind.EFFECTIVE elif focal_length == "equivalent": focal_length = FocalLengthKind.EQUIVALENT else: raise ValueError( f"Accepted choices for focal_length are 'effective' or 'equivalent'.\n" f"Chosen value: {focal_length}." ) event_source = EventSource( input_file, allowed_tels=list( filter(lambda check_id: check_id > 0, assigned_tel_ids.values()) ), # Here we load the events for all telescopes with ID > 0. focal_length_choice=focal_length, ) obs_id = event_source.obs_ids[0] subarray = event_source.subarray tel_descriptions = subarray.tel tel_positions = subarray.positions logger.info("\nSubarray description:") logger.info(format_object(tel_descriptions)) camera_geoms = {} for tel_id, telescope in tel_descriptions.items(): camera_geoms[tel_id] = telescope.camera.geometry # Configure the LST event processors config_lst = config["LST"] config_lst["mc_tel_ids"] = assigned_tel_ids logger.info("\nLST image extractor:") logger.info(format_object(config_lst["image_extractor"])) extractor_type_lst = config_lst["image_extractor"].pop("type") config_extractor_lst = {extractor_type_lst: config_lst["image_extractor"]} calibrator_lst = CameraCalibrator( image_extractor_type=extractor_type_lst, config=Config(config_extractor_lst), subarray=subarray, ) logger.info("\nLST NSB modifier:") logger.info(format_object(config_lst["increase_nsb"])) logger.info("\nLST PSF modifier:") logger.info(format_object(config_lst["increase_psf"])) logger.info("\nLST tailcuts cleaning:") logger.info(format_object(config_lst["tailcuts_clean"])) logger.info("\nLST time delta cleaning:") logger.info(format_object(config_lst["time_delta_cleaning"])) logger.info("\nLST dynamic cleaning:") logger.info(format_object(config_lst["dynamic_cleaning"])) use_only_main_island = config_lst["use_only_main_island"] logger.info(f"\nLST use only main island: {use_only_main_island}") # Configure the MAGIC event processors config_magic = config["MAGIC"] config_magic["mc_tel_ids"] = assigned_tel_ids logger.info("\nMAGIC image extractor:") logger.info(format_object(config_magic["image_extractor"])) for imagic in [1, 2]: if f"increase_nsb_m{imagic}" in config_magic: logger.info("\nMAGIC-" + imagic * "I" + " NSB modifier:") logger.info(format_object(config_magic[f"increase_nsb_m{imagic}"])) extractor_type_magic = config_magic["image_extractor"].pop("type") config_extractor_magic = {extractor_type_magic: config_magic["image_extractor"]} calibrator_magic = CameraCalibrator( image_extractor_type=extractor_type_magic, config=Config(config_extractor_magic), subarray=subarray, ) logger.info("\nMAGIC charge correction:") logger.info(format_object(config_magic["charge_correction"])) if config_magic["magic_clean"]["find_hotpixels"]: logger.warning( "\nWARNING: Hot pixels do not exist in a simulation. " "Setting the `find_hotpixels` option to False..." ) config_magic["magic_clean"].update({"find_hotpixels": False}) logger.info("\nMAGIC image cleaning:") logger.info(format_object(config_magic["magic_clean"])) # Prepare for saving data to an output file Path(output_dir).mkdir(exist_ok=True, parents=True) sim_config = event_source.simulation_config[obs_id] corsika_inputcard = event_source.file_.corsika_inputcards[0].decode() regex = r".*\nPRMPAR\s+(\d+)\s+.*" particle_id = int(re.findall(regex, corsika_inputcard)[0]) particle_type = PARTICLE_TYPES.get(particle_id, "unknown") zenith = 90 - sim_config["max_alt"].to_value("deg") azimuth = Angle(sim_config["max_az"]).wrap_at("360 deg").degree logger.info(np.asarray(list(assigned_tel_ids.values()))) LSTs_IDs = np.asarray(list(assigned_tel_ids.values())[0:4]) LSTs_in_use = ( np.where(LSTs_IDs > 0)[0] + 1 ) # Here we select which LSTs are/is in use if len(LSTs_in_use) == 0: LSTs_in_use = "".join(str(k) for k in LSTs_in_use) elif len(LSTs_in_use) > 0: LSTs_in_use = "LST" + "_LST".join(str(k) for k in LSTs_in_use) MAGICs_IDs = np.asarray(list(assigned_tel_ids.values())[4:6]) MAGICs_in_use = ( np.where(MAGICs_IDs > 0)[0] + 1 ) # Here we select which MAGICs are/is in use if len(MAGICs_in_use) == 0: MAGICs_in_use = "".join(str(k) for k in MAGICs_in_use) elif len(MAGICs_in_use) > 0: MAGICs_in_use = "MAGIC" + "_MAGIC".join(str(k) for k in MAGICs_in_use) magic_clean = {} for k in MAGICs_IDs: if k > 0: magic_clean[k] = MAGICClean(camera_geoms[k], config_magic["magic_clean"]) output_file = ( f"{output_dir}/dl1_{particle_type}_zd_{zenith.round(3)}deg_" f"az_{azimuth.round(3)}deg_{LSTs_in_use}_{MAGICs_in_use}_run{obs_id}.h5" ) # The files are saved with the names of all telescopes involved # Loop over every shower event logger.info("\nProcessing the events...") with HDF5TableWriter( output_file, group_name="events", mode="w", add_prefix=True ) as writer: for event in event_source: if event.count % 100 == 0: logger.info(f"{event.count} events") tels_with_trigger = event.trigger.tels_with_trigger # Check if the event triggers both M1 and M2 or not magic_stereo = (set(MAGICs_IDs).issubset(set(tels_with_trigger))) and ( MAGICs_in_use == "MAGIC1_MAGIC2" ) # If both have trigger, then magic_stereo = True for tel_id in tels_with_trigger: if ( tel_id in LSTs_IDs ): # If the ID is in the LST list, we call calibrate on the LST() # Calibrate the LST-1 event signal_pixels, image, peak_time = calibrate( event=event, tel_id=tel_id, obs_id=obs_id, config=config_lst, camera_geoms=camera_geoms, calibrator=calibrator_lst, is_lst=True, ) elif tel_id in MAGICs_IDs: # Calibrate the MAGIC event signal_pixels, image, peak_time = calibrate( event=event, tel_id=tel_id, obs_id=obs_id, config=config_magic, magic_clean=magic_clean, calibrator=calibrator_magic, is_lst=False, ) else: logger.info( f"--> Telescope ID {tel_id} not in LST list or MAGIC list. Please check if the IDs are OK in the configuration file" ) if not any( signal_pixels ): # So: if there is no event, we skip it and go back to the loop in the next event logger.info( f"--> {event.count} event (event ID: {event.index.event_id}, " f"telescope {tel_id}) could not survive the image cleaning. " "Skipping..." ) continue n_pixels = np.count_nonzero(signal_pixels) n_islands, _ = number_of_islands(camera_geoms[tel_id], signal_pixels) camera_geom_masked = camera_geoms[tel_id][signal_pixels] image_masked = image[signal_pixels] peak_time_masked = peak_time[signal_pixels] if any(image_masked < 0): logger.info( f"--> {event.count} event (event ID: {event.index.event_id}, " f"telescope {tel_id}) cannot be parametrized due to the pixels " "with negative charges. Skipping..." ) continue # Parametrize the image hillas_params = hillas_parameters(camera_geom_masked, image_masked) # if any(np.isnan(value) for value in hillas_params.values()): logger.info( f"--> {event.count} event (event ID: {event.index.event_id}, " f"telescope {tel_id}): non-valid Hillas parameters. Skipping..." ) continue timing_params = timing_parameters( camera_geom_masked, image_masked, peak_time_masked, hillas_params ) if np.isnan(timing_params.slope): logger.info( f"--> {event.count} event (event ID: {event.index.event_id}, " f"telescope {tel_id}) failed to extract finite timing " "parameters. Skipping..." ) continue leakage_params = leakage_parameters( camera_geoms[tel_id], image, signal_pixels ) conc_params = concentration_parameters( camera_geoms[tel_id], image, hillas_params ) # Calculate additional parameters true_disp = calculate_disp( pointing_alt=event.pointing.tel[tel_id].altitude, pointing_az=event.pointing.tel[tel_id].azimuth, shower_alt=event.simulation.shower.alt, shower_az=event.simulation.shower.az, cog_x=hillas_params.x, cog_y=hillas_params.y, camera_frame=camera_geoms[tel_id].frame, ) true_impact = calculate_impact( shower_alt=event.simulation.shower.alt, shower_az=event.simulation.shower.az, core_x=event.simulation.shower.core_x, core_y=event.simulation.shower.core_y, tel_pos_x=tel_positions[tel_id][0], tel_pos_y=tel_positions[tel_id][1], tel_pos_z=tel_positions[tel_id][2], ) off_axis = angular_separation( lon1=event.pointing.tel[tel_id].azimuth, lat1=event.pointing.tel[tel_id].altitude, lon2=event.simulation.shower.az, lat2=event.simulation.shower.alt, ) tels_with_trigger = np.intersect1d( tels_with_trigger, np.concatenate( (LSTs_IDs[LSTs_IDs != 0], MAGICs_IDs[MAGICs_IDs != 0]) ), assume_unique=True, ).tolist() # encode tels_with_trigger as an int value # that can be decoded later as a binary # tels_with_trigger = sum_{tel_id} 2**tel_id # where tel_id is only for those triggered tels_with_trigger_binary_int = np.array( [2 ** (tel_id) for tel_id in tels_with_trigger] ).sum() # Set the event information event_info = SimEventInfoContainer( obs_id=event.index.obs_id, event_id=event.index.event_id, pointing_alt=event.pointing.tel[tel_id].altitude, pointing_az=event.pointing.tel[tel_id].azimuth, true_energy=event.simulation.shower.energy, true_alt=event.simulation.shower.alt, true_az=event.simulation.shower.az, true_disp=true_disp, true_core_x=event.simulation.shower.core_x, true_core_y=event.simulation.shower.core_y, true_impact=true_impact, off_axis=off_axis, n_pixels=n_pixels, n_islands=n_islands, magic_stereo=magic_stereo, tels_with_trigger=tels_with_trigger_binary_int, ) # Reset the telescope IDs event_info.tel_id = tel_id # Save the parameters to the output file # Setting all the prefixes except of concentration to empty string event_info.prefix = "" hillas_params.prefix = "" timing_params.prefix = "" leakage_params.prefix = "" writer.write( "parameters", ( event_info, hillas_params, timing_params, leakage_params, conc_params, ), ) n_events_processed = event.count + 1 logger.info(f"\nIn total {n_events_processed} events are processed.") # Save the subarray description subarray.to_hdf(output_file) # Save the simulation configuration with HDF5TableWriter(output_file, group_name="simulation", mode="a") as writer: writer.write("config", sim_config) logger.info(f"\nOutput file: {output_file}")
def main(): """Main function.""" start_time = time.time() parser = argparse.ArgumentParser() # Here we are simply collecting the parameters from the command line, as input file, output directory, and configuration file parser.add_argument( "--input-file", "-i", dest="input_file", type=str, required=True, help="Path to an input simtel MC DL0 data file", ) parser.add_argument( "--output-dir", "-o", dest="output_dir", type=str, default="./data", help="Path to a directory where to save an output DL1 data file", ) parser.add_argument( "--config-file", "-c", dest="config_file", type=str, default="./config.yaml", help="Path to a configuration file", ) parser.add_argument( "--focal_length_choice", "-f", dest="focal_length_choice", type=str, choices=["equivalent", "effective"], default="effective", help='Choice of focal length, either "effective" or "equivalent". The default (and standard) value is "effective"', ) args = parser.parse_args() # Here we select all 3 parameters collected above with open( args.config_file, "rb" ) as f: # "rb" mode opens the file in binary format for reading config = yaml.safe_load( f ) # Here we collect the inputs from the configuration file # Checking if the input telescope list is properly organized: check_input_list(config) config["mc_tel_ids"] = dict( sorted(config["mc_tel_ids"].items()) ) # Sorting needed to correctly name the output file # Process the input data mc_dl0_to_dl1(args.input_file, args.output_dir, config, args.focal_length_choice) logger.info("\nDone.") process_time = time.time() - start_time logger.info(f"\nProcess time: {process_time:.0f} [sec]\n") if __name__ == "__main__": main()