From 8e22da4da3575c68c9340be20a2d1f66b63531ad Mon Sep 17 00:00:00 2001
From: Martin Charles <martincharles07@gmail.com>
Date: Thu, 5 Sep 2024 21:17:36 -0400
Subject: [PATCH] add information columns (#19)

---
 README.md                            |   2 +
 packages/duckdb_protobuf/src/io.rs   |  26 +++-
 packages/duckdb_protobuf/src/read.rs |   2 +-
 packages/duckdb_protobuf/src/vtab.rs | 201 +++++++++++++++++++++------
 4 files changed, 186 insertions(+), 45 deletions(-)

diff --git a/README.md b/README.md
index a33de1e..ae7fde1 100644
--- a/README.md
+++ b/README.md
@@ -95,6 +95,8 @@ streams with little upfront load complexity or time.
     ([encoding](https://protobuf.dev/programming-guides/encoding/#varints)). 
     files are a sequence of messages
   * `SingleMessagePerFile`: each file contains a single message
+* `filename`, `position` and `size`: boolean values enabling columns which add
+  source information about where the messages originated from
 
 ## features
 
diff --git a/packages/duckdb_protobuf/src/io.rs b/packages/duckdb_protobuf/src/io.rs
index ab1efdd..dc044a4 100644
--- a/packages/duckdb_protobuf/src/io.rs
+++ b/packages/duckdb_protobuf/src/io.rs
@@ -5,6 +5,7 @@ use protobuf::CodedInputStream;
 use std::error::Error;
 use std::fs::File;
 use std::io;
+use std::path::{Path, PathBuf};
 use strum::{AsRefStr, EnumIter, EnumString, IntoEnumIterator};
 
 #[derive(Copy, Clone, EnumString, EnumIter, AsRefStr)]
@@ -39,6 +40,7 @@ pub enum DelimitedLengthKind {
 #[self_referencing]
 pub struct LengthDelimitedRecordsReader {
     length_kind: DelimitedLengthKind,
+    path: PathBuf,
     inner: File,
 
     #[borrows(mut inner)]
@@ -46,19 +48,27 @@ pub struct LengthDelimitedRecordsReader {
     reader: CodedInputStream<'this>,
 }
 
+pub struct Record {
+    pub bytes: Vec<u8>,
+    pub position: u64,
+    pub size: u32,
+}
+
 impl LengthDelimitedRecordsReader {
-    pub fn create(inner: File, length_kind: DelimitedLengthKind) -> Self {
+    pub fn create(inner: File, length_kind: DelimitedLengthKind, path: PathBuf) -> Self {
         LengthDelimitedRecordsReaderBuilder {
             length_kind,
+            path,
             inner,
             reader_builder: |it| CodedInputStream::new(it),
         }
         .build()
     }
 
-    fn get_next(&mut self) -> Result<Vec<u8>, io::Error> {
+    fn get_next(&mut self) -> Result<Record, io::Error> {
         let length_kind = *self.borrow_length_kind();
         Ok(self.with_reader_mut(move |reader| {
+            let position = reader.pos();
             let len = match length_kind {
                 DelimitedLengthKind::BigEndianFixed => reader.read_u32::<BigEndian>()?,
                 DelimitedLengthKind::Varint => reader.read_raw_varint32()?,
@@ -67,15 +77,23 @@ impl LengthDelimitedRecordsReader {
             let mut buf = vec![0; len as usize];
             <CodedInputStream as io::Read>::read_exact(reader, &mut buf)?;
 
-            Ok::<_, io::Error>(buf)
+            Ok::<_, io::Error>(Record {
+                bytes: buf,
+                position,
+                size: len,
+            })
         })?)
     }
 
-    pub fn try_get_next(&mut self) -> Result<Option<Vec<u8>>, io::Error> {
+    pub fn try_get_next(&mut self) -> Result<Option<Record>, io::Error> {
         match self.get_next() {
             Ok(it) => Ok(Some(it)),
             Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
             Err(err) => Err(err.into()),
         }
     }
+
+    pub fn path(&self) -> &Path {
+        self.borrow_path().as_path()
+    }
 }
diff --git a/packages/duckdb_protobuf/src/read.rs b/packages/duckdb_protobuf/src/read.rs
index 1149397..fb2a6f5 100644
--- a/packages/duckdb_protobuf/src/read.rs
+++ b/packages/duckdb_protobuf/src/read.rs
@@ -52,7 +52,7 @@ pub fn write_message(
     Ok(())
 }
 
-struct MyFlatVector<T> {
+pub struct MyFlatVector<T> {
     _phantom_data: PhantomData<T>,
     ptr: duckdb::ffi::duckdb_vector,
     capacity: usize,
diff --git a/packages/duckdb_protobuf/src/vtab.rs b/packages/duckdb_protobuf/src/vtab.rs
index c0db7f5..1e1bd35 100644
--- a/packages/duckdb_protobuf/src/vtab.rs
+++ b/packages/duckdb_protobuf/src/vtab.rs
@@ -4,15 +4,16 @@ use duckdb::vtab::{
     BindInfo, DataChunk, Free, FunctionInfo, InitInfo, LogicalType, LogicalTypeId, VTab,
     VTabLocalData,
 };
-use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor};
+use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage};
 use std::error::Error;
+use std::ffi::CString;
 use std::fs::File;
 use std::io::Read;
 use std::ops::{Deref, DerefMut};
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
 
-use crate::io::{parse, DelimitedLengthKind, LengthDelimitedRecordsReader, LengthKind};
-use crate::read::write_to_output;
+use crate::io::{parse, DelimitedLengthKind, LengthDelimitedRecordsReader, LengthKind, Record};
+use crate::read::{write_to_output, MyFlatVector, VectorAccessor};
 use crate::types::into_logical_type;
 
 pub struct Parameters {
@@ -21,6 +22,9 @@ pub struct Parameters {
     pub message_name: String,
     pub shared_message_descriptor: MessageDescriptor,
     pub length_kind: LengthKind,
+    pub include_filename: bool,
+    pub include_position: bool,
+    pub include_size: bool,
 }
 
 impl Parameters {
@@ -62,18 +66,35 @@ impl Parameters {
         let length_kind = parse::<LengthKind>(&length_kind.to_string())
             .map_err(|err| format_err!("when parsing parameter delimiter: {}", err))?;
 
+        let include_filename = bind
+            .get_named_parameter("filename")
+            .map(|value| value.to_int64() != 0)
+            .unwrap_or(false);
+
+        let include_position = bind
+            .get_named_parameter("position")
+            .map(|value| value.to_int64() != 0)
+            .unwrap_or(false);
+
+        let include_size = bind
+            .get_named_parameter("size")
+            .map(|value| value.to_int64() != 0)
+            .unwrap_or(false);
+
         Ok(Self {
             files,
             descriptor_bytes,
             message_name,
             shared_message_descriptor: message_descriptor,
             length_kind,
+            include_filename,
+            include_position,
+            include_size,
         })
     }
 
     pub fn message_descriptor(&self) -> Result<MessageDescriptor, anyhow::Error> {
-        let descriptor_pool =
-            DescriptorPool::decode(self.descriptor_bytes.as_slice())?;
+        let descriptor_pool = DescriptorPool::decode(self.descriptor_bytes.as_slice())?;
 
         let message_descriptor = descriptor_pool
             .get_message_by_name(&self.message_name)
@@ -100,10 +121,18 @@ impl Parameters {
                 "delimiter".to_string(),
                 LogicalType::new(LogicalTypeId::Varchar),
             ),
+            (
+                "filename".to_string(),
+                LogicalType::new(LogicalTypeId::Boolean),
+            ),
+            (
+                "position".to_string(),
+                LogicalType::new(LogicalTypeId::Boolean),
+            ),
+            ("size".to_string(), LogicalType::new(LogicalTypeId::Boolean)),
         ]
     }
 }
-
 pub struct GlobalState {
     queue: ArrayQueue<PathBuf>,
 }
@@ -180,6 +209,18 @@ impl ProtobufVTab {
             );
         }
 
+        if params.include_filename {
+            bind.add_result_column("filename", LogicalType::new(LogicalTypeId::Varchar));
+        }
+
+        if params.include_position {
+            bind.add_result_column("position", LogicalType::new(LogicalTypeId::UBigint));
+        }
+
+        if params.include_size {
+            bind.add_result_column("size", LogicalType::new(LogicalTypeId::UBigint));
+        }
+
         data.assign(params);
 
         Ok(())
@@ -221,13 +262,17 @@ impl ProtobufVTab {
         let mut column_information = Default::default();
 
         for output_row_idx in 0..available_chunk_size {
-            let bytes = match state_container.next_message()? {
+            let StateContainerValue {
+                path_reference,
+                size,
+                bytes,
+                position,
+            } = match state_container.next_message()? {
                 None => break,
-                Some(bytes) => bytes,
+                Some(message_info) => message_info,
             };
 
-            let message =
-                DynamicMessage::decode(local_descriptor.clone(), bytes.as_slice())?;
+            let message = DynamicMessage::decode(local_descriptor.clone(), bytes.as_slice())?;
 
             write_to_output(
                 &mut column_information,
@@ -237,6 +282,48 @@ impl ProtobufVTab {
                 output_row_idx,
             )?;
 
+            let mut field_offset = message.descriptor().fields().len();
+
+            if parameters.include_filename {
+                let it = (|| -> Option<CString> {
+                    let value = CString::new(path_reference.path().to_str()?).ok()?;
+                    Some(value)
+                })();
+
+                let column = output.get_vector(field_offset);
+
+                match it {
+                    None => unsafe {
+                        let validity = duckdb::ffi::duckdb_vector_get_validity(column);
+                        duckdb::ffi::duckdb_validity_set_row_invalid(validity, output_row_idx as _);
+                    },
+                    Some(value) => unsafe {
+                        duckdb::ffi::duckdb_vector_assign_string_element(
+                            column,
+                            output_row_idx as _,
+                            value.as_ptr(),
+                        )
+                    },
+                }
+
+                field_offset += 1;
+            }
+
+            if parameters.include_position {
+                let column = output.get_vector(field_offset);
+                let mut vector =
+                    unsafe { MyFlatVector::<u64>::with_capacity(column, available_chunk_size) };
+                vector.as_mut_slice()[output_row_idx] = position as _;
+                field_offset += 1;
+            }
+
+            if parameters.include_size {
+                let column = output.get_vector(field_offset);
+                let mut vector =
+                    unsafe { MyFlatVector::<u64>::with_capacity(column, available_chunk_size) };
+                vector.as_mut_slice()[output_row_idx] = size as _;
+            }
+
             items += 1;
         }
 
@@ -252,47 +339,81 @@ struct StateContainer<'a> {
     parameters: &'a Parameters,
 }
 
-impl StateContainer<'_> {
-    fn next_message(&mut self) -> Result<Option<Vec<u8>>, anyhow::Error> {
-        let file_reader = if let Some(reader) = &mut self.local_state.current {
-            reader
-        } else {
-            let Some(next_file_path) = self.global_state.queue.pop() else {
-                return Ok(None);
-            };
+enum PathReference<'a> {
+    Borrowed(&'a Path),
+    Owned(PathBuf),
+}
+
+impl<'a> PathReference<'a> {
+    pub fn path(&self) -> &Path {
+        match self {
+            PathReference::Borrowed(it) => *it,
+            PathReference::Owned(it) => it.as_path(),
+        }
+    }
+}
+
+struct StateContainerValue<'a> {
+    path_reference: PathReference<'a>,
+    bytes: Vec<u8>,
+    size: usize,
+    position: u64,
+}
 
-            let mut next_file = File::open(&next_file_path)?;
-            match self.parameters.length_kind {
-                LengthKind::BigEndianFixed => {
-                    self.local_state.current = Some(LengthDelimitedRecordsReader::create(
+impl StateContainer<'_> {
+    fn next_message(&mut self) -> Result<Option<StateContainerValue>, anyhow::Error> {
+        let mut value = match self.local_state.current.take() {
+            Some(it) => it,
+            None => {
+                let Some(next_file_path) = self.global_state.queue.pop() else {
+                    return Ok(None);
+                };
+
+                let mut next_file = File::open(&next_file_path)?;
+                match self.parameters.length_kind {
+                    LengthKind::BigEndianFixed => LengthDelimitedRecordsReader::create(
                         next_file,
                         DelimitedLengthKind::BigEndianFixed,
-                    ));
-
-                    self.local_state.current.as_mut().unwrap()
-                }
-                LengthKind::Varint => {
-                    self.local_state.current = Some(LengthDelimitedRecordsReader::create(
+                        next_file_path,
+                    ),
+                    LengthKind::Varint => LengthDelimitedRecordsReader::create(
                         next_file,
                         DelimitedLengthKind::Varint,
-                    ));
-
-                    self.local_state.current.as_mut().unwrap()
-                }
-                LengthKind::SingleMessagePerFile => {
-                    let mut bytes = Vec::new();
-                    next_file.read_to_end(&mut bytes)?;
-                    return Ok(Some(bytes));
+                        next_file_path,
+                    ),
+                    LengthKind::SingleMessagePerFile => {
+                        let mut bytes = Vec::new();
+                        next_file.read_to_end(&mut bytes)?;
+                        let size = bytes.len();
+                        return Ok(Some(StateContainerValue {
+                            bytes,
+                            path_reference: PathReference::Owned(next_file_path),
+                            position: 0,
+                            size,
+                        }));
+                    }
                 }
             }
         };
 
-        let Some(next_message) = file_reader.try_get_next()? else {
-            self.local_state.current = None;
+        let Some(Record {
+            position,
+            size,
+            bytes: next_message,
+        }) = value.try_get_next()?
+        else {
             return Ok(None);
         };
 
-        Ok(Some(next_message))
+        self.local_state.current = Some(value);
+        Ok(Some(StateContainerValue {
+            path_reference: PathReference::Borrowed(
+                self.local_state.current.as_ref().unwrap().path(),
+            ),
+            bytes: next_message,
+            size: size as _,
+            position,
+        }))
     }
 }