8000 Add Parquet reading and writing for efficient storage of PSM lists by RalfG · Pull Request #81 · CompOmics/psm_utils · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add Parquet reading and writing for efficient storage of PSM lists #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community. 8000

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
10000
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.9.0] - 2024-05-01

### Added

- `io`: Read and write support for writing PSMs to Apache Parquet for efficient storage of PSM lists.
- `io.sage`: Support for Sage results in Parquet format (new `SageParquetReader`, renamed `SageReader` to `SageTSVReader`).

### Changed

- Upgrade Pydantic dependency to v2. The PSM `spectrum_id` field is now always coerced to a string.
- `io.proteoscape`: Use pyarrow to iteratively read from Parquet instead of first reading an entire dataframe with Pandas.
- `io.sage`: Update compatibility to Sage v0.14

## [0.8.3] - 2024-04-16

### Added
Expand Down
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ Supported file formats
`MaxQuant msms.txt <https://www.maxquant.org/>`_ ``msms`` ✅ ❌
`MS Amanda CSV <https://ms.imp.ac.at/?goto=msamanda>`_ ``msamanda`` ✅ ❌
`mzIdentML <https://psidev.info/mzidentml>`_ ``mzid`` ✅ ✅
`Parquet <https://psm-utils.readthedocs.io/en/stable/api/psm_utils.io#module-psm_utils.io.parquet>` ``parquet`` ✅ ✅
`Peptide Record <https://psm-utils.readthedocs.io/en/stable/api/psm_utils.io/#module-psm_utils.io.peptide_record>`_ ``peprec`` ✅ ✅
`pepXML <http://tools.proteomecenter.org/wiki/index.php?title=Formats:pepXML>`_ ``pepxml`` ✅ ❌
`Percolator tab <https://github.com/percolator/percolator/wiki/Interface>`_ ``percolator`` ✅ ✅
Proteome Discoverer MSF ``proteome_discoverer`` ✅ ❌
`Sage <https://github.com/lazear/sage/blob/v0.12.0/DOCS.md#interpreting-sage-output>`_ ``sage`` ✅ ❌
`Sage Parquet <https://github.com/lazear/sage/blob/v0.14.7/DOCS.md#interpreting-sage-output>`_ ``sage_parquet`` ✅ ❌
`Sage TSV <https://github.com/lazear/sage/blob/v0.14.7/DOCS.md#interpreting-sage-output>`_ ``sage_tsv`` ✅ ❌
ProteoScape Parquet ``proteoscape`` ✅ ❌
`TSV <https://psm-utils.readthedocs.io/en/stable/api/psm_utils.io/#module-psm_utils.io.tsv>`_ ``tsv`` ✅ ✅
`X!Tandem XML <https://www.thegpm.org/tandem/>`_ ``xtandem`` ✅ ❌
Expand Down
2 changes: 1 addition & 1 deletion psm_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Common utilities for parsing and handling PSMs, and search engine results."""

__version__ = "0.8.3"
__version__ = "0.9.0"
__all__ = ["Peptidoform", "PSM", "PSMList"]

from functools import lru_cache
Expand Down
36 changes: 26 additions & 10 deletions psm_utils/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import psm_utils.io.maxquant as maxquant
import psm_utils.io.msamanda as msamanda
import psm_utils.io.mzid as mzid
import psm_utils.io.parquet as parquet
import psm_utils.io.peptide_record as peptide_record
import psm_utils.io.pepxml as pepxml
import psm_utils.io.percolator as percolator
Expand Down Expand Up @@ -75,12 +76,6 @@
"extension": ".parquet",
"filename_pattern": r"^.*\.candidates\.parquet$",
},
"tsv": {
"reader": tsv.TSVReader,
"writer": tsv.TSVWriter,
"extension": ".tsv",
"filename_pattern": r"^.*\.tsv$",
},
"xtandem": {
"reader": xtandem.XTandemReader,
"writer": None,
Expand All @@ -93,19 +88,40 @@
"extension": ".csv",
"filename_pattern": r"^.*(?:_|\.)msamanda.csv$",
},
"sage": {
"reader": sage.SageReader,
"sage_tsv": {
"reader": sage.SageTSVReader,
"writer": None,
"extension": ".tsv",
"filename_pattern": r"^.*(?:_|\.).sage.tsv$",
},
"sage_parquet": {
"reader": sage.SageParquetReader,
"writer": None,
"extension": ".parquet",
"filename_pattern": r"^.*(?:_|\.).sage.parquet$",
},
"ionbot": {
"reader": ionbot.IonbotReader,
"writer": None,
"extension": "ionbot.first.csv",
"filename_pattern": r"^ionbot.first.csv$",
},
"parquet": { # List after proteoscape and sage to avoid extension matching conflicts
"reader": parquet.ParquetReader,
"writer": parquet.ParquetWriter,
"extension": ".parquet",
"filename_pattern": r"^.*\.parquet$",
},
"tsv": { # List after sage to avoid extension matching conflicts
"reader": tsv.TSVReader,
"writer": tsv.TSVWriter,
"extension": ".tsv",
"filename_pattern": r"^.*\.tsv$",
},
}

FILETYPES["sage"] = FILETYPES["sage_tsv"] # Alias for backwards compatibility

READERS = {k: v["reader"] for k, v in FILETYPES.items() if v["reader"]}
WRITERS = {k: v["writer"] for k, v in FILETYPES.items() if v["writer"]}

Expand All @@ -124,10 +140,10 @@
with NamedTemporaryFile(delete=False) as temp_file:
temp_file.close()
Path(temp_file.name).unlink()
example_psm = PSM(peptidoform="ACDE", spectrum_id=0)
example_psm = PSM(peptidoform="ACDE", spectrum_id="0")

Check warning on line 143 in psm_utils/io/__init__.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/__init__.py#L143

Added line #L143 was not covered by tests
try:
with writer(temp_file.name, example_psm=example_psm) as writer_instance:
writer_instance.write_psm(None)
writer_instance.write_psm(example_psm)

Check warning on line 146 in psm_utils/io/__init__.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/__init__.py#L146

Added line #L146 was not covered by tests
except NotImplementedError:
supports_write_psm = False
except AttributeError: # `None` is not valid PSM
Expand Down
1 change: 0 additions & 1 deletion psm_utils/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def set_csv_field_size_limit():
This function should be called before reading any CSV files to ensure that the field size
limit is properly set.


"""
max_int = sys.maxsize

