magic-cta-pipe is under active development. Expect large and rapid changes in functionality.

Source code for magicctapipe.image.cleaning

"""
Module for cleaning utilities
"""
import copy
import itertools

import numpy as np
from scipy.sparse.csgraph import connected_components

__all__ = [
    "MAGICClean",
    "PixelTreatment",
    "get_num_islands_MAGIC",
]


[docs] class MAGICClean: """Implement the cleaning used by MAGIC. Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ def __init__(self, camera, configuration): """Initiate cleaning class. Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ self.configuration = configuration self.camera = camera if configuration["use_sum"]: self.NN2 = self.GetListOfNN(NN_size=2) self.NN3 = self.GetListOfNN(NN_size=3) self.NN4 = self.GetListOfNN(NN_size=4) # Set the XNN thresholds and windows if they have not already been defined. # The defaults are from the expert values values in the star_MX_OSA.rc file. if "SumThresh2NNPerPixel" in configuration: self.SumThresh2NNPerPixel = configuration["SumThresh2NNPerPixel"] else: self.SumThresh2NNPerPixel = 1.8 if "SumThresh3NNPerPixel" in configuration: self.SumThresh3NNPerPixel = configuration["SumThresh3NNPerPixel"] else: self.SumThresh3NNPerPixel = 1.3 if "SumThresh4NNPerPixel" in configuration: self.SumThresh4NNPerPixel = configuration["SumThresh4NNPerPixel"] else: self.SumThresh4NNPerPixel = 1.0 # default WindowXNN values are converted from time slices to ns if "Window2NN" in configuration: self.Window2NN = configuration["Window2NN"] else: self.Window2NN = 0.82 / 1.639 if "Window3NN" in configuration: self.Window3NN = configuration["Window3NN"] else: self.Window3NN = 1.15 / 1.639 if "Window4NN" in configuration: self.Window4NN = configuration["Window4NN"] else: self.Window4NN = 1.80 / 1.639 if "clipping" in configuration: self.clipping = configuration["clipping"] else: self.clipping = 750.0 if "find_hotpixels" in configuration: self.find_hotpixels = configuration["find_hotpixels"] else: self.find_hotpixels = False if self.find_hotpixels: if "use_interpolation" in configuration: use_interpolation = configuration["use_interpolation"] else: use_interpolation = True if "use_process_pedestal_evt" in configuration: use_process_pedestal_evt = configuration["use_process_pedestal_evt"] else: use_process_pedestal_evt = True if "use_process_times" in configuration: use_process_times = configuration["use_process_times"] else: use_process_times = True if "minimum_number_of_neighbors" in configuration: minimum_number_of_neighbors = configuration[ "minimum_number_of_neighbors" ] else: minimum_number_of_neighbors = 3 if "fast" in configuration: fast = configuration["fast"] else: fast = False treatment_config = dict( use_interpolation=use_interpolation, use_process_pedestal_evt=use_process_pedestal_evt, use_process_times=use_process_times, minimum_number_of_neighbors=minimum_number_of_neighbors, fast=fast, ) self.pixel_treatment = PixelTreatment(self.camera, treatment_config)
[docs] def GetListOfNN(self, NN_size=2, bad_pixels=None): """Get the list of next neighbor pixels. Parameters ---------- NN_size : int, optional Order of next neighbors to find, by default 2. bad_pixels : np.ndarray, optional Array of bad pixels, by default None. Returns ------- np.ndarray Array of neighbor pixels. """ NN = [] pixels = list(range(self.camera.n_pixels)) if bad_pixels is not None: pixels = np.setdiff1d(pixels, bad_pixels) for pixel in pixels: neighbors = np.where(self.camera.neighbor_matrix[pixel])[0] if bad_pixels is not None: neighbors = np.setdiff1d(neighbors, bad_pixels) if len(neighbors) < NN_size: continue combos = list(itertools.combinations(neighbors, NN_size - 1)) for combo in combos: arr = list(combo) arr.append(pixel) if NN_size == 2: NN.append(sorted(arr)) if NN_size == 3: neigh0 = np.where(self.camera.neighbor_matrix[arr[0]])[0] neigh1 = np.where(self.camera.neighbor_matrix[arr[1]])[0] if bad_pixels is not None: neigh0 = np.setdiff1d(neigh0, bad_pixels) neigh1 = np.setdiff1d(neigh1, bad_pixels) neigh_set = np.asarray(list(set(neigh0) & set(neigh1))) if len(neigh_set) != 2: continue shared_pixel = neigh_set[neigh_set != pixel] if shared_pixel in neighbors: continue NN.append(sorted(arr)) if NN_size == 4: neigh0 = np.where(self.camera.neighbor_matrix[arr[0]])[0] neigh1 = np.where(self.camera.neighbor_matrix[arr[1]])[0] neigh2 = np.where(self.camera.neighbor_matrix[arr[2]])[0] if bad_pixels is not None: neigh0 = np.setdiff1d(neigh0, bad_pixels) neigh1 = np.setdiff1d(neigh1, bad_pixels) neigh2 = np.setdiff1d(neigh2, bad_pixels) if ( ((arr[0] in neigh1) or (arr[0] in neigh2)) and ((arr[1] in neigh0) or (arr[1] in neigh2)) and ((arr[2] in neigh0) or (arr[2] in neigh1)) ): pass else: continue NN.append(sorted(arr)) return np.unique(NN, axis=0)
[docs] def clean_image(self, event_image, event_pulse_time, unsuitable_mask=None): """Clean the input image. Parameters ---------- event_image : np.ndarray Input array with event image (charge per each pixel). event_pulse_time : np.ndarray Input array with event times (time per each pixel). unsuitable_mask : np.ndarray, optional Array of unsuitable pixels, by default None. Returns ------- clean_mask : np.ndarray Mask with pixels surviving the cleaning. self.event_image : np.ndarray Image (charges) with only surviving pixels. self.event_pulse_time : np.ndarray Image (times) with only surviving pixels. Raises ------ ValueError Raised if no unsuitable mask is provided but `find_hotpixles` is True. """ if unsuitable_mask is None and self.find_hotpixels: raise ValueError( "find_hotpixels set to %s but not unsuitable_mask provided." % self.find_hotpixels ) if self.find_hotpixels: self.event_image = copy.copy(event_image) ( self.event_image, self.event_pulse_time, self.unsuitable_mask, self.unmapped_mask, ) = self.pixel_treatment.treat( self.event_image, event_pulse_time, unsuitable_mask ) self.event_image[self.unmapped_mask] = 0.0 else: self.event_image = copy.copy(event_image) self.event_pulse_time = copy.copy(event_pulse_time) self.unmapped_mask = [] self.unsuitable_mask = [] # try: clean_mask = np.asarray([False] * self.camera.n_pixels) if self.configuration["use_sum"]: clean_mask = self.magic_clean_step1Sum() else: clean_mask = self.magic_clean_step1() self.mask_step1 = copy.copy(clean_mask) # # clean_mask = self.magic_clean_step2(clean_mask) clean_mask = self.magic_clean_step2b(clean_mask) self.mask_step2 = copy.copy(clean_mask) self.core_pix = np.sum(clean_mask) clean_mask = self.magic_clean_step3b(clean_mask) self.mask_step3 = copy.copy(clean_mask) self.used_pix = np.sum(clean_mask) # clipping self.event_image[np.where(self.event_image > self.clipping)] = self.clipping return clean_mask, self.event_image, self.event_pulse_time
[docs] def group_calculation(self, mask, NN, clipNN, windowNN, thresholdNN): """Calculate pixel groups. Parameters ---------- mask : np.ndarray Mask aray. NN : np.ndarray Neighbor pixels array. clipNN : float Charge clipping value. windowNN : float Time window cut. thresholdNN : float Group charge threshold. Returns ------- np.ndarray Updated mask array. """ meantime = 0.0 totcharge = 0.0 charge = copy.copy(self.event_image[NN]) charge[charge > clipNN] = clipNN totcharge = np.sum(charge, axis=1) meantime = np.sum(charge * self.event_pulse_time[NN], axis=1) / totcharge meantime_proper = np.tile(meantime, (len(NN[0]), 1)).transpose() timeok = np.all( np.fabs(meantime_proper - self.event_pulse_time[NN]) < windowNN, axis=1 ) selection = (timeok) * (totcharge > thresholdNN) mask[NN[selection]] = True return mask, NN[selection]
[docs] def magic_clean_step1Sum(self): """Perform the 1st step of the Sum cleaning. Returns ------- np.ndarray Mask array with excluded pixels. """ sumthresh2NN = ( self.SumThresh2NNPerPixel * 2 * self.configuration["picture_thresh"] ) sumthresh3NN = ( self.SumThresh3NNPerPixel * 3 * self.configuration["picture_thresh"] ) sumthresh4NN = ( self.SumThresh4NNPerPixel * 4 * self.configuration["picture_thresh"] ) clip2NN = 2.2 * sumthresh2NN / 2.0 clip3NN = 1.05 * sumthresh3NN / 3.0 clip4NN = 1.05 * sumthresh4NN / 4.0 mask = np.asarray([False] * len(self.event_image)) if self.find_hotpixels: bad_pixels = np.where(self.unmapped_mask)[0] NN2_mask = np.any(np.isin(self.NN2, bad_pixels), axis=1) NN2 = self.NN2[~NN2_mask] NN3_mask = np.any(np.isin(self.NN3, bad_pixels), axis=1) NN3 = self.NN3[~NN3_mask] NN4_mask = np.any(np.isin(self.NN4, bad_pixels), axis=1) NN4 = self.NN4[~NN4_mask] # NN2 = copy.copy(self.NN2) # NN3 = copy.copy(self.NN3) # NN4 = copy.copy(self.NN4) else: NN2 = copy.copy(self.NN2) NN3 = copy.copy(self.NN3) NN4 = copy.copy(self.NN4) mask, self.fuck2NN = self.group_calculation( mask, NN2, clip2NN, self.Window2NN, sumthresh2NN ) mask, self.fuck3NN = self.group_calculation( mask, NN3, clip3NN, self.Window3NN, sumthresh3NN ) mask, self.fuck4NN = self.group_calculation( mask, NN4, clip4NN, self.Window4NN, sumthresh4NN ) # print("4NN",self.fuck4NN) return np.asarray(mask)
[docs] def magic_clean_step1(self): """Perform the 1st step of the cleaning. Returns ------- np.ndarray Mask array with excluded pixels. """ mask = self.event_image <= self.configuration["picture_thresh"] return ~mask
[docs] def magic_clean_step2(self, mask): """Perform the 2nd step of the cleaning. Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ if np.sum(mask) == 0: return mask else: n = 0 size = 0 if self.configuration["use_time"]: neighbors = copy.copy(self.camera.neighbor_matrix) neighbors[self.unmapped_mask][:, self.unmapped_mask] = False clean_neighbors = neighbors[mask][:, mask] num_islands, labels = connected_components( clean_neighbors, directed=False ) island_ids = np.zeros(self.camera.n_pixels) island_ids[mask] = labels + 1 island_sizes = np.zeros(num_islands) for i in range(num_islands): island_sizes[i] = self.event_image[mask][labels == i].sum() brightest_id = island_sizes.argmax() + 1 nphot = self.event_image[island_ids == brightest_id] meantime = np.sum( self.event_pulse_time[island_ids == brightest_id] * nphot * nphot ) / np.sum(nphot * nphot) for ipixel in range(self.camera.n_pixels): if ipixel in self.unmapped_mask: continue if ( self.event_image[ipixel] > 2 * self.configuration["picture_thresh"] ): if ( np.fabs(self.event_pulse_time[ipixel] - meantime) > 2 * self.configuration["max_time_off"] ): mask[ipixel] = False if ( self.event_image[ipixel] < 2 * self.configuration["picture_thresh"] ): if ( np.fabs(self.event_pulse_time[ipixel] - meantime) > self.configuration["max_time_off"] ): mask[ipixel] = False neighbors = copy.copy(self.camera.neighbor_matrix) for ipixel in range(self.camera.n_pixels): if ipixel in self.unmapped_mask: continue hasNeighbor = False for neigh in np.where(neighbors[ipixel])[0]: if neigh in np.where(mask)[0]: hasNeighbor = True break if hasNeighbor == False: size += self.event_image[ipixel] n += 1 return mask
[docs] def magic_clean_step2b(self, mask): """Perform the 2nd step of the cleaning (b version). Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ if np.sum(mask) == 0: return mask else: pixels_to_remove = [] neighbors = copy.copy(self.camera.neighbor_matrix) neighbors[self.unmapped_mask] = False clean_neighbors = neighbors[mask][:, mask] num_islands, labels = connected_components(clean_neighbors, directed=False) island_ids = np.zeros(self.camera.n_pixels) island_ids[mask] = labels + 1 # Finding the islands "sizes" (total charges) island_sizes = np.zeros(num_islands) for i in range(num_islands): island_sizes[i] = self.event_image[mask][labels == i].sum() # Disabling pixels for all islands save the brightest one brightest_id = island_sizes.argmax() + 1 if self.configuration["use_time"]: brightest_pixel_times = self.event_pulse_time[ mask & (island_ids == brightest_id) ] brightest_pixel_charges = self.event_image[ mask & (island_ids == brightest_id) ] brightest_time = np.sum( brightest_pixel_times * brightest_pixel_charges**2 ) / np.sum(brightest_pixel_charges**2) time_diff = np.abs(self.event_pulse_time - brightest_time) mask[ (self.event_image > 2 * self.configuration["picture_thresh"]) & (time_diff > 2 * self.configuration["max_time_off"]) ] = False mask[ (self.event_image < 2 * self.configuration["picture_thresh"]) & (time_diff > self.configuration["max_time_off"]) ] = False for pix_id in np.where(mask)[0]: if len(set(np.where(neighbors[pix_id] & mask)[0])) == 0: pixels_to_remove.append(pix_id) mask[pixels_to_remove] = False return mask
[docs] def magic_clean_step3(self, mask): """Perform the 3rd step of the cleaning. Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ selection = [] core_mask = mask.copy() pixels_with_picture_neighbors_matrix = copy.copy(self.camera.neighbor_matrix) for pixel in np.where(self.event_image)[0]: if pixel in np.where(core_mask)[0]: continue if self.event_image[pixel] <= self.configuration["boundary_thresh"]: continue hasNeighbor = False if self.configuration["use_time"]: neighbors = np.where(pixels_with_picture_neighbors_matrix[pixel])[0] for neighbor in neighbors: if neighbor not in np.where(core_mask)[0]: continue time_diff = np.abs( self.event_pulse_time[neighbor] - self.event_pulse_time[pixel] ) if time_diff < self.configuration["max_time_diff"]: hasNeighbor = True break # print(pixel, neighbor, time_diff, self.configuration['max_time_diff'], hasNeighbor) if not hasNeighbor: continue if not pixels_with_picture_neighbors_matrix.dot(core_mask)[pixel]: continue selection.append(pixel) mask[selection] = True return mask
[docs] def magic_clean_step3b(self, mask): """Perform the 3rd step of the cleaning (b version). Parameters ---------- mask : np.ndarray Input mask. Returns ------- np.ndarray Mask array with excluded pixels. """ selection = [] core_mask = mask.copy() boundary_mask = ~mask pixels_with_picture_neighbors_matrix = copy.copy(self.camera.neighbor_matrix) boundary_threshold_selection = ( self.event_image > self.configuration["boundary_thresh"] ) boundary_threshold_selection = boundary_threshold_selection * boundary_mask pixels = np.where(boundary_threshold_selection)[0] if self.configuration["use_time"]: boundary_pixels = copy.copy(pixels) neighbors = pixels_with_picture_neighbors_matrix[boundary_pixels] neighbors[:, ~core_mask] = False time_broadcast = np.tile(self.event_pulse_time, (len(self.event_image), 1)) boundary_times = np.transpose(time_broadcast)[boundary_pixels] neighbor_times = time_broadcast[boundary_pixels] time_diff = np.abs(neighbor_times - boundary_times) time_selection = time_diff < self.configuration["max_time_diff"] hasNeighbor = np.minimum( np.sum(time_selection * neighbors, axis=1), 1 ).astype(bool) pixels = boundary_pixels[hasNeighbor] selection = pixels[pixels_with_picture_neighbors_matrix.dot(core_mask)[pixels]] mask[selection] = True return mask
[docs] def single_island(self, neighbors, mask): """Find single islands in the image. Parameters ---------- neighbors : np.ndarray Array with neighbor pixels. mask : np.ndarray Input mask array. Returns ------- np.ndarray Mask of pixels with single islands. """ pixels_to_remove = [] for pix_id in np.where(mask)[0]: if len(set(np.where(neighbors[pix_id] & mask)[0])) == 0: pixels_to_remove.append(pix_id) mask[pixels_to_remove] = False return mask
[docs] class PixelTreatment: """Interpolation for unsuitable pixels. Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ def __init__(self, camera, configuration): """_summary_ Parameters ---------- camera : ctapipe.instrument.CameraGeometry Camera geometry. configuration : dict Dictionary for the configuration. """ self.configuration = configuration self.camera = camera if "use_interpolation" in configuration: self.use_interpolation = configuration["use_interpolation"] else: self.use_interpolation = True if "use_process_pedestal_evt" in configuration: self.use_process_pedestal_evt = configuration["use_process_pedestal_evt"] else: self.use_process_pedestal_evt = True if "use_process_times" in configuration: self.use_process_times = configuration["use_process_times"] else: self.use_process_times = True if "minimum_number_of_neighbors" in configuration: self.minimum_number_of_neighbors = configuration[ "minimum_number_of_neighbors" ] else: self.minimum_number_of_neighbors = 3 if "fast" in configuration: self.fast = configuration["fast"] else: self.fast = False self.neighbors_array = self.camera.neighbor_matrix self.npix = self.camera.n_pixels
[docs] def treat(self, event_image, event_pulse_time, unsuitable_mask): """Interpolate unsuitable pixels (charge and time). Parameters ---------- event_image : np.ndarray Event image (charge). event_pulse_time : np.ndarray Event image (time). unsuitable_mask : np.ndarray Mask array with unsuitable pixels. Returns ------- tuple Tuple with interpolated values for pixels' charge and time. """ self.event_image = event_image self.event_pulse_time = event_pulse_time self.unsuitable_mask = unsuitable_mask self.unmapped_mask = [] self.unsuitable_neighbors = self.neighbors_array[self.unsuitable_mask] self.unsuitable_pixels = np.where(self.unsuitable_mask)[0] if self.use_interpolation: self.interpolate_signals() if self.use_process_pedestal_evt: self.interpolate_pedestals() if self.use_process_times: self.interpolate_times_slow() return ( self.event_image, self.event_pulse_time, self.unsuitable_mask, self.unmapped_mask, )
[docs] def interpolate_signals(self): """Interpolate charge values.""" neighbors_unsuitable = copy.copy(self.unsuitable_neighbors) neighbors_unsuitable[:, self.unsuitable_mask] = False number_of_neighbors = np.sum(neighbors_unsuitable, axis=1) number_of_neighbors_selection = ( number_of_neighbors > self.minimum_number_of_neighbors - 1 ) unsuitable_mask = np.asarray([False] * self.npix) unsuitable_mask[self.unsuitable_pixels[number_of_neighbors_selection]] = True unmapped_mask = np.asarray([False] * self.npix) unmapped_mask[self.unsuitable_pixels[~number_of_neighbors_selection]] = True image_broadcast = np.repeat( self.event_image[np.newaxis, ...], neighbors_unsuitable.shape[0], axis=0 ) image_broadcast[~neighbors_unsuitable] = np.nan self.event_image[self.unsuitable_mask] = np.nanmean(image_broadcast, axis=1) self.unmapped_mask = copy.copy(unmapped_mask) self.unsuitable_mask_new = unsuitable_mask self.unsuitable_pixels_new = np.where(self.unsuitable_mask_new)[0]
[docs] def find_two_closest_times(self, times_arr): """Find two closest times to the one of the current pixel. Parameters ---------- times_arr : np.ndarray Time array. Returns ------- tuple Two closest times. """ n0 = len(times_arr) minval = 1e10 p0 = -1 p1 = -1 for j in range(n0): for k in range(j): diff = np.fabs(times_arr[j] - times_arr[k]) if diff >= minval and diff < 250: continue p0 = j p1 = k minval = diff return p0, p1
[docs] def interpolate_times_slow(self): """Interpolate times (slow version).""" pixel_and_times = zip( self.unsuitable_pixels_new, self.event_pulse_time[self.unsuitable_mask_new] ) for ipixel, event_time in pixel_and_times: times = self.event_pulse_time[ np.logical_and(self.neighbors_array[ipixel][:], ~self.unsuitable_mask) ] p0, p1 = self.find_two_closest_times(times) if p0 >= 0 and p1 >= 0 and np.fabs(times[p0] - times[p1]) < 250: self.event_pulse_time[ipixel] = (times[p0] + times[p1]) / 2.0
[docs] def interpolate_times_fast(self): """Interpolate times (fast version).""" neighbors_unsuitable = self.neighbors_array[self.unsuitable_mask_new] neighbors_unsuitable[:, self.unsuitable_mask] = False times_broadcast = np.repeat( self.event_pulse_time[np.newaxis, ...], neighbors_unsuitable.shape[0], axis=0, ) times_broadcast[~neighbors_unsuitable] = np.nan times_broadcast.sort(axis=1) idx = np.nanargmin(np.fabs(np.diff(times_broadcast, axis=1)), axis=1) for ix, ipixel in enumerate(self.unsuitable_pixels_new): time1 = times_broadcast[ix][idx[ix]] time2 = times_broadcast[ix][idx[ix] + 1] self.event_pulse_time[ipixel] = (time1 + time2) / 2.0
[docs] def interpolate_pedestals(self): """Interpolate pedestals (not implemented).""" pass
[docs] def get_num_islands_MAGIC(camera, clean_mask): """Evaluate the number of islands as in MARS. Parameters ---------- camera : ctapipe.instrument.camera.geometry.CameraGeometry Camera geometry. clean_mask : numpy.ndarray Clean mask. Returns ------- int Number of islands in the image. """ # Identifying connected islands neighbors = camera.neighbor_matrix_sparse clean_neighbors = neighbors[clean_mask][:, clean_mask] num_islands, labels = connected_components(clean_neighbors, directed=False) return num_islands