#!/usr/bin/env python
# coding: utf-8
"""
This script processes the events of MAGIC calibrated data (*_Y_*.root)
with the MARS-like image cleaning and computes the DL1 parameters, i.e.,
Hillas, timing and leakage parameters. It saves only the events that all
the DL1 parameters are successfully reconstructed.
When the input is real data, it searches for all the subrun files with
the same observation ID and stored in the same directory as the input
subrun file. Then, it reads their drive reports and uses the information
to reconstruct the telescope pointing direction. Since the accuracy of
the reconstruction improves, it is recommended to store all the subrun
files in the same directory.
If the `--process-run` argument is given, it not only reads the drive
reports but also processes all the events of the subrun files at once.
If the `--magic-only` argument is given, the processing is unchanged,
but the subarray will contain only the MAGIC telescopes. This is needed
when performing a MAGIC-only analysis using the pipeline. In all other
cases, LST is included in the subarray.
Please note that it is also possible to process SUM trigger data with
this script, but since the MaTaJu cleaning is not yet implemented in
this pipeline, it applies the standard cleaning instead.
Usage:
$ python magic_calib_to_dl1.py
--input-file calib/20201216_M1_05093711.001_Y_CrabNebula-W0.40+035.root
(--output-dir dl1)
(--config-file config.yaml)
(--magic-only)
(--process-run)
"""
import argparse
import logging
import re
import time
import warnings
from pathlib import Path
import numpy as np
import yaml
from astropy import units as u
from astropy.coordinates import angular_separation
from ctapipe.containers import DL1CameraContainer
from ctapipe.image import (
concentration_parameters,
hillas_parameters,
leakage_parameters,
number_of_islands,
timing_parameters,
)
from ctapipe.instrument import SubarrayDescription
from ctapipe.io import HDF5TableWriter
from ctapipe_io_lst import LSTEventSource
from ctapipe_io_magic import MAGICEventSource
from magicctapipe.image import MAGICClean
from magicctapipe.io import (
RealEventInfoContainer,
SimEventInfoContainer,
check_input_list,
format_object,
)
from magicctapipe.utils import calculate_disp, calculate_impact
__all__ = ["magic_calib_to_dl1"]
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
# Ignore runtime warnings appeared during the image cleaning
warnings.simplefilter("ignore", category=RuntimeWarning)
# The pedestal types to find bad RMS pixels
PEDESTAL_TYPES = ["fundamental", "from_extractor", "from_extractor_rndm"]
TEL_COMBINATIONS = {
"LST1_M1": [1, 2], # combo_type = 0
"LST1_M1_M2": [1, 2, 3], # combo_type = 1
"LST1_M2": [1, 3], # combo_type = 2
"M1_M2": [2, 3], # combo_type = 3
} # TODO: REMOVE WHEN SWITCHING TO THE NEW RFs IMPLEMENTTATION (1 RF PER TELESCOPE)
[docs]
def magic_calib_to_dl1(
input_file, output_dir, config, max_events, magic_only=False, process_run=False
):
"""
Processes the events of MAGIC calibrated data and computes the DL1 parameters.
Parameters
----------
input_file : str
Path to an input MAGIC calibrated data file
output_dir : str
Path to a directory where to save an output DL1 data file
config : dict
Configuration for the LST-1 + MAGIC analysis
max_events : int
Maximum number of events to process
magic_only : bool, optional
If `True`, it will store subarray information only for the MAGIC
telescopes. This is needed if the pipeline will be used for
MAGIC-only analysis.
process_run : bool, optional
If `True`, it processes the events of all the subrun files
found in the same directory of the input subrun file at once
(applicable only to real data)
"""
# Load the input file
logger.info(f"\nInput file: {input_file}")
event_source = MAGICEventSource(
input_file, process_run=process_run, max_events=max_events
)
is_simulation = event_source.is_simulation
logger.info(f"\nIs simulation: {is_simulation}")
obs_id = event_source.obs_ids[0]
tel_id = event_source.telescope
logger.info(f"\nObservation ID: {obs_id}")
logger.info(f"Telescope ID: {tel_id}")
is_stereo_trigger = event_source.is_stereo
is_sum_trigger = event_source.is_sumt
logger.info(f"\nIs stereo trigger: {is_stereo_trigger}")
logger.info(f"Is SUM trigger: {is_sum_trigger}")
if is_sum_trigger:
logger.warning(
"\nWARNING: The MaTaJu cleaning is not yet implemented. "
"Will apply the standard image cleaning instead."
)
if not is_simulation:
logger.info("\nThe following files are found to read drive reports:")
for subrun_file in event_source.file_list_drive:
logger.info(subrun_file)
logger.info(f"\nProcess run: {process_run}")
time_diffs = event_source.event_time_diffs
subarray = event_source.subarray
camera_geom = subarray.tel[tel_id].camera.geometry
tel_position = subarray.positions[tel_id]
# Configure the MAGIC image cleaning
config_clean = config["MAGIC"]["magic_clean"]
logger.info("\nMAGIC image cleaning:")
logger.info(format_object(config_clean))
magic_clean = MAGICClean(camera_geom, config_clean)
if config_clean["find_hotpixels"]:
# Get the index of the pedestal type
i_ped_type = PEDESTAL_TYPES.index(config_clean["pedestal_type"])
# Prepare for saving data to an output file
Path(output_dir).mkdir(exist_ok=True, parents=True)
if is_simulation:
regex = r"GA_M\d_(\S+)_\d_\d+_Y_*"
input_file_name = Path(input_file).name
zenith_range = re.findall(regex, input_file_name)[0]
output_file = f"{output_dir}/dl1_M{tel_id}_GA_{zenith_range}.Run{obs_id}.h5"
else:
if process_run:
output_file = f"{output_dir}/dl1_M{tel_id}.Run{obs_id:08}.h5"
else:
subrun_id = event_source.metadata["subrun_number"][0]
output_file = f"{output_dir}/dl1_M{tel_id}.Run{obs_id:08}.{subrun_id:03}.h5"
assigned_tel_ids = config[
"mc_tel_ids"
] # This variable becomes the dictionary {'LST-1': 1, 'MAGIC-I': 2, 'MAGIC-II': 3} or similar
subarray_lst = LSTEventSource.create_subarray()
# Reset the telescope IDs of the subarray description
if not magic_only:
tel_positions_magic_lst = {
assigned_tel_ids["LST-1"]: [-8.09, 77.13, 0.78] * u.m, # LST-1
assigned_tel_ids["MAGIC-I"]: [39.3, -62.55, -0.97] * u.m, # MAGIC-I
assigned_tel_ids["MAGIC-II"]: [-31.21, -14.57, 0.2] * u.m, # MAGIC-II
}
tel_descriptions_magic_lst = {
# dummy telescope description for LST-1, same as MAGIC-I
assigned_tel_ids["LST-1"]: subarray_lst.tel[1], # LST-1
assigned_tel_ids["MAGIC-I"]: subarray.tel[1], # MAGIC-I
assigned_tel_ids["MAGIC-II"]: subarray.tel[2], # MAGIC-II
}
subarray_magic = SubarrayDescription(
"MAGIC-LST-Array", tel_positions_magic_lst, tel_descriptions_magic_lst
)
else:
tel_positions_magic = {
assigned_tel_ids["MAGIC-I"]: subarray.positions[1], # MAGIC-I
assigned_tel_ids["MAGIC-II"]: subarray.positions[2], # MAGIC-II
}
tel_descriptions_magic = {
assigned_tel_ids["MAGIC-I"]: subarray.tel[1], # MAGIC-I
assigned_tel_ids["MAGIC-II"]: subarray.tel[2], # MAGIC-II
}
subarray_magic = SubarrayDescription(
"MAGIC-Array", tel_positions_magic, tel_descriptions_magic
)
save_images = config.get("save_images", False)
if save_images:
dl1cont = DL1CameraContainer(prefix="")
# Loop over every shower event
logger.info("\nProcessing the events...")
with HDF5TableWriter(
output_file, group_name="events", mode="w", add_prefix=True
) as writer:
for event in event_source:
if event.count % 100 == 0:
logger.info(f"{event.count} events")
if config_clean["find_hotpixels"]:
# Find dead and bad RMS pixels
pixel_status = event.mon.tel[tel_id].pixel_status
dead_pixels = pixel_status.hardware_failing_pixels[0]
badrms_pixels = pixel_status.pedestal_failing_pixels[i_ped_type]
unsuitable_mask = np.logical_or(dead_pixels, badrms_pixels)
else:
unsuitable_mask = None
# Apply the image cleaning
signal_pixels, image, peak_time = magic_clean.clean_image(
event_image=event.dl1.tel[tel_id].image,
event_pulse_time=event.dl1.tel[tel_id].peak_time,
unsuitable_mask=unsuitable_mask,
)
if not any(signal_pixels):
logger.info(
f"--> {event.count} event (event ID: {event.index.event_id}) "
"could not survive the image cleaning. Skipping..."
)
continue
n_pixels = np.count_nonzero(signal_pixels)
n_islands, _ = number_of_islands(camera_geom, signal_pixels)
camera_geom_masked = camera_geom[signal_pixels]
image_masked = image[signal_pixels]
peak_time_masked = peak_time[signal_pixels]
if any(image_masked < 0):
logger.info(
f"--> {event.count} event (event ID: {event.index.event_id}) "
"cannot be parametrized due to the pixels with negative charges. "
"Skipping..."
)
continue
# Parametrize the image
hillas_params = hillas_parameters(camera_geom_masked, image_masked)
timing_params = timing_parameters(
camera_geom_masked, image_masked, peak_time_masked, hillas_params
)
if np.isnan(timing_params.slope):
logger.info(
f"--> {event.count} event (event ID: {event.index.event_id}) "
"failed to extract finite timing parameters. Skipping..."
)
continue
leakage_params = leakage_parameters(camera_geom, image, signal_pixels)
conc_params = concentration_parameters(camera_geom, image, hillas_params)
if is_simulation:
# Calculate additional parameters
true_disp = calculate_disp(
pointing_alt=event.pointing.tel[tel_id].altitude,
pointing_az=event.pointing.tel[tel_id].azimuth,
shower_alt=event.simulation.shower.alt,
shower_az=event.simulation.shower.az,
cog_x=hillas_params.x,
cog_y=hillas_params.y,
camera_frame=camera_geom.frame,
)
true_impact = calculate_impact(
shower_alt=event.simulation.shower.alt,
shower_az=event.simulation.shower.az,
core_x=event.simulation.shower.core_x,
core_y=event.simulation.shower.core_y,
tel_pos_x=tel_position[0],
tel_pos_y=tel_position[1],
tel_pos_z=tel_position[2],
)
off_axis = angular_separation(
lon1=event.pointing.tel[tel_id].azimuth,
lat1=event.pointing.tel[tel_id].altitude,
lon2=event.simulation.shower.az,
lat2=event.simulation.shower.alt,
)
# Set the simulated event information to the container
event_info = SimEventInfoContainer(
obs_id=event.index.obs_id,
event_id=event.index.event_id,
pointing_alt=event.pointing.tel[tel_id].altitude,
pointing_az=event.pointing.tel[tel_id].azimuth,
true_energy=event.simulation.shower.energy,
true_alt=event.simulation.shower.alt,
true_az=event.simulation.shower.az,
true_disp=true_disp,
true_core_x=event.simulation.shower.core_x,
true_core_y=event.simulation.shower.core_y,
true_impact=true_impact,
off_axis=off_axis,
n_pixels=n_pixels,
n_islands=n_islands,
)
else:
# With the UNIX format and the "long" type, we can get a
# timestamp without being affected by the rounding issue
timestamp = event.trigger.tel[tel_id].time.to_value(
format="unix", subfmt="long"
)
# To keep the precision for the coincidence with LST-1,
# we save the integral and fractional parts separately
fractional, integral = np.modf(timestamp)
time_sec = u.Quantity(integral, unit="s", dtype=int)
time_nanosec = u.Quantity(fractional, unit="s").to("ns")
time_nanosec = u.Quantity(time_nanosec.round(), dtype=int)
# Set the real event information to the container
event_info = RealEventInfoContainer(
obs_id=event.index.obs_id,
event_id=event.index.event_id,
pointing_alt=event.pointing.tel[tel_id].altitude,
pointing_az=event.pointing.tel[tel_id].azimuth,
time_sec=time_sec,
time_nanosec=time_nanosec,
time_diff=time_diffs[event.count],
n_pixels=n_pixels,
n_islands=n_islands,
)
tel_ids_new_assignments = {
1: assigned_tel_ids["MAGIC-I"],
2: assigned_tel_ids["MAGIC-II"],
3: assigned_tel_ids["LST-1"],
}
# Reset the telescope IDs
event_info.tel_id = tel_ids_new_assignments[tel_id]
# encode tels_with_trigger as an int value
# that can be decoded later as a binary
# tels_with_trigger = sum_{tel_id} 2**tel_id
# where tel_id is only for those triggered
tels_with_trigger_binary_int = np.sum(
2
** np.array(
[
tel_ids_new_assignments[tel_idx]
for tel_idx in event.trigger.tels_with_trigger
]
)
)
event_info.tels_with_trigger = tels_with_trigger_binary_int
# Save the parameters to the output file
# Setting all the prefixes except of concentration to empty string
event_info.prefix = ""
hillas_params.prefix = ""
timing_params.prefix = ""
leakage_params.prefix = ""
writer.write(
"parameters",
(event_info, hillas_params, timing_params, leakage_params, conc_params),
)
if save_images:
dl1cont.image = image
dl1cont.peak_time = peak_time
dl1cont.image_mask = signal_pixels
dl1cont.is_valid = True
writer.write(table_name="dl1/image", containers=[event_info, dl1cont])
n_events_processed = event.count + 1
logger.info(f"\nIn total {n_events_processed} events are processed.")
# Save the subarray description
subarray_magic.to_hdf(output_file)
if is_simulation:
# Save the simulation configuration
with HDF5TableWriter(output_file, group_name="simulation", mode="a") as writer:
writer.write("config", event_source.simulation_config[obs_id])
logger.info(f"\nOutput file: {output_file}")
def main():
"""Main function."""
start_time = time.time()
parser = argparse.ArgumentParser()
parser.add_argument(
"--input-file",
"-i",
dest="input_file",
type=str,
required=True,
help="Path to an input MAGIC calibrated data file",
)
parser.add_argument(
"--output-dir",
"-o",
dest="output_dir",
type=str,
default="./data",
help="Path to a directory where to save an output DL1 data file",
)
parser.add_argument(
"--config-file",
"-c",
dest="config_file",
type=str,
default="./config.yaml",
help="Path to a configuration file",
)
parser.add_argument(
"--max-evt",
"-m",
dest="max_events",
type=int,
default=None,
help="Max. number of processed showers",
)
parser.add_argument(
"--magic-only",
dest="magic_only",
action="store_true",
help="Process file(s) for MAGIC-only analysis",
)
parser.add_argument(
"--process-run",
dest="process_run",
action="store_true",
help="Process the events of all the subrun files at once",
)
args = parser.parse_args()
with open(args.config_file, "rb") as f:
config = yaml.safe_load(f)
# Checking if the input telescope list is properly organized:
check_input_list(config)
# Process the input data
magic_calib_to_dl1(
args.input_file,
args.output_dir,
config,
args.max_events,
args.magic_only,
args.process_run,
)
logger.info("\nDone.")
process_time = time.time() - start_time
logger.info(f"\nProcess time: {process_time:.0f} [sec]\n")
if __name__ == "__main__":
main()