Skip to content

Commit

Permalink
add information columns (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xcaff authored Sep 6, 2024
1 parent 79a3f52 commit 8e22da4
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 45 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ streams with little upfront load complexity or time.
([encoding](https://protobuf.dev/programming-guides/encoding/#varints)).
files are a sequence of messages
* `SingleMessagePerFile`: each file contains a single message
* `filename`, `position` and `size`: boolean values enabling columns which add
source information about where the messages originated from

## features

Expand Down
26 changes: 22 additions & 4 deletions packages/duckdb_protobuf/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use protobuf::CodedInputStream;
use std::error::Error;
use std::fs::File;
use std::io;
use std::path::{Path, PathBuf};
use strum::{AsRefStr, EnumIter, EnumString, IntoEnumIterator};

#[derive(Copy, Clone, EnumString, EnumIter, AsRefStr)]
Expand Down Expand Up @@ -39,26 +40,35 @@ pub enum DelimitedLengthKind {
#[self_referencing]
pub struct LengthDelimitedRecordsReader {
length_kind: DelimitedLengthKind,
path: PathBuf,
inner: File,

#[borrows(mut inner)]
#[not_covariant]
reader: CodedInputStream<'this>,
}

pub struct Record {
pub bytes: Vec<u8>,
pub position: u64,
pub size: u32,
}

impl LengthDelimitedRecordsReader {
pub fn create(inner: File, length_kind: DelimitedLengthKind) -> Self {
pub fn create(inner: File, length_kind: DelimitedLengthKind, path: PathBuf) -> Self {
LengthDelimitedRecordsReaderBuilder {
length_kind,
path,
inner,
reader_builder: |it| CodedInputStream::new(it),
}
.build()
}

fn get_next(&mut self) -> Result<Vec<u8>, io::Error> {
fn get_next(&mut self) -> Result<Record, io::Error> {
let length_kind = *self.borrow_length_kind();
Ok(self.with_reader_mut(move |reader| {
let position = reader.pos();
let len = match length_kind {
DelimitedLengthKind::BigEndianFixed => reader.read_u32::<BigEndian>()?,
DelimitedLengthKind::Varint => reader.read_raw_varint32()?,
Expand All @@ -67,15 +77,23 @@ impl LengthDelimitedRecordsReader {
let mut buf = vec![0; len as usize];
<CodedInputStream as io::Read>::read_exact(reader, &mut buf)?;

Ok::<_, io::Error>(buf)
Ok::<_, io::Error>(Record {
bytes: buf,
position,
size: len,
})
})?)
}

pub fn try_get_next(&mut self) -> Result<Option<Vec<u8>>, io::Error> {
pub fn try_get_next(&mut self) -> Result<Option<Record>, io::Error> {
match self.get_next() {
Ok(it) => Ok(Some(it)),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Err(err) => Err(err.into()),
}
}

pub fn path(&self) -> &Path {
self.borrow_path().as_path()
}
}
2 changes: 1 addition & 1 deletion packages/duckdb_protobuf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn write_message(
Ok(())
}

struct MyFlatVector<T> {
pub struct MyFlatVector<T> {
_phantom_data: PhantomData<T>,
ptr: duckdb::ffi::duckdb_vector,
capacity: usize,
Expand Down
201 changes: 161 additions & 40 deletions packages/duckdb_protobuf/src/vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use duckdb::vtab::{
BindInfo, DataChunk, Free, FunctionInfo, InitInfo, LogicalType, LogicalTypeId, VTab,
VTabLocalData,
};
use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor};
use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage};
use std::error::Error;
use std::ffi::CString;
use std::fs::File;
use std::io::Read;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::path::{Path, PathBuf};

use crate::io::{parse, DelimitedLengthKind, LengthDelimitedRecordsReader, LengthKind};
use crate::read::write_to_output;
use crate::io::{parse, DelimitedLengthKind, LengthDelimitedRecordsReader, LengthKind, Record};
use crate::read::{write_to_output, MyFlatVector, VectorAccessor};
use crate::types::into_logical_type;

pub struct Parameters {
Expand All @@ -21,6 +22,9 @@ pub struct Parameters {
pub message_name: String,
pub shared_message_descriptor: MessageDescriptor,
pub length_kind: LengthKind,
pub include_filename: bool,
pub include_position: bool,
pub include_size: bool,
}

impl Parameters {
Expand Down Expand Up @@ -62,18 +66,35 @@ impl Parameters {
let length_kind = parse::<LengthKind>(&length_kind.to_string())
.map_err(|err| format_err!("when parsing parameter delimiter: {}", err))?;

let include_filename = bind
.get_named_parameter("filename")
.map(|value| value.to_int64() != 0)
.unwrap_or(false);

let include_position = bind
.get_named_parameter("position")
.map(|value| value.to_int64() != 0)
.unwrap_or(false);

let include_size = bind
.get_named_parameter("size")
.map(|value| value.to_int64() != 0)
.unwrap_or(false);

Ok(Self {
files,
descriptor_bytes,
message_name,
shared_message_descriptor: message_descriptor,
length_kind,
include_filename,
include_position,
include_size,
})
}

pub fn message_descriptor(&self) -> Result<MessageDescriptor, anyhow::Error> {
let descriptor_pool =
DescriptorPool::decode(self.descriptor_bytes.as_slice())?;
let descriptor_pool = DescriptorPool::decode(self.descriptor_bytes.as_slice())?;

let message_descriptor = descriptor_pool
.get_message_by_name(&self.message_name)
Expand All @@ -100,10 +121,18 @@ impl Parameters {
"delimiter".to_string(),
LogicalType::new(LogicalTypeId::Varchar),
),
(
"filename".to_string(),
LogicalType::new(LogicalTypeId::Boolean),
),
(
"position".to_string(),
LogicalType::new(LogicalTypeId::Boolean),
),
("size".to_string(), LogicalType::new(LogicalTypeId::Boolean)),
]
}
}

pub struct GlobalState {
queue: ArrayQueue<PathBuf>,
}
Expand Down Expand Up @@ -180,6 +209,18 @@ impl ProtobufVTab {
);
}

if params.include_filename {
bind.add_result_column("filename", LogicalType::new(LogicalTypeId::Varchar));
}

if params.include_position {
bind.add_result_column("position", LogicalType::new(LogicalTypeId::UBigint));
}

if params.include_size {
bind.add_result_column("size", LogicalType::new(LogicalTypeId::UBigint));
}

data.assign(params);

Ok(())
Expand Down Expand Up @@ -221,13 +262,17 @@ impl ProtobufVTab {
let mut column_information = Default::default();

for output_row_idx in 0..available_chunk_size {
let bytes = match state_container.next_message()? {
let StateContainerValue {
path_reference,
size,
bytes,
position,
} = match state_container.next_message()? {
None => break,
Some(bytes) => bytes,
Some(message_info) => message_info,
};

let message =
DynamicMessage::decode(local_descriptor.clone(), bytes.as_slice())?;
let message = DynamicMessage::decode(local_descriptor.clone(), bytes.as_slice())?;

write_to_output(
&mut column_information,
Expand All @@ -237,6 +282,48 @@ impl ProtobufVTab {
output_row_idx,
)?;

let mut field_offset = message.descriptor().fields().len();

if parameters.include_filename {
let it = (|| -> Option<CString> {
let value = CString::new(path_reference.path().to_str()?).ok()?;
Some(value)
})();

let column = output.get_vector(field_offset);

match it {
None => unsafe {
let validity = duckdb::ffi::duckdb_vector_get_validity(column);
duckdb::ffi::duckdb_validity_set_row_invalid(validity, output_row_idx as _);
},
Some(value) => unsafe {
duckdb::ffi::duckdb_vector_assign_string_element(
column,
output_row_idx as _,
value.as_ptr(),
)
},
}

field_offset += 1;
}

if parameters.include_position {
let column = output.get_vector(field_offset);
let mut vector =
unsafe { MyFlatVector::<u64>::with_capacity(column, available_chunk_size) };
vector.as_mut_slice()[output_row_idx] = position as _;
field_offset += 1;
}

if parameters.include_size {
let column = output.get_vector(field_offset);
let mut vector =
unsafe { MyFlatVector::<u64>::with_capacity(column, available_chunk_size) };
vector.as_mut_slice()[output_row_idx] = size as _;
}

items += 1;
}

Expand All @@ -252,47 +339,81 @@ struct StateContainer<'a> {
parameters: &'a Parameters,
}

impl StateContainer<'_> {
fn next_message(&mut self) -> Result<Option<Vec<u8>>, anyhow::Error> {
let file_reader = if let Some(reader) = &mut self.local_state.current {
reader
} else {
let Some(next_file_path) = self.global_state.queue.pop() else {
return Ok(None);
};
enum PathReference<'a> {
Borrowed(&'a Path),
Owned(PathBuf),
}

impl<'a> PathReference<'a> {
pub fn path(&self) -> &Path {
match self {
PathReference::Borrowed(it) => *it,
PathReference::Owned(it) => it.as_path(),
}
}
}

struct StateContainerValue<'a> {
path_reference: PathReference<'a>,
bytes: Vec<u8>,
size: usize,
position: u64,
}

let mut next_file = File::open(&next_file_path)?;
match self.parameters.length_kind {
LengthKind::BigEndianFixed => {
self.local_state.current = Some(LengthDelimitedRecordsReader::create(
impl StateContainer<'_> {
fn next_message(&mut self) -> Result<Option<StateContainerValue>, anyhow::Error> {
let mut value = match self.local_state.current.take() {
Some(it) => it,
None => {
let Some(next_file_path) = self.global_state.queue.pop() else {
return Ok(None);
};

let mut next_file = File::open(&next_file_path)?;
match self.parameters.length_kind {
LengthKind::BigEndianFixed => LengthDelimitedRecordsReader::create(
next_file,
DelimitedLengthKind::BigEndianFixed,
));

self.local_state.current.as_mut().unwrap()
}
LengthKind::Varint => {
self.local_state.current = Some(LengthDelimitedRecordsReader::create(
next_file_path,
),
LengthKind::Varint => LengthDelimitedRecordsReader::create(
next_file,
DelimitedLengthKind::Varint,
));

self.local_state.current.as_mut().unwrap()
}
LengthKind::SingleMessagePerFile => {
let mut bytes = Vec::new();
next_file.read_to_end(&mut bytes)?;
return Ok(Some(bytes));
next_file_path,
),
LengthKind::SingleMessagePerFile => {
let mut bytes = Vec::new();
next_file.read_to_end(&mut bytes)?;
let size = bytes.len();
return Ok(Some(StateContainerValue {
bytes,
path_reference: PathReference::Owned(next_file_path),
position: 0,
size,
}));
}
}
}
};

let Some(next_message) = file_reader.try_get_next()? else {
self.local_state.current = None;
let Some(Record {
position,
size,
bytes: next_message,
}) = value.try_get_next()?
else {
return Ok(None);
};

Ok(Some(next_message))
self.local_state.current = Some(value);
Ok(Some(StateContainerValue {
path_reference: PathReference::Borrowed(
self.local_state.current.as_ref().unwrap().path(),
),
bytes: next_message,
size: size as _,
position,
}))
}
}

Expand Down

0 comments on commit 8e22da4

Please sign in to comment.