Source code for magicctapipe.image.cleaning

"""
Module for cleaning utilities
"""
import copy
import itertools

import numpy as np
from scipy.sparse.csgraph import connected_components

__all__ = [
    "MAGICClean",
    "PixelTreatment",
    "get_num_islands_MAGIC",
]


[docs] class MAGICClean: """Implement the cleaning used by MAGIC. Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ def __init__(self, camera, configuration): """Initiate cleaning class. Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ self.configuration = configuration self.camera = camera if configuration["use_sum"]: self.NN2 = self.GetListOfNN(NN_size=2) self.NN3 = self.GetListOfNN(NN_size=3) self.NN4 = self.GetListOfNN(NN_size=4) # Set the XNN thresholds and windows if they have not already been defined. # The defaults are from the expert values values in the star_MX_OSA.rc file. if "SumThresh2NNPerPixel" in configuration: self.SumThresh2NNPerPixel = configuration["SumThresh2NNPerPixel"] else: self.SumThresh2NNPerPixel = 1.8 if "SumThresh3NNPerPixel" in configuration: self.SumThresh3NNPerPixel = configuration["SumThresh3NNPerPixel"] else: self.SumThresh3NNPerPixel = 1.3 if "SumThresh4NNPerPixel" in configuration: self.SumThresh4NNPerPixel = configuration["SumThresh4NNPerPixel"] else: self.SumThresh4NNPerPixel = 1.0 # default WindowXNN values are converted from time slices to ns if "Window2NN" in configuration: self.Window2NN = configuration["Window2NN"] else: self.Window2NN = 0.82 / 1.639 if "Window3NN" in configuration: self.Window3NN = configuration["Window3NN"] else: self.Window3NN = 1.15 / 1.639 if "Window4NN" in configuration: self.Window4NN = configuration["Window4NN"] else: self.Window4NN = 1.80 / 1.639 if "clipping" in configuration: self.clipping = configuration["clipping"] else: self.clipping = 750.0 if "find_hotpixels" in configuration: self.find_hotpixels = configuration["find_hotpixels"] else: self.find_hotpixels = False if self.find_hotpixels: if "use_interpolation" in configuration: use_interpolation = configuration["use_interpolation"] else: use_interpolation = True if "use_process_pedestal_evt" in configuration: use_process_pedestal_evt = configuration["use_process_pedestal_evt"] else: use_process_pedestal_evt = True if "use_process_times" in configuration: use_process_times = configuration["use_process_times"] else: use_process_times = True if "minimum_number_of_neighbors" in configuration: minimum_number_of_neighbors = configuration[ "minimum_number_of_neighbors" ] else: minimum_number_of_neighbors = 3 if "fast" in configuration: fast = configuration["fast"] else: fast = False treatment_config = dict( use_interpolation=use_interpolation, use_process_pedestal_evt=use_process_pedestal_evt, use_process_times=use_process_times, minimum_number_of_neighbors=minimum_number_of_neighbors, fast=fast, ) self.pixel_treatment = PixelTreatment(self.camera, treatment_config)
[docs] def GetListOfNN(self, NN_size=2, bad_pixels=None): """Get the list of next neighbor pixels. Parameters ---------- NN_size : int, optional Order of next neighbors to find, by default 2. bad_pixels : np.ndarray, optional Array of bad pixels, by default None. Returns ------- np.ndarray Array of neighbor pixels. """ NN = [] pixels = list(range(self.camera.n_pixels)) if bad_pixels is not None: pixels = np.setdiff1d(pixels, bad_pixels) for pixel in pixels: neighbors = np.where(self.camera.neighbor_matrix[pixel])[0] if bad_pixels is not None: neighbors = np.setdiff1d(neighbors, bad_pixels) if len(neighbors) < NN_size: continue combos = list(itertools.combinations(neighbors, NN_size - 1)) for combo in combos: arr = list(combo) arr.append(pixel) if NN_size == 2: NN.append(sorted(arr)) if NN_size == 3: neigh0 = np.where(self.camera.neighbor_matrix[arr[0]])[0] neigh1 = np.where(self.camera.neighbor_matrix[arr[1]])[0] if bad_pixels is not None: neigh0 = np.setdiff1d(neigh0, bad_pixels) neigh1 = np.setdiff1d(neigh1, bad_pixels) neigh_set = np.asarray(list(set(neigh0) & set(neigh1))) if len(neigh_set) != 2: continue shared_pixel = neigh_set[neigh_set != pixel] if shared_pixel in neighbors: continue NN.append(sorted(arr)) if NN_size == 4: neigh0 = np.where(self.camera.neighbor_matrix[arr[0]])[0] neigh1 = np.where(self.camera.neighbor_matrix[arr[1]])[0] neigh2 = np.where(self.camera.neighbor_matrix[arr[2]])[0] if bad_pixels is not None: neigh0 = np.setdiff1d(neigh0, bad_pixels) neigh1 = np.setdiff1d(neigh1, bad_pixels) neigh2 = np.setdiff1d(neigh2, bad_pixels) if ( ((arr[0] in neigh1) or (arr[0] in neigh2)) and ((arr[1] in neigh0) or (arr[1] in neigh2)) and ((arr[2] in neigh0) or (arr[2] in neigh1)) ): pass else: continue NN.append(sorted(arr)) return np.unique(NN, axis=0)
[docs] def clean_image(self, event_image, event_pulse_time, unsuitable_mask=None): """Clean the input image. Parameters ---------- event_image : np.ndarray Input array with event image (charge per each pixel). event_pulse_time : np.ndarray Input array with event times (time per each pixel). unsuitable_mask : np.ndarray, optional Array of unsuitable pixels, by default None. Returns ------- clean_mask : np.ndarray Mask with pixels surviving the cleaning. self.event_image : np.ndarray Image (charges) with only surviving pixels. self.event_pulse_time : np.ndarray Image (times) with only surviving pixels. Raises ------ ValueError Raised if no unsuitable mask is provided but `find_hotpixles` is True. """ if unsuitable_mask is None and self.find_hotpixels: raise ValueError( "find_hotpixels set to %s but not unsuitable_mask provided." % self.find_hotpixels ) if self.find_hotpixels: self.event_image = copy.copy(event_image) ( self.event_image, self.event_pulse_time, self.unsuitable_mask, self.unmapped_mask, ) = self.pixel_treatment.treat( self.event_image, event_pulse_time, unsuitable_mask ) self.event_image[self.unmapped_mask] = 0.0 else: self.event_image = copy.copy(event_image) self.event_pulse_time = copy.copy(event_pulse_time) self.unmapped_mask = [] self.unsuitable_mask = [] # try: clean_mask = np.asarray([False] * self.camera.n_pixels) if self.configuration["use_sum"]: clean_mask = self.magic_clean_step1Sum() else: clean_mask = self.magic_clean_step1() self.mask_step1 = copy.copy(clean_mask) # # clean_mask = self.magic_clean_step2(clean_mask) clean_mask = self.magic_clean_step2b(clean_mask) self.mask_step2 = copy.copy(clean_mask) self.core_pix = np.sum(clean_mask) clean_mask = self.magic_clean_step3b(clean_mask) self.mask_step3 = copy.copy(clean_mask) self.used_pix = np.sum(clean_mask) # clipping self.event_image[np.where(self.event_image > self.clipping)] = self.clipping return clean_mask, self.event_image, self.event_pulse_time
[docs] def group_calculation(self, mask, NN, clipNN, windowNN, thresholdNN): """Calculate pixel groups. Parameters ---------- mask : np.ndarray Mask aray. NN : np.ndarray Neighbor pixels array. clipNN : float Charge clipping value. windowNN : float Time window cut. thresholdNN : float Group charge threshold. Returns ------- np.ndarray Updated mask array. """ meantime = 0.0 totcharge = 0.0 charge = copy.copy(self.event_image[NN]) charge[charge > clipNN] = clipNN totcharge = np.sum(charge, axis=1) meantime = np.sum(charge * self.event_pulse_time[NN], axis=1) / totcharge meantime_proper = np.tile(meantime, (len(NN[0]), 1)).transpose() timeok = np.all( np.fabs(meantime_proper - self.event_pulse_time[NN]) < windowNN, axis=1 ) selection = (timeok) * (totcharge > thresholdNN) mask[NN[selection]] = True return mask, NN[selection]
[docs] def magic_clean_step1Sum(self): """Perform the 1st step of the Sum cleaning. Returns ------- np.ndarray Mask array with excluded pixels. """ sumthresh2NN = ( self.SumThresh2NNPerPixel * 2 * self.configuration["picture_thresh"] ) sumthresh3NN = ( self.SumThresh3NNPerPixel * 3 * self.configuration["picture_thresh"] ) sumthresh4NN = ( self.SumThresh4NNPerPixel * 4 * self.configuration["picture_thresh"] ) clip2NN = 2.2 * sumthresh2NN / 2.0 clip3NN = 1.05 * sumthresh3NN / 3.0 clip4NN = 1.05 * sumthresh4NN / 4.0 mask = np.asarray([False] * len(self.event_image)) if self.find_hotpixels: bad_pixels = np.where(self.unmapped_mask)[0] NN2_mask = np.any(np.isin(self.NN2, bad_pixels), axis=1) NN2 = self.NN2[~NN2_mask] NN3_mask = np.any(np.isin(self.NN3, bad_pixels), axis=1) NN3 = self.NN3[~NN3_mask] NN4_mask = np.any(np.isin(self.NN4, bad_pixels), axis=1) NN4 = self.NN4[~NN4_mask] # NN2 = copy.copy(self.NN2) # NN3 = copy.copy(self.NN3) # NN4 = copy.copy(self.NN4) else: NN2 = copy.copy(self.NN2) NN3 = copy.copy(self.NN3) NN4 = copy.copy(self.NN4) mask, self.fuck2NN = self.group_calculation( mask, NN2, clip2NN, self.Window2NN, sumthresh2NN ) mask, self.fuck3NN = self.group_calculation( mask, NN3, clip3NN, self.Window3NN, sumthresh3NN ) mask, self.fuck4NN = self.group_calculation( mask, NN4, clip4NN, self.Window4NN, sumthresh4NN ) # print("4NN",self.fuck4NN) return np.asarray(mask)
[docs] def magic_clean_step1(self): """Perform the 1st step of the cleaning. Returns ------- np.ndarray Mask array with excluded pixels. """ mask = self.event_image <= self.configuration["picture_thresh"] return ~mask
[docs] def magic_clean_step2(self, mask): """Perform the 2nd step of the cleaning. Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ if np.sum(mask) == 0: return mask else: n = 0 size = 0 if self.configuration["use_time"]: neighbors = copy.copy(self.camera.neighbor_matrix) neighbors[self.unmapped_mask][:, self.unmapped_mask] = False clean_neighbors = neighbors[mask][:, mask] num_islands, labels = connected_components( clean_neighbors, directed=False ) island_ids = np.zeros(self.camera.n_pixels) island_ids[mask] = labels + 1 island_sizes = np.zeros(num_islands) for i in range(num_islands): island_sizes[i] = self.event_image[mask][labels == i].sum() brightest_id = island_sizes.argmax() + 1 nphot = self.event_image[island_ids == brightest_id] meantime = np.sum( self.event_pulse_time[island_ids == brightest_id] * nphot * nphot ) / np.sum(nphot * nphot) for ipixel in range(self.camera.n_pixels): if ipixel in self.unmapped_mask: continue if ( self.event_image[ipixel] > 2 * self.configuration["picture_thresh"] ): if ( np.fabs(self.event_pulse_time[ipixel] - meantime) > 2 * self.configuration["max_time_off"] ): mask[ipixel] = False if ( self.event_image[ipixel] < 2 * self.configuration["picture_thresh"] ): if ( np.fabs(self.event_pulse_time[ipixel] - meantime) > self.configuration["max_time_off"] ): mask[ipixel] = False neighbors = copy.copy(self.camera.neighbor_matrix) for ipixel in range(self.camera.n_pixels): if ipixel in self.unmapped_mask: continue hasNeighbor = False for neigh in np.where(neighbors[ipixel])[0]: if neigh in np.where(mask)[0]: hasNeighbor = True break if hasNeighbor == False: size += self.event_image[ipixel] n += 1 return mask
[docs] def magic_clean_step2b(self, mask): """Perform the 2nd step of the cleaning (b version). Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ if np.sum(mask) == 0: return mask else: pixels_to_remove = [] neighbors = copy.copy(self.camera.neighbor_matrix) neighbors[self.unmapped_mask] = False clean_neighbors = neighbors[mask][:, mask] num_islands, labels = connected_components(clean_neighbors, directed=False) island_ids = np.zeros(self.camera.n_pixels) island_ids[mask] = labels + 1 # Finding the islands "sizes" (total charges) island_sizes = np.zeros(num_islands) for i in range(num_islands): island_sizes[i] = self.event_image[mask][labels == i].sum() # Disabling pixels for all islands save the brightest one brightest_id = island_sizes.argmax() + 1 if self.configuration["use_time"]: brightest_pixel_times = self.event_pulse_time[ mask & (island_ids == brightest_id) ] brightest_pixel_charges = self.event_image[ mask & (island_ids == brightest_id) ] brightest_time = np.sum( brightest_pixel_times * brightest_pixel_charges**2 ) / np.sum(brightest_pixel_charges**2) time_diff = np.abs(self.event_pulse_time - brightest_time) mask[ (self.event_image > 2 * self.configuration["picture_thresh"]) & (time_diff > 2 * self.configuration["max_time_off"]) ] = False mask[ (self.event_image < 2 * self.configuration["picture_thresh"]) & (time_diff > self.configuration["max_time_off"]) ] = False for pix_id in np.where(mask)[0]: if len(set(np.where(neighbors[pix_id] & mask)[0])) == 0: pixels_to_remove.append(pix_id) mask[pixels_to_remove] = False return mask
[docs] def magic_clean_step3(self, mask): """Perform the 3rd step of the cleaning. Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ selection = [] core_mask = mask.copy() pixels_with_picture_neighbors_matrix = copy.copy(self.camera.neighbor_matrix) for pixel in np.where(self.event_image)[0]: if pixel in np.where(core_mask)[0]: continue if self.event_image[pixel] <= self.configuration["boundary_thresh"]: continue hasNeighbor = False if self.configuration["use_time"]: neighbors = np.where(pixels_with_picture_neighbors_matrix[pixel])[0] for neighbor in neighbors: if neighbor not in np.where(core_mask)[0]: continue time_diff = np.abs( self.event_pulse_time[neighbor] - self.event_pulse_time[pixel] ) if time_diff < self.configuration["max_time_diff"]: hasNeighbor = True break # print(pixel, neighbor, time_diff, self.configuration['max_time_diff'], hasNeighbor) if not hasNeighbor: continue if not pixels_with_picture_neighbors_matrix.dot(core_mask)[pixel]: continue selection.append(pixel) mask[selection] = True return mask
[docs] def magic_clean_step3b(self, mask): """Perform the 3rd step of the cleaning (b version). Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ selection = [] core_mask = mask.copy() boundary_mask = ~mask pixels_with_picture_neighbors_matrix = copy.copy(self.camera.neighbor_matrix) boundary_threshold_selection = ( self.event_image > self.configuration["boundary_thresh"] ) boundary_threshold_selection = boundary_threshold_selection * boundary_mask pixels = np.where(boundary_threshold_selection)[0] if self.configuration["use_time"]: boundary_pixels = copy.copy(pixels) neighbors = pixels_with_picture_neighbors_matrix[boundary_pixels] neighbors[:, ~core_mask] = False time_broadcast = np.tile(self.event_pulse_time, (len(self.event_image), 1)) boundary_times = np.transpose(time_broadcast)[boundary_pixels] neighbor_times = time_broadcast[boundary_pixels] time_diff = np.abs(neighbor_times - boundary_times) time_selection = time_diff < self.configuration["max_time_diff"] hasNeighbor = np.minimum( np.sum(time_selection * neighbors, axis=1), 1 ).astype(bool) pixels = boundary_pixels[hasNeighbor] selection = pixels[pixels_with_picture_neighbors_matrix.dot(core_mask)[pixels]] mask[selection] = True return mask
[docs] def single_island(self, neighbors, mask): """Find single islands in the image. Parameters ---------- neighbors : np.ndarray Array with neighbor pixels. mask : np.ndarray Input mask array. Returns ------- np.ndarray Mask of pixels with single islands. """ pixels_to_remove = [] for pix_id in np.where(mask)[0]: if len(set(np.where(neighbors[pix_id] & mask)[0])) == 0: pixels_to_remove.append(pix_id) mask[pixels_to_remove] = False return mask
[docs] class PixelTreatment: """Interpolation for unsuitable pixels. Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ def __init__(self, camera, configuration): """_summary_ Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ self.configuration = configuration self.camera = camera if "use_interpolation" in configuration: self.use_interpolation = configuration["use_interpolation"] else: self.use_interpolation = True if "use_process_pedestal_evt" in configuration: self.use_process_pedestal_evt = configuration["use_process_pedestal_evt"] else: self.use_process_pedestal_evt = True if "use_process_times" in configuration: self.use_process_times = configuration["use_process_times"] else: self.use_process_times = True if "minimum_number_of_neighbors" in configuration: self.minimum_number_of_neighbors = configuration[ "minimum_number_of_neighbors" ] else: self.minimum_number_of_neighbors = 3 if "fast" in configuration: self.fast = configuration["fast"] else: self.fast = False self.neighbors_array = self.camera.neighbor_matrix self.npix = self.camera.n_pixels
[docs] def treat(self, event_image, event_pulse_time, unsuitable_mask): """Interpolate unsuitable pixels (charge and time). Parameters ---------- event_image : np.ndarray Event image (charge). event_pulse_time : np.ndarray Event image (time). unsuitable_mask : np.ndarray Mask array with unsuitable pixels. Returns ------- tuple Tuple with interpolated values for pixels' charge and time. """ self.event_image = event_image self.event_pulse_time = event_pulse_time self.unsuitable_mask = unsuitable_mask self.unmapped_mask = [] self.unsuitable_neighbors = self.neighbors_array[self.unsuitable_mask] self.unsuitable_pixels = np.where(self.unsuitable_mask)[0] if self.use_interpolation: self.interpolate_signals() if self.use_process_pedestal_evt: self.interpolate_pedestals() if self.use_process_times: self.interpolate_times_slow() return ( self.event_image, self.event_pulse_time, self.unsuitable_mask, self.unmapped_mask, )
[docs] def interpolate_signals(self): """Interpolate charge values.""" neighbors_unsuitable = copy.copy(self.unsuitable_neighbors) neighbors_unsuitable[:, self.unsuitable_mask] = False number_of_neighbors = np.sum(neighbors_unsuitable, axis=1) number_of_neighbors_selection = ( number_of_neighbors > self.minimum_number_of_neighbors - 1 ) unsuitable_mask = np.asarray([False] * self.npix) unsuitable_mask[self.unsuitable_pixels[number_of_neighbors_selection]] = True unmapped_mask = np.asarray([False] * self.npix) unmapped_mask[self.unsuitable_pixels[~number_of_neighbors_selection]] = True image_broadcast = np.repeat( self.event_image[np.newaxis, ...], neighbors_unsuitable.shape[0], axis=0 ) image_broadcast[~neighbors_unsuitable] = np.nan self.event_image[self.unsuitable_mask] = np.nanmean(image_broadcast, axis=1) self.unmapped_mask = copy.copy(unmapped_mask) self.unsuitable_mask_new = unsuitable_mask self.unsuitable_pixels_new = np.where(self.unsuitable_mask_new)[0]
[docs] def find_two_closest_times(self, times_arr): """Find two closest times to the one of the current pixel. Parameters ---------- times_arr : np.ndarray Time array. Returns ------- tuple Two closest times. """ n0 = len(times_arr) minval = 1e10 p0 = -1 p1 = -1 for j in range(n0): for k in range(j): diff = np.fabs(times_arr[j] - times_arr[k]) if diff >= minval and diff < 250: continue p0 = j p1 = k minval = diff return p0, p1
[docs] def interpolate_times_slow(self): """Interpolate times (slow version).""" pixel_and_times = zip( self.unsuitable_pixels_new, self.event_pulse_time[self.unsuitable_mask_new] ) for ipixel, event_time in pixel_and_times: times = self.event_pulse_time[ np.logical_and(self.neighbors_array[ipixel][:], ~self.unsuitable_mask) ] p0, p1 = self.find_two_closest_times(times) if p0 >= 0 and p1 >= 0 and np.fabs(times[p0] - times[p1]) < 250: self.event_pulse_time[ipixel] = (times[p0] + times[p1]) / 2.0
[docs] def interpolate_times_fast(self): """Interpolate times (fast version).""" neighbors_unsuitable = self.neighbors_array[self.unsuitable_mask_new] neighbors_unsuitable[:, self.unsuitable_mask] = False times_broadcast = np.repeat( self.event_pulse_time[np.newaxis, ...], neighbors_unsuitable.shape[0], axis=0, ) times_broadcast[~neighbors_unsuitable] = np.nan times_broadcast.sort(axis=1) idx = np.nanargmin(np.fabs(np.diff(times_broadcast, axis=1)), axis=1) for ix, ipixel in enumerate(self.unsuitable_pixels_new): time1 = times_broadcast[ix][idx[ix]] time2 = times_broadcast[ix][idx[ix] + 1] self.event_pulse_time[ipixel] = (time1 + time2) / 2.0
[docs] def interpolate_pedestals(self): """Interpolate pedestals (not implemented).""" pass
[docs] def get_num_islands_MAGIC(camera, clean_mask): """Evaluate the number of islands as in MARS. Parameters ---------- camera : ctapipe.instrument.camera.geometry.CameraGeometry Camera geometry. clean_mask : numpy.ndarray Clean mask. Returns ------- int Number of islands in the image. """ # Identifying connected islands neighbors = camera.neighbor_matrix_sparse clean_neighbors = neighbors[clean_mask][:, clean_mask] num_islands, labels = connected_components(clean_neighbors, directed=False) return num_islands