Source code for magicctapipe.scripts.lst1_magic.lst1_magic_dl1_stereo_to_dl2

#!/usr/bin/env python
# coding: utf-8

"""
This script processes DL1-stereo events and reconstructs the DL2
parameters, i.e., energy, direction and gammaness, with trained RFs.

For reconstructing the arrival directions, it uses the MARS-like DISP
method, i.e., select the closest combination with which the sum of the
angular distances of all the head and tail candidates becomes minimum.

Usage:
$ python lst1_magic_dl1_stereo_to_dl2.py
--input-file-dl1 dl1_stereo/dl1_stereo_LST-1_MAGIC.Run03265.0040.h5
--input-dir-rfs rfs
(--output-dir dl2)
"""

import argparse
import glob
import itertools
import logging
import time
from pathlib import Path

import numpy as np
import pandas as pd
from astropy import units as u
from astropy.coordinates import AltAz, SkyCoord, angular_separation
from ctapipe.coordinates import TelescopeFrame
from ctapipe.instrument import SubarrayDescription

from magicctapipe.io import get_stereo_events_old, save_pandas_data_in_table
from magicctapipe.reco import DispRegressor, EnergyRegressor, EventClassifier

__all__ = ["apply_rfs", "reconstruct_arrival_direction", "dl1_stereo_to_dl2"]

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
TEL_COMBINATIONS = {
    "LST1_M1": [1, 2],  # combo_type = 0
    "LST1_M1_M2": [1, 2, 3],  # combo_type = 1
    "LST1_M2": [1, 3],  # combo_type = 2
    "M1_M2": [2, 3],  # combo_type = 3
}  # TODO: REMOVE WHEN SWITCHING TO THE NEW RFs IMPLEMENTTATION (1 RF PER TELESCOPE)


def apply_rfs(event_data, estimator):
    """
    Applies trained RFs to DL1-stereo events, whose telescope
    combination type is same as the RFs.

    Parameters
    ----------
    event_data : pandas.core.frame.DataFrame
        Data frame of shower events
    estimator : magicctapipe.reco.estimator
        Trained regressor or classifier

    Returns
    -------
    pandas.core.frame.DataFrame
        Data frame of the shower events with reconstructed parameters
    """

    tel_ids = list(estimator.telescope_rfs.keys())

    # Extract the events of the same telescope combination type
    combo_type = list(TEL_COMBINATIONS.values()).index(tel_ids)
    df_events = event_data.query(f"combo_type == {combo_type}")

    # Apply the RFs
    reco_params = estimator.predict(df_events)

    return reco_params


