Source code for magicctapipe.image.calib

"""
Module for calibration
"""
import numpy as np
from ctapipe.image import apply_time_delta_cleaning, number_of_islands, tailcuts_clean
from ctapipe.instrument import CameraGeometry
from lstchain.image.cleaning import apply_dynamic_cleaning
from lstchain.image.modifier import (
    add_noise_in_pixels,
    random_psf_smearer,
    set_numba_seed,
)

from .cleaning import MAGICClean

__all__ = ["calibrate"]


[docs] def calibrate( event, tel_id, config, calibrator, is_lst, obs_id=None, camera_geoms=None, magic_clean=None, ): """ This function calibrates the camera image for a single event of a telescope Parameters ---------- event : ctapipe.containers.ArrayEventContainer From an EventSource tel_id : int Telescope ID config : dict Parameters for image extraction and calibration calibrator : ctapipe.calib.CameraCalibrator `ctapipe` object needed to calibrate the camera is_lst : bool Whether the telescope is a LST obs_id : int, optional Observation ID. Unused in case of LST telescope, by default None camera_geoms : ctapipe.instrument.camera.geometry.CameraGeometry, optional Camera geometry. Used in case of LST telescope, by default None magic_clean : dict, optional Each entry is a MAGICClean object using the telescope camera geometry. Used in case of MAGIC telescope, by default None Returns ------- tuple Mask of the pixels selected by the cleaning, array of number of p.e. in the camera pixels, array of the signal peak time in the camera pixels """ if (not is_lst) and (magic_clean is None): raise ValueError( "Check the provided parameters and the telescope type; MAGIC calibration not possible if magic_clean not provided" ) if (is_lst) and (obs_id is None): raise ValueError( "Check the provided parameters and the telescope type; LST calibration not possible if obs_id not provided" ) if (is_lst) and (camera_geoms is None): raise ValueError( "Check the provided parameters and the telescope type; LST calibration not possible if gamera_geoms not provided" ) if (not is_lst) and (type(magic_clean[tel_id]) != MAGICClean): raise ValueError( "Check the provided magic_clean parameter; MAGIC calibration not possible if magic_clean not a dictionary of MAGICClean objects" ) if (is_lst) and (type(camera_geoms[tel_id]) != CameraGeometry): raise ValueError( "Check the provided camera_geoms parameter; LST calibration not possible if camera_geoms not a dictionary of CameraGeometry objects" ) calibrator._calibrate_dl0(event, tel_id) calibrator._calibrate_dl1(event, tel_id) image = event.dl1.tel[tel_id].image.astype(np.float64) peak_time = event.dl1.tel[tel_id].peak_time.astype(np.float64) if not is_lst: use_charge_correction = config["charge_correction"]["use"] if use_charge_correction: # Scale the charges by the correction factor image *= config["charge_correction"]["factor"] # Apply the image cleaning signal_pixels, image, peak_time = magic_clean[tel_id].clean_image( event_image=image, event_pulse_time=peak_time ) nsb_dict = "increase_nsb" if not is_lst: if config["mc_tel_ids"]["MAGIC-I"] == tel_id: nsb_dict += "_m1" if config["mc_tel_ids"]["MAGIC-II"] == tel_id: nsb_dict += "_m2" if nsb_dict in config: increase_nsb = config[nsb_dict].pop("use") if increase_nsb: rng = np.random.default_rng(event.index.event_id) # Add extra noise in pixels image = add_noise_in_pixels(rng, image, **config[nsb_dict]) config[nsb_dict]["use"] = increase_nsb if is_lst: increase_psf = config["increase_psf"]["use"] use_time_delta_cleaning = config["time_delta_cleaning"].pop("use") use_dynamic_cleaning = config["dynamic_cleaning"].pop("use") use_only_main_island = config["use_only_main_island"] if increase_psf: set_numba_seed(obs_id) # Smear the image image = random_psf_smearer( image=image, fraction=config["increase_psf"]["fraction"], indices=camera_geoms[tel_id].neighbor_matrix_sparse.indices, indptr=camera_geoms[tel_id].neighbor_matrix_sparse.indptr, ) # Apply the image cleaning signal_pixels = tailcuts_clean( camera_geoms[tel_id], image, **config["tailcuts_clean"] ) if use_time_delta_cleaning: signal_pixels = apply_time_delta_cleaning( geom=camera_geoms[tel_id], mask=signal_pixels, arrival_times=peak_time, **config["time_delta_cleaning"], ) if use_dynamic_cleaning: signal_pixels = apply_dynamic_cleaning( image, signal_pixels, **config["dynamic_cleaning"] ) if use_only_main_island: _, island_labels = number_of_islands(camera_geoms[tel_id], signal_pixels) n_pixels_on_island = np.bincount(island_labels.astype(np.int64)) # The first index means the pixels not surviving # the cleaning, so should not be considered n_pixels_on_island[0] = 0 max_island_label = np.argmax(n_pixels_on_island) signal_pixels[island_labels != max_island_label] = False config["time_delta_cleaning"]["use"] = use_time_delta_cleaning config["dynamic_cleaning"]["use"] = use_dynamic_cleaning return signal_pixels, image, peak_time