Source code for magicctapipe.scripts.lst1_magic.lst1_magic_stereo_reco

#!/usr/bin/env python
# coding: utf-8

"""
This script processes DL1 events and reconstructs the geometrical stereo
parameters with more than one telescope information. The quality cuts
specified in the configuration file are applied to the events before the
reconstruction.

When the input is real data containing LST and MAGIC events, it checks
the angular distances of their pointing directions and excludes the
events taken with larger distances than the limit specified in the
configuration file. This is in principle to avoid the reconstruction of
the events taken in too-mispointing situations. For example, DL1 data
may contain the coincident events taken with different wobble offsets
between the systems.

If the `--magic-only` argument is given, it reconstructs the stereo
parameters using only MAGIC events.

Usage:
$ python lst1_magic_stereo_reco.py
--input-file dl1_coincidence/dl1_LST-1_MAGIC.Run03265.0040.h5
(--output-dir dl1_stereo)
(--config-file config.yaml)
(--magic-only)
"""

import argparse
import logging
import sys
import time
from pathlib import Path

import numpy as np
import pandas as pd
import yaml
from astropy import units as u
from astropy.coordinates import Angle, angular_separation
from ctapipe.containers import (
    ArrayEventContainer,
    CameraHillasParametersContainer,
    ImageParametersContainer,
    LeakageContainer,
    MorphologyContainer,
    TimingParametersContainer,
)
from ctapipe.instrument import SubarrayDescription
from ctapipe.io import read_table, write_table
from ctapipe.reco import HillasReconstructor
from lstchain.io import HDF5_ZSTD_FILTERS
from numpy.linalg import LinAlgError

from magicctapipe.io import (
    check_input_list,
    format_object,
    get_stereo_events,
    save_pandas_data_in_table,
)
from magicctapipe.utils import (
    NO_EVENTS_WITHIN_MAXIMUM_DISTANCE,
    calculate_mean_direction,
)

__all__ = ["calculate_pointing_separation", "stereo_reconstruction"]

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)


def calculate_pointing_separation(event_data, config):
    """
    Calculates the angular distance of the LST and MAGIC pointing
    directions.

    Parameters
    ----------
    event_data : pandas.core.frame.DataFrame
        Data frame of LST and MAGIC events
    config : dict
        Configuration for the LST + MAGIC analysis

    Returns
    -------
    pandas.core.series.Series
        Angular distance of the LST array and MAGIC pointing directions
        in units of degree
    """

    assigned_tel_ids = config[
        "mc_tel_ids"
    ]  # This variable becomes a dictionary, e.g.: {'LST-1': 1, 'LST-2': 0, 'LST-3': 0, 'LST-4': 0, 'MAGIC-I': 2, 'MAGIC-II': 3}
    LSTs_IDs = np.asarray(list(assigned_tel_ids.values())[0:4])
    LSTs_IDs = list(LSTs_IDs[LSTs_IDs > 0])  # Here we list only the LSTs in use
    MAGICs_IDs = np.asarray(list(assigned_tel_ids.values())[4:6])
    MAGICs_IDs = list(MAGICs_IDs[MAGICs_IDs > 0])  # Here we list only the MAGICs in use

    # Extract LST events
    df_lst = event_data.query(f"tel_id == {LSTs_IDs}")

    # Extract the coincident events observed by both MAGICs and LSTs
    df_magic = event_data.query(f"tel_id == {MAGICs_IDs}")
    df_magic = df_magic.loc[df_lst.index]

    # Calculate the mean of the LSTs, and also of the M1 and M2 pointing directions
    pnt_az_LST, pnt_alt_LST = calculate_mean_direction(
        lon=df_lst["pointing_az"], lat=df_lst["pointing_alt"], unit="rad"
    )
    pnt_az_magic, pnt_alt_magic = calculate_mean_direction(
        lon=df_magic["pointing_az"], lat=df_magic["pointing_alt"], unit="rad"
    )

    # Calculate the angular distance of their pointing directions
    theta = angular_separation(
        lon1=u.Quantity(pnt_az_LST, unit="rad"),
        lat1=u.Quantity(pnt_alt_LST, unit="rad"),
        lon2=u.Quantity(pnt_az_magic, unit="rad"),
        lat2=u.Quantity(pnt_alt_magic, unit="rad"),
    )

    theta = pd.Series(data=theta.to_value("deg"), index=df_lst.index)

    return theta


