From b5aeb39878815f3a3d6fd8fa02f49f2dd4a8f608 Mon Sep 17 00:00:00 2001 From: hammadb Date: Mon, 11 Mar 2024 16:58:28 -0700 Subject: [PATCH] e2e working, needs several types of cleanup --- .../blockstore/arrow_blockfile/block/delta.rs | 6 +- .../blockstore/arrow_blockfile/block/types.rs | 66 ++++++--- .../src/blockstore/arrow_blockfile/mod.rs | 2 +- .../blockstore/arrow_blockfile/provider.rs | 4 +- rust/worker/src/blockstore/mod.rs | 2 +- rust/worker/src/blockstore/types.rs | 2 +- rust/worker/src/segment/record_segment.rs | 133 ++++++++++++++++-- rust/worker/src/segment/types.rs | 8 +- 8 files changed, 183 insertions(+), 40 deletions(-) diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs index 4ea32c815628..d1327324c497 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs @@ -206,7 +206,11 @@ impl BlockDeltaInner { fn get_document_size(&self) -> usize { self.new_data.iter().fold(0, |acc, (_, value)| match value { Value::EmbeddingRecordValue(embedding_record) => { - acc + embedding_record.get_document().unwrap().len() + let len = match &embedding_record.get_document() { + Some(document) => document.len(), + None => 0, + }; + acc + len } _ => 0, }) diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs index 16f0552f2e22..10bfb947e4a9 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs @@ -1,6 +1,6 @@ use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType}; use crate::errors::{ChromaError, ErrorCodes}; -use crate::types::{MetadataValue, UpdateMetadataValue}; +use crate::types::{EmbeddingRecord, MetadataValue, UpdateMetadataValue}; use arrow::array::{ ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, Float32Array, Float32Builder, GenericByteBuilder, StructArray, StructBuilder, UInt32Array, UInt32Builder, @@ -11,6 +11,7 @@ use arrow::{ datatypes::{DataType, Field}, record_batch::RecordBatch, }; +use num_bigint::BigInt; use parking_lot::RwLock; use prost_types::Struct; use rayon::vec; @@ -184,6 +185,45 @@ impl Block { .value(i), )) } + ValueType::EmbeddingRecord => { + let records = + value.as_any().downcast_ref::().unwrap(); + let id = records + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(i) + .to_string(); + let document = match records + .column(1) + .as_any() + .downcast_ref::() + { + Some(document) => Some(document.value(i).to_string()), + None => None, + }; + let metadata = match records + .column(2) + .as_any() + .downcast_ref::() + { + Some(metadata) => Some(metadata.value(i).to_string()), + None => None, + }; + return Some(Value::EmbeddingRecordValue(EmbeddingRecord { + id, + seq_id: BigInt::from(0), // TODO: THIS IS WRONG, WE NEED A NEW TYPE + operation: crate::types::Operation::Add, // TODO: THIS IS WRONG, WE NEED A NEW TYPE + embedding: Some(vec![1.0, 2.0, 3.0]), // TODO: populate this + encoding: None, // TODO: populate this + metadata: None, + collection_id: Uuid::parse_str( + "00000000-0000-0000-0000-000000000000", + ) + .unwrap(), + })); + } // TODO: Add support for other types _ => unimplemented!(), } @@ -592,23 +632,13 @@ impl BlockDataBuilder { builder .user_id_builder .append_value(embedding_record.id.clone()); - match embedding_record.get_document() { - Some(document) => { - builder.document_builder.append_value(document); - } - None => { - // TODO: append nulls - } - } - match embedding_record.metadata { - Some(metadata) => { - // TODO: Turn metadata into json - builder.metadata_builder.append_value("HAS"); - } - None => { - // TODO: append nulls - } - } + builder + .document_builder + .append_option(embedding_record.get_document()); + // TODO: This is a placeholder for the metadata field once we have json + cache support + builder + .metadata_builder + .append_option(Some("has".to_string())); } _ => unreachable!("Invalid value type for block"), }, diff --git a/rust/worker/src/blockstore/arrow_blockfile/mod.rs b/rust/worker/src/blockstore/arrow_blockfile/mod.rs index fdff38999eb4..b8049b331f93 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/mod.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/mod.rs @@ -1,4 +1,4 @@ mod block; mod blockfile; -mod provider; +pub(crate) mod provider; mod sparse_index; diff --git a/rust/worker/src/blockstore/arrow_blockfile/provider.rs b/rust/worker/src/blockstore/arrow_blockfile/provider.rs index 33013ec31416..e31f81d876e3 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/provider.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/provider.rs @@ -9,13 +9,13 @@ use uuid::Uuid; /// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production). /// For now, it keeps a simple local cache of blockfiles. -pub(super) struct ArrowBlockfileProvider { +pub(crate) struct ArrowBlockfileProvider { block_provider: ArrowBlockProvider, files: HashMap>, } impl ArrowBlockfileProvider { - pub(super) fn new() -> Self { + pub(crate) fn new() -> Self { Self { block_provider: ArrowBlockProvider::new(), files: HashMap::new(), diff --git a/rust/worker/src/blockstore/mod.rs b/rust/worker/src/blockstore/mod.rs index 4facc35f85d4..92a4e0cd2990 100644 --- a/rust/worker/src/blockstore/mod.rs +++ b/rust/worker/src/blockstore/mod.rs @@ -1,7 +1,7 @@ -mod arrow_blockfile; mod positional_posting_list_value; mod types; +pub(crate) mod arrow_blockfile; pub(crate) mod provider; pub(crate) use positional_posting_list_value::*; diff --git a/rust/worker/src/blockstore/types.rs b/rust/worker/src/blockstore/types.rs index b161235a9bfd..048a8453958e 100644 --- a/rust/worker/src/blockstore/types.rs +++ b/rust/worker/src/blockstore/types.rs @@ -253,7 +253,7 @@ impl From<&Value> for ValueType { Value::Int32ArrayValue(_) => ValueType::Int32Array, Value::PositionalPostingListValue(_) => ValueType::PositionalPostingList, Value::RoaringBitmapValue(_) => ValueType::RoaringBitmap, - Value::EmbeddingRecordValue(_) => unimplemented!(), + Value::EmbeddingRecordValue(_) => ValueType::EmbeddingRecord, Value::StringValue(_) => ValueType::String, Value::IntValue(_) => ValueType::Int, Value::UintValue(_) => ValueType::Uint, diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index e9916da216cf..9842536bf9e4 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -1,7 +1,7 @@ use super::types::{OffsetIdAssigner, SegmentWriter}; use crate::blockstore::{provider::BlockfileProvider, Blockfile}; -use crate::blockstore::{KeyType, ValueType}; -use crate::types::{EmbeddingRecord, Segment}; +use crate::blockstore::{BlockfileKey, Key, KeyType, Value, ValueType}; +use crate::types::{EmbeddingRecord, Operation, Segment}; use std::sync::atomic::AtomicU32; struct RecordSegment { @@ -15,6 +15,7 @@ struct RecordSegment { - in a separate file (bad) - special prefix in the blockfile (meh) */ + commited_max_offset_id: AtomicU32, current_max_offset_id: AtomicU32, } @@ -37,6 +38,7 @@ impl RecordSegment { id_to_user_id, records, current_max_offset_id: AtomicU32::new(0), + commited_max_offset_id: AtomicU32::new(0), }, // TODO: prefer to error out here _ => panic!("Failed to create blockfiles"), @@ -49,20 +51,59 @@ impl RecordSegment { } impl SegmentWriter for RecordSegment { - fn begin_transaction(&self) { - todo!() + fn begin_transaction(&mut self) { + let t1 = self.user_id_to_id.begin_transaction(); + let t2 = self.id_to_user_id.begin_transaction(); + let t3 = self.records.begin_transaction(); + match (t1, t2, t3) { + (Ok(()), Ok(()), Ok(())) => {} + // TODO: handle error better and add error to interface + _ => panic!("Failed to begin transaction"), + } } fn write_records( - &self, - records: Vec>, - offset_ids: Vec, + &mut self, + mut records: Vec>, + mut offset_ids: Vec>, ) { - todo!() + for (record, offset_id) in records.drain(..).zip(offset_ids.drain(..)) { + match record.operation { + Operation::Add => { + // TODO: error handling + let id = offset_id.unwrap(); + // TODO: Support empty prefixes in blockfile keys + let res = self.user_id_to_id.set( + BlockfileKey::new("".to_string(), Key::String(record.id.clone())), + Value::UintValue(id), + ); + // TODO: use the res + let res = self.id_to_user_id.set( + BlockfileKey::new("".to_string(), Key::Uint(id)), + Value::StringValue(record.id.clone()), + ); + let res = self.records.set( + BlockfileKey::new("".to_string(), Key::Uint(id)), + Value::EmbeddingRecordValue(*record), + ); + } + // TODO: support other operations + Operation::Upsert => {} + Operation::Update => {} + Operation::Delete => {} + } + } } - fn commit_transaction(&self) { - todo!() + fn commit_transaction(&mut self) { + let t1 = self.user_id_to_id.commit_transaction(); + let t2 = self.id_to_user_id.commit_transaction(); + let t3 = self.records.commit_transaction(); + match (t1, t2, t3) { + (Ok(()), Ok(()), Ok(())) => {} + // TODO: handle errors + _ => panic!("Failed to commit transaction"), + } } fn rollback_transaction(&self) { @@ -71,7 +112,75 @@ impl SegmentWriter for RecordSegment { } impl OffsetIdAssigner for RecordSegment { - fn assign_offset_ids(&self, records: Vec>) -> Vec { - todo!() + fn assign_offset_ids(&self, records: Vec>) -> Vec> { + // TODO: this should happen in a transaction + let mut offset_ids = Vec::new(); + for record in records { + // Only ADD and UPSERT (if an add) assign an offset id + let id = match record.operation { + Operation::Add => Some( + self.current_max_offset_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + ), + Operation::Upsert => { + // TODO: support empty prefixes in blockfile keys + let exists = self + .user_id_to_id + .get(BlockfileKey::new("".to_string(), Key::String(record.id))); + // TODO: I think not-found should be a None not an error + match exists { + Ok(_) => None, + Err(_) => Some( + self.current_max_offset_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + ), + } + } + _ => None, + }; + offset_ids.push(id); + } + offset_ids + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::ScalarEncoding; + use num_bigint::BigInt; + use uuid::Uuid; + + #[test] + fn can_write_to_segment() { + let blockfile_provider = + Box::new(crate::blockstore::arrow_blockfile::provider::ArrowBlockfileProvider::new()); + let mut segment = RecordSegment::new(blockfile_provider); + segment.begin_transaction(); + let record = Box::new(EmbeddingRecord { + id: "test".to_string(), + operation: Operation::Add, + embedding: Some(vec![1.0, 2.0, 3.0]), + seq_id: BigInt::from(0), + encoding: Some(ScalarEncoding::FLOAT32), + metadata: None, + collection_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(), + }); + let records = vec![record]; + let offset_ids = segment.assign_offset_ids(records.clone()); + segment.write_records(records, offset_ids); + segment.commit_transaction(); + + let res = segment + .records + .get(BlockfileKey::new("".to_string(), Key::Uint(0))); + assert!(res.is_ok()); + let res = res.unwrap(); + match res { + Value::EmbeddingRecordValue(record) => { + assert_eq!(record.id, "test"); + } + _ => panic!("Wrong value type"), + } } } diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index 4f55e51660fc..342291d6e5c7 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -1,12 +1,12 @@ use crate::types::EmbeddingRecord; pub(super) trait SegmentWriter { - fn begin_transaction(&self); - fn write_records(&self, records: Vec>, offset_ids: Vec); - fn commit_transaction(&self); + fn begin_transaction(&mut self); + fn write_records(&mut self, records: Vec>, offset_ids: Vec>); + fn commit_transaction(&mut self); fn rollback_transaction(&self); } pub(super) trait OffsetIdAssigner: SegmentWriter { - fn assign_offset_ids(&self, records: Vec>) -> Vec; + fn assign_offset_ids(&self, records: Vec>) -> Vec>; }