# ms_mint/processing.py
import os
import pandas as pd
import numpy as np
import logging
from pathlib import Path as P
from .tools import lock, mz_mean_width_to_min_max
from .io import ms_file_to_df
from .standards import RESULTS_COLUMNS, MINT_RESULTS_COLUMNS
def extract_chromatogram_from_ms1(df, mz_mean, mz_width=10):
"""
Extract single chromatogram of specific m/z value from MS-data.
:param df: MS-data
:type df: pandas.DataFrame with columns ['rt', 'm/z', 'intensity']
:param mz_mean: Target m/z value
:type mz_mean: float
:param mz_width: m/z width in ppm, defaults to 10
:type mz_width: float
:return: Chromatogram
:rtype: pandas.DataFrame
"""
mz_min, mz_max = mz_mean_width_to_min_max(mz_mean, mz_width)
chrom = df[(df.mz >= mz_min) & (df.mz <= mz_max)].copy()
chrom["scan_time"] = chrom["scan_time"].round(3)
chrom = chrom.groupby("scan_time").max()
return chrom["intensity"]
[docs]
def process_ms1_files_in_parallel(args):
"""
Pickleable function for (parallel) peak integration.
"""
filename = args["filename"]
targets = args["targets"]
output_fn = args["output_fn"]
if "queue" in args.keys():
q = args["queue"]
q.put("filename")
try:
results = process_ms1_file(filename=filename, targets=targets)
except Exception as e:
logging.error(f"process_ms1_files_in_parallel(): {e}")
results = pd.DataFrame()
if (output_fn is not None) and (len(results) > 0):
append_results(results, output_fn)
return None
return results
[docs]
def append_results(results, fn):
"""
Appends results to file.
:param results: New results.
:type results: pandas.DataFrame
:param fn: Filename to append to.
:type fn: str
"""
with lock(fn):
results.to_csv(fn, mode="a", header=False, index=False)
[docs]
def process_ms1_file(filename, targets):
"""
Peak integration using a filename as input.
:param filename: Path to mzxml or mzml filename
:type filename: str or PosixPath
:param targets: DataFrame in target list format.
:type targets: pandas.DataFrame
:return: DataFrame with processd peak intensities.
:rtype: pandas.DataFrame
"""
df = ms_file_to_df(filename)
results = process_ms1(df, targets)
results["total_intensity"] = df["intensity"].sum()
results["ms_file"] = filename
results["ms_file_label"] = P(filename).with_suffix('').name
results["ms_file_size_MB"] = os.path.getsize(filename) / 1024 / 1024
results["peak_score"] = 0 # score_peaks(results)
return results[MINT_RESULTS_COLUMNS]
[docs]
def process_ms1(df, targets):
"""
Process MS-1 data with a target list.
:param df: MS-1 data.
:type df: pandas.DataFrame
:param targets: Target list
:type targets: pandas.DataFrame
:return: Mint results.
:rtype: pandas.DataFrame
"""
results = _process_ms1_from_df_(df, targets)
results = pd.DataFrame(results, columns=["peak_label"] + RESULTS_COLUMNS)
results = pd.merge(targets, results, on=["peak_label"])
results = results.reset_index(drop=True)
return results
def _process_ms1_from_df_(df, targets):
peak_cols = [
"mz_mean",
"mz_width",
"rt_min",
"rt_max",
"intensity_threshold",
"peak_label",
]
array_peaks = targets[peak_cols].values
array_data = df[["scan_time", "mz", "intensity"]].values
result = process_ms1_from_numpy(array_data, array_peaks)
return result
[docs]
def process_ms1_from_numpy(array, peaks):
"""
Process MS1 data in numpy array format.
:param array: Input data.
:type array: numpy.Array
:param peaks: Peak data np.array([[mz_mean_1, mz_width_1, rt_min_1, rt_max_1, intensity_threshold_1, peak_label_1], ...])
:type peaks: numpy.Array
:return: Extracted data.
:rtype: list
"""
results = []
for mz_mean, mz_width, rt_min, rt_max, intensity_threshold, peak_label in peaks:
props = _process_ms1_from_numpy(
array,
mz_mean=mz_mean,
mz_width=mz_width,
rt_min=rt_min,
rt_max=rt_max,
intensity_threshold=intensity_threshold,
peak_label=peak_label,
)
if props is None:
continue
results.append([props[col] for col in ["peak_label"] + RESULTS_COLUMNS])
return results
def _process_ms1_from_numpy(
array, mz_mean, mz_width, rt_min, rt_max, intensity_threshold, peak_label=None
):
_slice = slice_ms1_array(
array=array,
mz_mean=mz_mean,
mz_width=mz_width,
rt_min=rt_min,
rt_max=rt_max,
intensity_threshold=intensity_threshold,
)
props = extract_ms1_properties(_slice, mz_mean)
if props is None:
return
if peak_label is not None:
props["peak_label"] = peak_label
return props
[docs]
def slice_ms1_array(
array: np.array, rt_min, rt_max, mz_mean, mz_width, intensity_threshold
):
"""
Slice MS1 data by m/z, mz_width, rt_min, rt_max
:param array: Input MS-1 data.
:type array: np.array
:param rt_min: Minimum retention time for slice
:type rt_min: float
:param rt_max: Maximum retention time for slice
:type rt_max: float
:param mz_mean: Mean m/z value for slice
:type mz_mean: float
:param mz_width: Width of slice in [ppm] of mz_mean
:type mz_width: float (>0)
:param intensity_threshold: Noise filter value
:type intensity_threshold: float (>0)
:return: Slice of numpy array
:rtype: np.Array
"""
delta_mass = mz_width * mz_mean * 1e-6
array = array[(array[:, 0] >= rt_min)]
array = array[(array[:, 0] <= rt_max)]
array = array[(np.abs(array[:, 1] - mz_mean) <= delta_mass)]
array = array[(array[:, 2] >= intensity_threshold)]
return array
[docs]
def score_peaks(mint_results):
"""
Score the peak quality (experimental).
1 - means a good shape
0 - means a bad shape
:param mint_results: DataFrame in ms_mint results format.
:type mint_results: pandas.DataFrame
:return: Score
:rtype: float
"""
R = mint_results.copy()
scores = (
((1 - R.peak_delta_int.apply(abs) / R.peak_max))
* (np.tanh(R.peak_n_datapoints / 20))
* (1 / (1 + abs(R.peak_rt_of_max - R[["rt_min", "rt_max"]].mean(axis=1))))
)
return scores
[docs]
def get_chromatogram_from_ms_file(ms_file: str, mz_mean: float, mz_width: float = 10) -> pd.DataFrame:
"""
Get chromatogram data from an MS file.
:param ms_file: Path to the MS file.
:param mz_mean: Mean m/z value.
:param mz_width: Width around the mean m/z to extract.
:return: Chromatogram data as a DataFrame.
"""
df = ms_file_to_df(ms_file)
chrom = extract_chromatogram_from_ms1(df, mz_mean, mz_width=mz_width)
return chrom