Skip to content

Commit

Permalink
v0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
lazear committed Sep 27, 2024
1 parent eb0a4ba commit f49f68f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 32 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ The Apache Parquet file format has emerged as the default file format for data s

However, the most critical part is that Parquet is widely supported in the data science ecosystem - supported out of the box by pandas, polars, and provided bindings for most programming languages. Parquet is also supported by a variety of databases and query engines (Athena, datafusion, Spark, BigQuery, etc), enabling direct SQL queries over petabyte-scale mass spectrometry data.

The proposed file format stores the most important data for analyzing MS data in individual columns (scan identifiers, ms_level, precursor list, scan_start_time, m/z, intensity, ion mobility, etc), and has a final column for storing lists of key-value pairs (cvParams in the mzML spec).


Prior work: https://github.com/compomics/ThermoRawFileParser - this would probably be the best way to integrate mzparquet! There is already an existing prototype parquet implementation, but it is different that the one proposed here.
The format specified in this repository is a minimal set of columns needed to successfully process data - peak lists, isolation windows, and RT/ion mobility values. Each individual ion in an acquisition has it's own row in the mzparquet file (long format).

## Example of querying mzparquet files

Expand Down
128 changes: 100 additions & 28 deletions src/write_long.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::mzml::RawSpectrum;
use parquet::{
basic::ZstdLevel,
column::reader::get_typed_column_reader,
data_type::{FloatType, Int32Type},
file::{
metadata::KeyValue,
properties::WriterProperties,
reader::FileReader,
writer::{
SerializedFileWriter, SerializedPageWriter, SerializedRowGroupWriter, TrackedWrite,
},
Expand All @@ -15,24 +14,86 @@ use parquet::{
use std::{collections::HashMap, io::Write, sync::Arc};

pub fn build_schema() -> parquet::errors::Result<Type> {
let msg = r#"
message schema {
required int32 scan;
required int32 level;
required float rt;
required float mz;
required float intensity;
optional float ion_mobility;
optional float isolation_lower;
optional float isolation_upper;
optional int32 precursor_scan;
optional float precursor_mz;
optional int32 precursor_charge;
}
"#;
let schema = parquet::schema::parser::parse_message_type(msg)?;

Ok(schema)
use parquet::basic::{LogicalType, Repetition, Type as PhysicalType};
use parquet::schema::types::Type;

let scan = Type::primitive_type_builder("scan", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.build()?;

let level = Type::primitive_type_builder("level", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.build()?;

let rt = Type::primitive_type_builder("rt", PhysicalType::FLOAT)
.with_repetition(Repetition::REQUIRED)
.build()?;
let mz = Type::primitive_type_builder("rt", PhysicalType::FLOAT)
.with_repetition(Repetition::REQUIRED)
.build()?;

let intensity = Type::primitive_type_builder("intensity", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.build()?;
let ion_mobility = Type::primitive_type_builder("ion_mobility", PhysicalType::FLOAT)
.with_repetition(Repetition::OPTIONAL)
.build()?;

let isolation_lower = Type::primitive_type_builder("isolation_lower", PhysicalType::FLOAT)
.with_repetition(Repetition::OPTIONAL)
.build()?;

let isolation_upper = Type::primitive_type_builder("isolation_upper", PhysicalType::FLOAT)
.with_repetition(Repetition::OPTIONAL)
.build()?;

let precursor_scan = Type::primitive_type_builder("precursor_scan", PhysicalType::INT32)
.with_repetition(Repetition::OPTIONAL)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.build()?;

let precursor_mz = Type::primitive_type_builder("precursor_mz", PhysicalType::FLOAT)
.with_repetition(Repetition::OPTIONAL)
.build()?;

let precursor_z = Type::primitive_type_builder("precursor_charge", PhysicalType::INT32)
.with_repetition(Repetition::OPTIONAL)
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
}))
.build()?;

Type::group_type_builder("schema")
.with_fields(vec![
Arc::new(scan),
Arc::new(level),
Arc::new(rt),
Arc::new(mz),
Arc::new(intensity),
Arc::new(ion_mobility),
Arc::new(isolation_lower),
Arc::new(isolation_upper),
Arc::new(precursor_scan),
Arc::new(precursor_mz),
Arc::new(precursor_z),
])
.build()
}

pub struct ColumnWriter<T: parquet::data_type::DataType, const NULLABLE: bool = false> {
Expand Down Expand Up @@ -117,8 +178,8 @@ where
level: ColumnWriter<Int32Type>,
rt: ColumnWriter<FloatType>,
mz: ColumnWriter<FloatType>,
int: ColumnWriter<FloatType>,
ionmobility: ColumnWriter<FloatType, true>,
int: ColumnWriter<Int32Type>,
ion_mobility: ColumnWriter<FloatType, true>,
lo: ColumnWriter<FloatType, true>,
hi: ColumnWriter<FloatType, true>,
pscan: ColumnWriter<Int32Type, true>,
Expand Down Expand Up @@ -147,7 +208,7 @@ where
rt: ColumnWriter::new(descr.column(2), options.clone()),
mz: ColumnWriter::new(descr.column(3), options.clone()),
int: ColumnWriter::new(descr.column(4), options.clone()),
ionmobility: ColumnWriter::new(descr.column(5), options.clone()),
ion_mobility: ColumnWriter::new(descr.column(5), options.clone()),
lo: ColumnWriter::new(descr.column(6), options.clone()),
hi: ColumnWriter::new(descr.column(7), options.clone()),
pscan: ColumnWriter::new(descr.column(8), options.clone()),
Expand All @@ -164,14 +225,15 @@ where
.insert(spectrum.id.clone(), self.scans_written as u32);

self.scan
.extend(std::iter::repeat(self.scans_written as i32).take(n));
.extend(std::iter::repeat(self.scans_written as u32 as i32).take(n));
self.level
.extend(std::iter::repeat(spectrum.ms_level as i32).take(n));
.extend(std::iter::repeat(spectrum.ms_level as u32 as i32).take(n));
self.rt
.extend(std::iter::repeat(spectrum.scan_start_time).take(n));
self.mz.extend(spectrum.mz.iter().copied());
self.int.extend(spectrum.intensity.iter().copied());
self.ionmobility
self.int
.extend(spectrum.intensity.iter().map(|n| *n as u32 as i32));
self.ion_mobility
.extend(std::iter::repeat(spectrum.inverse_ion_mobility).take(n));

if let Some(precursor) = spectrum.precursors.get(0) {
Expand Down Expand Up @@ -229,7 +291,7 @@ where
self.rt.write_and_flush(&mut rg)?;
self.mz.write_and_flush(&mut rg)?;
self.int.write_and_flush(&mut rg)?;
self.ionmobility.write_and_flush(&mut rg)?;
self.ion_mobility.write_and_flush(&mut rg)?;
self.lo.write_and_flush(&mut rg)?;
self.hi.write_and_flush(&mut rg)?;
self.pscan.write_and_flush(&mut rg)?;
Expand All @@ -253,6 +315,16 @@ pub fn serialize_to_parquet<W: Write + Send>(w: W, spectra: &[RawSpectrum]) -> a
WriterProperties::builder()
.set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(3)?))
.set_dictionary_enabled(false)
.set_key_value_metadata(Some(vec![
KeyValue {
key: "version".into(),
value: Some("0.2".into()),
},
KeyValue {
key: "writer".into(),
value: Some("github.com/lazear/mz_parquet".into()),
},
]))
.build(),
);

Expand Down

0 comments on commit f49f68f

Please sign in to comment.