Expand Down
130 changes: 130 additions & 0 deletions psm_utils/io/parquet.py
F438
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
Reader and writer for a simple, lossless psm_utils Parquet format.

Similar to the :py:mod:`psm_utils.io.tsv` module, this module provides a reader and writer
for :py:class:`~psm_utils.psm_list.PSMList` objects in a lossless manner. However, Parquet provides
better performance and storage efficiency compared to TSV, and is recommended for large datasets.

"""

from __future__ import annotations

from pathlib import Path
from typing import Union

import pyarrow as pa
import pyarrow.parquet as pq
from pydantic import ValidationError

from psm_utils.io._base_classes import ReaderBase, WriterBase
from psm_utils.io.exceptions import PSMUtilsIOException
from psm_utils.psm import PSM
from psm_utils.psm_list import PSMList


class ParquetReader(ReaderBase):
def __init__(self, path: Union[str, Path], *args, **kwargs):
"""
Reader for Parquet files.

Parameters
----------
path : Union[str, Path]
Path to the Parquet file.

"""
self.path = path

def __iter__(self):
with pq.ParquetFile(self.path) as reader:
for batch in reader.iter_batches():
for row in batch.to_pylist():
# Convert map columns (rendered as lists of tuples) to dictionaries
row["metadata"] = dict(row["metadata"] or {})
row["provenance_data"] = dict(row["provenance_data"] or {})
row["rescoring_features"] = dict(row["rescoring_features"] or {})

# Convert to PSM object and yield
try:
yield PSM(**row)
except ValidationError as e:
raise PSMUtilsIOException(f"Error while parsing row {row}:\n{e}")

Check warning on line 51 in psm_utils/io/parquet.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/parquet.py#L50-L51

Added lines #L50 - L51 were not covered by tests


class ParquetWriter(WriterBase):
def __init__(self, path: Union[str, Path], chunk_size: int = 1e6, *args, **kwargs):
"""
Writer for Parquet files.

