magic-cta-pipe is under active development. Expect large and rapid changes in functionality.

Source code for magicctapipe.utils.badpixels

"""
MAGIC bad pixels calculation
"""
import numpy as np
from astropy.time import Time
from ctapipe.instrument import CameraGeometry

__all__ = [
    "MAGICBadPixelsCalc",
]


[docs] class MAGICBadPixelsCalc: """Calculate bad pixels for MAGIC camera. Parameters ---------- is_simulation : bool Flag for simulation or real data camera : ctapipe.instrument.camera.geometry.CameraGeometry, optional Camera geometry, by default None config : dict, optional Configuration dictionary, by default None """ def __init__(self, is_simulation, camera=None, config=None): """Initialize class. Parameters ---------- is_simulation : bool Flag for simulation or real data camera : ctapipe.instrument.camera.geometry.CameraGeometry, optional Camera geometry, by default None config : dict, optional Configuration dictionary, by default None Raises ------ ValueError Error if wrong keys. """ # MAGIC telescope description if camera is None: camera = CameraGeometry.from_name("MAGICCam") self.n_camera_pixels = camera.n_pixels # initialize bad pixel mask. Will updated for each telescope/sample by # self._check_pedvar_fields() self.badrmspixel_mask = np.zeros(self.n_camera_pixels, dtype=np.bool) self.config = config if "pedestalLevel" in config: self.pedestalLevel = config["pedestalLevel"] else: self.pedestalLevel = 400.0 if "pedestalLevelVariance" in config: self.pedestalLevelVariance = config["pedestalLevelVariance"] else: self.pedestalLevelVariance = 4.5 if "pedestalType" in config: pedestalTypeName = config["pedestalType"] if pedestalTypeName == "Fundamental": self.pedestalType = 0 elif pedestalTypeName == "FromExtractor": self.pedestalType = 1 elif pedestalTypeName == "FromExtractorRndm": self.pedestalType = 2 else: raise ValueError( "pedestalType must be chosen from 'Fundamental', 'FromExtractor', or 'FromExtractorRndm'" ) else: self.pedestalType = 2 self.current_obs_id = -1 # Pedestal sample times and outlier masks are reduced to the unique # outlier masks: In MARS files, mayn duplicate entries are present, # and removing them onces significantly speeds up searching the correct # mask for a given event. self.sample_times_ped = [[], []] self.n_samples_ped = np.zeros(2, dtype=np.int16) - 1 self.charge_std_outliers = [[], []] # self.charge_std = [[], []] # Dead pixels masks change for every subrun and are directly used from # the MARS data self.sample_ranges_dead = [None, None] self.n_samples_dead = np.zeros(2, dtype=np.int16) - 1 # Allow processing of MCs (do nothing, but also don't crash) self.is_mc = is_simulation def _check_new_run(self, event): """Initializes or resets for each new run subrun-wise dead pixel samples or pedestal info with computed outlier masks. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. """ if event.index.obs_id != self.current_obs_id: self.sample_times_ped = [[], []] self.n_samples_ped = np.zeros(2, dtype=np.int16) - 1 self.charge_std_outliers = [[], []] # self.charge_std = [[], []] self.sample_ranges_dead = [None, None] self.n_samples_dead = np.zeros(2, dtype=np.int16) - 1 self.current_obs_id = event.index.obs_id def _check_pedestal_rms(self, charge_std): """ This internal method calculates the pedestal outlier pixels depending on the values in self.config. Corresponds to mbadpixels/MBadPixelsCalc::CheckPedestalRms() Parameters ---------- charge_std : np.ndarray Charge standard deviation. Returns ------- bool Mask with the Pedestal RMS outliers. """ if (len(charge_std)) != self.n_camera_pixels: print(len(charge_std)) print(self.n_camera_pixels) raise ValueError( "charge_std must be an array of length equal to number of MAGIC camera pixels" ) meanrms = 0.0 npix = 0 for i in range(self.n_camera_pixels): if charge_std[i] <= 0 or charge_std[i] >= 200 * self._getpixratiosqrt(i): continue # const Byte_t aidx = (*fGeomCam)[i].GetAidx(); meanrms += charge_std[i] npix += 1 # if no pixel has a minimum signal, return if meanrms == 0: return False meanrms /= npix meanrms2 = 0.0 varrms2 = 0.0 npix = 0 for i in range(self.n_camera_pixels): # Calculate the corrected means: if charge_std[i] <= 0.5 * meanrms or charge_std[i] >= 1.5 * meanrms: continue meanrms2 += charge_std[i] varrms2 += charge_std[i] ** 2 npix += 1 # if no pixel has a minimum signal, return lolim1 = 0 lolim2 = 0 # Precalcualtion of limits uplim1 = 0 uplim2 = 0 # for speeed reasons if npix == 0 or meanrms2 == 0: return False meanrms2 /= npix if self.pedestalLevel > 0: lolim1 = meanrms2 / self.pedestalLevel uplim1 = meanrms2 * self.pedestalLevel if self.pedestalLevelVariance > 0: varrms2 /= npix varrms2 = np.sqrt(varrms2 - meanrms2 * meanrms2) lolim2 = meanrms2 - self.pedestalLevelVariance * varrms2 uplim2 = meanrms2 + self.pedestalLevelVariance * varrms2 bads = 0 # Blind the Bad Pixels for i in range(self.n_camera_pixels): if ( self.pedestalLevel <= 0 or (charge_std[i] > lolim1 and charge_std[i] <= uplim1) ) and ( self.pedestalLevelVariance <= 0 or (charge_std[i] > lolim2 and charge_std[i] <= uplim2) ): continue self.badrmspixel_mask[i] = True # if (charge_std[i] <= lolim1 or charge_std[i] <= lolim2): # self.coldpixels[i] = True # elif (charge_std[i] > uplim1 or charge_std[i] > uplim2): # self.hotpixels[i] = True bads += 1 return True def _getpixratiosqrt(self, i_pix): """Return pixels ration Parameters ---------- i_pix : int Pixel index Returns ------- float Pixel ratio. """ # i_pixzero = np.where(self.geom.pix_id == 0)[0][0] # return np.sqrt(self.geom.pix_area[i_pix] / self.geom.pix_area[i_pixzero]) return 1.0
[docs] def get_badrmspixel_mask(self, event): """ Fetch the bad RMS pixel mask for a given event, that is the event time. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- bool Has two dimensions: masks for M1 and/or M2. """ badrmspixel_mask = [None, None] if self.is_mc: for tel_id in event.trigger.tels_with_trigger: badrmspixel_mask[tel_id - 1] = np.zeros( self.n_camera_pixels, dtype=np.bool ) return badrmspixel_mask self._check_new_run(event) event_time = event.trigger.time.unix for tel_id in event.trigger.tels_with_trigger: self._check_pedvar_fields(tel_id, event) # now find monitoring data sample matching to this event by time stamp: if event_time <= self.sample_times_ped[tel_id - 1][0]: i_min = 0 else: i_min = np.where(event_time > self.sample_times_ped[tel_id - 1])[0][-1] badrmspixel_mask[tel_id - 1] = self.charge_std_outliers[tel_id - 1][i_min] return badrmspixel_mask
[docs] def get_badrmspixel_indices(self, event): """ Quick workaround to get the pixel IDs and not the mask Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- list Bad pixels indices. """ badrmspixel_indices = [[None], [None]] if self.is_mc: return badrmspixel_indices badrmspixel_mask = self.get_badrmspixel_mask(self, event) for i, badrmspixelmask_tel_i in enumerate(badrmspixel_mask): if badrmspixelmask_tel_i != [None]: badrmspixel_indices[i] = np.where(badrmspixelmask_tel_i)[0] return badrmspixel_indices
def _check_pedvar_fields(self, tel_id, event): """ Update the pedestal RMS outliers. Does it only once after initializing the class. and reduces the samples to the unique ones. Parameters ---------- tel_id : int Telescope ID event : ctapipe.containers.ArrayEventContainer Event container. """ if self.n_samples_ped[tel_id - 1] == -1: self.n_samples_ped[tel_id - 1] = len( event.mon.tel[tel_id].pedestal.sample_time ) # calculate only once the hot pixel array of the monitoring data: print("Update hot pixels for M%d..." % tel_id, end=" ") event.mon.tel[tel_id].pedestal.charge_std_outliers = [] for i_sample in range(self.n_samples_ped[tel_id - 1]): charge_std = event.mon.tel[tel_id].pedestal.charge_std[ self.pedestalType ][i_sample] if i_sample == 0: charge_std_last = None else: charge_std_last = event.mon.tel[tel_id].pedestal.charge_std[ self.pedestalType ][i_sample - 1] if not np.array_equal(charge_std, charge_std_last): self.sample_times_ped[tel_id - 1].append( event.mon.tel[tel_id].pedestal.sample_time[i_sample].unix ) self.badrmspixel_mask = np.zeros( self.n_camera_pixels, dtype=np.bool ) self._check_pedestal_rms(charge_std) self.charge_std_outliers[tel_id - 1].append(self.badrmspixel_mask) # self.charge_std[tel_id - 1].append(charge_std) self.sample_times_ped[tel_id - 1] = np.array( self.sample_times_ped[tel_id - 1] ) self.charge_std_outliers[tel_id - 1] = np.array( self.charge_std_outliers[tel_id - 1], dtype=np.bool ) # self.charge_std[tel_id - 1] = np.array(self.charge_std[tel_id - 1]) print("done.")
[docs] def get_deadpixel_mask(self, event): """ Fetch the subrun-wise defined dead pixels for a given event, that is the event time. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- bool Has two dimensions: masks for M1 and/or M2. """ deadpixel_mask = [[None], [None]] if self.is_mc: for tel_id in event.trigger.tels_with_trigger: deadpixel_mask[tel_id - 1] = np.zeros( self.n_camera_pixels, dtype=np.bool ) return deadpixel_mask self._check_new_run(event) event_time = event.trigger.time.unix sample_time_range = Time( [ [ event.mon.tel[1].pedestal.sample_time[0].unix, event.mon.tel[1].pedestal.sample_time[-1].unix, ] ], format="unix", scale="utc", ) for tel_id in event.trigger.tels_with_trigger: if self.n_samples_dead[tel_id - 1] == -1: # self.n_samples_dead[tel_id - 1] = len(event.mon.tel[tel_id].pixel_status.sample_time_range) self.n_samples_dead[tel_id - 1] = len(sample_time_range) self.sample_ranges_dead[tel_id - 1] = np.zeros( shape=(self.n_samples_dead[tel_id - 1], 2) ) for i in range(self.n_samples_dead[tel_id - 1]): # self.sample_ranges_dead[tel_id - 1][i,0] = event.mon.tel[tel_id].pixel_status.sample_time_range[i][0].unix # self.sample_ranges_dead[tel_id - 1][i,1] = event.mon.tel[tel_id].pixel_status.sample_time_range[i][1].unix self.sample_ranges_dead[tel_id - 1][i, 0] = sample_time_range[i][ 0 ].unix self.sample_ranges_dead[tel_id - 1][i, 1] = sample_time_range[i][ 1 ].unix # now find sample: indices_time_dead = np.where( event_time >= self.sample_ranges_dead[tel_id - 1][:, 0] )[0] if indices_time_dead.size: i_min_dead = indices_time_dead[-1] else: i_min_dead = 0 deadpixel_mask[tel_id - 1] = event.mon.tel[ tel_id ].pixel_status.hardware_failing_pixels[i_min_dead] return deadpixel_mask
[docs] def get_badpixel_mask(self, event): """ Fetch the union of bad RMS and bad pixels for a given event, that is the event time. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- bool Has two dimensions: masks for M1 and/or M2. """ badpixel_mask = [[None], [None]] if self.is_mc: for tel_id in event.trigger.tels_with_trigger: badpixel_mask[tel_id - 1] = np.zeros( self.n_camera_pixels, dtype=np.bool ) return badpixel_mask badrmspixel_mask = self.get_badrmspixel_mask(event) deadpixel_mask = self.get_deadpixel_mask(event) for tel_id in event.trigger.tels_with_trigger: badpixel_mask[tel_id - 1] = np.logical_or( badrmspixel_mask[tel_id - 1], deadpixel_mask[tel_id - 1] ) return badpixel_mask
# def get_charge_std(self, event): # """ # Fetch the pedestal RMS pixel values for a given event, that is the event time. # # Returns # ------- # charge_std: has two dimensions: M1 and/or M2. # """ # # charge_std = [None, None] # event_time = event.trigger.time.unix # # for tel_id in event.trigger.tels_with_trigger: # # self._check_pedvar_fields(tel_id, event) # # # now find monitoring data sample matching to this event by time stamp: # if event_time <= self.sample_times_ped[tel_id - 1][0]: # i_min = 0 # else: # i_min = np.where(event_time > self.sample_times_ped[tel_id - 1])[0][-1] # # charge_std[tel_id - 1] = self.charge_std[tel_id - 1][i_min] # # return charge_std