"""
Funtions to read and write MINT files.
"""
import pandas as pd
import numpy as np
import io
import logging
import pathlib
from typing import Union, Optional
from pathlib import Path as P
from datetime import date
from pyteomics import mzxml, mzml
try:
from pyteomics import mzmlb
MZMLB_AVAILABLE = True
except ImportError as e:
logging.warning(f"Cound not import pyteomics.mzmlb:\n{e}")
MZMLB_AVAILABLE = False
MS_FILE_COLUMNS = [
"scan_id",
"ms_level",
"polarity",
"scan_time",
"mz",
"intensity",
]
[docs]
def ms_file_to_df(fn, read_only: bool = False):
"""
Read MS file and convert it to a pandas.DataFrame.
:param fn: Filename
:type fn: str or PosixPath
:param read_only: Whether or not to apply convert to dataframe (for testing purposes), defaults to False
:type read_only: bool, optional
:return: MS data as DataFrame
:rtype: pandas.DataFrame
"""
fn = str(fn)
try:
if fn.lower().endswith(".mzxml"):
df = mzxml_to_df(fn, read_only=read_only)
elif fn.lower().endswith(".mzml"):
df = mzml_to_df(fn, read_only=read_only)
elif fn.lower().endswith("hdf"):
df = pd.read_hdf(fn)
elif fn.lower().endswith(".feather"):
df = pd.read_feather(fn)
elif fn.lower().endswith(".parquet"):
df = read_parquet(fn, read_only=read_only)
elif fn.lower().endswith(".mzmlb"):
df = mzmlb_to_df__pyteomics(fn, read_only=read_only)
else:
logging.error(f"Cannot read file {fn} of type {type(fn)}")
except IndexError as e:
logging.warning(f"{e}: {fn}")
return None
if read_only:
return df
else:
# Compatibility with old schema
df = df.rename(
columns={
"retentionTime": "scan_time",
"intensity array": "intensity",
"m/z array": "mz",
}
)
if 'scan_id' not in df.columns:
df['scan_id'] = 0
if 'ms_level' not in df.columns:
df['ms_level'] = 1
# Set datatypes
set_dtypes(df)
return df
[docs]
def mzxml_to_df(
fn: Union[str, pathlib.Path],
read_only: bool = False,
time_unit_in_file: str = "min",
) -> Optional[pd.DataFrame]:
"""
Read mzXML file and convert it to pandas.DataFrame.
:param fn: Filename
:type fn: Union[str, pathlib.Path]
:param read_only: Whether or not to convert to dataframe (for testing purposes), defaults to False
:type read_only: bool, optional
:param time_unit_in_file: The time unit used in the mzXML file (either 'sec' or 'min'), defaults to 'min'
:type time_unit_in_file: str, optional
:return: MS data
:rtype: Optional[pd.DataFrame]
"""
assert str(fn).lower().endswith(".mzxml"), fn
with mzxml.MzXML(fn) as ms_data:
data = [x for x in ms_data]
if read_only:
return None
data = [_extract_mzxml(x) for x in data]
df = pd.json_normalize(data, sep="_")
# Convert retention time to seconds
if time_unit_in_file == "min":
df["scan_time"] = df["scan_time"].astype(np.float64) * 60.0
df = df.explode(["mz", "intensity"])
set_dtypes(df)
return df.reset_index(drop=True)[MS_FILE_COLUMNS]
def _extract_mzxml(data):
return {
"scan_id": data["num"],
"ms_level": data["msLevel"],
"polarity": data["polarity"],
"scan_time": data["retentionTime"],
"mz": np.array(data["m/z array"]),
"intensity": np.array(data["intensity array"]),
}
[docs]
def mzml_to_pandas_df_pyteomics(fn, **kwargs):
# warnings.warn("mzml_to_pandas_df_pyteomics() is deprecated use mzxml_to_df() instead", DeprecationWarning)
return mzml_to_df(fn, **kwargs)
[docs]
def mzml_to_df(fn, read_only=False):
"""
Reads mzML file and returns a pandas.DataFrame
using the mzML library.
:param fn: Filename
:type fn: str or PosixPath
:param explode: Whether to explode the DataFrame, defaults to True
:type explode: bool, optional
:return: MS data
:rtype: pandas.DataFrame
"""
assert str(fn).lower().endswith(".mzml"), fn
# Read mzML file using pyteomics
with mzml.read(str(fn)) as reader:
# Initialize empty lists for the slices and the attributes
slices = []
# Loop through the spectra and extract the data
for spectrum in reader:
time_unit = spectrum['scanList']['scan'][0]['scan start time'].unit_info
if read_only:
continue
# Extract the scan ID, retention time, m/z values, and intensity values
scan_id = int(spectrum["id"].split("=")[-1])
rt = spectrum["scanList"]["scan"][0]["scan start time"]
if time_unit == 'minute':
rt = rt * 60.
mz = np.array(spectrum["m/z array"], dtype=np.float64)
intensity = np.array(spectrum["intensity array"], dtype=np.float64)
if "positive scan" in spectrum.keys():
polarity = "+"
elif "negative scan" in spectrum.keys():
polarity = "-"
else:
polarity = None
ms_level = spectrum["ms level"]
slices.append(
pd.DataFrame(
{
"scan_id": scan_id,
"mz": mz,
"intensity": intensity,
"polarity": polarity,
"ms_level": ms_level,
"scan_time": rt,
}
)
)
if read_only:
return None
df = pd.concat(slices)
df["intensity"] = df["intensity"].astype(int)
df = df[MS_FILE_COLUMNS].reset_index(drop=True)
set_dtypes(df)
return df
[docs]
def set_dtypes(df):
dtypes = dict(
mz=np.float32,
scan_id=np.int64,
ms_level=np.int8,
scan_time=np.float32,
intensity=np.int64,
)
for var, dtype in dtypes.items():
if not df[var].dtype == dtype:
df[var] = df[var].astype(dtype)
return df
def _extract_mzml(data, time_unit):
RT = data.scan_time_in_minutes() * 60
peaks = data.peaks("centroided")
return {
"scan_id": data["id"],
"ms_level": data.ms_level,
"polarity": "+" if data["positive scan"] else "-",
"scan_time": RT,
"mz": peaks[:, 0].astype("float64"),
"intensity": peaks[:, 1].astype("int64"),
}
extract_mzml = np.vectorize(_extract_mzml)
[docs]
def read_parquet(fn, read_only=False):
"""
Reads parquet file and returns a pandas.DataFrame.
:param fn: Filename
:type fn: str or PosixPath
:param read_only: Whether or not to convert to dataframe, defaults to False
:type read_only: bool, optional
:return: MS data
:rtype: pandas.DataFrame
"""
df = pd.read_parquet(fn)
if read_only or (
len(df.columns) == len(MS_FILE_COLUMNS) and all(df.columns == MS_FILE_COLUMNS)
):
return df
else:
return format_thermo_raw_file_reader_parquet(df)
[docs]
def mzmlb_to_df__pyteomics(fn, read_only=False):
"""
Reads mzMLb file and returns a pandas.DataFrame
using the pyteomics library.
:param fn: Filename
:type fn: str or PosixPath
:param read_only: Whether or not to convert to dataframe, defaults to False
:type read_only: bool, optional
:return: MS data
:rtype: pandas.DataFrame
"""
with mzmlb.MzMLb(fn) as ms_data:
data = [x for x in ms_data]
if read_only:
return None
data = list(extract_mzmlb(data))
df = (
pd.DataFrame.from_dict(data)
.set_index(["index", "retentionTime", "polarity"])
.apply(pd.Series.explode)
.reset_index()
.rename(
columns={
"index": "scan_id",
"retentionTime": "scan_time",
"m/z array": "mz",
"ms level": "ms_level",
"intensity array": "intensity",
}
)
)
# mzMLb starts scan index with 0
df["scan_id"] = df["scan_id"] + 1
df = df[MS_FILE_COLUMNS]
return df
def _extract_mzmlb(data):
cols = ["index", "ms level", "polarity", "retentionTime", "m/z array", "intensity array"]
data["retentionTime"] = data["scanList"]["scan"][0]["scan start time"] * 60
if 'positive scan' in data.keys():
data["polarity"] = '+'
elif 'negative scan' in data.keys():
data["polarity"] = '-'
else:
data["polarity"] = None
return {c: data[c] for c in cols}
extract_mzmlb = np.vectorize(_extract_mzmlb)
[docs]
def df_to_numeric(df):
"""
Converts dataframe to numeric types if possible.
"""
for col in df.columns:
df.loc[:, col] = pd.to_numeric(df[col], errors="ignore")
[docs]
def export_to_excel(mint, fn=None):
"""
Export MINT state to Excel file.
:param mint: Mint instance
:type mint: ms_mint.Mint.Mint
:return: None, or file buffer (if fn is None)
:rtype: None or io.BytesIO
"""
date_string = str(date.today())
if fn is None:
file_buffer = io.BytesIO()
writer = pd.ExcelWriter(file_buffer)
else:
writer = pd.ExcelWriter(fn)
# Write into file
mint.targets.to_excel(writer, "Targets", index=False)
mint.results.to_excel(writer, "Results", index=False)
meta = pd.DataFrame({"MINT_version": [mint.version], "Date": [date_string]}).T[0]
meta.to_excel(writer, "Metadata", index=True, header=False)
# Close writer and maybe return file buffer
writer.close()
if fn is None:
file_buffer.seek(0)
return file_buffer
[docs]
def convert_ms_file_to_feather(fn, fn_out=None):
"""
Convert MS file to feather format.
:param fn: Filename to convert
:type fn: str or PosixPath
:param fn_out: Output filename, defaults to None
:type fn_out: str or PosixPath, optional
:return: Filename of generated file
:rtype: str
"""
fn = P(fn)
if fn_out is None:
fn_out = fn.with_suffix(".feather")
df = ms_file_to_df(fn).reset_index(drop=True)
df.to_feather(fn_out)
return fn_out
[docs]
def convert_ms_file_to_parquet(fn, fn_out=None):
"""
Convert MS file to parquet format.
:param fn: Filename to convert
:type fn: str or PosixPath
:param fn_out: Output filename, defaults to None
:type fn_out: str or PosixPath, optional
:return: Filename of generated file
:rtype: str
"""
fn = P(fn)
if fn_out is None:
fn_out = fn.with_suffix(".parquet")
df = ms_file_to_df(fn).reset_index(drop=True)
df.to_parquet(fn_out)
return fn_out