Parameters
----------
path : Union[str, Path]
Path to the Parquet file.
chunk_size : int
Number of PSMs to write in a single batch. Default is 1e6.

"""
self.path = path
self.chunk_size = chunk_size

self._writer = None
self._psm_cache = []

def __enter__(self):
self._writer = pq.ParquetWriter(self.path, schema=SCHEMA)
return self

def __exit__(self, *args, **kwargs):
self._flush()
self._writer.close()

def write_psm(self, psm: PSM):
"""Write a single PSM to the Parquet file."""
self._psm_cache.append(self._psm_to_entry(psm))
if len(self._psm_cache) > self.chunk_size:
self._flush()

Check warning on line 85 in psm_utils/io/parquet.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/parquet.py#L85

Added line #L85 was not covered by tests

def write_file(self, psm_list: PSMList):
"""Write a list of PSMs to the Parquet file."""
with self:
for psm in psm_list:
self.write_psm(psm)

@staticmethod
def _psm_to_entry(psm: PSM) -> dict:
"""Convert a PSM object to a dictionary suitable for writing to Parquet."""
psm_dict = dict(psm)
psm_dict["peptidoform"] = str(psm.peptidoform)
return psm_dict

def _flush(self):
"""Write the cached PSMs to the Parquet file."""
if not self._psm_cache:
return
table = pa.Table.from_pylist(self._psm_cache, schema=SCHEMA)
self._writer.write_table(table)
self._psm_cache = []


SCHEMA = pa.schema(
[
("peptidoform", pa.string()),
("spectrum_id", pa.string()),
("run", pa.string()),
("collection", pa.string()),
("spectrum", pa.string()),
("is_decoy", pa.bool_()),
("score", pa.float32()),
("qvalue", pa.float32()),
("pep", pa.float32()),
("precursor_mz", pa.float32()),
("retention_time", pa.float32()),
("ion_mobility", pa.float32()),
("protein_list", pa.list_(pa.string())),
("rank", pa.int32()),
("source", pa.string()),
("provenance_data", pa.map_(pa.string(), pa.string())),
("metadata", pa.map_(pa.string(), pa.string())),
("rescoring_features", pa.map_(pa.string(), pa.float32())),
]
)
93 changes: 49 additions & 44 deletions psm_utils/io/proteoscape.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import re
from pathlib import Path
from typing import Union
from collections import namedtuple

import numpy as np
import pandas as pd
import pyarrow.parquet as pq

from psm_utils import PSM
from psm_utils.psm import PSM
from psm_utils.psm_list import PSMList
from psm_utils.io._base_classes import ReaderBase
from psm_utils.io.exceptions import PSMUtilsIOException
from psm_utils.peptidoform import format_number_as_string

logger = logging.getLogger(__name__)
Expand All @@ -36,31 +38,31 @@
Path to MSF file.

"""
if isinstance(filename, pd.DataFrame):
self.data = filename
else:
super().__init__(filename, *args, **kwargs)
self.data = pd.read_parquet(self.filename)

self._Row = namedtuple("Row", self.data.columns)
self.filename = filename

Check warning on line 41 in psm_utils/io/proteoscape.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/proteoscape.py#L41

Added line #L41 was not covered by tests

def __len__(self):
"""Return number of PSMs in file."""
return len(self.data)
return pq.read_metadata(self.filename).num_rows

Check warning on line 45 in psm_utils/io/proteoscape.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/proteoscape.py#L45

Added line #L45 was not covered by tests

def __iter__(self):
"""Iterate over file and return PSMs one-by-one."""
for entry in self.data.itertuples():
yield _parse_entry(entry)

def __getitem__(self, index):
"""Return PSM at index."""
return _parse_entry(self._Row(*self.data.iloc[index]))
with pq.ParquetFile(self.filename) as reader:
for batch in reader.iter_batches():
for row in batch.to_pylist():
try:
yield _parse_entry(row)
except Exception as e:
raise PSMUtilsIOException(f"Error while parsing row {row}:\n{e}") from e

