| |
|
| |
|
| | from dataclasses import dataclass |
| | import numpy as np |
| | import scipy.linalg as la |
| | from scipy.signal import find_peaks |
| | from math import ceil |
| |
|
| |
|
| |
|
| |
|
| | def thin_peaks(peak_list, dmin=10, voxel_size=(1,1,1), return_larger_peaks=False): |
| | """ |
| | Remove peaks within a specified distance of each other, retaining the peak with the highest intensity. |
| | |
| | Args: |
| | - peak_list (list of PeakData): Each element contains: |
| | - pos (list of float): 3D coordinates of the peak. |
| | - intensity (float): The intensity value of the peak. |
| | - key (tuple): A unique identifier or index for the peak (#trace, #peak) |
| | - dmin (float, optional): Minimum distance between peaks. peaks closer than this threshold will be thinned. Defaults to 10. |
| | - return_larger_peaks (bool, optional): Indicate larger peak for each thinned peak |
| | |
| | Returns: |
| | - list of tuples: A list containing keys of the removed peaks. |
| | if return_larger_peaks |
| | - list of tuples: A list containing the keys of the larger peak causing the peak to be removed |
| | |
| | Notes: |
| | - The function uses the L2 norm (Euclidean distance) to compute the distance between peaks. |
| | - When two peaks are within `dmin` distance, the peak with the lower intensity is removed. |
| | """ |
| | removed_peaks = [] |
| | removed_larger_peaks = [] |
| | for i in range(len(peak_list)): |
| | if peak_list[i].key in removed_peaks: |
| | continue |
| | for j in range(len(peak_list)): |
| | if i==j: |
| | continue |
| | if peak_list[j].key in removed_peaks: |
| | continue |
| | d = (np.array(peak_list[i].pos) - np.array(peak_list[j].pos))*np.array(voxel_size) |
| | d = la.norm(d) |
| | if d<dmin: |
| | hi = peak_list[i].intensity |
| | hj = peak_list[j].intensity |
| | if hi<hj: |
| | removed_peaks.append(peak_list[i].key) |
| | removed_larger_peaks.append(peak_list[j].key) |
| | break |
| | else: |
| | removed_peaks.append(peak_list[j].key) |
| | removed_larger_peaks.append(peak_list[i].key) |
| |
|
| | if return_larger_peaks: |
| | return removed_peaks, removed_larger_peaks |
| | else: |
| | return removed_peaks |
| |
|
| |
|
| | @dataclass |
| | class CellData(object): |
| | """Represents data related to a single cell. |
| | |
| | Attributes: |
| | pathdata_list (list): A list of PathData objects representing the various paths associated with the cell. |
| | """ |
| | pathdata_list: list |
| |
|
| | @dataclass |
| | class RemovedPeakData(object): |
| | """Represents data related to a removed peak |
| | |
| | Attributes: |
| | idx (int): Index of peak along path |
| | screening_peak (tuple): (path_idx, position along path) for screening peak |
| | """ |
| | idx: int |
| | screening_peak: tuple |
| |
|
| | @dataclass |
| | class PathData(object): |
| | """Represents data related to a specific path in the cell. |
| | |
| | This dataclass encapsulates information about the peaks, |
| | the defining points, the fluorescence values, and the path length of a specific path. |
| | |
| | Attributes: peaks (list): List of peaks in the path (indicies of positions in points, o_intensity). |
| | removed_peaks (list): List of peaks in the path which have been removed because of a nearby larger peak |
| | points (list): List of points defining the path. |
| | o_intensity (list): List of (unnormalized) fluorescence intensity values along the path |
| | SC_length (float): Length of the path. |
| | |
| | """ |
| | peaks: list |
| | removed_peaks: list |
| | points: list |
| | o_intensity: list |
| | SC_length: float |
| |
|
| | @dataclass |
| | class PeakData(object): |
| | pos: tuple |
| | intensity: float |
| | key: tuple |
| |
|
| |
|
| | def find_peaks2(v, distance=5, prominence=0.5): |
| | """ |
| | Find peaks in a 1D array with extended boundary handling. |
| | |
| | The function pads the input array at both ends to handle boundary peaks. It then identifies peaks in the extended array |
| | and maps them back to the original input array. |
| | |
| | Args: |
| | - v (numpy.ndarray): 1D input array in which to find peaks. |
| | - distance (int, optional): Minimum number of array elements that separate two peaks. Defaults to 5. |
| | - prominence (float, optional): Minimum prominence required for a peak to be identified. Defaults to 0.5. |
| | |
| | Returns: |
| | - list of int: List containing the indices of the identified peaks in the original input array. |
| | - dict: Information about the properties of the identified peaks (as returned by scipy.signal.find_peaks). |
| | |
| | """ |
| | pad = int(ceil(distance))+1 |
| | v_ext = np.concatenate([np.ones((pad,), dtype=v.dtype)*np.min(v), v, np.ones((pad,), dtype=v.dtype)*np.min(v)]) |
| |
|
| | assert(len(v_ext) == len(v)+2*pad) |
| | peaks, _ = find_peaks(v_ext, distance=distance, prominence=prominence) |
| | peaks = peaks - pad |
| | n_peaks = [] |
| | for i in peaks: |
| | if 0<=i<len(v): |
| | n_peaks.append(i) |
| | else: |
| | raise Exception |
| | return n_peaks, _ |
| | |
| |
|
| | def process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=10): |
| | """ |
| | Process traces of cells to extract peak information and organize the data. |
| | |
| | The function normalizes fluorescence data, finds peaks, refines peak information, |
| | removes unwanted peaks that might be due to close proximity of bright peaks from |
| | other paths, and organizes all the information into a structured data format. |
| | |
| | Args: |
| | all_paths (list of list of tuples): A list containing paths, where each path is |
| | represented as a list of 3D coordinate tuples. |
| | path_lengths (list of float): List of path lengths corresponding to the provided paths. |
| | measured_trace_fluorescence (list of list of float): A list containing fluorescence |
| | data corresponding to each path point. |
| | dmin (float): Distance below which brighter peaks screen less bright ones. |
| | |
| | Returns: |
| | CellData: An object containing organized peak and path data for a given cell. |
| | |
| | Note: |
| | - The function assumes that each path and its corresponding length and fluorescence data |
| | are positioned at the same index in their respective lists. |
| | """ |
| | |
| | cell_peaks = [] |
| |
|
| | for points, o_intensity in zip(all_paths, measured_trace_fluorescence): |
| | |
| | |
| | intensity_normalized = (o_intensity - np.mean(o_intensity))/np.std(o_intensity) |
| | |
| | |
| | p,_ = find_peaks2(intensity_normalized, distance=5, prominence=0.5*np.std(intensity_normalized)) |
| | peaks = np.array(p, dtype=np.int32) |
| |
|
| | |
| | peak_mean_heights = [ o_intensity[u] for u in peaks ] |
| | peak_points = [ points[u] for u in peaks ] |
| | |
| | cell_peaks.append((peaks, peak_points, peak_mean_heights)) |
| | |
| | |
| | |
| | |
| |
|
| | to_thin = [] |
| | for k in range(len(cell_peaks)): |
| | for u in range(len(cell_peaks[k][0])): |
| | to_thin.append(PeakData(pos=cell_peaks[k][1][u], intensity=cell_peaks[k][2][u], key=(k, u))) |
| | |
| | |
| | removed_peaks, removed_larger_peaks = thin_peaks(to_thin, return_larger_peaks=True, dmin=dmin) |
| |
|
| | |
| | new_cell_peaks = [] |
| | removed_cell_peaks = [] |
| | removed_cell_peaks_larger = [] |
| | for path_idx in range(len(cell_peaks)): |
| | path_retained_peaks = [] |
| | path_removed_peaks = [] |
| | path_peaks = cell_peaks[path_idx][0] |
| |
|
| | for peak_idx in range(len(path_peaks)): |
| | if (path_idx, peak_idx) not in removed_peaks: |
| | path_retained_peaks.append(path_peaks[peak_idx]) |
| | else: |
| | |
| | idx = removed_peaks.index((path_idx, peak_idx)) |
| | larger_path, larger_idx = removed_larger_peaks[idx] |
| | path_removed_peaks.append(RemovedPeakData(idx=path_peaks[peak_idx], screening_peak=(larger_path, cell_peaks[larger_path][0][larger_idx]))) |
| | |
| | |
| | new_cell_peaks.append(path_retained_peaks) |
| | removed_cell_peaks.append(path_removed_peaks) |
| | |
| | cell_peaks = new_cell_peaks |
| | pd_list = [] |
| | |
| | |
| | for k in range(len(all_paths)): |
| | |
| | points, o_intensity = all_paths[k], measured_trace_fluorescence[k] |
| |
|
| | peaks = cell_peaks[k] |
| | removed_peaks = removed_cell_peaks[k] |
| | |
| | pd = PathData(peaks=peaks, removed_peaks=removed_peaks, points=points, o_intensity=o_intensity, SC_length=path_lengths[k]) |
| | pd_list.append(pd) |
| |
|
| | cd = CellData(pathdata_list=pd_list) |
| |
|
| | return cd |
| |
|
| |
|
| | alpha_max = 0.4 |
| |
|
| |
|
| | |
| | |
| | def focus_criterion(pos, v, alpha=alpha_max): |
| | """ |
| | Identify and return positions where values in the array `v` exceed a certain threshold. |
| | |
| | The threshold is computed as `alpha` times the maximum value in `v`. |
| | |
| | Args: |
| | - pos (numpy.ndarray): Array of positions. |
| | - v (numpy.ndarray): 1D array of values, e.g., intensities. |
| | - alpha (float, optional): A scaling factor for the threshold. Defaults to `alpha_max`. |
| | |
| | Returns: |
| | - numpy.ndarray: Array of positions where corresponding values in `v` exceed the threshold. |
| | """ |
| | if len(v): |
| | idx = (v>=alpha*np.max(v)) |
| | return np.array(pos[idx]) |
| | else: |
| | return np.array([], dtype=np.int32) |
| |
|
| | def analyse_celldata(cell_data, config): |
| | """ |
| | Analyse the provided cell data to extract focus-related information. |
| | |
| | Args: |
| | cd (CellData): An instance of the CellData class containing path data information. |
| | config (dictionary): Configuration dictionary containing 'peak_threshold' and 'threshold_type' |
| | 'peak_threshold' (float) - threshold for calling peaks as foci |
| | 'threshold_type' (str) = 'per-trace', 'per-foci' |
| | |
| | Returns: |
| | tuple: A tuple containing: |
| | - foci_rel_intensity (list): List of relative intensities for the detected foci. |
| | - foci_pos (list): List of absolute positions of the detected foci. |
| | - foci_pos_index (list): List of indices of the detected foci. |
| | - screened_foci_data (list): List of RemovedPeakData indicating positions of removed peaks and the index of the larger peak |
| | - trace_median_intensities (list): Per-trace median intensity |
| | - trace_thresholds (list): Per-trace absolute threshold for calling peaks as foci |
| | """ |
| | foci_abs_intensity = [] |
| | foci_pos = [] |
| | foci_pos_index = [] |
| | screened_foci_data = [] |
| | trace_median_intensities = [] |
| | trace_thresholds = [] |
| | |
| | peak_threshold = config['peak_threshold'] |
| |
|
| | threshold_type = config['threshold_type'] |
| |
|
| | if threshold_type == 'per-trace': |
| | """ |
| | Call extracted peaks as foci if intensity - trace_mean > peak_threshold * (trace_max_foci_intensity - trace_mean) |
| | """ |
| | |
| | for path_data in cell_data.pathdata_list: |
| | peaks = np.array(path_data.peaks, dtype=np.int32) |
| |
|
| | |
| | |
| | h = np.array(path_data.o_intensity) |
| | h = h - np.mean(h) |
| | h = h/np.std(h) |
| | |
| | foci_idx = focus_criterion(peaks, h[peaks], peak_threshold) |
| | |
| | |
| | removed_peaks = path_data.removed_peaks |
| | removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) |
| |
|
| | |
| | if len(peaks): |
| | trace_thresholds.append((1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks])) |
| | else: |
| | trace_thresholds.append(None) |
| |
|
| | if len(removed_peaks): |
| | if len(peaks): |
| | threshold = (1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks]) |
| | else: |
| | threshold = float('-inf') |
| |
|
| | |
| | removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] |
| | screened_foci_idx = np.where(removed_peak_heights>threshold)[0] |
| | |
| | screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) |
| | else: |
| | screened_foci_data.append([]) |
| | |
| | pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length |
| | foci_pos.append(pos_abs) |
| | foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) |
| | |
| | foci_pos_index.append(foci_idx) |
| | trace_median_intensities.append(np.median(path_data.o_intensity)) |
| | |
| | elif threshold_type == 'per-cell': |
| | """ |
| | Call extracted peaks as foci if intensity - trace_mean > peak_threshold * max(intensity - trace_mean) |
| | """ |
| | max_cell_intensity = float("-inf") |
| | for path_data in cell_data.pathdata_list: |
| |
|
| | |
| | |
| | h = np.array(path_data.o_intensity) |
| | h = h - np.mean(h) |
| | max_cell_intensity = max(max_cell_intensity, np.max(h)) |
| |
|
| | for path_data in cell_data.pathdata_list: |
| | peaks = np.array(path_data.peaks, dtype=np.int32) |
| |
|
| | |
| | |
| | h = np.array(path_data.o_intensity) |
| | h = h - np.mean(h) |
| |
|
| | foci_idx = peaks[h[peaks]>peak_threshold*max_cell_intensity] |
| |
|
| | removed_peaks = path_data.removed_peaks |
| | removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) |
| |
|
| | trace_thresholds.append(np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity) |
| |
|
| | if len(removed_peaks): |
| | threshold = np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity |
| |
|
| | removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] |
| | screened_foci_idx = np.where(removed_peak_heights>threshold)[0] |
| | |
| | screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) |
| | else: |
| | screened_foci_data.append([]) |
| |
|
| | pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length |
| | foci_pos.append(pos_abs) |
| | foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) |
| | |
| | foci_pos_index.append(foci_idx) |
| | trace_median_intensities.append(np.median(path_data.o_intensity)) |
| | |
| | else: |
| | raise NotImplementedError |
| | |
| | return foci_abs_intensity, foci_pos, foci_pos_index, screened_foci_data, trace_median_intensities, trace_thresholds |
| |
|
| | def analyse_traces(all_paths, path_lengths, measured_trace_fluorescence, config): |
| | |
| | cd = process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=config['screening_distance']) |
| |
|
| | return analyse_celldata(cd, config) |
| |
|
| | |
| |
|
| |
|
| |
|