Skip to content

Commit

Permalink
enable multi-threading (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xcaff authored Sep 5, 2024
1 parent fe43a19 commit 37998a0
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 97 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ endif
packages/vendor/duckdb:
mkdir -p packages/vendor/duckdb
curl -L https://crates.io/api/v1/crates/duckdb/1.0.0/download | tar --strip-components=1 -xz -C packages/vendor/duckdb
patch --strip=1 --directory=packages/vendor/duckdb < patches/duckdb+1.0.0.patch

packages/vendor/duckdb-loadable-macros:
mkdir -p packages/vendor/duckdb-loadable-macros
Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ streams with little upfront load complexity or time.

* doesn't support a few types (bytes, maps, {s,}fixed{32,64}, sint{32,64}),
contributions and even feedback that these field types are used is welcome!
* execution is single threaded (limitations of the rust bindings)
* no community plugin due to lack of out of tree build support from duckdb
community repository

i'm releasing this to understand how other folks are using protobuf streams and
duckdb. i'm open to PRs, issues and other feedback.
Expand Down
1 change: 1 addition & 0 deletions packages/duckdb_protobuf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ byteorder = "1.5.0"
log = "0.4.21"
ouroboros = "0.18.4"
strum = { version = "0.26.3", features = ["derive"] }
crossbeam = "0.8.4"

[dev-dependencies]
anyhow = "1.0"
Expand Down
70 changes: 4 additions & 66 deletions packages/duckdb_protobuf/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,68 +7,6 @@ use std::fs::File;
use std::io;
use strum::{AsRefStr, EnumIter, EnumString, IntoEnumIterator};

use crate::vtab::Parameters;

pub struct RecordsReader {
files_iterator: glob::Paths,
length_kind: LengthKind,
current_file: Option<LengthDelimitedRecordsReader>,
}

impl RecordsReader {
pub fn new(params: &Parameters) -> Result<RecordsReader, anyhow::Error> {
Ok(RecordsReader {
files_iterator: glob::glob(params.files.as_str())?,
length_kind: params.length_kind,
current_file: None,
})
}

pub fn next_message(&mut self) -> Result<Option<Vec<u8>>, anyhow::Error> {
let file_reader = if let Some(reader) = &mut self.current_file {
reader
} else {
let Some(next_file_path) = self.files_iterator.next() else {
return Ok(None);
};

let next_file_path = next_file_path?;
let mut next_file = File::open(&next_file_path)?;

match self.length_kind {
LengthKind::BigEndianFixed => {
self.current_file = Some(LengthDelimitedRecordsReader::create(
next_file,
DelimitedLengthKind::BigEndianFixed,
));

self.current_file.as_mut().unwrap()
}
LengthKind::Varint => {
self.current_file = Some(LengthDelimitedRecordsReader::create(
next_file,
DelimitedLengthKind::Varint,
));

self.current_file.as_mut().unwrap()
}
LengthKind::SingleMessagePerFile => {
let mut bytes = Vec::new();
<File as io::Read>::read_to_end(&mut next_file, &mut bytes)?;
return Ok(Some(bytes));
}
}
};

let Some(next_message) = file_reader.try_get_next()? else {
self.current_file = None;
return Ok(None);
};

Ok(Some(next_message))
}
}

#[derive(Copy, Clone, EnumString, EnumIter, AsRefStr)]
pub enum LengthKind {
BigEndianFixed,
Expand All @@ -83,7 +21,7 @@ pub fn parse<T: std::str::FromStr<Err = impl Error> + IntoEnumIterator + AsRef<s
format_err!(
"{}: expected one of: {}, got: {}",
err,
LengthKind::iter()
T::iter()
.map(|it| format!("{}", it.as_ref()))
.collect::<Vec<_>>()
.join(", "),
Expand All @@ -93,7 +31,7 @@ pub fn parse<T: std::str::FromStr<Err = impl Error> + IntoEnumIterator + AsRef<s
}

#[derive(Copy, Clone)]
enum DelimitedLengthKind {
pub enum DelimitedLengthKind {
BigEndianFixed,
Varint,
}
Expand All @@ -109,7 +47,7 @@ pub struct LengthDelimitedRecordsReader {
}

impl LengthDelimitedRecordsReader {
fn create(inner: File, length_kind: DelimitedLengthKind) -> Self {
pub fn create(inner: File, length_kind: DelimitedLengthKind) -> Self {
LengthDelimitedRecordsReaderBuilder {
length_kind,
inner,
Expand All @@ -133,7 +71,7 @@ impl LengthDelimitedRecordsReader {
})?)
}

fn try_get_next(&mut self) -> Result<Option<Vec<u8>>, io::Error> {
pub fn try_get_next(&mut self) -> Result<Option<Vec<u8>>, io::Error> {
match self.get_next() {
Ok(it) => Ok(Some(it)),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Expand Down
2 changes: 1 addition & 1 deletion packages/duckdb_protobuf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use duckdb_loadable_macros::duckdb_entrypoint;

#[duckdb_entrypoint]
fn protobuf_init(conn: Connection) -> Result<(), Box<dyn Error>> {
conn.register_table_function::<ProtobufVTab>("protobuf")?;
conn.register_table_function_local_init::<ProtobufVTab>("protobuf")?;

Ok(())
}
Loading

0 comments on commit 37998a0

Please sign in to comment.