Skip to content

Core classes (Mint, Chromatogram)

Main module of the ms-mint library.

Mint

Main class of the ms_mint package for processing metabolomics files.

This class provides the primary interface for extracting, processing, and analyzing mass spectrometry data for metabolomics analysis.

Attributes:

Name Type Description
verbose

Controls the verbosity level of the instance.

version str

The version of the ms_mint package being used.

progress_callback

Function to update progress information.

plot

Instance of MintPlotter for visualization.

opt

Instance of TargetOptimizer for target optimization.

pca

Instance of PrincipalComponentsAnalyser for PCA analysis.

tqdm

Progress bar utility.

wdir

Working directory for input/output operations.

status str

Current status of processing ('waiting', 'running', 'done').

ms_files List[str]

List of MS files to be processed.

n_files int

Number of MS files currently loaded.

targets DataFrame

DataFrame with target compounds information.

results DataFrame

DataFrame with analysis results.

progress float

Current progress of processing (0-100).

Source code in src/ms_mint/Mint.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
class Mint:
    """Main class of the ms_mint package for processing metabolomics files.

    This class provides the primary interface for extracting, processing, and
    analyzing mass spectrometry data for metabolomics analysis.

    Attributes:
        verbose: Controls the verbosity level of the instance.
        version: The version of the ms_mint package being used.
        progress_callback: Function to update progress information.
        plot: Instance of MintPlotter for visualization.
        opt: Instance of TargetOptimizer for target optimization.
        pca: Instance of PrincipalComponentsAnalyser for PCA analysis.
        tqdm: Progress bar utility.
        wdir: Working directory for input/output operations.
        status: Current status of processing ('waiting', 'running', 'done').
        ms_files: List of MS files to be processed.
        n_files: Number of MS files currently loaded.
        targets: DataFrame with target compounds information.
        results: DataFrame with analysis results.
        progress: Current progress of processing (0-100).
    """

    def __init__(
        self,
        verbose: bool = False,
        progress_callback: Optional[Callable[[float], None]] = None,
        time_unit: str = "s",
        wdir: Optional[Union[str, P]] = None,
    ) -> None:
        """Initialize a Mint instance.

        Args:
            verbose: Sets verbosity of the instance.
            progress_callback: A callback function for reporting progress (0-100).
            time_unit: Unit for time measurements.
            wdir: Working directory. If None, uses current directory.
        """
        self.verbose = verbose
        self._version = ms_mint.__version__
        if verbose:
            print(f"Mint version: {self.version}\n")
        self.progress_callback = progress_callback
        self.reset()
        self.plot = MintPlotter(mint=self)
        self.opt = TargetOptimizer(mint=self)
        self.pca = PrincipalComponentsAnalyser(self)
        self.tqdm = tqdm

        # Setup working directory as pathlib.Path
        self.wdir = P(os.getcwd() if wdir is None else wdir)

    @property
    def version(self) -> str:
        """Get the ms-mint version number.

        Returns:
            Version string.
        """
        return self._version

    def reset(self) -> "Mint":
        """Reset Mint instance by removing targets, MS-files and results.

        Returns:
            Self for method chaining.
        """
        self._files: List[str] = []
        self._targets_files: List[str] = []
        self._targets: pd.DataFrame = pd.DataFrame(columns=TARGETS_COLUMNS)
        self._results: pd.DataFrame = pd.DataFrame({i: [] for i in MINT_RESULTS_COLUMNS})
        self._all_df: Optional[pd.DataFrame] = None
        self._progress: float = 0
        self.runtime: Optional[float] = None
        self._status: str = "waiting"
        self._messages: List[str] = []
        self.meta: pd.DataFrame = init_metadata()
        return self

    def clear_targets(self) -> None:
        """Reset target list."""
        self.targets = pd.DataFrame(columns=TARGETS_COLUMNS)

    def clear_results(self) -> None:
        """Reset results."""
        self.results = pd.DataFrame(columns=MINT_RESULTS_COLUMNS)

    def clear_ms_files(self) -> None:
        """Reset MS files."""
        self.ms_files = []

    def run(
        self,
        nthreads: Optional[int] = None,
        rt_margin: float = 0.5,
        mode: str = "standard",
        fn: Optional[str] = None,
        **kwargs,
    ) -> Optional["Mint"]:
        """Run MINT and process MS-files with current target list.

        Args:
            nthreads: Number of cores to use. Options:
                * None - Run with min(n_cpus, n_files) CPUs
                * 1: Run without multiprocessing on one CPU
                * >1: Run with multiprocessing using specified threads
            rt_margin: Margin to add to rt values when rt_min/rt_max not specified.
            mode: Compute mode, one of:
                * 'standard': calculates peak shapes projected to RT dimension
                * 'express': omits calculation of other features, only peak_areas
            fn: Output filename to save results directly to disk instead of memory.
            **kwargs: Additional arguments passed to the processing function.

        Returns:
            Self for method chaining, or None if no files or targets loaded.
        """
        self._status = "running"

        if (self.n_files == 0) or (len(self.targets) == 0):
            return None

        targets = self.targets.reset_index()
        self._set_rt_min_max(targets, rt_margin)

        nthreads = self._determine_nthreads(nthreads)

        if self.verbose:
            print(f"Run MINT with {nthreads} processes:")

        start = time.time()
        if nthreads > 1:
            self._run_parallel(nthreads=nthreads, mode=mode, fn=fn, **kwargs)
        else:
            self._run_sequential(mode=mode, fn=fn, targets=targets)

        self.progress = 100
        self._report_runtime(start)

        self._status = "done"
        assert self.progress == 100
        return self

    def _set_rt_min_max(self, targets: pd.DataFrame, rt_margin: float) -> None:
        """Set retention time min/max values based on rt and margin.

        Args:
            targets: DataFrame containing target information.
            rt_margin: Margin to add/subtract from rt for min/max.
        """
        if "rt" in targets.columns:
            update_rt_min = (targets.rt_min.isna()) & (~targets.rt.isna())
            targets.loc[update_rt_min, "rt_min"] = targets.loc[update_rt_min, "rt"] - rt_margin
            update_rt_max = (targets.rt_max.isna()) & (~targets.rt.isna())
            targets.loc[update_rt_max, "rt_max"] = targets.loc[update_rt_max, "rt"] + rt_margin

    def _determine_nthreads(self, nthreads: Optional[int]) -> int:
        """Determine number of threads to use for parallel processing.

        Args:
            nthreads: Requested number of threads, or None for automatic.

        Returns:
            Number of threads to use.
        """
        if nthreads is None:
            nthreads = min(cpu_count(), self.n_files)
        return nthreads

    def _run_sequential(self, mode: str, fn: Optional[str], targets: pd.DataFrame) -> None:
        """Run processing sequentially (single-threaded).

        Args:
            mode: Processing mode ('standard' or 'express').
            fn: Output filename or None.
            targets: DataFrame of targets to process.
        """
        results = []
        for i, filename in enumerate(self.ms_files):
            args = {
                "filename": filename,
                "targets": targets,
                "q": None,
                "mode": mode,
                "output_fn": None,
            }
            results.append(process_ms1_files_in_parallel(args))
            self.progress = int(100 * (i / self.n_files))
        self.results = pd.concat(results).reset_index(drop=True)

    def _report_runtime(self, start: float) -> None:
        """Report runtime statistics after processing.

        Args:
            start: Start time of processing in seconds.
        """
        end = time.time()
        self.runtime = end - start
        self.runtime_per_file = self.runtime / self.n_files
        self.runtime_per_peak = self.runtime / self.n_files / len(self.targets)

        if self.verbose:
            print(f"Total runtime: {self.runtime:.2f}s")
            print(f"Runtime per file: {self.runtime_per_file:.2f}s")
            print(f"Runtime per peak ({len(self.targets)}): {self.runtime_per_peak:.2f}s\n")
            print("Results:", self.results)

    def _run_parallel(
        self,
        nthreads: int = 1,
        mode: str = "standard",
        maxtasksperchild: Optional[int] = None,
        fn: Optional[str] = None,
    ) -> None:
        """Run processing in parallel using multiple threads.

        Args:
            nthreads: Number of threads to use.
            mode: Processing mode ('standard' or 'express').
            maxtasksperchild: Maximum number of tasks per child process.
            fn: Output filename or None.
        """
        pool = Pool(processes=nthreads, maxtasksperchild=maxtasksperchild)
        m = Manager()
        q = m.Queue()
        args = []

        if fn is not None:
            # Prepare output file (only headers)
            pd.DataFrame(columns=MINT_RESULTS_COLUMNS).to_csv(fn, index=False)

        for filename in self.ms_files:
            args.append(
                {
                    "filename": filename,
                    "targets": self.targets.reset_index(),
                    "queue": q,
                    "mode": mode,
                    "output_fn": fn,
                }
            )

        results = pool.map_async(process_ms1_files_in_parallel, args)
        self._monitor_progress(results, q)

        pool.close()
        pool.join()

        if fn is None:
            results = results.get()
            self.results = pd.concat(results).reset_index(drop=True)

    def _monitor_progress(self, results: Any, q: Any) -> None:
        """Monitor progress of parallel processing.

        Args:
            results: AsyncResult object from parallel processing.
            q: Queue for tracking progress.
        """
        while not results.ready():
            size = q.qsize()
            self.progress = 100 * size / self.n_files
            time.sleep(1)
        self.progress = 100

    @property
    def status(self) -> str:
        """Get current status of Mint instance.

        Returns:
            Status string, one of: 'waiting', 'running', 'done'
        """
        return self._status

    @property
    def ms_files(self) -> List[str]:
        """Get list of MS files to process.

        Returns:
            List of filenames.
        """
        return self._files

    @ms_files.setter
    def ms_files(self, list_of_files: Union[str, List[str]]) -> None:
        """Set MS files to process.

        Args:
            list_of_files: Filename or list of file names of MS-files.
        """
        if isinstance(list_of_files, str):
            list_of_files = [list_of_files]
        list_of_files = [str(P(i)) for i in list_of_files if is_ms_file(i)]
        for f in list_of_files:
            if not os.path.isfile(f):
                logging.warning(f"File not found ({f})")
        self._files = list_of_files
        if self.verbose:
            print("Set files to:\n" + "\n".join(self.ms_files) + "\n")
        self.meta = self.meta.reindex([fn_to_label(fn) for fn in list_of_files])

    @property
    def n_files(self) -> int:
        """Get number of currently stored MS filenames.

        Returns:
            Number of files stored in self.ms_files
        """
        return len(self.ms_files)

    def load_files(self, obj: Union[str, List[str]]) -> "Mint":
        """Load MS files and return self for chaining.

        Args:
            obj: Filename pattern (for glob) or list of file names.

        Returns:
            Self for method chaining.
        """
        if isinstance(obj, str):
            self.ms_files = glob(obj, recursive=True)
        elif isinstance(obj, list):
            self.ms_files = obj
        return self

    def load_targets(self, list_of_files: Union[str, P, List[Union[str, P]]]) -> "Mint":
        """Load targets from file(s) (csv, xlsx).

        Args:
            list_of_files: Filename or list of file names.

        Returns:
            Self for method chaining.

        Raises:
            ValueError: If input is not a list of files.
            AssertionError: If a file is not found.
        """
        if isinstance(list_of_files, str) or isinstance(list_of_files, P):
            list_of_files = [list_of_files]
        if not isinstance(list_of_files, list):
            raise ValueError("Input should be a list of files.")
        for f in list_of_files:
            assert os.path.isfile(f), f"File not found ({f})"
        self._targets_files = list_of_files
        if self.verbose:
            print("Set targets files to:\n" + "\n".join(str(f) for f in self._targets_files) + "\n")
        self.targets = read_targets(list_of_files)
        return self

    @property
    def targets(self) -> pd.DataFrame:
        """Get target list.

        Returns:
            Target list DataFrame.
        """
        return self._targets

    @targets.setter
    def targets(self, targets: pd.DataFrame) -> None:
        """Set target list.

        Args:
            targets: DataFrame containing target information.

        Raises:
            AssertionError: If targets validation fails.
        """
        targets = standardize_targets(targets)
        assert check_targets(targets), check_targets(targets)
        self._targets = targets.set_index("peak_label")
        if self.verbose:
            print("Set targets to:\n", self.targets.to_string(), "\n")

    def get_target_params(self, peak_label: str) -> Tuple[float, float, float, float]:
        """Get target parameters for a specific peak label.

        Args:
            peak_label: Label of the target peak.

        Returns:
            Tuple of (mz_mean, mz_width, rt_min, rt_max).
        """
        target_data = self.targets.loc[peak_label]
        mz_mean, mz_width, rt_min, rt_max = target_data[["mz_mean", "mz_width", "rt_min", "rt_max"]]
        return mz_mean, mz_width, rt_min, rt_max

    @property
    def peak_labels(self) -> List[str]:
        """Get list of peak labels from targets.

        Returns:
            List of peak label strings.
        """
        return self.targets.index.to_list()

    @property
    def results(self) -> pd.DataFrame:
        """Get results DataFrame.

        Returns:
            DataFrame containing analysis results.
        """
        return self._results

    @results.setter
    def results(self, df: pd.DataFrame) -> None:
        """Set results DataFrame.

        Args:
            df: DataFrame with MINT results.
        """
        self._results = df

    def crosstab(
        self,
        var_name: Optional[str] = None,
        index: Optional[Union[str, List[str]]] = None,
        column: Optional[str] = None,
        aggfunc: str = "mean",
        apply: Optional[Callable] = None,
        scaler: Optional[Union[str, Any]] = None,
        groupby: Optional[Union[str, List[str]]] = None,
    ) -> pd.DataFrame:
        """Create condensed representation of the results.

        Creates a cross-table with filenames as index and target labels as columns.
        The values in the cells are determined by var_name.

        Args:
            var_name: Name of the column from results table for cell values.
                Defaults to 'peak_area_top3'.
            index: Column(s) to use as index in the resulting cross-tabulation.
                Defaults to 'ms_file_label'.
            column: Column to use as columns in the resulting cross-tabulation.
                Defaults to 'peak_label'.
            aggfunc: Aggregation function for aggregating values. Defaults to 'mean'.
            apply: Function to apply to the resulting cross-tabulation.
                Options include 'log2p1', 'logp1', or a custom function.
            scaler: Function or name of scaler to scale the data.
                Options include 'standard', 'robust', 'minmax', or a scikit-learn scaler.
            groupby: Column(s) to group data before scaling.

        Returns:
            DataFrame representing the cross-tabulation.

        Raises:
            ValueError: If an unsupported scaler is specified.
        """
        df_meta = pd.merge(self.meta, self.results, left_index=True, right_on="ms_file_label")
        # Remove None if in index
        if isinstance(index, list):
            if None in index:
                index.remove(None)
        if isinstance(groupby, str):
            groupby = [groupby]

        if index is None:
            index = "ms_file_label"
        if column is None:
            column = "peak_label"
        if var_name is None:
            var_name = "peak_area_top3"
        if apply:
            if apply == "log2p1":
                apply = log2p1
            if apply == "logp1":
                apply = np.log1p
            df_meta[var_name] = df_meta[var_name].apply(apply)
        if isinstance(scaler, str):
            scaler_dict = {
                "standard": StandardScaler(),
                "robust": RobustScaler(),
                "minmax": MinMaxScaler(),
            }

            if scaler not in scaler_dict:
                raise ValueError(f"Unsupported scaler: {scaler}")

            scaler = scaler_dict[scaler]

        if scaler:
            if groupby:
                groupby_cols = groupby + [column]
                df_meta[var_name] = df_meta.groupby(groupby_cols)[var_name].transform(
                    lambda x: self._scale_group(x, scaler)
                )
            else:
                df_meta[var_name] = df_meta.groupby(column)[var_name].transform(
                    lambda x: self._scale_group(x, scaler)
                )

        df = pd.pivot_table(
            df_meta,
            index=index,
            columns=column,
            values=var_name,
            aggfunc=aggfunc,
        ).astype(np.float64)
        return df

    @property
    def progress(self) -> float:
        """Get current progress value.

        Returns:
            Current progress value (0-100).
        """
        return self._progress

    @progress.setter
    def progress(self, value: float) -> None:
        """Set progress and call progress callback function.

        Args:
            value: Progress value between 0 and 100.

        Raises:
            AssertionError: If value is outside the range 0-100.
        """
        assert value >= 0, value
        assert value <= 100, value
        self._progress = value
        if self.progress_callback is not None:
            self.progress_callback(value)

    def export(self, fn: Optional[str] = None) -> Optional[BytesIO]:
        """Export current results to file.

        Args:
            fn: Filename to export to. If None, returns file buffer.
                Supported formats: .xlsx, .csv, .parquet

        Returns:
            BytesIO buffer if fn is None, otherwise None.
        """
        if fn is None:
            buffer = export_to_excel(self, fn=fn)
            return buffer
        elif fn.endswith(".xlsx"):
            export_to_excel(self, fn=fn)
        elif fn.endswith(".csv"):
            self.results.to_csv(fn, index=False)
        elif fn.endswith(".parquet"):
            self.results.to_parquet(fn, index=False)
        return None

    def load(self, fn: Union[str, BytesIO]) -> "Mint":
        """Load results into Mint instance.

        Args:
            fn: Filename (csv, xlsx, parquet) or file-like object.

        Returns:
            Self for method chaining.
        """
        if self.verbose:
            print(f"Loading MINT results from {fn}")

        if isinstance(fn, str):
            if fn.endswith("xlsx"):
                results = pd.read_excel(fn, sheet_name="Results")
                self.results = results

            elif fn.endswith(".csv"):
                results = pd.read_csv(fn)
                results["peak_shape_rt"] = results["peak_shape_rt"].fillna("")
                results["peak_shape_int"] = results["peak_shape_int"].fillna("")
                self.results = results

            elif fn.endswith(".parquet"):
                results = pd.read_parquet(fn)
        else:
            results = pd.read_csv(fn)

        # Add file labels if not present already
        if "ms_file_label" not in results.columns:
            results["ms_file_label"] = [fn_to_label(fn) for fn in results.ms_file]

        self.results = results.rename(columns=DEPRECATED_LABELS)
        self.digest_results()
        return self

    def digest_results(self) -> None:
        """Extract MS files and targets from results and set them in the instance."""
        self.ms_files = get_ms_files_from_results(self.results)
        self.targets = get_targets_from_results(self.results)

    def get_chromatograms(
        self,
        fns: Optional[List[str]] = None,
        peak_labels: Optional[List[str]] = None,
        filters: Optional[List[Any]] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """Get chromatograms for specified files and peak labels.

        Args:
            fns: List of filenames to extract chromatograms from. Defaults to all MS files.
            peak_labels: List of peak labels to extract. Defaults to all peak labels.
            filters: List of filters to apply to the chromatograms.
            **kwargs: Additional arguments to pass to the Chromatogram constructor.

        Returns:
            DataFrame containing chromatogram data.
        """
        if fns is None:
            fns = self.ms_files
        if peak_labels is None:
            peak_labels = self.peak_labels
        return self._get_chromatograms(
            fns=tuple(fns),
            peak_labels=tuple(peak_labels),
            filters=tuple(filters) if filters is not None else None,
            **kwargs,
        )

    @lru_cache(1)
    def _get_chromatograms(
        self,
        fns: Optional[Tuple[str, ...]] = None,
        peak_labels: Optional[Tuple[str, ...]] = None,
        filters: Optional[Tuple[Any, ...]] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """Cached implementation of get_chromatograms.

        Args:
            fns: Tuple of filenames to extract chromatograms from.
            peak_labels: Tuple of peak labels to extract.
            filters: Tuple of filters to apply to the chromatograms.
            **kwargs: Additional arguments to pass to the Chromatogram constructor.

        Returns:
            DataFrame containing chromatogram data.
        """
        if isinstance(fns, tuple):
            fns = list(fns)

        if not isinstance(fns, list):
            fns = [fns]

        labels = [fn_to_label(fn) for fn in fns]

        # Need to get the actual file names with get_chromatogramsath
        # in case only ms_file_labels are provided
        fns = [fn for fn in self.ms_files if fn_to_label(fn) in labels]

        data = []

        for fn in self.tqdm(fns, desc="Loading chromatograms"):
            df = ms_file_to_df(fn)
            for label in peak_labels:
                mz_mean, mz_width, rt_min, rt_max = self.get_target_params(label)
                chrom_raw = extract_chromatogram_from_ms1(
                    df, mz_mean=mz_mean, mz_width=mz_width
                ).to_frame()
                if len(chrom_raw) == 0:
                    continue
                chrom = Chromatogram(chrom_raw.index, chrom_raw.values, filters=filters, **kwargs)
                if filters is not None:
                    chrom.apply_filters()
                chrom_data = chrom.data
                chrom_data["ms_file"] = fn
                chrom_data["ms_file_label"] = fn_to_label(fn)
                chrom_data["peak_label"] = label
                chrom_data["rt_min"] = rt_min
                chrom_data["rt_max"] = rt_max
                data.append(chrom_data)

        data = pd.concat(data).reset_index()

        data["ms_file"] = data["ms_file"].apply(lambda x: P(x).with_suffix("").name)
        return data

    def load_metadata(self, fn: Optional[Union[str, P]] = None) -> "Mint":
        """Load metadata from file.

        Args:
            fn: Filename to load metadata from. Defaults to metadata.parquet in working directory.

        Returns:
            Self for method chaining.
        """
        if fn is None:
            fn = self.wdir / METADATA_DEFAUT_FN
        if str(fn).endswith(".csv"):
            self.meta = pd.read_csv(fn, index_col=0)
        elif str(fn).endswith(".parquet"):
            self.meta = pd.read_parquet(fn)
        if "ms_file_label" in self.meta.columns:
            self.meta = self.meta.set_index("ms_file_label")
        return self

    def save_metadata(self, fn: Optional[Union[str, P]] = None) -> "Mint":
        """Save metadata to file.

        Args:
            fn: Filename to save metadata to. Defaults to metadata.parquet in working directory.

        Returns:
            Self for method chaining.
        """
        if fn is None:
            fn = self.wdir / METADATA_DEFAUT_FN
        if str(fn).endswith(".csv"):
            self.meta.to_csv(fn, na_filter=False)
        elif str(fn).endswith(".parquet"):
            self.meta.to_parquet(fn)
        return self

    def _scale_group(self, group: pd.Series, scaler: Any) -> np.ndarray:
        """Scale a group of values using a scaler.

        Args:
            group: Series of values to scale.
            scaler: Scikit-learn scaler with fit_transform method.

        Returns:
            Scaled values as a numpy array.
        """
        return scaler.fit_transform(group.to_numpy().reshape(-1, 1)).flatten()

ms_files: List[str] property writable

Get list of MS files to process.

Returns:

Type Description
List[str]

List of filenames.

n_files: int property

Get number of currently stored MS filenames.

Returns:

Type Description
int

Number of files stored in self.ms_files

peak_labels: List[str] property

Get list of peak labels from targets.

Returns:

Type Description
List[str]

List of peak label strings.

progress: float property writable

Get current progress value.

Returns:

Type Description
float

Current progress value (0-100).

results: pd.DataFrame property writable

Get results DataFrame.

Returns:

Type Description
DataFrame

DataFrame containing analysis results.

status: str property

Get current status of Mint instance.

Returns:

Type Description
str

Status string, one of: 'waiting', 'running', 'done'

targets: pd.DataFrame property writable

Get target list.

Returns:

Type Description
DataFrame

Target list DataFrame.

version: str property

Get the ms-mint version number.

Returns:

Type Description
str

Version string.

__init__(verbose=False, progress_callback=None, time_unit='s', wdir=None)

Initialize a Mint instance.

Parameters:

Name Type Description Default
verbose bool

Sets verbosity of the instance.

False
progress_callback Optional[Callable[[float], None]]

A callback function for reporting progress (0-100).

None
time_unit str

Unit for time measurements.

's'
wdir Optional[Union[str, Path]]

Working directory. If None, uses current directory.

None
Source code in src/ms_mint/Mint.py
def __init__(
    self,
    verbose: bool = False,
    progress_callback: Optional[Callable[[float], None]] = None,
    time_unit: str = "s",
    wdir: Optional[Union[str, P]] = None,
) -> None:
    """Initialize a Mint instance.

    Args:
        verbose: Sets verbosity of the instance.
        progress_callback: A callback function for reporting progress (0-100).
        time_unit: Unit for time measurements.
        wdir: Working directory. If None, uses current directory.
    """
    self.verbose = verbose
    self._version = ms_mint.__version__
    if verbose:
        print(f"Mint version: {self.version}\n")
    self.progress_callback = progress_callback
    self.reset()
    self.plot = MintPlotter(mint=self)
    self.opt = TargetOptimizer(mint=self)
    self.pca = PrincipalComponentsAnalyser(self)
    self.tqdm = tqdm

    # Setup working directory as pathlib.Path
    self.wdir = P(os.getcwd() if wdir is None else wdir)

clear_ms_files()

Reset MS files.

Source code in src/ms_mint/Mint.py
def clear_ms_files(self) -> None:
    """Reset MS files."""
    self.ms_files = []

clear_results()

Reset results.

Source code in src/ms_mint/Mint.py
def clear_results(self) -> None:
    """Reset results."""
    self.results = pd.DataFrame(columns=MINT_RESULTS_COLUMNS)

clear_targets()

Reset target list.

Source code in src/ms_mint/Mint.py
def clear_targets(self) -> None:
    """Reset target list."""
    self.targets = pd.DataFrame(columns=TARGETS_COLUMNS)

crosstab(var_name=None, index=None, column=None, aggfunc='mean', apply=None, scaler=None, groupby=None)

Create condensed representation of the results.

Creates a cross-table with filenames as index and target labels as columns. The values in the cells are determined by var_name.

Parameters:

Name Type Description Default
var_name Optional[str]

Name of the column from results table for cell values. Defaults to 'peak_area_top3'.

None
index Optional[Union[str, List[str]]]

Column(s) to use as index in the resulting cross-tabulation. Defaults to 'ms_file_label'.

None
column Optional[str]

Column to use as columns in the resulting cross-tabulation. Defaults to 'peak_label'.

None
aggfunc str

Aggregation function for aggregating values. Defaults to 'mean'.

'mean'
apply Optional[Callable]

Function to apply to the resulting cross-tabulation. Options include 'log2p1', 'logp1', or a custom function.

None
scaler Optional[Union[str, Any]]

Function or name of scaler to scale the data. Options include 'standard', 'robust', 'minmax', or a scikit-learn scaler.

None
groupby Optional[Union[str, List[str]]]

Column(s) to group data before scaling.

None

Returns:

Type Description
DataFrame

DataFrame representing the cross-tabulation.

Raises:

Type Description
ValueError

If an unsupported scaler is specified.

Source code in src/ms_mint/Mint.py
def crosstab(
    self,
    var_name: Optional[str] = None,
    index: Optional[Union[str, List[str]]] = None,
    column: Optional[str] = None,
    aggfunc: str = "mean",
    apply: Optional[Callable] = None,
    scaler: Optional[Union[str, Any]] = None,
    groupby: Optional[Union[str, List[str]]] = None,
) -> pd.DataFrame:
    """Create condensed representation of the results.

    Creates a cross-table with filenames as index and target labels as columns.
    The values in the cells are determined by var_name.

    Args:
        var_name: Name of the column from results table for cell values.
            Defaults to 'peak_area_top3'.
        index: Column(s) to use as index in the resulting cross-tabulation.
            Defaults to 'ms_file_label'.
        column: Column to use as columns in the resulting cross-tabulation.
            Defaults to 'peak_label'.
        aggfunc: Aggregation function for aggregating values. Defaults to 'mean'.
        apply: Function to apply to the resulting cross-tabulation.
            Options include 'log2p1', 'logp1', or a custom function.
        scaler: Function or name of scaler to scale the data.
            Options include 'standard', 'robust', 'minmax', or a scikit-learn scaler.
        groupby: Column(s) to group data before scaling.

    Returns:
        DataFrame representing the cross-tabulation.

    Raises:
        ValueError: If an unsupported scaler is specified.
    """
    df_meta = pd.merge(self.meta, self.results, left_index=True, right_on="ms_file_label")
    # Remove None if in index
    if isinstance(index, list):
        if None in index:
            index.remove(None)
    if isinstance(groupby, str):
        groupby = [groupby]

    if index is None:
        index = "ms_file_label"
    if column is None:
        column = "peak_label"
    if var_name is None:
        var_name = "peak_area_top3"
    if apply:
        if apply == "log2p1":
            apply = log2p1
        if apply == "logp1":
            apply = np.log1p
        df_meta[var_name] = df_meta[var_name].apply(apply)
    if isinstance(scaler, str):
        scaler_dict = {
            "standard": StandardScaler(),
            "robust": RobustScaler(),
            "minmax": MinMaxScaler(),
        }

        if scaler not in scaler_dict:
            raise ValueError(f"Unsupported scaler: {scaler}")

        scaler = scaler_dict[scaler]

    if scaler:
        if groupby:
            groupby_cols = groupby + [column]
            df_meta[var_name] = df_meta.groupby(groupby_cols)[var_name].transform(
                lambda x: self._scale_group(x, scaler)
            )
        else:
            df_meta[var_name] = df_meta.groupby(column)[var_name].transform(
                lambda x: self._scale_group(x, scaler)
            )

    df = pd.pivot_table(
        df_meta,
        index=index,
        columns=column,
        values=var_name,
        aggfunc=aggfunc,
    ).astype(np.float64)
    return df

digest_results()

Extract MS files and targets from results and set them in the instance.

Source code in src/ms_mint/Mint.py
def digest_results(self) -> None:
    """Extract MS files and targets from results and set them in the instance."""
    self.ms_files = get_ms_files_from_results(self.results)
    self.targets = get_targets_from_results(self.results)

export(fn=None)

Export current results to file.

Parameters:

Name Type Description Default
fn Optional[str]

Filename to export to. If None, returns file buffer. Supported formats: .xlsx, .csv, .parquet

None

Returns:

Type Description
Optional[BytesIO]

BytesIO buffer if fn is None, otherwise None.

Source code in src/ms_mint/Mint.py
def export(self, fn: Optional[str] = None) -> Optional[BytesIO]:
    """Export current results to file.

    Args:
        fn: Filename to export to. If None, returns file buffer.
            Supported formats: .xlsx, .csv, .parquet

    Returns:
        BytesIO buffer if fn is None, otherwise None.
    """
    if fn is None:
        buffer = export_to_excel(self, fn=fn)
        return buffer
    elif fn.endswith(".xlsx"):
        export_to_excel(self, fn=fn)
    elif fn.endswith(".csv"):
        self.results.to_csv(fn, index=False)
    elif fn.endswith(".parquet"):
        self.results.to_parquet(fn, index=False)
    return None

get_chromatograms(fns=None, peak_labels=None, filters=None, **kwargs)

Get chromatograms for specified files and peak labels.

Parameters:

Name Type Description Default
fns Optional[List[str]]

List of filenames to extract chromatograms from. Defaults to all MS files.

None
peak_labels Optional[List[str]]

List of peak labels to extract. Defaults to all peak labels.

None
filters Optional[List[Any]]

List of filters to apply to the chromatograms.

None
**kwargs

Additional arguments to pass to the Chromatogram constructor.

{}

Returns:

Type Description
DataFrame

DataFrame containing chromatogram data.

Source code in src/ms_mint/Mint.py
def get_chromatograms(
    self,
    fns: Optional[List[str]] = None,
    peak_labels: Optional[List[str]] = None,
    filters: Optional[List[Any]] = None,
    **kwargs,
) -> pd.DataFrame:
    """Get chromatograms for specified files and peak labels.

    Args:
        fns: List of filenames to extract chromatograms from. Defaults to all MS files.
        peak_labels: List of peak labels to extract. Defaults to all peak labels.
        filters: List of filters to apply to the chromatograms.
        **kwargs: Additional arguments to pass to the Chromatogram constructor.

    Returns:
        DataFrame containing chromatogram data.
    """
    if fns is None:
        fns = self.ms_files
    if peak_labels is None:
        peak_labels = self.peak_labels
    return self._get_chromatograms(
        fns=tuple(fns),
        peak_labels=tuple(peak_labels),
        filters=tuple(filters) if filters is not None else None,
        **kwargs,
    )

get_target_params(peak_label)

Get target parameters for a specific peak label.

Parameters:

Name Type Description Default
peak_label str

Label of the target peak.

required

Returns:

Type Description
Tuple[float, float, float, float]

Tuple of (mz_mean, mz_width, rt_min, rt_max).

Source code in src/ms_mint/Mint.py
def get_target_params(self, peak_label: str) -> Tuple[float, float, float, float]:
    """Get target parameters for a specific peak label.

    Args:
        peak_label: Label of the target peak.

    Returns:
        Tuple of (mz_mean, mz_width, rt_min, rt_max).
    """
    target_data = self.targets.loc[peak_label]
    mz_mean, mz_width, rt_min, rt_max = target_data[["mz_mean", "mz_width", "rt_min", "rt_max"]]
    return mz_mean, mz_width, rt_min, rt_max

load(fn)

Load results into Mint instance.

Parameters:

Name Type Description Default
fn Union[str, BytesIO]

Filename (csv, xlsx, parquet) or file-like object.

required

Returns:

Type Description
Mint

Self for method chaining.

Source code in src/ms_mint/Mint.py
def load(self, fn: Union[str, BytesIO]) -> "Mint":
    """Load results into Mint instance.

    Args:
        fn: Filename (csv, xlsx, parquet) or file-like object.

    Returns:
        Self for method chaining.
    """
    if self.verbose:
        print(f"Loading MINT results from {fn}")

    if isinstance(fn, str):
        if fn.endswith("xlsx"):
            results = pd.read_excel(fn, sheet_name="Results")
            self.results = results

        elif fn.endswith(".csv"):
            results = pd.read_csv(fn)
            results["peak_shape_rt"] = results["peak_shape_rt"].fillna("")
            results["peak_shape_int"] = results["peak_shape_int"].fillna("")
            self.results = results

        elif fn.endswith(".parquet"):
            results = pd.read_parquet(fn)
    else:
        results = pd.read_csv(fn)

    # Add file labels if not present already
    if "ms_file_label" not in results.columns:
        results["ms_file_label"] = [fn_to_label(fn) for fn in results.ms_file]

    self.results = results.rename(columns=DEPRECATED_LABELS)
    self.digest_results()
    return self

load_files(obj)

Load MS files and return self for chaining.

Parameters:

Name Type Description Default
obj Union[str, List[str]]

Filename pattern (for glob) or list of file names.

required

Returns:

Type Description
Mint

Self for method chaining.

Source code in src/ms_mint/Mint.py
def load_files(self, obj: Union[str, List[str]]) -> "Mint":
    """Load MS files and return self for chaining.

    Args:
        obj: Filename pattern (for glob) or list of file names.

    Returns:
        Self for method chaining.
    """
    if isinstance(obj, str):
        self.ms_files = glob(obj, recursive=True)
    elif isinstance(obj, list):
        self.ms_files = obj
    return self

load_metadata(fn=None)

Load metadata from file.

Parameters:

Name Type Description Default
fn Optional[Union[str, Path]]

Filename to load metadata from. Defaults to metadata.parquet in working directory.

None

Returns:

Type Description
Mint

Self for method chaining.

Source code in src/ms_mint/Mint.py
def load_metadata(self, fn: Optional[Union[str, P]] = None) -> "Mint":
    """Load metadata from file.

    Args:
        fn: Filename to load metadata from. Defaults to metadata.parquet in working directory.

    Returns:
        Self for method chaining.
    """
    if fn is None:
        fn = self.wdir / METADATA_DEFAUT_FN
    if str(fn).endswith(".csv"):
        self.meta = pd.read_csv(fn, index_col=0)
    elif str(fn).endswith(".parquet"):
        self.meta = pd.read_parquet(fn)
    if "ms_file_label" in self.meta.columns:
        self.meta = self.meta.set_index("ms_file_label")
    return self

load_targets(list_of_files)

Load targets from file(s) (csv, xlsx).

Parameters:

Name Type Description Default
list_of_files Union[str, Path, List[Union[str, Path]]]

Filename or list of file names.

required

Returns:

Type Description
Mint

Self for method chaining.

Raises:

Type Description
ValueError

If input is not a list of files.

AssertionError

If a file is not found.

Source code in src/ms_mint/Mint.py
def load_targets(self, list_of_files: Union[str, P, List[Union[str, P]]]) -> "Mint":
    """Load targets from file(s) (csv, xlsx).

    Args:
        list_of_files: Filename or list of file names.

    Returns:
        Self for method chaining.

    Raises:
        ValueError: If input is not a list of files.
        AssertionError: If a file is not found.
    """
    if isinstance(list_of_files, str) or isinstance(list_of_files, P):
        list_of_files = [list_of_files]
    if not isinstance(list_of_files, list):
        raise ValueError("Input should be a list of files.")
    for f in list_of_files:
        assert os.path.isfile(f), f"File not found ({f})"
    self._targets_files = list_of_files
    if self.verbose:
        print("Set targets files to:\n" + "\n".join(str(f) for f in self._targets_files) + "\n")
    self.targets = read_targets(list_of_files)
    return self

reset()

Reset Mint instance by removing targets, MS-files and results.

Returns:

Type Description
Mint

Self for method chaining.

Source code in src/ms_mint/Mint.py
def reset(self) -> "Mint":
    """Reset Mint instance by removing targets, MS-files and results.

    Returns:
        Self for method chaining.
    """
    self._files: List[str] = []
    self._targets_files: List[str] = []
    self._targets: pd.DataFrame = pd.DataFrame(columns=TARGETS_COLUMNS)
    self._results: pd.DataFrame = pd.DataFrame({i: [] for i in MINT_RESULTS_COLUMNS})
    self._all_df: Optional[pd.DataFrame] = None
    self._progress: float = 0
    self.runtime: Optional[float] = None
    self._status: str = "waiting"
    self._messages: List[str] = []
    self.meta: pd.DataFrame = init_metadata()
    return self

run(nthreads=None, rt_margin=0.5, mode='standard', fn=None, **kwargs)

Run MINT and process MS-files with current target list.

Parameters:

Name Type Description Default
nthreads Optional[int]

Number of cores to use. Options: * None - Run with min(n_cpus, n_files) CPUs * 1: Run without multiprocessing on one CPU * >1: Run with multiprocessing using specified threads

None
rt_margin float

Margin to add to rt values when rt_min/rt_max not specified.

0.5
mode str

Compute mode, one of: * 'standard': calculates peak shapes projected to RT dimension * 'express': omits calculation of other features, only peak_areas

'standard'
fn Optional[str]

Output filename to save results directly to disk instead of memory.

None
**kwargs

Additional arguments passed to the processing function.

{}

Returns:

Type Description
Optional[Mint]

Self for method chaining, or None if no files or targets loaded.

Source code in src/ms_mint/Mint.py
def run(
    self,
    nthreads: Optional[int] = None,
    rt_margin: float = 0.5,
    mode: str = "standard",
    fn: Optional[str] = None,
    **kwargs,
) -> Optional["Mint"]:
    """Run MINT and process MS-files with current target list.

    Args:
        nthreads: Number of cores to use. Options:
            * None - Run with min(n_cpus, n_files) CPUs
            * 1: Run without multiprocessing on one CPU
            * >1: Run with multiprocessing using specified threads
        rt_margin: Margin to add to rt values when rt_min/rt_max not specified.
        mode: Compute mode, one of:
            * 'standard': calculates peak shapes projected to RT dimension
            * 'express': omits calculation of other features, only peak_areas
        fn: Output filename to save results directly to disk instead of memory.
        **kwargs: Additional arguments passed to the processing function.

    Returns:
        Self for method chaining, or None if no files or targets loaded.
    """
    self._status = "running"

    if (self.n_files == 0) or (len(self.targets) == 0):
        return None

    targets = self.targets.reset_index()
    self._set_rt_min_max(targets, rt_margin)

    nthreads = self._determine_nthreads(nthreads)

    if self.verbose:
        print(f"Run MINT with {nthreads} processes:")

    start = time.time()
    if nthreads > 1:
        self._run_parallel(nthreads=nthreads, mode=mode, fn=fn, **kwargs)
    else:
        self._run_sequential(mode=mode, fn=fn, targets=targets)

    self.progress = 100
    self._report_runtime(start)

    self._status = "done"
    assert self.progress == 100
    return self

save_metadata(fn=None)

Save metadata to file.

Parameters:

Name Type Description Default
fn Optional[Union[str, Path]]

Filename to save metadata to. Defaults to metadata.parquet in working directory.

None

Returns:

Type Description
Mint

Self for method chaining.

Source code in src/ms_mint/Mint.py
def save_metadata(self, fn: Optional[Union[str, P]] = None) -> "Mint":
    """Save metadata to file.

    Args:
        fn: Filename to save metadata to. Defaults to metadata.parquet in working directory.

    Returns:
        Self for method chaining.
    """
    if fn is None:
        fn = self.wdir / METADATA_DEFAUT_FN
    if str(fn).endswith(".csv"):
        self.meta.to_csv(fn, na_filter=False)
    elif str(fn).endswith(".parquet"):
        self.meta.to_parquet(fn)
    return self

options: show_root_heading: true show_root_full_path: true show_submodules: true members_order: source

Experimental module to run Mint interactively inside the Jupyter notebook.

Example usage
from ms_mint.notebook import Mint

mint = Mint()

mint.display()

Mint

Bases: Mint

Interactive MINT for Jupyter Notebook environment (experimental).

This class extends the base Mint class with interactive widgets and controls for use in Jupyter notebooks, allowing for a graphical user interface to manage MS files, target lists, and process data.

Attributes:

Name Type Description
progress_callback

Function to update progress bar.

ms_storage_path

File chooser widget for MS file directory.

target_files_button

Upload widget for target files.

load_ms_button

Button to load MS files from selected directory.

message_box

Text area for displaying messages.

run_button

Button to start processing.

download_button

Button to export results.

progress_bar

Progress indicator for processing.

layout

Main container for all widgets.

Source code in src/ms_mint/notebook.py
class Mint(_Mint_):
    """Interactive MINT for Jupyter Notebook environment (experimental).

    This class extends the base Mint class with interactive widgets and controls
    for use in Jupyter notebooks, allowing for a graphical user interface to
    manage MS files, target lists, and process data.

    Attributes:
        progress_callback: Function to update progress bar.
        ms_storage_path: File chooser widget for MS file directory.
        target_files_button: Upload widget for target files.
        load_ms_button: Button to load MS files from selected directory.
        message_box: Text area for displaying messages.
        run_button: Button to start processing.
        download_button: Button to export results.
        progress_bar: Progress indicator for processing.
        layout: Main container for all widgets.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the interactive Mint instance.

        Args:
            *args: Positional arguments passed to the parent Mint class.
            **kwargs: Keyword arguments passed to the parent Mint class.
        """
        self.progress_callback = self._set_progress_

        super().__init__(progress_callback=self.progress_callback, *args, **kwargs)

        # Initialize file chooser for MS files directory
        fc = FileChooser()
        fc.show_only_dirs = True
        fc.default_path = os.getcwd()
        self.ms_storage_path = fc

        # Target file upload widget
        self.target_files_button = W.FileUpload(
            description="Peaklists", accept="csv,xlsx", multiple=False
        )
        self.target_files_button.observe(self._load_target_from_bytes_, names="value")

        # Button to load MS files
        self.load_ms_button = W.Button(description="Load MS-files")
        self.load_ms_button.on_click(self._search_files_)

        # Message display area
        self.message_box = W.Textarea(
            value="",
            placeholder="Please, select ms-files define a target list.",
            description="",
            disabled=True,
            layout={"width": "90%", "height": "500px", "font_family": "monospace"},
        )

        # Processing buttons
        self.run_button = W.Button(description="Run")
        self.run_button.on_click(self._run_)
        self.run_button.style.button_color = "lightgray"

        self.download_button = W.Button(description="Export")
        self.download_button.on_click(self._export_action_)
        self.download_button.style.button_color = "lightgray"

        # Progress indicator
        self.progress_bar = W.IntProgress(
            min=0,
            max=100,
            layout=W.Layout(width="90%"),
            description="Progress:",
            bar_style="info",
        )

        self.output = W.Output()

        # Create tabs for file selection
        tabs = W.Tab()
        tabs.children = [
            W.HBox([self.ms_storage_path, self.load_ms_button]),
            W.HBox(
                [
                    self.target_files_button,
                ]
            ),
        ]

        tabs.set_title(0, "MS-Files")
        tabs.set_title(1, "Peaklists")

        # Main layout
        self.layout = W.VBox(
            [
                tabs,
                self.message_box,
                W.HBox([self.run_button, self.download_button]),
                self.progress_bar,
            ]
        )

        self.tqdm = tqdm

    def _load_target_from_bytes_(self, value: Dict[str, Any]) -> None:
        """Load target list from uploaded file bytes.

        Args:
            value: Dictionary containing upload widget's value information.
        """
        for data in value["new"].values():
            self.load(io.BytesIO(data["content"]))
        self._message_(f"{len(self.targets)} targets loaded.")

    @property
    def messages(self) -> List[str]:
        """Get the list of messages displayed in the message box.

        Returns:
            List of messages.
        """
        return self._messages

    def _message_(self, text: str) -> None:
        """Add a message to the message box.

        Args:
            text: Message text to add.
        """
        self.message_box.value = f"{text}\n" + self.message_box.value

    def _clear_messages_(self) -> None:
        """Clear all messages from the message box."""
        self.message_box.value = ""

    def _search_files_(self, b: Optional[W.Button] = None) -> None:
        """Search for MS files in the selected directory.

        Args:
            b: Button that triggered the action (not used).
        """
        self.ms_files = (
            glob(os.path.join(self.ms_storage_path.selected_path, "*mzXML"))
            + glob(os.path.join(self.ms_storage_path.selected_path, "*mzML"))
            + glob(os.path.join(self.ms_storage_path.selected_path, "*mzHDF"))
            + glob(os.path.join(self.ms_storage_path.selected_path, "*mzxml"))
            + glob(os.path.join(self.ms_storage_path.selected_path, "*mzml"))
            + glob(os.path.join(self.ms_storage_path.selected_path, "*mzhdf"))
        )
        self.message(
            f"{self.n_files} MS-files loaded."
        )  # This should be self._message_ instead of self.message

    def display(self) -> W.VBox:
        """Display control elements in Jupyter notebook.

        Returns:
            The main widget layout container.
        """
        display(HTML("<style>textarea, input { font-family: monospace; }</style>"))
        return self.layout

    def _run_(self, b: Optional[W.Button] = None, **kwargs: Any) -> None:
        """Run data processing with the current settings.

        Args:
            b: Button that triggered the action (not used).
            **kwargs: Additional keyword arguments passed to the run method.
        """
        self._message_("Start processing...")
        self.progress = 0
        self.run(**kwargs)
        self._message_("...finished processing.")
        if self.results is not None:
            self.download_button.style.button_color = "lightgreen"

    def _set_progress_(self, value: int) -> None:
        """Update the progress bar value.

        Args:
            value: Progress value (0-100).
        """
        self.progress_bar.value = value

    def _export_action_(self, b: Optional[W.Button] = None, filename: Optional[str] = None) -> None:
        """Export results to an Excel file.

        Args:
            b: Button that triggered the action (not used).
            filename: Output filename. If None, uses a default name.
        """
        if filename is None:
            filename = "MINT__results.xlsx"
            filename = os.path.join(os.getcwd(), filename)
        self.export(filename)
        self._message_(f"\nExported results to: {filename}")

messages: List[str] property

Get the list of messages displayed in the message box.

Returns:

Type Description
List[str]

List of messages.

__init__(*args, **kwargs)

Initialize the interactive Mint instance.

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to the parent Mint class.

()
**kwargs Any

Keyword arguments passed to the parent Mint class.

{}
Source code in src/ms_mint/notebook.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the interactive Mint instance.

    Args:
        *args: Positional arguments passed to the parent Mint class.
        **kwargs: Keyword arguments passed to the parent Mint class.
    """
    self.progress_callback = self._set_progress_

    super().__init__(progress_callback=self.progress_callback, *args, **kwargs)

    # Initialize file chooser for MS files directory
    fc = FileChooser()
    fc.show_only_dirs = True
    fc.default_path = os.getcwd()
    self.ms_storage_path = fc

    # Target file upload widget
    self.target_files_button = W.FileUpload(
        description="Peaklists", accept="csv,xlsx", multiple=False
    )
    self.target_files_button.observe(self._load_target_from_bytes_, names="value")

    # Button to load MS files
    self.load_ms_button = W.Button(description="Load MS-files")
    self.load_ms_button.on_click(self._search_files_)

    # Message display area
    self.message_box = W.Textarea(
        value="",
        placeholder="Please, select ms-files define a target list.",
        description="",
        disabled=True,
        layout={"width": "90%", "height": "500px", "font_family": "monospace"},
    )

    # Processing buttons
    self.run_button = W.Button(description="Run")
    self.run_button.on_click(self._run_)
    self.run_button.style.button_color = "lightgray"

    self.download_button = W.Button(description="Export")
    self.download_button.on_click(self._export_action_)
    self.download_button.style.button_color = "lightgray"

    # Progress indicator
    self.progress_bar = W.IntProgress(
        min=0,
        max=100,
        layout=W.Layout(width="90%"),
        description="Progress:",
        bar_style="info",
    )

    self.output = W.Output()

    # Create tabs for file selection
    tabs = W.Tab()
    tabs.children = [
        W.HBox([self.ms_storage_path, self.load_ms_button]),
        W.HBox(
            [
                self.target_files_button,
            ]
        ),
    ]

    tabs.set_title(0, "MS-Files")
    tabs.set_title(1, "Peaklists")

    # Main layout
    self.layout = W.VBox(
        [
            tabs,
            self.message_box,
            W.HBox([self.run_button, self.download_button]),
            self.progress_bar,
        ]
    )

    self.tqdm = tqdm

display()

Display control elements in Jupyter notebook.

Returns:

Type Description
VBox

The main widget layout container.

Source code in src/ms_mint/notebook.py
def display(self) -> W.VBox:
    """Display control elements in Jupyter notebook.

    Returns:
        The main widget layout container.
    """
    display(HTML("<style>textarea, input { font-family: monospace; }</style>"))
    return self.layout

options: show_root_heading: true show_root_full_path: true show_submodules: true members_order: source

ms_mint.MintPlotter

MintPlotter

Plot generator for visualizing MS-MINT analysis results.

This class provides various visualization methods for metabolomics data processed by MS-MINT, including heatmaps, chromatograms, peak shapes, and 2D histograms.

Attributes:

Name Type Description
mint

The Mint instance containing data to be visualized.

Source code in src/ms_mint/MintPlotter.py
class MintPlotter:
    """Plot generator for visualizing MS-MINT analysis results.

    This class provides various visualization methods for metabolomics data processed
    by MS-MINT, including heatmaps, chromatograms, peak shapes, and 2D histograms.

    Attributes:
        mint: The Mint instance containing data to be visualized.
    """

    def __init__(self, mint: "ms_mint.Mint.Mint") -> None:
        """Initialize the MintPlotter with a Mint instance.

        Args:
            mint: Mint instance containing the data to visualize.
        """
        self.mint = mint

    def hierarchical_clustering(
        self,
        data: Optional[pd.DataFrame] = None,
        peak_labels: Optional[List[str]] = None,
        ms_files: Optional[List[str]] = None,
        title: Optional[str] = None,
        figsize: Tuple[int, int] = (8, 8),
        targets_var: Optional[str] = None,
        var_name: str = "peak_max",
        vmin: int = -3,
        vmax: int = 3,
        xmaxticks: Optional[int] = None,
        ymaxticks: Optional[int] = None,
        apply: str = "log2p1",
        metric: str = "cosine",
        scaler: str = "standard",
        groupby: Optional[str] = None,
        transposed: bool = False,
        **kwargs,
    ) -> matplotlib.figure.Figure:
        """Perform hierarchical clustering and plot a heatmap.

        If no data is provided, data is taken from self.mint.crosstab(var_name).
        The clustered non-transformed non-scaled data is stored in `self.mint.clustered`.

        Args:
            data: DataFrame with data to be used for clustering. If None, crosstab of
                mint instance is used.
            peak_labels: List of peak labels to include in the analysis.
            ms_files: List of MS files to include in the analysis.
            title: Title for the plot.
            figsize: Tuple of (width, height) in inches for the figure.
            targets_var: Deprecated, use var_name instead.
            var_name: Name of the column from data to be used for cell values in the heatmap.
            vmin: Minimum value for color scaling.
            vmax: Maximum value for color scaling.
            xmaxticks: Maximum number of ticks on x-axis.
            ymaxticks: Maximum number of ticks on y-axis.
            apply: Transformation to be applied on the data. Can be "log1p", "log2p1",
                "log10p1" or None.
            metric: The distance metric to use for the tree. Can be any metric supported
                by scipy.spatial.distance.pdist.
            scaler: Method to scale data along both axes. Can be "standard", "robust" or None.
            groupby: Name of the column to group data before scaling. If None, scaling is
                applied to the whole data, not group-wise.
            transposed: Whether to transpose the figure or not.
            **kwargs: Additional keyword arguments passed to hierarchical_clustering.

        Returns:
            Matplotlib figure representing the clustered heatmap.
        """
        if targets_var is not None:
            warnings.warn("targets_var is deprecated, use var_name instead", DeprecationWarning)
            var_name = targets_var

        warnings.simplefilter("ignore", ClusterWarning)
        if data is None:
            data = self.mint.crosstab(
                var_name=var_name, apply=apply, scaler=scaler, groupby=groupby
            )

        if transposed:
            data = data.T

        _, fig, ndx_x, ndx_y = hierarchical_clustering(
            data,
            vmin=vmin,
            vmax=vmax,
            figsize=figsize,
            xmaxticks=xmaxticks,
            ymaxticks=ymaxticks,
            metric=metric,
            **kwargs,
        )

        self.mint.clustered = data.iloc[ndx_x, ndx_y]

        return fig

    def peak_shapes(
        self,
        fns: Optional[Union[str, List[str]]] = None,
        peak_labels: Optional[Union[str, List[str]]] = None,
        interactive: bool = False,
        **kwargs,
    ) -> Union[sns.axisgrid.FacetGrid, PlotlyFigure]:
        """Plot peak shapes extracted from MS-MINT results.

        Args:
            fns: Filename(s) to include in the plot. If None, all files in results are used.
            peak_labels: Peak label(s) to include in the plot. If None, all peaks are used.
            interactive: If True, returns an interactive Plotly figure instead of a static
                Matplotlib figure.
            **kwargs: Additional keyword arguments passed to the underlying plotting functions.

        Returns:
            Either a seaborn FacetGrid or a Plotly figure depending on the 'interactive' parameter.
        """
        if peak_labels is None:
            peak_labels = self.mint.peak_labels

        if len(self.mint.results) > 0:
            if not interactive:
                return plot_peak_shapes(
                    self.mint.results,
                    mint_metadata=self.mint.meta,
                    fns=fns,
                    peak_labels=peak_labels,
                    **kwargs,
                )
            else:
                return plotly_peak_shapes(
                    self.mint.results,
                    mint_metadata=self.mint.meta,
                    fns=fns,
                    peak_labels=peak_labels,
                    **kwargs,
                )

    def heatmap(
        self,
        col_name: str = "peak_max",
        normed_by_cols: bool = True,
        transposed: bool = False,
        clustered: bool = False,
        add_dendrogram: bool = False,
        name: str = "",
        correlation: bool = False,
        **kwargs,
    ) -> Optional[PlotlyFigure]:
        """Create an interactive heatmap to explore the data.

        Calls mint.crosstab() and then visualizes the result using plotly_heatmap.

        Args:
            col_name: Name of the column in mint.results to be analyzed.
            normed_by_cols: Whether or not to normalize the columns in the crosstab.
            transposed: If True, transpose matrix before plotting.
            clustered: Whether or not to cluster the rows.
            add_dendrogram: Whether or not to replace row labels with a dendrogram.
            name: Label to use for the colorbar.
            correlation: If True, convert data to correlation matrix before plotting.
            **kwargs: Additional keyword arguments passed to plotly_heatmap.

        Returns:
            Interactive Plotly heatmap figure, or None if no results are available.
        """
        data = self.mint.crosstab(col_name)

        # Remove path and suffix from file name.
        transform_filenames_func = lambda x: P(x).with_suffix("").name
        data.index = [transform_filenames_func(i) for i in data.index]

        if len(self.mint.results) > 0:
            return plotly_heatmap(
                data,
                normed_by_cols=normed_by_cols,
                transposed=transposed,
                clustered=clustered,
                add_dendrogram=add_dendrogram,
                name=col_name if not name else name,
                correlation=correlation,
                **kwargs,
            )
        return None

    def histogram_2d(
        self,
        fn: str,
        peak_label: Optional[str] = None,
        rt_margin: float = 0,
        mz_margin: float = 0,
        **kwargs,
    ) -> matplotlib.figure.Figure:
        """Create a 2D histogram of an MS file.

        Args:
            fn: File name of the MS file to visualize.
            peak_label: Target to focus. If provided, the plot will highlight the region
                defined by the target parameters.
            rt_margin: Margin in retention time dimension to add around the target region.
            mz_margin: Margin in m/z dimension to add around the target region.
            **kwargs: Additional keyword arguments passed to plot_metabolomics_hist2d.

        Returns:
            Matplotlib Figure containing the 2D histogram.
        """
        df = ms_file_to_df(fn)
        mz_range, rt_range, rt_min, rt_max = None, None, None, None
        mz_min, mz_max = None, None

        if peak_label is not None:
            target_data = self.mint.targets.loc[peak_label]
            mz_mean, mz_width, rt_min, rt_max = target_data[
                ["mz_mean", "mz_width", "rt_min", "rt_max"]
            ]
            mz_min, mz_max = mz_mean_width_to_min_max(mz_mean, mz_width)
            mz_range = (mz_min - mz_margin, mz_max + mz_margin)
            rt_range = (rt_min - rt_margin, rt_max + rt_margin)

        fig = plot_metabolomics_hist2d(df, mz_range=mz_range, rt_range=rt_range, **kwargs)

        if rt_min is not None and mz_min is not None:
            plt.plot(
                [rt_min, rt_max, rt_max, rt_min, rt_min],
                [mz_min, mz_min, mz_max, mz_max, mz_min],
                color="w",
                ls="--",
                lw=0.5,
            )
        if peak_label is None:
            plt.title(f"{P(fn).with_suffix('').name}")
        else:
            plt.title(f"{P(fn).with_suffix('').name}\n{peak_label}")
        return fig

    def chromatogram(
        self,
        fns: Optional[Union[str, List[str]]] = None,
        peak_labels: Optional[Union[str, List[str]]] = None,
        interactive: bool = False,
        filters: Optional[List[Any]] = None,
        ax: Optional[plt.Axes] = None,
        **kwargs,
    ) -> Union[sns.axisgrid.FacetGrid, sns.axes._base.AxesBase, PlotlyFigure]:
        """Plot chromatograms extracted from one or more files.

        Args:
            fns: File name(s) to extract chromatograms from. If None, all files are used.
            peak_labels: Target(s) from Mint.targets.peak_label to use for extraction parameters.
                If None, all targets are used.
            interactive: If True, returns an interactive Plotly figure instead of a static Matplotlib figure.
            filters: List of filters to apply to the chromatograms before plotting.
            ax: Matplotlib axes to plot on. If None, a new figure is created.
            **kwargs: Additional keyword arguments passed to the underlying plotting functions.

        Returns:
            Either a seaborn FacetGrid, a single Axes, or a Plotly figure depending on
            the 'interactive' parameter and whether an 'ax' is provided.
        """
        if isinstance(fns, str):
            fns = [fns]

        if fns is not None:
            fns = tuple(fns)

        if isinstance(peak_labels, str):
            peak_labels = [peak_labels]

        if peak_labels is None:
            peak_labels = self.mint.peak_labels

        if peak_labels is not None:
            peak_labels = tuple(peak_labels)

        data = self.mint.get_chromatograms(fns=fns, peak_labels=peak_labels, filters=filters)

        if not interactive:
            params = dict(
                x="scan_time",
                y="intensity",
                col="peak_label",
                col_wrap=1,
                col_order=peak_labels,
                height=1.5,
                aspect=5,
                hue="ms_file_label",
                facet_kws=dict(sharey=False),
                marker=".",
                linewidth=0,
            )
            params.update(kwargs)

            if ax is None:
                g = sns.relplot(data=data, **params)

                for peak_label, ax in zip(peak_labels, g.axes.flatten()):
                    _, _, rt_min, rt_max = self.mint.get_target_params(peak_label)
                    if rt_min is not None and rt_max is not None:
                        ax.axvspan(rt_min, rt_max, color="lightgreen", alpha=0.5, zorder=-1)
                    ax.ticklabel_format(style="sci", axis="y", useOffset=False, scilimits=(0, 0))
                g.set_titles(template="{col_name}")

            else:
                g = sns.lineplot(
                    data=data, x="scan_time", y="intensity", hue="ms_file_label", ax=ax, **kwargs
                )
            return g

        else:
            g = px.line(
                data_frame=data,
                x="scan_time",
                y="intensity",
                facet_col="peak_label",
                color="ms_file_label",
                height=700,
                facet_col_wrap=1,
            )
            g.update_xaxes(matches=None)
            g.update_yaxes(matches=None)
            return g

__init__(mint)

Initialize the MintPlotter with a Mint instance.

Parameters:

Name Type Description Default
mint 'ms_mint.Mint.Mint'

Mint instance containing the data to visualize.

required
Source code in src/ms_mint/MintPlotter.py
def __init__(self, mint: "ms_mint.Mint.Mint") -> None:
    """Initialize the MintPlotter with a Mint instance.

    Args:
        mint: Mint instance containing the data to visualize.
    """
    self.mint = mint

hierarchical_clustering(data=None, peak_labels=None, ms_files=None, title=None, figsize=(8, 8), targets_var=None, var_name='peak_max', vmin=-3, vmax=3, xmaxticks=None, ymaxticks=None, apply='log2p1', metric='cosine', scaler='standard', groupby=None, transposed=False, **kwargs)

Perform hierarchical clustering and plot a heatmap.

If no data is provided, data is taken from self.mint.crosstab(var_name). The clustered non-transformed non-scaled data is stored in self.mint.clustered.

Parameters:

Name Type Description Default
data Optional[DataFrame]

DataFrame with data to be used for clustering. If None, crosstab of mint instance is used.

None
peak_labels Optional[List[str]]

List of peak labels to include in the analysis.

None
ms_files Optional[List[str]]

List of MS files to include in the analysis.

None
title Optional[str]

Title for the plot.

None
figsize Tuple[int, int]

Tuple of (width, height) in inches for the figure.

(8, 8)
targets_var Optional[str]

Deprecated, use var_name instead.

None
var_name str

Name of the column from data to be used for cell values in the heatmap.

'peak_max'
vmin int

Minimum value for color scaling.

-3
vmax int

Maximum value for color scaling.

3
xmaxticks Optional[int]

Maximum number of ticks on x-axis.

None
ymaxticks Optional[int]

Maximum number of ticks on y-axis.

None
apply str

Transformation to be applied on the data. Can be "log1p", "log2p1", "log10p1" or None.

'log2p1'
metric str

The distance metric to use for the tree. Can be any metric supported by scipy.spatial.distance.pdist.

'cosine'
scaler str

Method to scale data along both axes. Can be "standard", "robust" or None.

'standard'
groupby Optional[str]

Name of the column to group data before scaling. If None, scaling is applied to the whole data, not group-wise.

None
transposed bool

Whether to transpose the figure or not.

False
**kwargs

Additional keyword arguments passed to hierarchical_clustering.

{}

Returns:

Type Description
Figure

Matplotlib figure representing the clustered heatmap.

Source code in src/ms_mint/MintPlotter.py
def hierarchical_clustering(
    self,
    data: Optional[pd.DataFrame] = None,
    peak_labels: Optional[List[str]] = None,
    ms_files: Optional[List[str]] = None,
    title: Optional[str] = None,
    figsize: Tuple[int, int] = (8, 8),
    targets_var: Optional[str] = None,
    var_name: str = "peak_max",
    vmin: int = -3,
    vmax: int = 3,
    xmaxticks: Optional[int] = None,
    ymaxticks: Optional[int] = None,
    apply: str = "log2p1",
    metric: str = "cosine",
    scaler: str = "standard",
    groupby: Optional[str] = None,
    transposed: bool = False,
    **kwargs,
) -> matplotlib.figure.Figure:
    """Perform hierarchical clustering and plot a heatmap.

    If no data is provided, data is taken from self.mint.crosstab(var_name).
    The clustered non-transformed non-scaled data is stored in `self.mint.clustered`.

    Args:
        data: DataFrame with data to be used for clustering. If None, crosstab of
            mint instance is used.
        peak_labels: List of peak labels to include in the analysis.
        ms_files: List of MS files to include in the analysis.
        title: Title for the plot.
        figsize: Tuple of (width, height) in inches for the figure.
        targets_var: Deprecated, use var_name instead.
        var_name: Name of the column from data to be used for cell values in the heatmap.
        vmin: Minimum value for color scaling.
        vmax: Maximum value for color scaling.
        xmaxticks: Maximum number of ticks on x-axis.
        ymaxticks: Maximum number of ticks on y-axis.
        apply: Transformation to be applied on the data. Can be "log1p", "log2p1",
            "log10p1" or None.
        metric: The distance metric to use for the tree. Can be any metric supported
            by scipy.spatial.distance.pdist.
        scaler: Method to scale data along both axes. Can be "standard", "robust" or None.
        groupby: Name of the column to group data before scaling. If None, scaling is
            applied to the whole data, not group-wise.
        transposed: Whether to transpose the figure or not.
        **kwargs: Additional keyword arguments passed to hierarchical_clustering.

    Returns:
        Matplotlib figure representing the clustered heatmap.
    """
    if targets_var is not None:
        warnings.warn("targets_var is deprecated, use var_name instead", DeprecationWarning)
        var_name = targets_var

    warnings.simplefilter("ignore", ClusterWarning)
    if data is None:
        data = self.mint.crosstab(
            var_name=var_name, apply=apply, scaler=scaler, groupby=groupby
        )

    if transposed:
        data = data.T

    _, fig, ndx_x, ndx_y = hierarchical_clustering(
        data,
        vmin=vmin,
        vmax=vmax,
        figsize=figsize,
        xmaxticks=xmaxticks,
        ymaxticks=ymaxticks,
        metric=metric,
        **kwargs,
    )

    self.mint.clustered = data.iloc[ndx_x, ndx_y]

    return fig

peak_shapes(fns=None, peak_labels=None, interactive=False, **kwargs)

Plot peak shapes extracted from MS-MINT results.

Parameters:

Name Type Description Default
fns Optional[Union[str, List[str]]]

Filename(s) to include in the plot. If None, all files in results are used.

None
peak_labels Optional[Union[str, List[str]]]

Peak label(s) to include in the plot. If None, all peaks are used.

None
interactive bool

If True, returns an interactive Plotly figure instead of a static Matplotlib figure.

False
**kwargs

Additional keyword arguments passed to the underlying plotting functions.

{}

Returns:

Type Description
Union[FacetGrid, Figure]

Either a seaborn FacetGrid or a Plotly figure depending on the 'interactive' parameter.

Source code in src/ms_mint/MintPlotter.py
def peak_shapes(
    self,
    fns: Optional[Union[str, List[str]]] = None,
    peak_labels: Optional[Union[str, List[str]]] = None,
    interactive: bool = False,
    **kwargs,
) -> Union[sns.axisgrid.FacetGrid, PlotlyFigure]:
    """Plot peak shapes extracted from MS-MINT results.

    Args:
        fns: Filename(s) to include in the plot. If None, all files in results are used.
        peak_labels: Peak label(s) to include in the plot. If None, all peaks are used.
        interactive: If True, returns an interactive Plotly figure instead of a static
            Matplotlib figure.
        **kwargs: Additional keyword arguments passed to the underlying plotting functions.

    Returns:
        Either a seaborn FacetGrid or a Plotly figure depending on the 'interactive' parameter.
    """
    if peak_labels is None:
        peak_labels = self.mint.peak_labels

    if len(self.mint.results) > 0:
        if not interactive:
            return plot_peak_shapes(
                self.mint.results,
                mint_metadata=self.mint.meta,
                fns=fns,
                peak_labels=peak_labels,
                **kwargs,
            )
        else:
            return plotly_peak_shapes(
                self.mint.results,
                mint_metadata=self.mint.meta,
                fns=fns,
                peak_labels=peak_labels,
                **kwargs,
            )

heatmap(col_name='peak_max', normed_by_cols=True, transposed=False, clustered=False, add_dendrogram=False, name='', correlation=False, **kwargs)

Create an interactive heatmap to explore the data.

Calls mint.crosstab() and then visualizes the result using plotly_heatmap.

Parameters:

Name Type Description Default
col_name str

Name of the column in mint.results to be analyzed.

'peak_max'
normed_by_cols bool

Whether or not to normalize the columns in the crosstab.

True
transposed bool

If True, transpose matrix before plotting.

False
clustered bool

Whether or not to cluster the rows.

False
add_dendrogram bool

Whether or not to replace row labels with a dendrogram.

False
name str

Label to use for the colorbar.

''
correlation bool

If True, convert data to correlation matrix before plotting.

False
**kwargs

Additional keyword arguments passed to plotly_heatmap.

{}

Returns:

Type Description
Optional[Figure]

Interactive Plotly heatmap figure, or None if no results are available.

Source code in src/ms_mint/MintPlotter.py
def heatmap(
    self,
    col_name: str = "peak_max",
    normed_by_cols: bool = True,
    transposed: bool = False,
    clustered: bool = False,
    add_dendrogram: bool = False,
    name: str = "",
    correlation: bool = False,
    **kwargs,
) -> Optional[PlotlyFigure]:
    """Create an interactive heatmap to explore the data.

    Calls mint.crosstab() and then visualizes the result using plotly_heatmap.

    Args:
        col_name: Name of the column in mint.results to be analyzed.
        normed_by_cols: Whether or not to normalize the columns in the crosstab.
        transposed: If True, transpose matrix before plotting.
        clustered: Whether or not to cluster the rows.
        add_dendrogram: Whether or not to replace row labels with a dendrogram.
        name: Label to use for the colorbar.
        correlation: If True, convert data to correlation matrix before plotting.
        **kwargs: Additional keyword arguments passed to plotly_heatmap.

    Returns:
        Interactive Plotly heatmap figure, or None if no results are available.
    """
    data = self.mint.crosstab(col_name)

    # Remove path and suffix from file name.
    transform_filenames_func = lambda x: P(x).with_suffix("").name
    data.index = [transform_filenames_func(i) for i in data.index]

    if len(self.mint.results) > 0:
        return plotly_heatmap(
            data,
            normed_by_cols=normed_by_cols,
            transposed=transposed,
            clustered=clustered,
            add_dendrogram=add_dendrogram,
            name=col_name if not name else name,
            correlation=correlation,
            **kwargs,
        )
    return None

histogram_2d(fn, peak_label=None, rt_margin=0, mz_margin=0, **kwargs)

Create a 2D histogram of an MS file.

Parameters:

Name Type Description Default
fn str

File name of the MS file to visualize.

required
peak_label Optional[str]

Target to focus. If provided, the plot will highlight the region defined by the target parameters.

None
rt_margin float

Margin in retention time dimension to add around the target region.

0
mz_margin float

Margin in m/z dimension to add around the target region.

0
**kwargs

Additional keyword arguments passed to plot_metabolomics_hist2d.

{}

Returns:

Type Description
Figure

Matplotlib Figure containing the 2D histogram.

Source code in src/ms_mint/MintPlotter.py
def histogram_2d(
    self,
    fn: str,
    peak_label: Optional[str] = None,
    rt_margin: float = 0,
    mz_margin: float = 0,
    **kwargs,
) -> matplotlib.figure.Figure:
    """Create a 2D histogram of an MS file.

    Args:
        fn: File name of the MS file to visualize.
        peak_label: Target to focus. If provided, the plot will highlight the region
            defined by the target parameters.
        rt_margin: Margin in retention time dimension to add around the target region.
        mz_margin: Margin in m/z dimension to add around the target region.
        **kwargs: Additional keyword arguments passed to plot_metabolomics_hist2d.

    Returns:
        Matplotlib Figure containing the 2D histogram.
    """
    df = ms_file_to_df(fn)
    mz_range, rt_range, rt_min, rt_max = None, None, None, None
    mz_min, mz_max = None, None

    if peak_label is not None:
        target_data = self.mint.targets.loc[peak_label]
        mz_mean, mz_width, rt_min, rt_max = target_data[
            ["mz_mean", "mz_width", "rt_min", "rt_max"]
        ]
        mz_min, mz_max = mz_mean_width_to_min_max(mz_mean, mz_width)
        mz_range = (mz_min - mz_margin, mz_max + mz_margin)
        rt_range = (rt_min - rt_margin, rt_max + rt_margin)

    fig = plot_metabolomics_hist2d(df, mz_range=mz_range, rt_range=rt_range, **kwargs)

    if rt_min is not None and mz_min is not None:
        plt.plot(
            [rt_min, rt_max, rt_max, rt_min, rt_min],
            [mz_min, mz_min, mz_max, mz_max, mz_min],
            color="w",
            ls="--",
            lw=0.5,
        )
    if peak_label is None:
        plt.title(f"{P(fn).with_suffix('').name}")
    else:
        plt.title(f"{P(fn).with_suffix('').name}\n{peak_label}")
    return fig

chromatogram(fns=None, peak_labels=None, interactive=False, filters=None, ax=None, **kwargs)

Plot chromatograms extracted from one or more files.

Parameters:

Name Type Description Default
fns Optional[Union[str, List[str]]]

File name(s) to extract chromatograms from. If None, all files are used.

None
peak_labels Optional[Union[str, List[str]]]

Target(s) from Mint.targets.peak_label to use for extraction parameters. If None, all targets are used.

None
interactive bool

If True, returns an interactive Plotly figure instead of a static Matplotlib figure.

False
filters Optional[List[Any]]

List of filters to apply to the chromatograms before plotting.

None
ax Optional[Axes]

Matplotlib axes to plot on. If None, a new figure is created.

None
**kwargs

Additional keyword arguments passed to the underlying plotting functions.

{}

Returns:

Type Description
Union[FacetGrid, AxesBase, Figure]

Either a seaborn FacetGrid, a single Axes, or a Plotly figure depending on

Union[FacetGrid, AxesBase, Figure]

the 'interactive' parameter and whether an 'ax' is provided.

Source code in src/ms_mint/MintPlotter.py
def chromatogram(
    self,
    fns: Optional[Union[str, List[str]]] = None,
    peak_labels: Optional[Union[str, List[str]]] = None,
    interactive: bool = False,
    filters: Optional[List[Any]] = None,
    ax: Optional[plt.Axes] = None,
    **kwargs,
) -> Union[sns.axisgrid.FacetGrid, sns.axes._base.AxesBase, PlotlyFigure]:
    """Plot chromatograms extracted from one or more files.

    Args:
        fns: File name(s) to extract chromatograms from. If None, all files are used.
        peak_labels: Target(s) from Mint.targets.peak_label to use for extraction parameters.
            If None, all targets are used.
        interactive: If True, returns an interactive Plotly figure instead of a static Matplotlib figure.
        filters: List of filters to apply to the chromatograms before plotting.
        ax: Matplotlib axes to plot on. If None, a new figure is created.
        **kwargs: Additional keyword arguments passed to the underlying plotting functions.

    Returns:
        Either a seaborn FacetGrid, a single Axes, or a Plotly figure depending on
        the 'interactive' parameter and whether an 'ax' is provided.
    """
    if isinstance(fns, str):
        fns = [fns]

    if fns is not None:
        fns = tuple(fns)

    if isinstance(peak_labels, str):
        peak_labels = [peak_labels]

    if peak_labels is None:
        peak_labels = self.mint.peak_labels

    if peak_labels is not None:
        peak_labels = tuple(peak_labels)

    data = self.mint.get_chromatograms(fns=fns, peak_labels=peak_labels, filters=filters)

    if not interactive:
        params = dict(
            x="scan_time",
            y="intensity",
            col="peak_label",
            col_wrap=1,
            col_order=peak_labels,
            height=1.5,
            aspect=5,
            hue="ms_file_label",
            facet_kws=dict(sharey=False),
            marker=".",
            linewidth=0,
        )
        params.update(kwargs)

        if ax is None:
            g = sns.relplot(data=data, **params)

            for peak_label, ax in zip(peak_labels, g.axes.flatten()):
                _, _, rt_min, rt_max = self.mint.get_target_params(peak_label)
                if rt_min is not None and rt_max is not None:
                    ax.axvspan(rt_min, rt_max, color="lightgreen", alpha=0.5, zorder=-1)
                ax.ticklabel_format(style="sci", axis="y", useOffset=False, scilimits=(0, 0))
            g.set_titles(template="{col_name}")

        else:
            g = sns.lineplot(
                data=data, x="scan_time", y="intensity", hue="ms_file_label", ax=ax, **kwargs
            )
        return g

    else:
        g = px.line(
            data_frame=data,
            x="scan_time",
            y="intensity",
            facet_col="peak_label",
            color="ms_file_label",
            height=700,
            facet_col_wrap=1,
        )
        g.update_xaxes(matches=None)
        g.update_yaxes(matches=None)
        return g

ms_mint.TargetOptimizer

TargetOptimizer

Optimizer for MS-MINT target lists.

This class provides methods to optimize retention time parameters in target lists based on actual data from MS files.

Attributes:

Name Type Description
mint

Mint instance to optimize.

results

Results of the most recent optimization.

Source code in src/ms_mint/TargetOptimizer.py
class TargetOptimizer:
    """Optimizer for MS-MINT target lists.

    This class provides methods to optimize retention time parameters
    in target lists based on actual data from MS files.

    Attributes:
        mint: Mint instance to optimize.
        results: Results of the most recent optimization.
    """

    def __init__(self, mint: Optional["ms_mint.Mint.Mint"] = None) -> None:
        """Initialize a TargetOptimizer instance.

        Args:
            mint: Mint instance to optimize.
        """
        self.mint = mint
        self.reset()

    def reset(self) -> "TargetOptimizer":
        """Reset the optimizer results.

        Returns:
            Self for method chaining.
        """
        self.results: Optional[pd.DataFrame] = None
        return self

    def rt_min_max(
        self,
        fns: Optional[List[Union[str, P]]] = None,
        targets: Optional[pd.DataFrame] = None,
        peak_labels: Optional[List[str]] = None,
        minimum_intensity: float = 1e4,
        plot: bool = False,
        sigma: float = 20,
        filters: Optional[List[Any]] = None,
        post_opt: bool = False,
        post_opt_kwargs: Optional[Dict[str, Any]] = None,
        rel_height: float = 0.9,
        height: int = 3,
        aspect: int = 2,
        col_wrap: int = 3,
        **kwargs,
    ) -> Union[Tuple["ms_mint.Mint.Mint", Figure], "ms_mint.Mint.Mint"]:
        """Optimize rt_min and rt_max values based on expected retention times.

        For this optimization all rt values in the target list must be present.
        This method analyzes chromatograms to find peaks around expected retention
        times and sets optimal rt_min and rt_max values.

        Args:
            fns: List of filenames to use for optimization. If None, uses all files in mint.
            targets: Target list to optimize. If None, uses mint.targets.
            peak_labels: Subset of peak_labels to optimize. If None, optimizes all targets.
            minimum_intensity: Minimum intensity required, otherwise skip target.
            plot: Whether to plot optimizations (up to 1000 plots).
            sigma: Sigma value for peak selection (Gaussian weighting parameter).
            filters: Filter instances to apply in respective order.
            post_opt: Whether to optimize retention times after peak selection.
            post_opt_kwargs: Parameters for post-optimization.
            rel_height: Relative height for peak width determination.
            height: Height of each subplot in inches.
            aspect: Width-to-height ratio of each subplot.
            col_wrap: Maximum number of columns in the plot.
            **kwargs: Additional parameters passed to find_peaks method.

        Returns:
            If plot=True, returns a tuple of (mint instance, matplotlib figure).
            If plot=False, returns only the mint instance.
        """
        if targets is None:
            targets = self.mint.targets.reset_index()

        if fns is None:
            fns = self.mint.ms_files

        if peak_labels is None:
            peak_labels = targets.peak_label.values

        _targets = targets.set_index("peak_label").copy()

        ms1 = pd.concat(
            [ms_file_to_df(fn) for fn in self.mint.tqdm(fns, desc="Reading files")]
        ).sort_values(["scan_time", "mz"])

        if plot:
            n_rows = int(np.ceil(len(peak_labels) / col_wrap))
            fig = plt.figure(figsize=(col_wrap * height * aspect, n_rows * height))

        i = 0
        for peak_label, row in self.mint.tqdm(
            _targets.iterrows(), total=len(targets), desc="Optimizing targets"
        ):
            if peak_label not in peak_labels:
                logging.warning(f"{peak_label} not in {peak_labels}")
                continue

            mz = row.mz_mean
            rt = row.rt

            _slice = extract_chromatogram_from_ms1(ms1, mz).groupby("scan_time").sum()

            chrom = Chromatogram(_slice.index, _slice.values, expected_rt=rt, filters=filters)

            if chrom.x.max() < minimum_intensity:
                logging.warning(
                    f"Peak intensity for {peak_label} below threshold ({minimum_intensity})"
                )
                continue

            chrom.apply_filters()
            chrom.find_peaks(rel_height=rel_height, **kwargs)
            chrom.select_peak_with_gaussian_weight(rt, sigma)

            if post_opt:
                if post_opt_kwargs is None:
                    post_opt_kwargs = {}
                chrom.optimise_peak_times_with_diff(**post_opt_kwargs)

            if chrom.selected_peak_ndxs is None or len(chrom.selected_peak_ndxs) == 0:
                logging.warning(f"No peaks detected for {peak_label}")
                continue

            ndx = chrom.selected_peak_ndxs[0]
            rt_min = chrom.peaks.at[ndx, "rt_min"]
            rt_max = chrom.peaks.at[ndx, "rt_max"]

            _targets.loc[peak_label, ["rt_min", "rt_max"]] = rt_min, rt_max

            if plot:
                i += 1

                if i <= 1000:
                    plt.subplot(n_rows, col_wrap, i)
                    chrom.plot()
                    plt.gca().get_legend().remove()
                    plt.title(f"{peak_label}\nm/z={mz:.3f}")

        self.results = _targets.reset_index()

        if self.mint is not None:
            self.mint.targets = self.results

        if plot:
            plt.tight_layout()
            return self.mint, fig
        else:
            return self.mint

    def detect_largest_peak_rt(
        self,
        fns: Optional[List[Union[str, P]]] = None,
        targets: Optional[pd.DataFrame] = None,
        peak_labels: Optional[List[str]] = None,
        minimum_intensity: float = 1e4,
        plot: bool = False,
        height: int = 3,
        aspect: int = 2,
        col_wrap: int = 3,
        **kwargs,
    ) -> Union[Tuple["ms_mint.Mint.Mint", Figure], "ms_mint.Mint.Mint"]:
        """Detect the largest peak and set the RT value (not RT_min and RT_max).

        Uses a simple maximum intensity approach rather than complex peak detection
        to find the retention time of the most intense peak for each target.

        Args:
            fns: List of filenames to use for peak detection. If None, uses all files in mint.
            targets: Target list to update. If None, uses mint.targets.
            peak_labels: Subset of peak_labels to update. If None, updates all targets.
            minimum_intensity: Minimum intensity required, otherwise skip target.
            plot: Whether to plot results (up to 100 plots).
            height: Height of each subplot in inches.
            aspect: Width-to-height ratio of each subplot.
            col_wrap: Maximum number of columns in the plot.
            **kwargs: Additional parameters (not used but accepted for compatibility).

        Returns:
            If plot=True, returns a tuple of (mint instance, matplotlib figure).
            If plot=False, returns only the mint instance.
        """
        if targets is None:
            targets = self.mint.targets.reset_index()

        if fns is None:
            fns = self.mint.ms_files

        if peak_labels is None:
            peak_labels = targets.peak_label.values

        _targets = targets.set_index("peak_label").copy()

        ms1 = pd.concat(
            [ms_file_to_df(fn) for fn in self.mint.tqdm(fns, desc="Reading files")]
        ).sort_values(["scan_time", "mz"])

        if plot:
            n_rows = int(np.ceil(min(len(peak_labels), 100) / col_wrap))
            fig = plt.figure(figsize=(col_wrap * height * aspect, n_rows * height))

        i = 0
        for peak_label, row in self.mint.tqdm(
            _targets.iterrows(), total=len(targets), desc="Detecting largest peaks"
        ):
            if peak_label not in peak_labels:
                logging.warning(f"{peak_label} not in {peak_labels}")
                continue

            mz = row.mz_mean
            mz_width = row.mz_width if "mz_width" in row else 0.01  # Default width if not present

            # Extract chromatogram
            try:
                _slice = extract_chromatogram_from_ms1(
                    ms1, mz, mz_width if "mz_width" in row else None
                )
                if len(_slice) == 0:
                    logging.warning(f"No data points found for {peak_label}")
                    continue

                chrom_data = _slice.groupby("scan_time").sum()

                # Simple approach: find the scan time with maximum intensity
                if chrom_data.values.max() < minimum_intensity:
                    logging.warning(
                        f"Peak intensity for {peak_label} below threshold ({minimum_intensity})"
                    )
                    continue

                # Get the retention time with the maximum intensity
                max_intensity_idx = chrom_data.values.argmax()
                new_rt = chrom_data.index[max_intensity_idx]

                # Update only the RT value
                _targets.loc[peak_label, "rt"] = new_rt

                if plot and i < 100:  # Only plot first 100
                    i += 1
                    plt.subplot(n_rows, col_wrap, i)
                    plt.plot(chrom_data.index, chrom_data.values)
                    plt.axvline(new_rt, color="red", linestyle="--")
                    plt.title(f"{peak_label}\nm/z={mz:.3f}\nRT={new_rt:.1f}")

            except Exception as e:
                logging.error(f"Error processing {peak_label}: {str(e)}")
                continue

        self.results = _targets.reset_index()

        if self.mint is not None:
            self.mint.targets = self.results

        if plot:
            plt.tight_layout()
            return self.mint, fig
        else:
            return self.mint

__init__(mint=None)

Initialize a TargetOptimizer instance.

Parameters:

Name Type Description Default
mint Optional['ms_mint.Mint.Mint']

Mint instance to optimize.

None
Source code in src/ms_mint/TargetOptimizer.py
def __init__(self, mint: Optional["ms_mint.Mint.Mint"] = None) -> None:
    """Initialize a TargetOptimizer instance.

    Args:
        mint: Mint instance to optimize.
    """
    self.mint = mint
    self.reset()

reset()

Reset the optimizer results.

Returns:

Type Description
'TargetOptimizer'

Self for method chaining.

Source code in src/ms_mint/TargetOptimizer.py
def reset(self) -> "TargetOptimizer":
    """Reset the optimizer results.

    Returns:
        Self for method chaining.
    """
    self.results: Optional[pd.DataFrame] = None
    return self

rt_min_max(fns=None, targets=None, peak_labels=None, minimum_intensity=10000.0, plot=False, sigma=20, filters=None, post_opt=False, post_opt_kwargs=None, rel_height=0.9, height=3, aspect=2, col_wrap=3, **kwargs)

Optimize rt_min and rt_max values based on expected retention times.

For this optimization all rt values in the target list must be present. This method analyzes chromatograms to find peaks around expected retention times and sets optimal rt_min and rt_max values.

Parameters:

Name Type Description Default
fns Optional[List[Union[str, Path]]]

List of filenames to use for optimization. If None, uses all files in mint.

None
targets Optional[DataFrame]

Target list to optimize. If None, uses mint.targets.

None
peak_labels Optional[List[str]]

Subset of peak_labels to optimize. If None, optimizes all targets.

None
minimum_intensity float

Minimum intensity required, otherwise skip target.

10000.0
plot bool

Whether to plot optimizations (up to 1000 plots).

False
sigma float

Sigma value for peak selection (Gaussian weighting parameter).

20
filters Optional[List[Any]]

Filter instances to apply in respective order.

None
post_opt bool

Whether to optimize retention times after peak selection.

False
post_opt_kwargs Optional[Dict[str, Any]]

Parameters for post-optimization.

None
rel_height float

Relative height for peak width determination.

0.9
height int

Height of each subplot in inches.

3
aspect int

Width-to-height ratio of each subplot.

2
col_wrap int

Maximum number of columns in the plot.

3
**kwargs

Additional parameters passed to find_peaks method.

{}

Returns:

Type Description
Union[Tuple['ms_mint.Mint.Mint', Figure], 'ms_mint.Mint.Mint']

If plot=True, returns a tuple of (mint instance, matplotlib figure).

Union[Tuple['ms_mint.Mint.Mint', Figure], 'ms_mint.Mint.Mint']

If plot=False, returns only the mint instance.

Source code in src/ms_mint/TargetOptimizer.py
def rt_min_max(
    self,
    fns: Optional[List[Union[str, P]]] = None,
    targets: Optional[pd.DataFrame] = None,
    peak_labels: Optional[List[str]] = None,
    minimum_intensity: float = 1e4,
    plot: bool = False,
    sigma: float = 20,
    filters: Optional[List[Any]] = None,
    post_opt: bool = False,
    post_opt_kwargs: Optional[Dict[str, Any]] = None,
    rel_height: float = 0.9,
    height: int = 3,
    aspect: int = 2,
    col_wrap: int = 3,
    **kwargs,
) -> Union[Tuple["ms_mint.Mint.Mint", Figure], "ms_mint.Mint.Mint"]:
    """Optimize rt_min and rt_max values based on expected retention times.

    For this optimization all rt values in the target list must be present.
    This method analyzes chromatograms to find peaks around expected retention
    times and sets optimal rt_min and rt_max values.

    Args:
        fns: List of filenames to use for optimization. If None, uses all files in mint.
        targets: Target list to optimize. If None, uses mint.targets.
        peak_labels: Subset of peak_labels to optimize. If None, optimizes all targets.
        minimum_intensity: Minimum intensity required, otherwise skip target.
        plot: Whether to plot optimizations (up to 1000 plots).
        sigma: Sigma value for peak selection (Gaussian weighting parameter).
        filters: Filter instances to apply in respective order.
        post_opt: Whether to optimize retention times after peak selection.
        post_opt_kwargs: Parameters for post-optimization.
        rel_height: Relative height for peak width determination.
        height: Height of each subplot in inches.
        aspect: Width-to-height ratio of each subplot.
        col_wrap: Maximum number of columns in the plot.
        **kwargs: Additional parameters passed to find_peaks method.

    Returns:
        If plot=True, returns a tuple of (mint instance, matplotlib figure).
        If plot=False, returns only the mint instance.
    """
    if targets is None:
        targets = self.mint.targets.reset_index()

    if fns is None:
        fns = self.mint.ms_files

    if peak_labels is None:
        peak_labels = targets.peak_label.values

    _targets = targets.set_index("peak_label").copy()

    ms1 = pd.concat(
        [ms_file_to_df(fn) for fn in self.mint.tqdm(fns, desc="Reading files")]
    ).sort_values(["scan_time", "mz"])

    if plot:
        n_rows = int(np.ceil(len(peak_labels) / col_wrap))
        fig = plt.figure(figsize=(col_wrap * height * aspect, n_rows * height))

    i = 0
    for peak_label, row in self.mint.tqdm(
        _targets.iterrows(), total=len(targets), desc="Optimizing targets"
    ):
        if peak_label not in peak_labels:
            logging.warning(f"{peak_label} not in {peak_labels}")
            continue

        mz = row.mz_mean
        rt = row.rt

        _slice = extract_chromatogram_from_ms1(ms1, mz).groupby("scan_time").sum()

        chrom = Chromatogram(_slice.index, _slice.values, expected_rt=rt, filters=filters)

        if chrom.x.max() < minimum_intensity:
            logging.warning(
                f"Peak intensity for {peak_label} below threshold ({minimum_intensity})"
            )
            continue

        chrom.apply_filters()
        chrom.find_peaks(rel_height=rel_height, **kwargs)
        chrom.select_peak_with_gaussian_weight(rt, sigma)

        if post_opt:
            if post_opt_kwargs is None:
                post_opt_kwargs = {}
            chrom.optimise_peak_times_with_diff(**post_opt_kwargs)

        if chrom.selected_peak_ndxs is None or len(chrom.selected_peak_ndxs) == 0:
            logging.warning(f"No peaks detected for {peak_label}")
            continue

        ndx = chrom.selected_peak_ndxs[0]
        rt_min = chrom.peaks.at[ndx, "rt_min"]
        rt_max = chrom.peaks.at[ndx, "rt_max"]

        _targets.loc[peak_label, ["rt_min", "rt_max"]] = rt_min, rt_max

        if plot:
            i += 1

            if i <= 1000:
                plt.subplot(n_rows, col_wrap, i)
                chrom.plot()
                plt.gca().get_legend().remove()
                plt.title(f"{peak_label}\nm/z={mz:.3f}")

    self.results = _targets.reset_index()

    if self.mint is not None:
        self.mint.targets = self.results

    if plot:
        plt.tight_layout()
        return self.mint, fig
    else:
        return self.mint

detect_largest_peak_rt(fns=None, targets=None, peak_labels=None, minimum_intensity=10000.0, plot=False, height=3, aspect=2, col_wrap=3, **kwargs)

Detect the largest peak and set the RT value (not RT_min and RT_max).

Uses a simple maximum intensity approach rather than complex peak detection to find the retention time of the most intense peak for each target.

Parameters:

Name Type Description Default
fns Optional[List[Union[str, Path]]]

List of filenames to use for peak detection. If None, uses all files in mint.

None
targets Optional[DataFrame]

Target list to update. If None, uses mint.targets.

None
peak_labels Optional[List[str]]

Subset of peak_labels to update. If None, updates all targets.

None
minimum_intensity float

Minimum intensity required, otherwise skip target.

10000.0
plot bool

Whether to plot results (up to 100 plots).

False
height int

Height of each subplot in inches.

3
aspect int

Width-to-height ratio of each subplot.

2
col_wrap int

Maximum number of columns in the plot.

3
**kwargs

Additional parameters (not used but accepted for compatibility).

{}

Returns:

Type Description
Union[Tuple['ms_mint.Mint.Mint', Figure], 'ms_mint.Mint.Mint']

If plot=True, returns a tuple of (mint instance, matplotlib figure).

Union[Tuple['ms_mint.Mint.Mint', Figure], 'ms_mint.Mint.Mint']

If plot=False, returns only the mint instance.

Source code in src/ms_mint/TargetOptimizer.py
def detect_largest_peak_rt(
    self,
    fns: Optional[List[Union[str, P]]] = None,
    targets: Optional[pd.DataFrame] = None,
    peak_labels: Optional[List[str]] = None,
    minimum_intensity: float = 1e4,
    plot: bool = False,
    height: int = 3,
    aspect: int = 2,
    col_wrap: int = 3,
    **kwargs,
) -> Union[Tuple["ms_mint.Mint.Mint", Figure], "ms_mint.Mint.Mint"]:
    """Detect the largest peak and set the RT value (not RT_min and RT_max).

    Uses a simple maximum intensity approach rather than complex peak detection
    to find the retention time of the most intense peak for each target.

    Args:
        fns: List of filenames to use for peak detection. If None, uses all files in mint.
        targets: Target list to update. If None, uses mint.targets.
        peak_labels: Subset of peak_labels to update. If None, updates all targets.
        minimum_intensity: Minimum intensity required, otherwise skip target.
        plot: Whether to plot results (up to 100 plots).
        height: Height of each subplot in inches.
        aspect: Width-to-height ratio of each subplot.
        col_wrap: Maximum number of columns in the plot.
        **kwargs: Additional parameters (not used but accepted for compatibility).

    Returns:
        If plot=True, returns a tuple of (mint instance, matplotlib figure).
        If plot=False, returns only the mint instance.
    """
    if targets is None:
        targets = self.mint.targets.reset_index()

    if fns is None:
        fns = self.mint.ms_files

    if peak_labels is None:
        peak_labels = targets.peak_label.values

    _targets = targets.set_index("peak_label").copy()

    ms1 = pd.concat(
        [ms_file_to_df(fn) for fn in self.mint.tqdm(fns, desc="Reading files")]
    ).sort_values(["scan_time", "mz"])

    if plot:
        n_rows = int(np.ceil(min(len(peak_labels), 100) / col_wrap))
        fig = plt.figure(figsize=(col_wrap * height * aspect, n_rows * height))

    i = 0
    for peak_label, row in self.mint.tqdm(
        _targets.iterrows(), total=len(targets), desc="Detecting largest peaks"
    ):
        if peak_label not in peak_labels:
            logging.warning(f"{peak_label} not in {peak_labels}")
            continue

        mz = row.mz_mean
        mz_width = row.mz_width if "mz_width" in row else 0.01  # Default width if not present

        # Extract chromatogram
        try:
            _slice = extract_chromatogram_from_ms1(
                ms1, mz, mz_width if "mz_width" in row else None
            )
            if len(_slice) == 0:
                logging.warning(f"No data points found for {peak_label}")
                continue

            chrom_data = _slice.groupby("scan_time").sum()

            # Simple approach: find the scan time with maximum intensity
            if chrom_data.values.max() < minimum_intensity:
                logging.warning(
                    f"Peak intensity for {peak_label} below threshold ({minimum_intensity})"
                )
                continue

            # Get the retention time with the maximum intensity
            max_intensity_idx = chrom_data.values.argmax()
            new_rt = chrom_data.index[max_intensity_idx]

            # Update only the RT value
            _targets.loc[peak_label, "rt"] = new_rt

            if plot and i < 100:  # Only plot first 100
                i += 1
                plt.subplot(n_rows, col_wrap, i)
                plt.plot(chrom_data.index, chrom_data.values)
                plt.axvline(new_rt, color="red", linestyle="--")
                plt.title(f"{peak_label}\nm/z={mz:.3f}\nRT={new_rt:.1f}")

        except Exception as e:
            logging.error(f"Error processing {peak_label}: {str(e)}")
            continue

    self.results = _targets.reset_index()

    if self.mint is not None:
        self.mint.targets = self.results

    if plot:
        plt.tight_layout()
        return self.mint, fig
    else:
        return self.mint

ms_mint.Chromatogram

Chromatogram

A class for handling chromatogram data extraction and processing.

This class provides functionality to extract, process, and analyze chromatogram data from mass spectrometry files, including peak detection and visualization capabilities.

Attributes:

Name Type Description
t ndarray

Array of scan times.

x ndarray

Array of intensity values.

noise_level Optional[float]

Estimated noise level of the chromatogram.

filters List[Filter]

List of filters to be applied to the chromatogram.

peaks Optional[DataFrame]

DataFrame containing detected peaks information.

selected_peak_ndxs Optional[List[int]]

Indices of selected peaks.

expected_rt Optional[float]

Expected retention time.

weights Optional[ndarray]

Weighting values for peak selection.

Source code in src/ms_mint/Chromatogram.py
class Chromatogram:
    """A class for handling chromatogram data extraction and processing.

    This class provides functionality to extract, process, and analyze chromatogram data
    from mass spectrometry files, including peak detection and visualization capabilities.

    Attributes:
        t: Array of scan times.
        x: Array of intensity values.
        noise_level: Estimated noise level of the chromatogram.
        filters: List of filters to be applied to the chromatogram.
        peaks: DataFrame containing detected peaks information.
        selected_peak_ndxs: Indices of selected peaks.
        expected_rt: Expected retention time.
        weights: Weighting values for peak selection.
    """

    def __init__(
        self,
        scan_times: Optional[Union[List[float], np.ndarray]] = None,
        intensities: Optional[Union[List[float], np.ndarray]] = None,
        filters: Optional[List[Filter]] = None,
        expected_rt: Optional[float] = None,
    ) -> None:
        """Initialize a Chromatogram object.

        Args:
            scan_times: Array-like object containing the scan times.
            intensities: Array-like object containing the intensities.
            filters: List of filters to be applied.
            expected_rt: Expected retention time in seconds.
        """
        # Initialize empty arrays for scan_times and intensities
        self.t: np.ndarray = (
            np.array([]) if scan_times is None or 0 in scan_times else np.array([0])
        )
        self.x: np.ndarray = (
            np.array([]) if intensities is None or 0 in scan_times else np.array([0])
        )

        # Update scan_times and intensities if provided
        if scan_times is not None:
            self.t = np.append(self.t, scan_times)
        if intensities is not None:
            self.x = np.append(self.x, intensities)

        # Initialize other attributes
        self.noise_level: Optional[float] = None
        self.filters: List[Filter] = filters or [Resampler(), GaussFilter(), Smoother()]
        self.peaks: Optional[pd.DataFrame] = None
        self.selected_peak_ndxs: Optional[List[int]] = None
        self.expected_rt: Optional[float] = expected_rt
        self.weights: Optional[np.ndarray] = None

    def from_file(
        self, fn: str, mz_mean: float, mz_width: float = 10, expected_rt: Optional[float] = None
    ) -> None:
        """Load chromatogram data from a mass spectrometry file.

        Args:
            fn: Filename of the mass spectrometry file.
            mz_mean: Mean m/z value to extract.
            mz_width: Width of the m/z window to extract.
            expected_rt: Expected retention time in seconds.
        """
        chrom = get_chromatogram_from_ms_file(fn, mz_mean=mz_mean, mz_width=mz_width)
        self.t = np.append(self.t, chrom.index)
        self.x = np.append(self.x, chrom.values)
        if expected_rt is not None:
            self.expected_rt = expected_rt

    def estimate_noise_level(self, window: int = 20) -> None:
        """Estimate the noise level of the chromatogram.

        Uses a rolling window standard deviation approach to estimate the baseline noise.

        Args:
            window: Size of the rolling window for noise estimation.
        """
        data = pd.Series(index=self.t, data=self.x)
        self.noise_level = data.rolling(window, center=True).std().median()

    def apply_filters(self) -> None:
        """Apply all filters in the filter list to the chromatogram data."""
        for filt in self.filters:
            self.t, self.x = filt.transform(self.t, self.x)

    def find_peaks(
        self, prominence: Optional[float] = None, rel_height: float = 0.9, **kwargs
    ) -> None:
        """Find peaks in the chromatogram.

        Args:
            prominence: Minimum prominence of peaks. If None, estimated from noise level.
            rel_height: Relative height for determining peak width.
            **kwargs: Additional keyword arguments to pass to the peak finding function.
        """
        self.estimate_noise_level()
        if prominence is None:
            prominence = self.noise_level * 5
        self.peaks = find_peaks_in_timeseries(
            self.data.intensity, prominence=prominence, rel_height=rel_height, **kwargs
        )

    def optimise_peak_times_with_diff(self, rolling_window: int = 20, plot: bool = False) -> None:
        """Optimize peak start and end times using the derivative.

        Uses the first derivative of the chromatogram to more accurately determine
        peak start and end times.

        Args:
            rolling_window: Window size for rolling mean calculation of the derivative.
            plot: Whether to plot the results of peak detection on the derivative.
        """
        peaks = self.peaks
        diff = (
            (self.data - self.data.shift(1)).rolling(rolling_window, center=True).mean().fillna(0)
        )
        prominence = 0

        peak_startings = find_peaks_in_timeseries(
            diff.fillna(0).intensity, prominence=prominence, plot=plot
        )
        if plot:
            plt.show()

        peak_endings = find_peaks_in_timeseries(
            -diff.fillna(0).intensity, prominence=prominence, plot=plot
        )
        if plot:
            plt.show()

        for ndx, row in peaks.iterrows():
            new_rt_min = row.rt_min
            new_rt_max = row.rt_max

            candidates_rt_min = peak_startings[peak_startings.rt <= new_rt_min]
            candidates_rt_max = peak_endings[peak_endings.rt >= new_rt_max]

            if len(candidates_rt_min) > 0:
                new_rt_min = candidates_rt_min.tail(1).rt.values[0]

            if len(candidates_rt_max) > 0:
                new_rt_max = candidates_rt_max.head(1).rt.values[0]

            peaks.loc[ndx, ["rt_min", "rt_max"]] = new_rt_min, new_rt_max

    def select_peak_by_rt(self, expected_rt: Optional[float] = None) -> pd.DataFrame:
        """Select the peak closest to the expected retention time.

        Args:
            expected_rt: Expected retention time in seconds. If None, uses the stored expected_rt.

        Returns:
            DataFrame containing the selected peak information.
        """
        peaks = self.peaks
        if expected_rt is None:
            expected_rt = self.expected_rt
        else:
            self.expected_rt = expected_rt
        selected_ndx = (peaks.rt - expected_rt).abs().sort_values().index[0]
        self.selected_peak_ndxs = [selected_ndx]
        return self.selected_peaks

    def select_peak_by_highest_intensity(self) -> pd.DataFrame:
        """Select the peak with the highest intensity.

        Returns:
            DataFrame containing the selected peak information.
        """
        peaks = self.peaks
        selected_ndx = peaks.sort_values("peak_height", ascending=False).index.values[0]
        self.selected_peak_ndxs = [selected_ndx]
        return self.selected_peaks

    def select_peak_with_gaussian_weight(
        self, expected_rt: Optional[float] = None, sigma: float = 50
    ) -> Optional[pd.DataFrame]:
        """Select peak using Gaussian weighting around expected retention time.

        This method applies a Gaussian weighting centered at the expected retention time
        to favor peaks close to the expected time while still considering peak height.

        Args:
            expected_rt: Expected retention time in seconds. If None, uses the stored expected_rt.
            sigma: Standard deviation of the Gaussian weight function in seconds.

        Returns:
            DataFrame containing the selected peak information, or None if no peaks available.
        """
        peaks = self.peaks
        if expected_rt is None:
            expected_rt = self.expected_rt
        else:
            self.expected_rt = expected_rt
        if peaks is None or len(peaks) == 0:
            logging.warning("No peaks available to select.")
            return None
        weights = gaussian(peaks.rt, expected_rt, sigma)
        weighted_peaks = weights * peaks.peak_height
        x = np.arange(int(self.t.min()), int(self.t.max()))
        self.weights = max(peaks.peak_height) * gaussian(x, expected_rt, sigma)
        selected_ndx = weighted_peaks.sort_values(ascending=False).index.values[0]
        self.selected_peak_ndxs = [selected_ndx]
        return self.selected_peaks

    @property
    def selected_peaks(self) -> pd.DataFrame:
        """Get DataFrame of the currently selected peaks.

        Returns:
            DataFrame containing information about the selected peaks.
        """
        return self.peaks.loc[self.selected_peak_ndxs]

    @property
    def data(self) -> pd.DataFrame:
        """Get chromatogram data as a DataFrame.

        Returns:
            DataFrame with scan times as index and intensity as a column.
        """
        df = pd.DataFrame(index=self.t, data={"intensity": self.x})
        df.index.name = "scan_time"
        return df

    def plot(self, label: Optional[str] = None, **kwargs) -> Figure:
        """Plot the chromatogram with detected peaks.

        Args:
            label: Label for the plot.
            **kwargs: Additional keyword arguments to pass to the plotting function.

        Returns:
            Matplotlib Figure object.
        """
        series = self.data
        peaks = self.peaks
        selected_peak_ndxs = self.selected_peak_ndxs
        weights = self.weights
        fig = plot_peaks(
            series,
            peaks,
            label=label,
            highlight=selected_peak_ndxs,
            expected_rt=self.expected_rt,
            weights=weights,
            **kwargs,
        )
        return fig

selected_peaks: pd.DataFrame property

Get DataFrame of the currently selected peaks.

Returns:

Type Description
DataFrame

DataFrame containing information about the selected peaks.

data: pd.DataFrame property

Get chromatogram data as a DataFrame.

Returns:

Type Description
DataFrame

DataFrame with scan times as index and intensity as a column.

__init__(scan_times=None, intensities=None, filters=None, expected_rt=None)

Initialize a Chromatogram object.

Parameters:

Name Type Description Default
scan_times Optional[Union[List[float], ndarray]]

Array-like object containing the scan times.

None
intensities Optional[Union[List[float], ndarray]]

Array-like object containing the intensities.

None
filters Optional[List[Filter]]

List of filters to be applied.

None
expected_rt Optional[float]

Expected retention time in seconds.

None
Source code in src/ms_mint/Chromatogram.py
def __init__(
    self,
    scan_times: Optional[Union[List[float], np.ndarray]] = None,
    intensities: Optional[Union[List[float], np.ndarray]] = None,
    filters: Optional[List[Filter]] = None,
    expected_rt: Optional[float] = None,
) -> None:
    """Initialize a Chromatogram object.

    Args:
        scan_times: Array-like object containing the scan times.
        intensities: Array-like object containing the intensities.
        filters: List of filters to be applied.
        expected_rt: Expected retention time in seconds.
    """
    # Initialize empty arrays for scan_times and intensities
    self.t: np.ndarray = (
        np.array([]) if scan_times is None or 0 in scan_times else np.array([0])
    )
    self.x: np.ndarray = (
        np.array([]) if intensities is None or 0 in scan_times else np.array([0])
    )

    # Update scan_times and intensities if provided
    if scan_times is not None:
        self.t = np.append(self.t, scan_times)
    if intensities is not None:
        self.x = np.append(self.x, intensities)

    # Initialize other attributes
    self.noise_level: Optional[float] = None
    self.filters: List[Filter] = filters or [Resampler(), GaussFilter(), Smoother()]
    self.peaks: Optional[pd.DataFrame] = None
    self.selected_peak_ndxs: Optional[List[int]] = None
    self.expected_rt: Optional[float] = expected_rt
    self.weights: Optional[np.ndarray] = None

from_file(fn, mz_mean, mz_width=10, expected_rt=None)

Load chromatogram data from a mass spectrometry file.

Parameters:

Name Type Description Default
fn str

Filename of the mass spectrometry file.

required
mz_mean float

Mean m/z value to extract.

required
mz_width float

Width of the m/z window to extract.

10
expected_rt Optional[float]

Expected retention time in seconds.

None
Source code in src/ms_mint/Chromatogram.py
def from_file(
    self, fn: str, mz_mean: float, mz_width: float = 10, expected_rt: Optional[float] = None
) -> None:
    """Load chromatogram data from a mass spectrometry file.

    Args:
        fn: Filename of the mass spectrometry file.
        mz_mean: Mean m/z value to extract.
        mz_width: Width of the m/z window to extract.
        expected_rt: Expected retention time in seconds.
    """
    chrom = get_chromatogram_from_ms_file(fn, mz_mean=mz_mean, mz_width=mz_width)
    self.t = np.append(self.t, chrom.index)
    self.x = np.append(self.x, chrom.values)
    if expected_rt is not None:
        self.expected_rt = expected_rt

estimate_noise_level(window=20)

Estimate the noise level of the chromatogram.

Uses a rolling window standard deviation approach to estimate the baseline noise.

Parameters:

Name Type Description Default
window int

Size of the rolling window for noise estimation.

20
Source code in src/ms_mint/Chromatogram.py
def estimate_noise_level(self, window: int = 20) -> None:
    """Estimate the noise level of the chromatogram.

    Uses a rolling window standard deviation approach to estimate the baseline noise.

    Args:
        window: Size of the rolling window for noise estimation.
    """
    data = pd.Series(index=self.t, data=self.x)
    self.noise_level = data.rolling(window, center=True).std().median()

apply_filters()

Apply all filters in the filter list to the chromatogram data.

Source code in src/ms_mint/Chromatogram.py
def apply_filters(self) -> None:
    """Apply all filters in the filter list to the chromatogram data."""
    for filt in self.filters:
        self.t, self.x = filt.transform(self.t, self.x)

find_peaks(prominence=None, rel_height=0.9, **kwargs)

Find peaks in the chromatogram.

Parameters:

Name Type Description Default
prominence Optional[float]

Minimum prominence of peaks. If None, estimated from noise level.

None
rel_height float

Relative height for determining peak width.

0.9
**kwargs

Additional keyword arguments to pass to the peak finding function.

{}
Source code in src/ms_mint/Chromatogram.py
def find_peaks(
    self, prominence: Optional[float] = None, rel_height: float = 0.9, **kwargs
) -> None:
    """Find peaks in the chromatogram.

    Args:
        prominence: Minimum prominence of peaks. If None, estimated from noise level.
        rel_height: Relative height for determining peak width.
        **kwargs: Additional keyword arguments to pass to the peak finding function.
    """
    self.estimate_noise_level()
    if prominence is None:
        prominence = self.noise_level * 5
    self.peaks = find_peaks_in_timeseries(
        self.data.intensity, prominence=prominence, rel_height=rel_height, **kwargs
    )

optimise_peak_times_with_diff(rolling_window=20, plot=False)

Optimize peak start and end times using the derivative.

Uses the first derivative of the chromatogram to more accurately determine peak start and end times.

Parameters:

Name Type Description Default
rolling_window int

Window size for rolling mean calculation of the derivative.

20
plot bool

Whether to plot the results of peak detection on the derivative.

False
Source code in src/ms_mint/Chromatogram.py
def optimise_peak_times_with_diff(self, rolling_window: int = 20, plot: bool = False) -> None:
    """Optimize peak start and end times using the derivative.

    Uses the first derivative of the chromatogram to more accurately determine
    peak start and end times.

    Args:
        rolling_window: Window size for rolling mean calculation of the derivative.
        plot: Whether to plot the results of peak detection on the derivative.
    """
    peaks = self.peaks
    diff = (
        (self.data - self.data.shift(1)).rolling(rolling_window, center=True).mean().fillna(0)
    )
    prominence = 0

    peak_startings = find_peaks_in_timeseries(
        diff.fillna(0).intensity, prominence=prominence, plot=plot
    )
    if plot:
        plt.show()

    peak_endings = find_peaks_in_timeseries(
        -diff.fillna(0).intensity, prominence=prominence, plot=plot
    )
    if plot:
        plt.show()

    for ndx, row in peaks.iterrows():
        new_rt_min = row.rt_min
        new_rt_max = row.rt_max

        candidates_rt_min = peak_startings[peak_startings.rt <= new_rt_min]
        candidates_rt_max = peak_endings[peak_endings.rt >= new_rt_max]

        if len(candidates_rt_min) > 0:
            new_rt_min = candidates_rt_min.tail(1).rt.values[0]

        if len(candidates_rt_max) > 0:
            new_rt_max = candidates_rt_max.head(1).rt.values[0]

        peaks.loc[ndx, ["rt_min", "rt_max"]] = new_rt_min, new_rt_max

select_peak_by_rt(expected_rt=None)

Select the peak closest to the expected retention time.

Parameters:

Name Type Description Default
expected_rt Optional[float]

Expected retention time in seconds. If None, uses the stored expected_rt.

None

Returns:

Type Description
DataFrame

DataFrame containing the selected peak information.

Source code in src/ms_mint/Chromatogram.py
def select_peak_by_rt(self, expected_rt: Optional[float] = None) -> pd.DataFrame:
    """Select the peak closest to the expected retention time.

    Args:
        expected_rt: Expected retention time in seconds. If None, uses the stored expected_rt.

    Returns:
        DataFrame containing the selected peak information.
    """
    peaks = self.peaks
    if expected_rt is None:
        expected_rt = self.expected_rt
    else:
        self.expected_rt = expected_rt
    selected_ndx = (peaks.rt - expected_rt).abs().sort_values().index[0]
    self.selected_peak_ndxs = [selected_ndx]
    return self.selected_peaks

select_peak_by_highest_intensity()

Select the peak with the highest intensity.

Returns:

Type Description
DataFrame

DataFrame containing the selected peak information.

Source code in src/ms_mint/Chromatogram.py
def select_peak_by_highest_intensity(self) -> pd.DataFrame:
    """Select the peak with the highest intensity.

    Returns:
        DataFrame containing the selected peak information.
    """
    peaks = self.peaks
    selected_ndx = peaks.sort_values("peak_height", ascending=False).index.values[0]
    self.selected_peak_ndxs = [selected_ndx]
    return self.selected_peaks

select_peak_with_gaussian_weight(expected_rt=None, sigma=50)

Select peak using Gaussian weighting around expected retention time.

This method applies a Gaussian weighting centered at the expected retention time to favor peaks close to the expected time while still considering peak height.

Parameters:

Name Type Description Default
expected_rt Optional[float]

Expected retention time in seconds. If None, uses the stored expected_rt.

None
sigma float

Standard deviation of the Gaussian weight function in seconds.

50

Returns:

Type Description
Optional[DataFrame]

DataFrame containing the selected peak information, or None if no peaks available.

Source code in src/ms_mint/Chromatogram.py
def select_peak_with_gaussian_weight(
    self, expected_rt: Optional[float] = None, sigma: float = 50
) -> Optional[pd.DataFrame]:
    """Select peak using Gaussian weighting around expected retention time.

    This method applies a Gaussian weighting centered at the expected retention time
    to favor peaks close to the expected time while still considering peak height.

    Args:
        expected_rt: Expected retention time in seconds. If None, uses the stored expected_rt.
        sigma: Standard deviation of the Gaussian weight function in seconds.

    Returns:
        DataFrame containing the selected peak information, or None if no peaks available.
    """
    peaks = self.peaks
    if expected_rt is None:
        expected_rt = self.expected_rt
    else:
        self.expected_rt = expected_rt
    if peaks is None or len(peaks) == 0:
        logging.warning("No peaks available to select.")
        return None
    weights = gaussian(peaks.rt, expected_rt, sigma)
    weighted_peaks = weights * peaks.peak_height
    x = np.arange(int(self.t.min()), int(self.t.max()))
    self.weights = max(peaks.peak_height) * gaussian(x, expected_rt, sigma)
    selected_ndx = weighted_peaks.sort_values(ascending=False).index.values[0]
    self.selected_peak_ndxs = [selected_ndx]
    return self.selected_peaks

plot(label=None, **kwargs)

Plot the chromatogram with detected peaks.

Parameters:

Name Type Description Default
label Optional[str]

Label for the plot.

None
**kwargs

Additional keyword arguments to pass to the plotting function.

{}

Returns:

Type Description
Figure

Matplotlib Figure object.

Source code in src/ms_mint/Chromatogram.py
def plot(self, label: Optional[str] = None, **kwargs) -> Figure:
    """Plot the chromatogram with detected peaks.

    Args:
        label: Label for the plot.
        **kwargs: Additional keyword arguments to pass to the plotting function.

    Returns:
        Matplotlib Figure object.
    """
    series = self.data
    peaks = self.peaks
    selected_peak_ndxs = self.selected_peak_ndxs
    weights = self.weights
    fig = plot_peaks(
        series,
        peaks,
        label=label,
        highlight=selected_peak_ndxs,
        expected_rt=self.expected_rt,
        weights=weights,
        **kwargs,
    )
    return fig