Check warning on line 55 in psm_utils/io/proteoscape.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/proteoscape.py#L49-L55

Added lines #L49 - L55 were not covered by tests

@classmethod
def from_dataframe(cls, dataframe: pd.DataFrame, *args, **kwargs):
"""Create a ProteoScapeReader from a DataFrame."""
return cls(dataframe, *args, **kwargs)
def from_dataframe(cls, dataframe: pd.DataFrame) -> PSMList:
"""Create a PSMList from a ProteoScape Pandas DataFrame."""
return PSMList(

Check warning on line 60 in psm_utils/io/proteoscape.py

View check run for this annotation

Codecov / codecov/patch

psm_utils/io/proteoscape.py#L60

Added line #L60 was not covered by tests
psm_list=[
cls._get_peptide_spectrum_match(cls(""), entry)
for entry in dataframe.to_dict(orient="records")
]
)


def _parse_peptidoform(
Expand All @@ -81,40 +83,43 @@
return f"{n_term}{''.join(peptidoform)}{c_term}/{precursor_charge}"


def _parse_entry(entry) -> PSM:
def _parse_entry(entry: dict) -> PSM:
"""Parse a single entry from ProteoScape Parquet file to PSM object."""
return PSM(
peptidoform=_parse_peptidoform(
entry.stripped_peptide, entry.ptms, entry.ptm_locations, entry.precursor_charge
entry["stripped_peptide"],
entry["ptms"],
entry["ptm_locations"],
entry["precursor_charge"],
),
spectrum_id=entry.ms2_id,
run=getattr(entry, "run", None),
is_decoy=all(DECOY_PATTERN.match(p) for p in entry.locus_name),
score=entry.x_corr_score,
precursor_mz=entry.precursor_mz,
retention_time=entry.rt,
ion_mobility=entry.ook0,
protein_list=list(entry.locus_name),
rank=entry.rank,
spectrum_id=entry["ms2_id"],
run=entry.get("run", None),
is_decoy=all(DECOY_PATTERN.match(p) for p in entry["locus_name"]),
score=entry["x_corr_score"],
precursor_mz=entry["precursor_mz"],
retention_time=entry["rt"],
ion_mobility=entry["ook0"],
protein_list=list(entry["locus_name"]),
rank=entry["rank"],
source="ProteoScape",
provenance_data={
"candidate_id": str(entry.candidate_id),
"ms2_id": str(entry.ms2_id),
"parent_id": str(entry.parent_id),
"candidate_id": str(entry["candidate_id"]),
"ms2_id": str(entry["ms2_id"]),
"parent_id": str(entry["parent_id"]),
},
metadata={
"leading_aa": str(entry.leading_aa),
"trailing_aa": str(entry.trailing_aa),
"corrected_ook0": str(entry.corrected_ook0),
"leading_aa": str(entry["leading_aa"]),
"trailing_aa": str(entry["trailing_aa"]),
"corrected_ook0": str(entry["corrected_ook0"]),
},
rescoring_features={
"tims_score": float(entry.tims_score),
"x_corr_score": float(entry.x_corr_score),
"delta_cn_score": float(entry.delta_cn_score),
"ppm_error": float(entry.ppm_error),
"number_matched_ions": float(entry.number_matched_ions),
"number_expected_ions": float(entry.number_expected_ions),
"ion_proportion": float(entry.ion_proportion),
"spectrum_total_ion_intensity": float(entry.spectrum_total_ion_intensity),
"tims_score": float(entry["tims_score"]),
"x_corr_score": float(entry["x_corr_score"]),
"delta_cn_score": float(entry["delta_cn_score"]),
"ppm_error": float(entry["ppm_error"]),
"number_matched_ions": float(entry["number_matched_ions"]),
"number_expected_ions": float(entry["number_expected_ions"]),
"ion_proportion": float(entry["ion_proportion"]),
"spectrum_total_ion_intensity": float(entry["spectrum_total_ion_intensity"]),
},
)
Loading
Loading
0