[docs] def stereo_reconstruction(input_file, output_dir, config, magic_only_analysis=False): """ Processes DL1 events and reconstructs the geometrical stereo parameters with more than one telescope information. Parameters ---------- input_file : str Path to an input DL1 data file output_dir : str Path to a directory where to save an output DL1-stereo data file config : dict Configuration file for the stereo LST + MAGIC analysis, i.e. config_stereo.yaml magic_only_analysis : bool, optional If `True`, it reconstructs the stereo parameters using only MAGIC events """ config_stereo = config["stereo_reco"] assigned_tel_ids = config[ "mc_tel_ids" ] # This variable becomes a dictionary, e.g.: {'LST-1': 1, 'LST-2': 0, 'LST-3': 0, 'LST-4': 0, 'MAGIC-I': 2, 'MAGIC-II': 3} save_images = config.get("save_images", False) # Load the input file logger.info(f"\nInput file: {input_file}") event_data = pd.read_hdf(input_file, key="events/parameters") # It sometimes happens that there are MAGIC events whose event and # telescope IDs are duplicated, so here we exclude those events event_data.drop_duplicates( subset=["obs_id", "event_id", "tel_id"], keep=False, inplace=True ) event_data.set_index(["obs_id", "event_id", "tel_id"], inplace=True) event_data.sort_index(inplace=True) is_simulation = "true_energy" in event_data.columns logger.info(f"\nIs simulation: {is_simulation}") subarray = SubarrayDescription.from_hdf(input_file) tel_positions = subarray.positions logger.info("\nTelescope positions:") logger.info(format_object(tel_positions)) # Apply the event cuts logger.info(f"\nMAGIC-only analysis: {magic_only_analysis}") LSTs_IDs = np.asarray(list(assigned_tel_ids.values())[0:4]) if magic_only_analysis: tel_id = np.asarray(list(assigned_tel_ids.values())[:]) used_id = tel_id[tel_id != 0] magic_ids = [item for item in used_id if item not in LSTs_IDs] event_data.query( f"tel_id in {magic_ids}", inplace=True ) # Here we select only the events with the MAGIC tel_ids logger.info(f"\nQuality cuts: {config_stereo['quality_cuts']}") event_data = get_stereo_events( event_data, config=config, quality_cuts=config_stereo["quality_cuts"] ) # Check the angular distance of the LST and MAGIC pointing directions tel_ids = np.unique(event_data.index.get_level_values("tel_id")).tolist() Number_of_LSTs_in_use = len(LSTs_IDs[LSTs_IDs > 0]) MAGICs_IDs = np.asarray(list(assigned_tel_ids.values())[4:6]) Number_of_MAGICs_in_use = len(MAGICs_IDs[MAGICs_IDs > 0]) Two_arrays_are_used = Number_of_LSTs_in_use * Number_of_MAGICs_in_use > 0 if (not is_simulation) and (Two_arrays_are_used): logger.info( "\nChecking the angular distances of " "the LST and MAGIC pointing directions..." ) event_data.reset_index(level="tel_id", inplace=True) # Calculate the angular distance theta = calculate_pointing_separation(event_data, config) theta_uplim = u.Quantity(config_stereo["theta_uplim"]) mask = u.Quantity(theta, unit="deg") < theta_uplim try: if all(mask): logger.info( "--> All the events were taken with smaller angular distances " f"than the limit {theta_uplim}." ) elif not any(mask): raise Exception( f"--> All the events were taken with larger angular distances " f"than the limit {theta_uplim}. Exiting..." ) else: logger.info( f"--> Exclude {np.count_nonzero(mask)} stereo events whose " f"angular distances are larger than the limit {theta_uplim}." ) event_data = event_data.loc[theta[mask].index] except Exception as e: logger.error(f"{e}") sys.exit(NO_EVENTS_WITHIN_MAXIMUM_DISTANCE) event_data.set_index("tel_id", append=True, inplace=True) # Configure the HillasReconstructor hillas_reconstructor = HillasReconstructor(subarray) # Calculate the mean pointing direction pnt_az_mean, pnt_alt_mean = calculate_mean_direction( lon=event_data["pointing_az"], lat=event_data["pointing_alt"], unit="rad" ) # Loop over every shower event logger.info("\nReconstructing the stereo parameters...") multi_indices = event_data.groupby(["obs_id", "event_id"]).size().index for i_evt, (obs_id, event_id) in enumerate(multi_indices): if i_evt % 100 == 0: logger.info(f"{i_evt} events") # Create an array event container event = ArrayEventContainer() # Assign the mean pointing direction event.pointing.array_altitude = pnt_alt_mean.loc[(obs_id, event_id)] * u.rad event.pointing.array_azimuth = pnt_az_mean.loc[(obs_id, event_id)] * u.rad # Extract the data frame of the shower event df_evt = event_data.loc[(obs_id, event_id, slice(None))] # Loop over every telescope tel_ids = df_evt.index.get_level_values("tel_id").values event.trigger.tels_with_trigger = tel_ids for tel_id in tel_ids: df_tel = df_evt.loc[tel_id] # Assign the telescope information event.pointing.tel[tel_id].altitude = df_tel["pointing_alt"] * u.rad event.pointing.tel[tel_id].azimuth = df_tel["pointing_az"] * u.rad hillas_params = CameraHillasParametersContainer( intensity=float(df_tel["intensity"]), x=u.Quantity(df_tel["x"], unit="m"), y=u.Quantity(df_tel["y"], unit="m"), r=u.Quantity(df_tel["r"], unit="m"), phi=Angle(df_tel["phi"], unit="deg"), psi=Angle(df_tel["psi"], unit="deg"), length=u.Quantity(df_tel["length"], unit="m"), length_uncertainty=u.Quantity(df_tel["length_uncertainty"], unit="m"), width=u.Quantity(df_tel["width"], unit="m"), width_uncertainty=u.Quantity(df_tel["width_uncertainty"], unit="m"), skewness=float(df_tel["skewness"]), kurtosis=float(df_tel["kurtosis"]), ) time_par = TimingParametersContainer( slope=u.Quantity(df_tel["slope"], unit="1/deg"), intercept=float(df_tel["intercept"]), ) leak_par = LeakageContainer( intensity_width_1=float(df_tel["intensity_width_1"]), intensity_width_2=float(df_tel["intensity_width_2"]), pixels_width_1=float(df_tel["pixels_width_1"]), pixels_width_2=float(df_tel["pixels_width_2"]), ) morph_par = MorphologyContainer( n_islands=int(df_tel["n_islands"]), # n_large_islands=int(df_morph_tel["n_large_islands"]), # n_medium_islands=int(df_morph_tel["n_medium_islands"]), # n_small_islands=int(df_morph_tel["n_small_islands"]), n_large_islands=np.nan, n_medium_islands=np.nan, n_small_islands=np.nan, n_pixels=int(df_tel["n_pixels"]), ) event.dl1.tel[tel_id].parameters = ImageParametersContainer( hillas=hillas_params, timing=time_par, leakage=leak_par, morphology=morph_par, ) # event.dl1.tel[tel_id].is_valid=True # Reconstruct the stereo parameters try: hillas_reconstructor(event) except LinAlgError: logger.info( f"--> event {i_evt} (event ID {event_id}) failed to reconstruct valid " "stereo parameters, caught LinAlgError exception. Skipping..." ) continue reconstructor_name = "HillasReconstructor" stereo_params = event.dl2.stereo.geometry[reconstructor_name] if not stereo_params.is_valid: logger.info( f"--> event {i_evt} (event ID {event_id}) failed to reconstruct valid " "stereo parameters, maybe due to the images of zero width. Skipping..." ) continue stereo_params.az.wrap_at("360 deg", inplace=True) for tel_id in tel_ids: # Set the stereo parameters to the data frame params = { "alt": stereo_params.alt.to_value("deg"), "alt_uncert": stereo_params.alt_uncert.to_value("deg"), "az": stereo_params.az.to_value("deg"), "az_uncert": stereo_params.az_uncert.to_value("deg"), "core_x": stereo_params.core_x.to_value("m"), "core_y": stereo_params.core_y.to_value("m"), "impact": event.dl2.tel[tel_id] .impact[reconstructor_name] .distance.to_value("m"), "h_max": stereo_params.h_max.to_value("m"), "is_valid": int(stereo_params.is_valid), } event_data.loc[(obs_id, event_id, tel_id), params.keys()] = params.values() n_events_processed = i_evt + 1 logger.info(f"{n_events_processed} events") event_data.reset_index(inplace=True) # Save the data in an output file Path(output_dir).mkdir(exist_ok=True, parents=True) input_file_name = Path(input_file).name if magic_only_analysis: output_file_name = input_file_name.replace("dl1", "dl1_stereo_magic_only") else: output_file_name = input_file_name.replace("dl1", "dl1_stereo") output_file = f"{output_dir}/{output_file_name}" save_pandas_data_in_table( event_data, output_file, group_name="/events", table_name="parameters" ) if save_images: tel_ids = np.unique(event_data["tel_id"]).tolist() for tel_id in tel_ids: key = f"/events/dl1/image_{tel_id}" image = read_table(input_file, key) mask = np.zeros(len(image), dtype=bool) tag = "lst" if tel_id in LSTs_IDs else "magic" obs_evt_ids = event_data.query(f"tel_id=={tel_id}")[ [f"obs_id_{tag}", f"event_id_{tag}"] ].to_numpy() for obs_id, evt_id in obs_evt_ids: this_mask = np.logical_and( image["obs_id"] == obs_id, image["event_id"] == evt_id ) mask = np.logical_or(mask, this_mask) logger.info(f"found {sum(mask)} images of tel_id={tel_id}") write_table( image[mask], output_file, key, overwrite=True, filters=HDF5_ZSTD_FILTERS, ) # Save the subarray description subarray.to_hdf(output_file) if is_simulation: # Save the simulation configuration sim_config = pd.read_hdf(input_file, key="simulation/config") save_pandas_data_in_table( input_data=sim_config, output_file=output_file, group_name="/simulation", table_name="config", mode="a", ) logger.info(f"\nOutput file: {output_file}")
def main(): """Main function.""" start_time = time.time() parser = argparse.ArgumentParser() parser.add_argument( "--input-file", "-i", dest="input_file", type=str, required=True, help="Path to an input DL1 data file", ) parser.add_argument( "--output-dir", "-o", dest="output_dir", type=str, default="./data", help="Path to a directory where to save an output DL1-stereo data file", ) parser.add_argument( "--config-file", "-c", dest="config_file", type=str, default="./config.yaml", help="Path to a configuration file", ) parser.add_argument( "--magic-only", dest="magic_only", action="store_true", help="Reconstruct the stereo parameters using only MAGIC events", ) args = parser.parse_args() with open(args.config_file, "rb") as f: config = yaml.safe_load(f) # Checking if the input telescope list is properly organized: check_input_list(config) # Process the input data stereo_reconstruction(args.input_file, args.output_dir, config, args.magic_only) logger.info("\nDone.") process_time = time.time() - start_time logger.info(f"\nProcess time: {process_time:.0f} [sec]\n") if __name__ == "__main__": main()