Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Apr 2, 2024
1 parent 4c2c28a commit 7cd8318
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
12 changes: 8 additions & 4 deletions rust/worker/src/blockstore/arrow_blockfile/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Blockfile for ArrowBlockfile {
}
}

fn delete(&mut self, key: BlockfileKey) -> Result<(), Box<dyn ChromaError>> {
fn delete(&mut self, key: BlockfileKey) -> Result<(), Box<BlockfileError>> {
if !self.in_transaction() {
return Err(Box::new(BlockfileError::TransactionNotInProgress));
}
Expand Down Expand Up @@ -128,7 +128,11 @@ impl Blockfile for ArrowBlockfile {
let delta = match transaction_state.get_delta_for_block(&target_block_id) {
None => {
let target_block = match self.block_provider.get_block(&target_block_id) {
None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)),
None => {
return Err(Box::new(BlockfileError::Other(Box::new(
ArrowBlockfileError::BlockNotFoundError,
))));
}
Some(block) => block,
};
let delta = BlockDelta::from(target_block);
Expand All @@ -138,7 +142,7 @@ impl Blockfile for ArrowBlockfile {
Some(delta) => delta,
};

delta.delete(key).into_result()
delta.delete(key)
}

fn get_by_prefix(
Expand Down Expand Up @@ -435,7 +439,7 @@ impl ArrowBlockfile {
self.transaction_state.is_some()
}

fn validate_key(&self, key: &BlockfileKey) -> Result<(), Box<dyn ChromaError>> {
fn validate_key(&self, key: &BlockfileKey) -> Result<(), Box<BlockfileError>> {
match key.key {
Key::String(_) => {
if self.key_type != KeyType::String {
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/execution/data/data_chunk.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::types::{LogRecord, OperationRecord};
use crate::types::LogRecord;
use std::sync::Arc;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -108,8 +108,8 @@ impl<'a> Iterator for DataChunkIteraror<'a> {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::LogRecord;
use crate::types::Operation;
use crate::types::{LogRecord, OperationRecord};

#[test]
fn test_data_chunk() {
Expand Down
15 changes: 9 additions & 6 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ use crate::{
/// A MaterializedLogRecord would have the metadata fully reconciled in its materialized_metadata
/// field.
pub(super) struct MaterializedLogRecord<'a> {
segment_offset_id: u32,
segment_offset_id: Option<u32>, // If the record is new, this is the offset id assigned to it
record: &'a LogRecord,
new_embedding: Option<Vec<f32>>,
old_embedding: Option<Vec<f32>>,
new_metadata: Option<crate::types::Metadata>,
new_document: Option<String>,
old_metadata: Option<crate::types::Metadata>,
old_document: Option<String>,
}

// In order to update full text search we need to know the old document so we can remove it

pub(super) trait SegmentWriter {
fn begin_transaction(&mut self);
fn write_records(&mut self, records: Vec<Box<LogRecord>>, offset_ids: Vec<Option<u32>>);
Expand All @@ -26,10 +29,10 @@ pub(super) trait SegmentWriter {
}

pub(super) trait OffsetIdAssigner: SegmentWriter {
fn assign_offset_ids(&mut self, records: Vec<Box<LogRecord>>) -> Vec<Option<u32>>;
fn assign_offset_ids(&mut self, records: DataChunk) -> Vec<Option<u32>>;
}

pub(super) fn data_chunk() {
pub(super) async fn data_chunk() {
let data = vec![
LogRecord {
log_offset: 1,
Expand Down Expand Up @@ -68,7 +71,7 @@ pub(super) fn data_chunk() {
println!("Record: {:?}", stored_record.record);
}
let arc_stored: Arc<[MaterializedLogRecord]> = stored_record_vec.into();
test_store_fn(arc_stored);
test_store_fn(arc_stored).await;
}

pub(super) async fn test_store_fn(arc_stored: Arc<[MaterializedLogRecord<'_>]>) {
Expand Down

0 comments on commit 7cd8318

Please sign in to comment.