def reconstruct_arrival_direction(event_data, tel_descriptions):
    """
    Reconstructs the arrival directions of shower events with the
    MARS-like DISP method.

    Parameters
    ----------
    event_data : pandas.core.frame.DataFrame
        Data frame of shower events
    tel_descriptions : dict
        Telescope descriptions

    Returns
    -------
    pandas.core.frame.DataFrame
        Data frame of the shower events with reconstructed directions
    """

    params_with_flips = pd.DataFrame()

    # First of all, we reconstruct the directions of all the head and
    # tail candidates for every telescope image, i.e., the directions
    # separated by the DISP parameter from the image CoG along the
    # shower main axis. The `flip` parameter distinguishes them.

    tel_ids = np.unique(event_data.index.get_level_values("tel_id"))

    for tel_id in tel_ids:
        df_events = event_data.query(f"tel_id == {tel_id}")

        tel_pointing = AltAz(
            alt=u.Quantity(df_events["pointing_alt"], unit="rad"),
            az=u.Quantity(df_events["pointing_az"], unit="rad"),
        )

        tel_frame = TelescopeFrame(telescope_pointing=tel_pointing)

        cog_coord = SkyCoord(
            u.Quantity(df_events["x"], unit="m"),
            u.Quantity(df_events["y"], unit="m"),
            frame=tel_descriptions[tel_id].camera.geometry.frame,
        )

        cog_coord = cog_coord.transform_to(tel_frame)

        for flip in [0, 1]:
            psi_flipped = df_events["psi"] + 180 * flip

            event_coord = cog_coord.directional_offset_by(
                position_angle=u.Quantity(psi_flipped, unit="deg"),
                separation=u.Quantity(df_events["reco_disp"], unit="deg"),
            )

            event_coord = event_coord.altaz

            df_altaz = pd.DataFrame(
                data={
                    "flip": flip,
                    "reco_alt": event_coord.alt.to_value("deg"),
                    "reco_az": event_coord.az.to_value("deg"),
                    "combo_type": df_events["combo_type"],
                },
                index=df_events.index,
            )

            params_with_flips = pd.concat([params_with_flips, df_altaz])

    params_with_flips.set_index("flip", append=True, inplace=True)
    params_with_flips.sort_index(inplace=True)

    # Then, we get the flip combination minimizing the angular distances
    # of the head and tail candidates for every shower event. In order
    # to speed up the calculations, here we process the events for every
    # telescope combination types.

    reco_params = pd.DataFrame()

    for combo_type, tel_ids in enumerate(TEL_COMBINATIONS.values()):
        df_events = params_with_flips.query(f"combo_type == {combo_type}")

        n_events = len(df_events.groupby(["obs_id", "event_id"]).size())

        if n_events == 0:
            print(f"Telescope combination {combo_type} has no events. Skipping.")
            continue

        # Here we first define all the possible flip combinations. For
        # example, in case that we have two telescope images, in total
        # 4 combinations are defined as follows:
        #   [(head, head), (head, tail), (tail, head), (tail, tail)]
        # where the i-th element of each tuple means the i-th telescope
        # image. In case of 3 images we have in total 8 combinations.

        flip_combinations = np.array(
            list(itertools.product([0, 1], repeat=len(tel_ids)))
        )

        # Next, we define all the possible 2 telescopes combinations.
        # For example, in case of 3 telescopes, in total 3 combinations
        # are defined as follows:
        #                 [(1, 2), (1, 3), (2, 3)]
        # where the elements of the tuples mean the telescope IDs. In
        # case of 2 telescopes there is only one combination.

        tel_any2_combinations = list(itertools.combinations(tel_ids, 2))

        distances = np.zeros((len(flip_combinations), n_events))

        # Loop over every flip combination
        for i_flip, flip_combo in enumerate(flip_combinations):
            container = {}

            # Set the directions of a given flip combination
            for tel_id, flip in zip(tel_ids, flip_combo):
                container[tel_id] = df_events.query(
                    f"(tel_id == {tel_id}) & (flip == {flip})"
                )

            for tel_id_1, tel_id_2 in tel_any2_combinations:
                # Calculate the distance of the 2-tel combination
                theta = angular_separation(
                    lon1=u.Quantity(container[tel_id_1]["reco_az"], unit="deg"),
                    lat1=u.Quantity(container[tel_id_1]["reco_alt"], unit="deg"),
                    lon2=u.Quantity(container[tel_id_2]["reco_az"], unit="deg"),
                    lat2=u.Quantity(container[tel_id_2]["reco_alt"], unit="deg"),
                )

                # Sum up the distance
                distances[i_flip] += theta.to_value("deg")

        # Extracts the minimum distances and their flip combinations
        distances_min = distances.min(axis=0)
        indices_at_min = distances.argmin(axis=0)

        flips = flip_combinations[indices_at_min].ravel()

        group_size = df_events.groupby(["obs_id", "event_id", "tel_id"]).size()

        obs_ids = group_size.index.get_level_values("obs_id")
        event_ids = group_size.index.get_level_values("event_id")
        tel_ids = group_size.index.get_level_values("tel_id")

        multi_indices = pd.MultiIndex.from_arrays(
            arrays=[obs_ids, event_ids, tel_ids, flips], names=df_events.index.names
        )

        # Keep only the information of the closest combinations
        df_events = df_events.loc[multi_indices]

        # Add the minimum angular distances to the output data frame,
        # since they are useful to separate gamma and hadron events
        # (hadron events tend to have larger distances than gammas)
        df_disp_diffs = pd.DataFrame(
            data={
                "disp_diff_sum": distances_min,
                "disp_diff_mean": distances_min / len(tel_any2_combinations),
            },
            index=df_events.groupby(["obs_id", "event_id"]).size().index,
        )

        df_events = df_events.join(df_disp_diffs)

        reco_params = pd.concat([reco_params, df_events])

    reco_params.reset_index(level="flip", inplace=True)
    reco_params.drop(["flip", "combo_type"], axis=1, inplace=True)
    reco_params.sort_index(inplace=True)

    return reco_params


