Source code for magicctapipe.utils.badpixels

"""
MAGIC bad pixels calculation
"""
import numpy as np
from astropy.time import Time
from ctapipe.instrument import CameraGeometry

__all__ = [
    "MAGICBadPixelsCalc",
]


[docs] class MAGICBadPixelsCalc: """Calculate bad pixels for MAGIC camera. Parameters ---------- is_simulation : bool Flag for simulation or real data camera : ctapipe.instrument.camera.geometry.CameraGeometry, optional Camera geometry, by default None config : dict, optional Configuration dictionary, by default None """ def __init__(self, is_simulation, camera=None, config=None): """Initialize class. Parameters ---------- is_simulation : bool Flag for simulation or real data camera : ctapipe.instrument.camera.geometry.CameraGeometry, optional Camera geometry, by default None config : dict, optional Configuration dictionary, by default None Raises ------ ValueError Error if wrong keys. """ # MAGIC telescope description if camera is None: camera = CameraGeometry.from_name("MAGICCam") self.n_camera_pixels = camera.n_pixels # initialize bad pixel mask. Will updated for each telescope/sample by # self._check_pedvar_fields() self.badrmspixel_mask = np.zeros(self.n_camera_pixels, dtype=np.bool) self.config = config if "pedestalLevel" in config: self.pedestalLevel = config["pedestalLevel"] else: self.pedestalLevel = 400.0 if "pedestalLevelVariance" in config: self.pedestalLevelVariance = config["pedestalLevelVariance"] else: self.pedestalLevelVariance = 4.5 if "pedestalType" in config: pedestalTypeName = config["pedestalType"] if pedestalTypeName == "Fundamental": self.pedestalType = 0 elif pedestalTypeName == "FromExtractor": self.pedestalType = 1 elif pedestalTypeName == "FromExtractorRndm": self.pedestalType = 2 else: raise ValueError( "pedestalType must be chosen from 'Fundamental', 'FromExtractor', or 'FromExtractorRndm'" ) else: self.pedestalType = 2 self.current_obs_id = -1 # Pedestal sample times and outlier masks are reduced to the unique # outlier masks: In MARS files, mayn duplicate entries are present, # and removing them onces significantly speeds up searching the correct # mask for a given event. self.sample_times_ped = [[], []] self.n_samples_ped = np.zeros(2, dtype=np.int16) - 1 self.charge_std_outliers = [[], []] # self.charge_std = [[], []] # Dead pixels masks change for every subrun and are directly used from # the MARS data self.sample_ranges_dead = [None, None] self.n_samples_dead = np.zeros(2, dtype=np.int16) - 1 # Allow processing of MCs (do nothing, but also don't crash) self.is_mc = is_simulation def _check_new_run(self, event): """Initializes or resets for each new run subrun-wise dead pixel samples or pedestal info with computed outlier masks. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. """ if event.index.obs_id != self.current_obs_id: self.sample_times_ped = [[], []] self.n_samples_ped = np.zeros(2, dtype=np.int16) - 1 self.charge_std_outliers = [[], []] # self.charge_std = [[], []] self.sample_ranges_dead = [None, None] self.n_samples_dead = np.zeros(2, dtype=np.int16) - 1 self.current_obs_id = event.index.obs_id def _check_pedestal_rms(self, charge_std): """ This internal method calculates the pedestal outlier pixels depending on the values in self.config. Corresponds to mbadpixels/MBadPixelsCalc::CheckPedestalRms() Parameters ---------- charge_std : np.ndarray Charge standard deviation. Returns ------- bool Mask with the Pedestal RMS outliers. """ if (len(charge_std)) != self.n_camera_pixels: print(len(charge_std)) print(self.n_camera_pixels) raise ValueError( "charge_std must be an array of length equal to number of MAGIC camera pixels" ) meanrms = 0.0 npix = 0 for i in range(self.n_camera_pixels): if charge_std[i] <= 0 or charge_std[i] >= 200 * self._getpixratiosqrt(i): continue # const Byte_t aidx = (*fGeomCam)[i].GetAidx(); meanrms += charge_std[i] npix += 1 # if no pixel has a minimum signal, return if meanrms == 0: return False meanrms /= npix meanrms2 = 0.0 varrms2 = 0.0 npix = 0 for i in range(self.n_camera_pixels): # Calculate the corrected means: if charge_std[i] <= 0.5 * meanrms or charge_std[i] >= 1.5 * meanrms: continue meanrms2 += charge_std[i] varrms2 += charge_std[i] ** 2 npix += 1 # if no pixel has a minimum signal, return lolim1 = 0 lolim2 = 0 # Precalcualtion of limits uplim1 = 0 uplim2 = 0 # for speeed reasons if npix == 0 or meanrms2 == 0: return False meanrms2 /= npix if self.pedestalLevel > 0: lolim1 = meanrms2 / self.pedestalLevel uplim1 = meanrms2 * self.pedestalLevel if self.pedestalLevelVariance > 0: varrms2 /= npix varrms2 = np.sqrt(varrms2 - meanrms2 * meanrms2) lolim2 = meanrms2 - self.pedestalLevelVariance * varrms2 uplim2 = meanrms2 + self.pedestalLevelVariance * varrms2 bads = 0 # Blind the Bad Pixels for i in range(self.n_camera_pixels): if ( self.pedestalLevel <= 0 or (charge_std[i] > lolim1 and charge_std[i] <= uplim1) ) and ( self.pedestalLevelVariance <= 0 or (charge_std[i] > lolim2 and charge_std[i] <= uplim2) ): continue self.badrmspixel_mask[i] = True # if (charge_std[i] <= lolim1 or charge_std[i] <= lolim2): # self.coldpixels[i] = True # elif (charge_std[i] > uplim1 or charge_std[i] > uplim2): # self.hotpixels[i] = True bads += 1 return True def _getpixratiosqrt(self, i_pix): """Return pixels ration Parameters ---------- i_pix : int Pixel index Returns ------- float Pixel ratio. """ # i_pixzero = np.where(self.geom.pix_id == 0)[0][0] # return np.sqrt(self.geom.pix_area[i_pix] / self.geom.pix_area[i_pixzero]) return 1.0
[docs] def get_badrmspixel_mask(self, event): """ Fetch the bad RMS pixel mask for a given event, that is the event time. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- bool Has two dimensions: masks for M1 and/or M2. """ badrmspixel_mask = [None, None] if self.is_mc: for tel_id in event.trigger.tels_with_trigger: badrmspixel_mask[tel_id - 1] = np.zeros( self.n_camera_pixels, dtype=np.bool ) return badrmspixel_mask self._check_new_run(event) event_time = event.trigger.time.unix for tel_id in event.trigger.tels_with_trigger: self._check_pedvar_fields(tel_id, event) # now find monitoring data sample matching to this event by time stamp: if event_time <= self.sample_times_ped[tel_id - 1][0]: i_min = 0 else: i_min = np.where(event_time > self.sample_times_ped[tel_id - 1])[0][-1] badrmspixel_mask[tel_id - 1] = self.charge_std_outliers[tel_id - 1][i_min] return badrmspixel_mask
[docs] def get_badrmspixel_indices(self, event): """ Quick workaround to get the pixel IDs and not the mask Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- list Bad pixels indices. """ badrmspixel_indices = [[None], [None]] if self.is_mc: return badrmspixel_indices badrmspixel_mask = self.get_badrmspixel_mask(self, event) for i, badrmspixelmask_tel_i in enumerate(badrmspixel_mask): if badrmspixelmask_tel_i != [None]: badrmspixel_indices[i] = np.where(badrmspixelmask_tel_i)[0] return badrmspixel_indices
def _check_pedvar_fields(self, tel_id, event): """ Update the pedestal RMS outliers. Does it only once after initializing the class. and reduces the samples to the unique ones. Parameters ---------- tel_id : int Telescope ID event : ctapipe.containers.ArrayEventContainer Event container. """ if self.n_samples_ped[tel_id - 1] == -1: self.n_samples_ped[tel_id - 1] = len( event.mon.tel[tel_id].pedestal.sample_time ) # calculate only once the hot pixel array of the monitoring data: print("Update hot pixels for M%d..." % tel_id, end=" ") event.mon.tel[tel_id].pedestal.charge_std_outliers = [] for i_sample in range(self.n_samples_ped[tel_id - 1]): charge_std = event.mon.tel[tel_id].pedestal.charge_std[ self.pedestalType ][i_sample] if i_sample == 0: charge_std_last = None else: charge_std_last = event.mon.tel[tel_id].pedestal.charge_std[ self.pedestalType ][i_sample - 1] if not np.array_equal(charge_std, charge_std_last): self.sample_times_ped[tel_id - 1].append( event.mon.tel[tel_id].pedestal.sample_time[i_sample].unix ) self.badrmspixel_mask = np.zeros( self.n_camera_pixels, dtype=np.bool ) self._check_pedestal_rms(charge_std) self.charge_std_outliers[tel_id - 1].append(self.badrmspixel_mask) # self.charge_std[tel_id - 1].append(charge_std) self.sample_times_ped[tel_id - 1] = np.array( self.sample_times_ped[tel_id - 1] ) self.charge_std_outliers[tel_id - 1] = np.array( self.charge_std_outliers[tel_id - 1], dtype=np.bool ) # self.charge_std[tel_id - 1] = np.array(self.charge_std[tel_id - 1]) print("done.")
[docs] def get_deadpixel_mask(self, event): """ Fetch the subrun-wise defined dead pixels for a given event, that is the event time. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- bool Has two dimensions: masks for M1 and/or M2. """ deadpixel_mask = [[None], [None]] if self.is_mc: for tel_id in event.trigger.tels_with_trigger: deadpixel_mask[tel_id - 1] = np.zeros( self.n_camera_pixels, dtype=np.bool ) return deadpixel_mask self._check_new_run(event) event_time = event.trigger.time.unix sample_time_range = Time( [ [ event.mon.tel[1].pedestal.sample_time[0].unix, event.mon.tel[1].pedestal.sample_time[-1].unix, ] ], format="unix", scale="utc", ) for tel_id in event.trigger.tels_with_trigger: if self.n_samples_dead[tel_id - 1] == -1: # self.n_samples_dead[tel_id - 1] = len(event.mon.tel[tel_id].pixel_status.sample_time_range) self.n_samples_dead[tel_id - 1] = len(sample_time_range) self.sample_ranges_dead[tel_id - 1] = np.zeros( shape=(self.n_samples_dead[tel_id - 1], 2) ) for i in range(self.n_samples_dead[tel_id - 1]): # self.sample_ranges_dead[tel_id - 1][i,0] = event.mon.tel[tel_id].pixel_status.sample_time_range[i][0].unix # self.sample_ranges_dead[tel_id - 1][i,1] = event.mon.tel[tel_id].pixel_status.sample_time_range[i][1].unix self.sample_ranges_dead[tel_id - 1][i, 0] = sample_time_range[i][ 0 ].unix self.sample_ranges_dead[tel_id - 1][i, 1] = sample_time_range[i][ 1 ].unix # now find sample: indices_time_dead = np.where( event_time >= self.sample_ranges_dead[tel_id - 1][:, 0] )[0] if indices_time_dead.size: i_min_dead = indices_time_dead[-1] else: i_min_dead = 0 deadpixel_mask[tel_id - 1] = event.mon.tel[ tel_id ].pixel_status.hardware_failing_pixels[i_min_dead] return deadpixel_mask
[docs] def get_badpixel_mask(self, event): """ Fetch the union of bad RMS and bad pixels for a given event, that is the event time. Parameters ---------- event : ctapipe.containers.ArrayEventContainer Event container. Returns ------- bool Has two dimensions: masks for M1 and/or M2. """ badpixel_mask = [[None], [None]] if self.is_mc: for tel_id in event.trigger.tels_with_trigger: badpixel_mask[tel_id - 1] = np.zeros( self.n_camera_pixels, dtype=np.bool ) return badpixel_mask badrmspixel_mask = self.get_badrmspixel_mask(event) deadpixel_mask = self.get_deadpixel_mask(event) for tel_id in event.trigger.tels_with_trigger: badpixel_mask[tel_id - 1] = np.logical_or( badrmspixel_mask[tel_id - 1], deadpixel_mask[tel_id - 1] ) return badpixel_mask
# def get_charge_std(self, event): # """ # Fetch the pedestal RMS pixel values for a given event, that is the event time. # # Returns # ------- # charge_std: has two dimensions: M1 and/or M2. # """ # # charge_std = [None, None] # event_time = event.trigger.time.unix # # for tel_id in event.trigger.tels_with_trigger: # # self._check_pedvar_fields(tel_id, event) # # # now find monitoring data sample matching to this event by time stamp: # if event_time <= self.sample_times_ped[tel_id - 1][0]: # i_min = 0 # else: # i_min = np.where(event_time > self.sample_times_ped[tel_id - 1])[0][-1] # # charge_std[tel_id - 1] = self.charge_std[tel_id - 1][i_min] # # return charge_std