Skip to content

Commit

Permalink
Make file reading thread-safe, by having SubFile use read_at/seek_rea…
Browse files Browse the repository at this point in the history
…d rather than seek and read.
  • Loading branch information
Tim Evans committed Jun 19, 2024
1 parent 72c0190 commit f67715e
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 100 deletions.
9 changes: 3 additions & 6 deletions src/data/iterators.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Display, fs::File};
use std::fmt::Display;

use chrono::{DateTime, NaiveDate, Utc};

Expand Down Expand Up @@ -241,14 +241,11 @@ impl Iterator for Vectors {
#[derive(Debug)]
pub struct GenericBoundaries<T: NumberType> {
value: BoundaryValues<T>,
inclusive: SimpleIter<bool, SubFile<File>>,
inclusive: SimpleIter<bool, SubFile>,
}

impl<T: NumberType> GenericBoundaries<T> {
pub fn new(
value: SimpleIter<T, SubFile<File>>,
inclusive: SimpleIter<bool, SubFile<File>>,
) -> Self {
pub fn new(value: SimpleIter<T, SubFile>, inclusive: SimpleIter<bool, SubFile>) -> Self {
Self {
value: BoundaryValues::new(value),
inclusive,
Expand Down
50 changes: 25 additions & 25 deletions src/data/read_checks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, fs::File};
use std::collections::HashSet;

use crate::{
array::Constraint,
Expand All @@ -18,12 +18,12 @@ use super::{
/// When used for tensor block model sizes this checks that all sizes are greater than zero.
#[derive(Debug)]
pub struct GenericScalars<T: FloatType> {
inner: SimpleIter<T, SubFile<File>>,
inner: SimpleIter<T, SubFile>,
is_size: bool,
}

impl<T: FloatType> GenericScalars<T> {
pub(crate) fn new(inner: SimpleIter<T, SubFile<File>>, constraint: &Constraint) -> Self {
pub(crate) fn new(inner: SimpleIter<T, SubFile>, constraint: &Constraint) -> Self {
Self {
inner,
is_size: matches!(constraint, Constraint::Size),
Expand Down Expand Up @@ -55,12 +55,12 @@ impl<T: FloatType> Iterator for GenericScalars<T> {
/// Checks that all indices are within range.
#[derive(Debug)]
pub struct Indices {
inner: NullableIter<u32, SubFile<File>>,
inner: NullableIter<u32, SubFile>,
category_count: u64,
}

impl Indices {
pub(crate) fn new(inner: NullableIter<u32, SubFile<File>>, constraint: &Constraint) -> Self {
pub(crate) fn new(inner: NullableIter<u32, SubFile>, constraint: &Constraint) -> Self {
let &Constraint::Index(category_count) = constraint else {
panic!("invalid constraint");
};
Expand Down Expand Up @@ -95,12 +95,12 @@ impl Iterator for Indices {
/// Checks that all vertex indices are within range.
#[derive(Debug)]
pub struct GenericPrimitives<const N: usize> {
inner: MultiIter<u32, SubFile<File>, N>,
inner: MultiIter<u32, SubFile, N>,
vertex_count: u64,
}

impl<const N: usize> GenericPrimitives<N> {
pub(crate) fn new(inner: MultiIter<u32, SubFile<File>, N>, constraint: &Constraint) -> Self {
pub(crate) fn new(inner: MultiIter<u32, SubFile, N>, constraint: &Constraint) -> Self {
let vertex_count = match constraint {
Constraint::Segment(n) | Constraint::Triangle(n) => *n,
_ => panic!("invalid constraint"),
Expand Down Expand Up @@ -138,12 +138,12 @@ impl<const N: usize> Iterator for GenericPrimitives<N> {
/// Checks that the values are increasing.
#[derive(Debug)]
pub(super) struct BoundaryValues<T: NumberType> {
inner: SimpleIter<T, SubFile<File>>,
inner: SimpleIter<T, SubFile>,
previous: Option<T>,
}

impl<T: NumberType> BoundaryValues<T> {
pub(crate) fn new(inner: SimpleIter<T, SubFile<File>>) -> Self {
pub(crate) fn new(inner: SimpleIter<T, SubFile>) -> Self {
Self {
inner,
previous: None,
Expand All @@ -170,15 +170,15 @@ impl<T: NumberType> Iterator for BoundaryValues<T> {
/// Checks that the parent index and corners are all valid.
#[derive(Debug)]
pub struct GenericFreeformSubblocks<T: FloatType> {
parents: MultiIter<u32, SubFile<File>, 3>,
corners: MultiIter<T, SubFile<File>, 6>,
parents: MultiIter<u32, SubFile, 3>,
corners: MultiIter<T, SubFile, 6>,
block_count: [u32; 3],
}

impl<T: FloatType> GenericFreeformSubblocks<T> {
pub(crate) fn new(
parents: MultiIter<u32, SubFile<File>, 3>,
corners: MultiIter<T, SubFile<File>, 6>,
parents: MultiIter<u32, SubFile, 3>,
corners: MultiIter<T, SubFile, 6>,
constraint: &Constraint,
) -> Self {
let block_count = match constraint {
Expand Down Expand Up @@ -219,17 +219,17 @@ impl<T: FloatType> Iterator for GenericFreeformSubblocks<T> {
/// Checks that the parent index and corners are all valid.
#[derive(Debug)]
pub struct RegularSubblocks {
parents: MultiIter<u32, SubFile<File>, 3>,
corners: MultiIter<u32, SubFile<File>, 6>,
parents: MultiIter<u32, SubFile, 3>,
corners: MultiIter<u32, SubFile, 6>,
block_count: [u32; 3],
subblock_count: [u32; 3],
mode: Option<(SubblockMode, HashSet<[u32; 3]>)>,
}

impl RegularSubblocks {
pub(crate) fn new(
parents: MultiIter<u32, SubFile<File>, 3>,
corners: MultiIter<u32, SubFile<File>, 6>,
parents: MultiIter<u32, SubFile, 3>,
corners: MultiIter<u32, SubFile, 6>,
constraint: &Constraint,
) -> Self {
let &Constraint::RegularSubblock {
Expand Down Expand Up @@ -350,7 +350,7 @@ fn check_freeform_corners<T: FloatType>(corners: [T; 6]) -> Result<(), Error> {

/// Iterator for reading numbers, supporting `f32`, `f64`, `i64`, date, and date-time generically.
#[derive(Debug)]
pub struct GenericNumbers<T: NumberType>(pub(crate) NullableIter<T, SubFile<File>>);
pub struct GenericNumbers<T: NumberType>(pub(crate) NullableIter<T, SubFile>);

impl<T: NumberType> Iterator for GenericNumbers<T> {
type Item = Result<Option<T>, Error>;
Expand All @@ -362,7 +362,7 @@ impl<T: NumberType> Iterator for GenericNumbers<T> {

/// Iterator for reading small fixed-size arrays, like vertices and texture coordinates.
#[derive(Debug)]
pub struct GenericArrays<T: NumberType, const N: usize>(pub(crate) MultiIter<T, SubFile<File>, N>);
pub struct GenericArrays<T: NumberType, const N: usize>(pub(crate) MultiIter<T, SubFile, N>);

impl<T: NumberType, const N: usize> Iterator for GenericArrays<T, N> {
type Item = Result<[T; N], Error>;
Expand All @@ -374,7 +374,7 @@ impl<T: NumberType, const N: usize> Iterator for GenericArrays<T, N> {

/// Iterator for reading non-nullable colors, such as category legends.
#[derive(Debug)]
pub struct Gradient(pub(crate) MultiIter<u8, SubFile<File>, 4>);
pub struct Gradient(pub(crate) MultiIter<u8, SubFile, 4>);

impl Iterator for Gradient {
type Item = Result<[u8; 4], Error>;
Expand All @@ -387,7 +387,7 @@ impl Iterator for Gradient {
/// Iterator for reading small optional fixed-size arrays, like 2D and 3D vectors.
#[derive(Debug)]
pub struct GenericOptionalArrays<T: NumberType, const N: usize>(
pub(crate) NullableGroupIter<T, SubFile<File>, N>,
pub(crate) NullableGroupIter<T, SubFile, N>,
);

impl<T: NumberType, const N: usize> Iterator for GenericOptionalArrays<T, N> {
Expand All @@ -400,7 +400,7 @@ impl<T: NumberType, const N: usize> Iterator for GenericOptionalArrays<T, N> {

/// Iterator for reading nullable colors.
#[derive(Debug)]
pub struct Colors(pub(crate) NullableGroupIter<u8, SubFile<File>, 4>);
pub struct Colors(pub(crate) NullableGroupIter<u8, SubFile, 4>);

impl Iterator for Colors {
type Item = Result<Option<[u8; 4]>, Error>;
Expand All @@ -412,7 +412,7 @@ impl Iterator for Colors {

/// Iterator for reading nullable booleans.
#[derive(Debug)]
pub struct Booleans(pub(crate) NullableIter<bool, SubFile<File>>);
pub struct Booleans(pub(crate) NullableIter<bool, SubFile>);

impl Iterator for Booleans {
type Item = Result<Option<bool>, Error>;
Expand All @@ -424,7 +424,7 @@ impl Iterator for Booleans {

/// Iterator for reading non-nullable strings, such as category names.
#[derive(Debug)]
pub struct Names(pub(crate) SimpleIter<String, SubFile<File>>);
pub struct Names(pub(crate) SimpleIter<String, SubFile>);

impl Iterator for Names {
type Item = Result<String, Error>;
Expand All @@ -436,7 +436,7 @@ impl Iterator for Names {

/// Iterator for reading nullable strings.
#[derive(Debug)]
pub struct Text(pub(crate) NullableIter<String, SubFile<File>>);
pub struct Text(pub(crate) NullableIter<String, SubFile>);

impl Iterator for Text {
type Item = Result<Option<String>, Error>;
Expand Down
7 changes: 1 addition & 6 deletions src/file/parquet/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::fs::File;

use crate::{
array_type,
data::*,
Expand All @@ -12,10 +10,7 @@ use crate::{
use super::{super::Reader, schemas};

impl Reader {
fn array_reader(
&self,
array: &Array<impl ArrayType>,
) -> Result<PqArrayReader<SubFile<File>>, Error> {
fn array_reader(&self, array: &Array<impl ArrayType>) -> Result<PqArrayReader<SubFile>, Error> {
let f = self.array_bytes_reader(array)?;
let reader = PqArrayReader::new(f)?;
if array.item_count() != reader.len() {
Expand Down
4 changes: 2 additions & 2 deletions src/file/parquet/schemas.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fs::File, sync::OnceLock};
use std::sync::OnceLock;

use crate::{
file::SubFile,
Expand All @@ -23,7 +23,7 @@ macro_rules! declare_schema {
}

pub fn check(
reader: &PqArrayReader<SubFile<File>>,
reader: &PqArrayReader<SubFile>,
) -> Result<$name, crate::error::Error> {
reader.matches(Self::matcher())
}
Expand Down
3 changes: 1 addition & 2 deletions src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl Default for Limits {
/// > where data is maliciously crafted to expand to an excessive size when decompressed,
/// > leading to a potential denial of service attack.
/// > Use the limits provided check arrays sizes before allocating memory.

pub struct Reader {
archive: Archive,
version: [u32; 2],
Expand Down Expand Up @@ -175,7 +174,7 @@ impl Reader {
pub fn array_bytes_reader(
&self,
array: &array::Array<impl array::ArrayType>,
) -> Result<SubFile<File>, Error> {
) -> Result<SubFile, Error> {
array.constraint(); // Check that validation has been done.
self.archive.open(array.filename())
}
Expand Down
Loading

0 comments on commit f67715e

Please sign in to comment.