[docs] def dl1_stereo_to_dl2(input_file_dl1, input_dir_rfs, output_dir): """ Processes DL1-stereo events and reconstructs the DL2 parameters with trained RFs. Parameters ---------- input_file_dl1 : str Path to an input DL1-stereo data file input_dir_rfs : str Path to a directory where trained RFs are stored output_dir : str Path to a directory where to save an output DL2 data file """ # Load the input DL1-stereo data file logger.info(f"\nInput DL1-stereo data file: {input_file_dl1}") event_data = pd.read_hdf(input_file_dl1, key="events/parameters") event_data.set_index(["obs_id", "event_id", "tel_id"], inplace=True) event_data.sort_index(inplace=True) is_simulation = "true_energy" in event_data.columns logger.info(f"\nIs simulation: {is_simulation}") event_data = get_stereo_events_old(event_data) subarray = SubarrayDescription.from_hdf(input_file_dl1) tel_descriptions = subarray.tel logger.info(f"\nInput RF directory: {input_dir_rfs}") mask_energy_regressor = f"{input_dir_rfs}/energy_regressors_*.joblib" mask_disp_regressor = f"{input_dir_rfs}/disp_regressors_*.joblib" mask_event_classifier = f"{input_dir_rfs}/event_classifiers_*.joblib" # Find the energy regressors input_files_energy = glob.glob(mask_energy_regressor) input_files_energy.sort() n_files_energy = len(input_files_energy) if n_files_energy > 0: logger.info(f"\nIn total {n_files_energy} energy regressor files are found:") for input_file_energy in input_files_energy: logger.info(f"Applying {input_file_energy}...") energy_regressor = EnergyRegressor() energy_regressor.load(input_file_energy) # Apply the RFs reco_params = apply_rfs(event_data, energy_regressor) event_data.loc[reco_params.index, reco_params.columns] = reco_params del energy_regressor # Find the DISP regressors input_files_dips = glob.glob(mask_disp_regressor) input_files_dips.sort() n_files_disp = len(input_files_dips) if n_files_disp > 0: logger.info(f"\nIn total {n_files_disp} DISP regressor files are found:") for input_file_disp in input_files_dips: logger.info(f"Applying {input_file_disp}...") disp_regressor = DispRegressor() disp_regressor.load(input_file_disp) # Apply the RFs reco_params = apply_rfs(event_data, disp_regressor) event_data.loc[reco_params.index, reco_params.columns] = reco_params # Reconstruct the arrival directions with the DISP method logger.info("\nReconstructing the arrival directions...") reco_params = reconstruct_arrival_direction(event_data, tel_descriptions) event_data.loc[reco_params.index, reco_params.columns] = reco_params del disp_regressor # Find the event classifiers input_files_class = glob.glob(mask_event_classifier) input_files_class.sort() n_files_class = len(input_files_class) if n_files_class > 0: logger.info(f"\nIn total {n_files_class} event classifier files are found:") for input_file_class in input_files_class: logger.info(f"Applying {input_file_class}...") event_classifier = EventClassifier() event_classifier.load(input_file_class) # Apply the RFs reco_params = apply_rfs(event_data, event_classifier) event_data.loc[reco_params.index, reco_params.columns] = reco_params del event_classifier # In case of MAGIC-only analyses, here we drop `time_sec` and # `time_nanosec` but instead set `timestamp`, since the precise # timestamps are not needed anymore if "time_sec" in event_data.columns: time_sec = u.Quantity(event_data["time_sec"], unit="s") time_nanosec = u.Quantity(event_data["time_nanosec"], unit="ns") timestamps = time_sec + time_nanosec event_data["timestamp"] = timestamps.to_value("s") event_data.drop(columns=["time_sec", "time_nanosec"], inplace=True) event_data.reset_index(inplace=True) # Save the data in an output file Path(output_dir).mkdir(exist_ok=True, parents=True) input_file_name = Path(input_file_dl1).name output_file_name = input_file_name.replace("dl1_stereo", "dl2") output_file = f"{output_dir}/{output_file_name}" save_pandas_data_in_table( event_data, output_file, group_name="/events", table_name="parameters" ) # Save the subarray description subarray.to_hdf(output_file) if is_simulation: # Save the simulation configuration sim_config = pd.read_hdf(input_file_dl1, key="simulation/config") save_pandas_data_in_table( input_data=sim_config, output_file=output_file, group_name="/simulation", table_name="config", mode="a", ) logger.info(f"\nOutput file: {output_file}")
def main(): """Main function.""" start_time = time.time() parser = argparse.ArgumentParser() parser.add_argument( "--input-file-dl1", "-d", dest="input_file_dl1", type=str, required=True, help="Path to an input DL1-stereo data file", ) parser.add_argument( "--input-dir-rfs", "-r", dest="input_dir_rfs", type=str, required=True, help="Path to a directory where trained RFs are stored", ) parser.add_argument( "--output-dir", "-o", dest="output_dir", type=str, default="./data", help="Path to a directory where to save an output DL2 data file", ) args = parser.parse_args() # Process the input data dl1_stereo_to_dl2(args.input_file_dl1, args.input_dir_rfs, args.output_dir) logger.info("\nDone.") process_time = time.time() - start_time logger.info(f"\nProcess time: {process_time:.0f} [sec]\n") if __name__ == "__main__": main()