From 7cd83187bc40b26cbdef3a57594a1036a524e948 Mon Sep 17 00:00:00 2001 From: hammadb Date: Tue, 2 Apr 2024 14:51:26 -0700 Subject: [PATCH] wip --- .../src/blockstore/arrow_blockfile/blockfile.rs | 12 ++++++++---- rust/worker/src/execution/data/data_chunk.rs | 4 ++-- rust/worker/src/segment/types.rs | 15 +++++++++------ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs index 93404e90e0b..73b095f0ce9 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs @@ -99,7 +99,7 @@ impl Blockfile for ArrowBlockfile { } } - fn delete(&mut self, key: BlockfileKey) -> Result<(), Box> { + fn delete(&mut self, key: BlockfileKey) -> Result<(), Box> { if !self.in_transaction() { return Err(Box::new(BlockfileError::TransactionNotInProgress)); } @@ -128,7 +128,11 @@ impl Blockfile for ArrowBlockfile { let delta = match transaction_state.get_delta_for_block(&target_block_id) { None => { let target_block = match self.block_provider.get_block(&target_block_id) { - None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)), + None => { + return Err(Box::new(BlockfileError::Other(Box::new( + ArrowBlockfileError::BlockNotFoundError, + )))); + } Some(block) => block, }; let delta = BlockDelta::from(target_block); @@ -138,7 +142,7 @@ impl Blockfile for ArrowBlockfile { Some(delta) => delta, }; - delta.delete(key).into_result() + delta.delete(key) } fn get_by_prefix( @@ -435,7 +439,7 @@ impl ArrowBlockfile { self.transaction_state.is_some() } - fn validate_key(&self, key: &BlockfileKey) -> Result<(), Box> { + fn validate_key(&self, key: &BlockfileKey) -> Result<(), Box> { match key.key { Key::String(_) => { if self.key_type != KeyType::String { diff --git a/rust/worker/src/execution/data/data_chunk.rs b/rust/worker/src/execution/data/data_chunk.rs index 176adc40f0b..4ff4bb77bfd 100644 --- a/rust/worker/src/execution/data/data_chunk.rs +++ b/rust/worker/src/execution/data/data_chunk.rs @@ -1,4 +1,4 @@ -use crate::types::{LogRecord, OperationRecord}; +use crate::types::LogRecord; use std::sync::Arc; #[derive(Clone, Debug)] @@ -108,8 +108,8 @@ impl<'a> Iterator for DataChunkIteraror<'a> { #[cfg(test)] mod tests { use super::*; - use crate::types::LogRecord; use crate::types::Operation; + use crate::types::{LogRecord, OperationRecord}; #[test] fn test_data_chunk() { diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index c7b041ee8e3..c2855455414 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -11,13 +11,16 @@ use crate::{ /// A MaterializedLogRecord would have the metadata fully reconciled in its materialized_metadata /// field. pub(super) struct MaterializedLogRecord<'a> { - segment_offset_id: u32, + segment_offset_id: Option, // If the record is new, this is the offset id assigned to it record: &'a LogRecord, - new_embedding: Option>, + old_embedding: Option>, new_metadata: Option, - new_document: Option, + old_metadata: Option, + old_document: Option, } +// In order to update full text search we need to know the old document so we can remove it + pub(super) trait SegmentWriter { fn begin_transaction(&mut self); fn write_records(&mut self, records: Vec>, offset_ids: Vec>); @@ -26,10 +29,10 @@ pub(super) trait SegmentWriter { } pub(super) trait OffsetIdAssigner: SegmentWriter { - fn assign_offset_ids(&mut self, records: Vec>) -> Vec>; + fn assign_offset_ids(&mut self, records: DataChunk) -> Vec>; } -pub(super) fn data_chunk() { +pub(super) async fn data_chunk() { let data = vec![ LogRecord { log_offset: 1, @@ -68,7 +71,7 @@ pub(super) fn data_chunk() { println!("Record: {:?}", stored_record.record); } let arc_stored: Arc<[MaterializedLogRecord]> = stored_record_vec.into(); - test_store_fn(arc_stored); + test_store_fn(arc_stored).await; } pub(super) async fn test_store_fn(arc_stored: Arc<[MaterializedLogRecord<'_>]>) {