magic-cta-pipe is under active development. Expect large and rapid changes in functionality.

Source code for magicctapipe.utils.gti

"""
GTIs calculation
"""
import datetime

import pandas
import scipy
import uproot

__all__ = [
    "info_message",
    "identify_time_edges",
    "intersect_time_intervals",
    "GTIGenerator",
]


[docs] def info_message(text, prefix="info"): """Prints the specified text with the prefix of the current date Parameters ---------- text : str Text prefix : str, optional Prefix, by default 'info' """ date_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S") print(f"({prefix:s}) {date_str:s}: {text:s}")
[docs] def identify_time_edges(times, criterion, max_time_diff=6.9e-4): """ Identifies the time interval edges, corresponding to the True state of the specified condition. Neighbouring time intervals, separated by not more than max_time_diff are joined together. Parameters ---------- times : np.ndarray Array of the time data points. criterion : np.ndarray Array of True/False values, indicating the goodness of the corresponding data points. max_time_diff : float, optional Maximal time difference between the time intervals, below which they are joined into one. Returns ------- list List of start/stop pairs, describing the identified time intervals. """ times = scipy.array(times) wh = scipy.where(criterion == True) if len(wh[0]) == 0: print("No time intervals excluded!") return [[0, 0]] # The above function will result in indicies of non-zero bins. # But we want indicies of their _edges_. non_zero_bin_edges = [] for i in wh[0]: if i - 1 >= 0: if abs(times[i] - times[i - 1]) < max_time_diff: non_zero_bin_edges.append(i - 1) non_zero_bin_edges.append(i) if i + 1 < len(times): if abs(times[i] - times[i + 1]) < max_time_diff: non_zero_bin_edges.append(i + 1) non_zero_bin_edges = scipy.unique(non_zero_bin_edges) if len(non_zero_bin_edges) > 2: # Finding the elements, that separate the observational time intervals # During one time interval diff should return 1 (current bin has index i, the index of the next time bin is i+1). # Here we're looking for diffs that are not equal to 1. # division_indicies = (scipy.diff(non_zero_bin_edges[1:-1])-1).nonzero() # division_indicies = division_indicies[0] cond_id = scipy.diff(non_zero_bin_edges[1:-1]) > 1 cond_time = scipy.diff(times[non_zero_bin_edges[1:-1]]) > max_time_diff division_indicies = scipy.where(cond_id | cond_time) division_indicies = division_indicies[0] # Concatenating to the found elements the beginning and the end of the observational time. # Also adding i+1 elements, to correctly switch to the next observational time interval. parts_edges_idx = scipy.concatenate( ( [non_zero_bin_edges[0]], non_zero_bin_edges[1:-1][division_indicies], non_zero_bin_edges[1:-1][division_indicies + 1], [non_zero_bin_edges[-1]], ) ) else: parts_edges_idx = scipy.array(non_zero_bin_edges) parts_edges_idx.sort() # Transorming edges indicies to the real values and transforming them to the [start, stop] list. parts_edges = times[parts_edges_idx] parts_edges = parts_edges.reshape(-1, 2) return parts_edges
[docs] def intersect_time_intervals(intervals1, intervals2): """ Intersects two lists of (TStart, TStop) pairs. Returned list contains the start/stop invervals, common in both input lists. Parameters ---------- intervals1 : list First list of (TStart, TStop) lists (or tuples). intervals2 : list Second list of (TStart, TStop) lists (or tuples). Returns ------- list A list of (TStart, TStop) lists, representing the start/stop invervals, common in both input lists. """ joined_intervals = [] # Comparing 1st to 2nd for interv1 in intervals1: tstart = None tstop = None for interv2 in intervals2: if (interv2[0] >= interv1[0]) and (interv2[0] <= interv1[1]): tstart = interv2[0] tstop = min(interv2[1], interv1[1]) elif (interv2[1] >= interv1[0]) and (interv2[1] <= interv1[1]): tstart = max(interv2[0], interv1[0]) tstop = interv2[1] elif (interv2[0] < interv1[0]) and (interv2[1] > interv1[1]): tstart = interv1[0] tstop = interv1[1] if tstart and tstop: joined_intervals.append([tstart, tstop]) return joined_intervals
[docs] class GTIGenerator: """Generate good time intervals (GTI). Parameters ---------- config : dict, optional Configuration, by default None verbose : bool, optional Verbose flag, by default False """ def __init__(self, config=None, verbose=False): """Initialize class. Parameters ---------- config : dict, optional Configuration, by default None verbose : bool, optional Verbose flag, by default False """ self._config = config self.verbose = verbose @property def config(self): """Copy configuration. Returns ------- dict Configuration. """ return self._config.copy() @config.setter def config(self, new_config): """Set new configuration. Parameters ---------- new_config : dict New configuration. Raises ------ ValueError Error if `event_list` key is not present in configuration. """ if "event_list" not in new_config: raise ValueError( 'GTI generator error: the configuration dict is missing the "event_list" section.' ) self._config = new_config.copy() def _identify_data_taking_time_edges(self, file_list, max_tdiff=1): """Identify data taking time edges. Parameters ---------- file_list : list List of files. max_tdiff : int, optional Maximum time difference, by default 1 Returns ------- list List of data taking time edges. Raises ------ ValueError Error if no files to process. """ if not file_list: raise ValueError("GTI generator: no files to process") mjd = scipy.zeros(0) tdiff = scipy.zeros(0) if self.verbose: info_message("identifying data taking time edges", "GTI generator") for fnum, fname in enumerate(file_list): with uproot.open(fname) as input_stream: if self.verbose: info_message( f"processing file {fnum+1} / {len(file_list)}", "GTI generator" ) _tdiff = input_stream["Events"]["MRawEvtHeader.fTimeDiff"].array( library="np" ) _mjd = input_stream["Events"]["MTime.fMjd"].array(library="np") _millisec = input_stream["Events"]["MTime.fTime.fMilliSec"].array( library="np" ) _nanosec = input_stream["Events"]["MTime.fNanoSec"].array(library="np") _mjd = _mjd + (_millisec / 1e3 + _nanosec / 1e9) / 86400.0 mjd = scipy.concatenate((mjd, _mjd)) tdiff = scipy.concatenate((tdiff, _tdiff)) sort_args = mjd.argsort() not_edge = tdiff < max_tdiff time_intervals = identify_time_edges( mjd[sort_args], not_edge[sort_args], max_time_diff=max_tdiff / 86400.0 ) return time_intervals def _identify_dc_time_edges(self, file_list): """Identifies time edges after DC cuts. Parameters ---------- file_list : list File list. Returns ------- list List of time edges after DC cuts. Raises ------ ValueError Error if no files to process. ValueError Error if no DC cuts given. """ if not file_list: raise ValueError("GTI generator: no files to process") if "dc" not in self.config["event_list"]["cuts"]["quality"]: raise ValueError("GTI generator: no DC cuts given") if self.verbose: info_message("identifying DC time edges", "GTI generator") df = pandas.DataFrame() # Looping over the data files for fnum, file_name in enumerate(file_list): if self.verbose: info_message( f"processing file {fnum+1} / {len(file_list)}", "GTI generator" ) with uproot.open(file_name) as input_stream: mjd = input_stream["Camera"]["MTimeCamera.fMjd"].array(library="np") millisec = input_stream["Camera"]["MTimeCamera.fTime.fMilliSec"].array( library="np" ) nanosec = input_stream["Camera"]["MTimeCamera.fNanoSec"].array( library="np" ) df_ = pandas.DataFrame() df_["mjd"] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400 df_["value"] = input_stream["Camera"]["MReportCamera.fMedianDC"].array( library="np" ) df = df.append(df_) df = df.sort_values(by=["mjd"]) cut = self.config["event_list"]["cuts"]["quality"]["dc"] selection = df.query(cut) _, idx, _ = scipy.intersect1d(df["mjd"], selection["mjd"], return_indices=True) criterion = scipy.repeat(False, len(df["mjd"])) criterion[idx] = True time_intervals = identify_time_edges( df["mjd"], criterion, max_time_diff=self.config["event_list"]["max_time_diff"], ) return time_intervals def _identify_l3rate_time_edges(self, file_list): """Identifies time edges after L3 rate cuts. Parameters ---------- file_list : list File list. Returns ------- list List of time edges after L3 rate cuts. Raises ------ ValueError Error if no files to process. ValueError Error if no L3 rate cuts given. """ if not file_list: raise ValueError("GTI generator: no files to process") if "l3rate" not in self.config["event_list"]["cuts"]["quality"]: raise ValueError("GTI generator: no L3 rate cuts given") if self.verbose: info_message("identifying L3 rate time edges", "GTI generator") df = pandas.DataFrame() # Looping over the data files for fnum, file_name in enumerate(file_list): info_message( f"processing file {fnum+1} / {len(file_list)}", "GTI generator" ) with uproot.open(file_name) as input_stream: mjd = input_stream["Trigger"]["MTimeTrigger.fMjd"].array(library="np") millisec = input_stream["Trigger"][ "MTimeTrigger.fTime.fMilliSec" ].array(library="np") nanosec = input_stream["Trigger"]["MTimeTrigger.fNanoSec"].array( library="np" ) df_ = pandas.DataFrame() df_["mjd"] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400 df_["value"] = input_stream["Trigger"]["MReportTrigger.fL3Rate"].array( library="np" ) df = df.append(df_) df = df.sort_values(by=["mjd"]) cut = self.config["event_list"]["cuts"]["quality"]["l3rate"] selection = df.query(cut) _, idx, _ = scipy.intersect1d(df["mjd"], selection["mjd"], return_indices=True) criterion = scipy.repeat(False, len(df["mjd"])) criterion[idx] = True time_intervals = identify_time_edges( df["mjd"], criterion, max_time_diff=self.config["event_list"]["max_time_diff"], ) return time_intervals
[docs] def process_files(self, file_list): """ GTI list generator. Parameters ---------- file_list : list File list. Returns ------- list A list of (TStart, TStop) lists, representing the identified GTIs. """ if not self.config: raise ValueError("GTIGenerator: configuration is not set") # # Identifying the files to read # info_message("looking for the files", "GTI generator") # file_list = glob.glob(file_mask) # if not file_list: # raise ValueError("No files to process") # # Containers for the data points # dfs = { # 'dc': pandas.DataFrame(), # 'l3rate': pandas.DataFrame() # } # # Removing the containers, not specified in the configuration card # if "l3rate" not in config['event_list']['cuts']: # del dfs['l3rate'] # if "dc" not in config['event_list']['cuts']: # del dfs['dc'] # # Looping over the data files # for fnum, file_name in enumerate(file_list): # info_message(f"processing file {fnum+1} / {len(file_list)}", "GTI generator") # with uproot.open(file_name) as input_stream: # # --- DC --- # if "dc" in dfs: # mjd = input_stream["Camera"]["MTimeCamera.fMjd"].array() # millisec = input_stream["Camera"]["MTimeCamera.fTime.fMilliSec"].array() # nanosec = input_stream["Camera"]["MTimeCamera.fNanoSec"].array() # df_ = pandas.DataFrame() # df_['mjd'] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400 # df_['value'] = input_stream["Camera"]["MReportCamera.fMedianDC"].array() # dfs['dc'] = dfs['dc'].append(df_) # # --- L3 rate --- # if "l3rate" in dfs: # mjd = input_stream["Trigger"]["MTimeTrigger.fMjd"].array() # millisec = input_stream["Trigger"]["MTimeTrigger.fTime.fMilliSec"].array() # nanosec = input_stream["Trigger"]["MTimeTrigger.fNanoSec"].array() # df_ = pandas.DataFrame() # df_['mjd'] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400 # df_['value'] = input_stream["Trigger"]["MReportTrigger.fL3Rate"].array() # dfs['l3rate'] = dfs['l3rate'].append(df_) # # Sorting data points by date is needed for the time intervals identification # for key in dfs: # dfs[key] = dfs[key].sort_values(by=['mjd']) # info_message("identifying GTIs", "GTI generator") # time_intervals_list = [] # # Identifying DC-related GTIs # if "dc" in dfs: # cut = config['event_list']['cuts']['dc'] # selection = dfs['dc'].query(cut) # _, idx, _ = scipy.intersect1d(dfs['dc']["mjd"], selection["mjd"], return_indices=True) # criterion = scipy.repeat(False, len(dfs['dc']["mjd"])) # criterion[idx] = True # time_intervals = identify_time_edges(dfs['dc']["mjd"], criterion, max_time_diff=config['event_list']['max_time_diff']) # time_intervals_list.append(time_intervals) # # Identifying L3-related GTIs # if "l3rate" in dfs: # cut = config['event_list']['cuts']['l3rate'] # selection = dfs['l3rate'].query(cut) # _, idx, _ = scipy.intersect1d(dfs['l3rate']["mjd"], selection["mjd"], return_indices=True) # criterion = scipy.repeat(False, len(dfs['l3rate']["mjd"])) # criterion[idx] = True # time_intervals = identify_time_edges(dfs['l3rate']["mjd"], criterion, max_time_diff=config['event_list']['max_time_diff']) # time_intervals_list.append(time_intervals) time_intervals_list = [ self._identify_data_taking_time_edges(file_list), self._identify_dc_time_edges(file_list), self._identify_l3rate_time_edges(file_list), ] joint_intervals = time_intervals_list[0] # Joining all found GTIs for i in range(1, len(time_intervals_list)): joint_intervals = intersect_time_intervals( joint_intervals, time_intervals_list[i] ) return joint_intervals