Source code for ms_mint.Chromatogram

import numpy as np
import pandas as pd
import logging

from matplotlib import pyplot as plt

from .tools import find_peaks_in_timeseries, gaussian, mz_mean_width_to_min_max
from .io import ms_file_to_df
from .filters import Filter, Resampler, Smoother, GaussFilter
from .matplotlib_tools import plot_peaks
from .processing import get_chromatogram_from_ms_file

from typing import Optional, Union, List, Any


[docs] class Chromatogram:
[docs] def __init__( self, scan_times: Optional[Union[List[float], np.ndarray]] = None, intensities: Optional[Union[List[float], np.ndarray]] = None, filters: Optional[List[Filter]] = None, expected_rt: Optional[float] = None ): """ Initialize a Chromatogram object. :param scan_times: Array-like object containing the scan times. :param intensities: Array-like object containing the intensities. :param filters: List of filters to be applied. :param expected_rt: Expected retention time. """ # Initialize empty arrays for scan_times and intensities # Initialize scan_times and intensities as empty lists or arrays self.t = np.array([]) if scan_times is None or 0 in scan_times else np.array([0]) self.x = np.array([]) if intensities is None or 0 in scan_times else np.array([0]) # Update scan_times and intensities if provided if scan_times is not None: self.t = np.append(self.t, scan_times) if intensities is not None: self.x = np.append(self.x, intensities) # Initialize other attributes self.noise_level: Optional[float] = None self.filters: List[Any] = filters or [Resampler(), GaussFilter(), Smoother()] self.peaks: Optional[pd.DataFrame] = None self.selected_peak_ndxs: Optional[List[int]] = None self.expected_rt = expected_rt self.weights: Optional[Union[List[float], np.ndarray]] = None
[docs] def from_file(self, fn, mz_mean, mz_width=10, expected_rt=None): chrom = get_chromatogram_from_ms_file(fn, mz_mean=mz_mean, mz_width=mz_width) self.t = np.append(self.t, chrom.index) self.x = np.append(self.x, chrom.values) if expected_rt is not None: self.expected_rt = expected_rt
[docs] def estimate_noise_level(self, window=20): data = pd.Series(index=self.t, data=self.x) self.noise_level = data.rolling(window, center=True).std().median()
[docs] def apply_filters(self): for filt in self.filters: self.t, self.x = filt.transform(self.t, self.x)
[docs] def find_peaks(self, prominence=None, rel_height=0.9, **kwargs): self.estimate_noise_level() if prominence is None: prominence = self.noise_level * 5 self.peaks = find_peaks_in_timeseries( self.data.intensity, prominence=prominence, rel_height=rel_height, **kwargs )
[docs] def optimise_peak_times_with_diff(self, rolling_window=20, plot=False): peaks = self.peaks diff = ( (self.data - self.data.shift(1)) .rolling(rolling_window, center=True) .mean() .fillna(0) ) prominence = 0 peak_startings = find_peaks_in_timeseries( diff.fillna(0).intensity, prominence=prominence, plot=plot ) if plot: plt.show() peak_endings = find_peaks_in_timeseries( -diff.fillna(0).intensity, prominence=prominence, plot=plot ) if plot: plt.show() for ndx, row in peaks.iterrows(): new_rt_min = row.rt_min new_rt_max = row.rt_max candidates_rt_min = peak_startings[peak_startings.rt <= new_rt_min] candidates_rt_max = peak_endings[peak_endings.rt >= new_rt_max] if len(candidates_rt_min) > 0: new_rt_min = candidates_rt_min.tail(1).rt.values[0] if len(candidates_rt_max) > 0: new_rt_max = candidates_rt_max.head(1).rt.values[0] peaks.loc[ndx, ["rt_min", "rt_max"]] = new_rt_min, new_rt_max
[docs] def select_peak_by_rt(self, expected_rt=None): peaks = self.peaks if expected_rt is None: expected_rt = self.expected_rt else: self.expected_rt = expected_rt selected_ndx = (peaks.rt - expected_rt).abs().sort_values().index[0] self.selected_peak_ndxs = [selected_ndx] return self.selected_peaks
[docs] def select_peak_by_highest_intensity(self): peaks = self.peaks selected_ndx = peaks.sort_values("peak_height", ascending=False).index.values[0] self.selected_peak_ndxs = [selected_ndx] return self.selected_peaks
[docs] def select_peak_with_gaussian_weight(self, expected_rt=None, sigma=50): peaks = self.peaks if expected_rt is None: expected_rt = self.expected_rt else: self.expected_rt = expected_rt if peaks is None or len(peaks) == 0: logging.warning("No peaks available to select.") return None weights = gaussian(peaks.rt, expected_rt, sigma) weighted_peaks = weights * peaks.peak_height x = np.arange(int(self.t.min()), int(self.t.max())) self.weights = max(peaks.peak_height) * gaussian(x, expected_rt, sigma) selected_ndx = weighted_peaks.sort_values(ascending=False).index.values[0] self.selected_peak_ndxs = [selected_ndx] return self.selected_peaks
@property def selected_peaks(self): return self.peaks.loc[self.selected_peak_ndxs] @property def data(self): df = pd.DataFrame(index=self.t, data={"intensity": self.x}) df.index.name = "scan_time" return df
[docs] def plot(self, label=None, **kwargs): series = self.data peaks = self.peaks selected_peak_ndxs = self.selected_peak_ndxs weights = self.weights fig = plot_peaks( series, peaks, label=label, highlight=selected_peak_ndxs, expected_rt=self.expected_rt, weights=weights, **kwargs ) return fig