Skip to content

Commit

Permalink
e2e working, needs several types of cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 11, 2024
1 parent 9e8191d commit b5aeb39
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 40 deletions.
6 changes: 5 additions & 1 deletion rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ impl BlockDeltaInner {
fn get_document_size(&self) -> usize {
self.new_data.iter().fold(0, |acc, (_, value)| match value {
Value::EmbeddingRecordValue(embedding_record) => {
acc + embedding_record.get_document().unwrap().len()
let len = match &embedding_record.get_document() {
Some(document) => document.len(),
None => 0,
};
acc + len
}
_ => 0,
})
Expand Down
66 changes: 48 additions & 18 deletions rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType};
use crate::errors::{ChromaError, ErrorCodes};
use crate::types::{MetadataValue, UpdateMetadataValue};
use crate::types::{EmbeddingRecord, MetadataValue, UpdateMetadataValue};
use arrow::array::{
ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, Float32Array,
Float32Builder, GenericByteBuilder, StructArray, StructBuilder, UInt32Array, UInt32Builder,
Expand All @@ -11,6 +11,7 @@ use arrow::{
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use num_bigint::BigInt;
use parking_lot::RwLock;
use prost_types::Struct;
use rayon::vec;
Expand Down Expand Up @@ -184,6 +185,45 @@ impl Block {
.value(i),
))
}
ValueType::EmbeddingRecord => {
let records =
value.as_any().downcast_ref::<StructArray>().unwrap();
let id = records
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(i)
.to_string();
let document = match records
.column(1)
.as_any()
.downcast_ref::<StringArray>()
{
Some(document) => Some(document.value(i).to_string()),
None => None,
};
let metadata = match records
.column(2)
.as_any()
.downcast_ref::<StringArray>()
{
Some(metadata) => Some(metadata.value(i).to_string()),
None => None,
};
return Some(Value::EmbeddingRecordValue(EmbeddingRecord {
id,
seq_id: BigInt::from(0), // TODO: THIS IS WRONG, WE NEED A NEW TYPE
operation: crate::types::Operation::Add, // TODO: THIS IS WRONG, WE NEED A NEW TYPE
embedding: Some(vec![1.0, 2.0, 3.0]), // TODO: populate this
encoding: None, // TODO: populate this
metadata: None,
collection_id: Uuid::parse_str(
"00000000-0000-0000-0000-000000000000",
)
.unwrap(),
}));
}
// TODO: Add support for other types
_ => unimplemented!(),
}
Expand Down Expand Up @@ -592,23 +632,13 @@ impl BlockDataBuilder {
builder
.user_id_builder
.append_value(embedding_record.id.clone());
match embedding_record.get_document() {
Some(document) => {
builder.document_builder.append_value(document);
}
None => {
// TODO: append nulls
}
}
match embedding_record.metadata {
Some(metadata) => {
// TODO: Turn metadata into json
builder.metadata_builder.append_value("HAS");
}
None => {
// TODO: append nulls
}
}
builder
.document_builder
.append_option(embedding_record.get_document());
// TODO: This is a placeholder for the metadata field once we have json + cache support
builder
.metadata_builder
.append_option(Some("has".to_string()));
}
_ => unreachable!("Invalid value type for block"),
},
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/arrow_blockfile/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod block;
mod blockfile;
mod provider;
pub(crate) mod provider;
mod sparse_index;
4 changes: 2 additions & 2 deletions rust/worker/src/blockstore/arrow_blockfile/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use uuid::Uuid;

/// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production).
/// For now, it keeps a simple local cache of blockfiles.
pub(super) struct ArrowBlockfileProvider {
pub(crate) struct ArrowBlockfileProvider {
block_provider: ArrowBlockProvider,
files: HashMap<String, Box<dyn Blockfile>>,
}

impl ArrowBlockfileProvider {
pub(super) fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
block_provider: ArrowBlockProvider::new(),
files: HashMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod arrow_blockfile;
mod positional_posting_list_value;
mod types;

pub(crate) mod arrow_blockfile;
pub(crate) mod provider;

pub(crate) use positional_posting_list_value::*;
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl From<&Value> for ValueType {
Value::Int32ArrayValue(_) => ValueType::Int32Array,
Value::PositionalPostingListValue(_) => ValueType::PositionalPostingList,
Value::RoaringBitmapValue(_) => ValueType::RoaringBitmap,
Value::EmbeddingRecordValue(_) => unimplemented!(),
Value::EmbeddingRecordValue(_) => ValueType::EmbeddingRecord,
Value::StringValue(_) => ValueType::String,
Value::IntValue(_) => ValueType::Int,
Value::UintValue(_) => ValueType::Uint,
Expand Down
133 changes: 121 additions & 12 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::types::{OffsetIdAssigner, SegmentWriter};
use crate::blockstore::{provider::BlockfileProvider, Blockfile};
use crate::blockstore::{KeyType, ValueType};
use crate::types::{EmbeddingRecord, Segment};
use crate::blockstore::{BlockfileKey, Key, KeyType, Value, ValueType};
use crate::types::{EmbeddingRecord, Operation, Segment};
use std::sync::atomic::AtomicU32;

struct RecordSegment {
Expand All @@ -15,6 +15,7 @@ struct RecordSegment {
- in a separate file (bad)
- special prefix in the blockfile (meh)
*/
commited_max_offset_id: AtomicU32,
current_max_offset_id: AtomicU32,
}

Expand All @@ -37,6 +38,7 @@ impl RecordSegment {
id_to_user_id,
records,
current_max_offset_id: AtomicU32::new(0),
commited_max_offset_id: AtomicU32::new(0),
},
// TODO: prefer to error out here
_ => panic!("Failed to create blockfiles"),
Expand All @@ -49,20 +51,59 @@ impl RecordSegment {
}

impl SegmentWriter for RecordSegment {
fn begin_transaction(&self) {
todo!()
fn begin_transaction(&mut self) {
let t1 = self.user_id_to_id.begin_transaction();
let t2 = self.id_to_user_id.begin_transaction();
let t3 = self.records.begin_transaction();
match (t1, t2, t3) {
(Ok(()), Ok(()), Ok(())) => {}
// TODO: handle error better and add error to interface
_ => panic!("Failed to begin transaction"),
}
}

fn write_records(
&self,
records: Vec<Box<crate::types::EmbeddingRecord>>,
offset_ids: Vec<u32>,
&mut self,
mut records: Vec<Box<EmbeddingRecord>>,
mut offset_ids: Vec<Option<u32>>,
) {
todo!()
for (record, offset_id) in records.drain(..).zip(offset_ids.drain(..)) {
match record.operation {
Operation::Add => {
// TODO: error handling
let id = offset_id.unwrap();
// TODO: Support empty prefixes in blockfile keys
let res = self.user_id_to_id.set(
BlockfileKey::new("".to_string(), Key::String(record.id.clone())),
Value::UintValue(id),
);
// TODO: use the res
let res = self.id_to_user_id.set(
BlockfileKey::new("".to_string(), Key::Uint(id)),
Value::StringValue(record.id.clone()),
);
let res = self.records.set(
BlockfileKey::new("".to_string(), Key::Uint(id)),
Value::EmbeddingRecordValue(*record),
);
}
// TODO: support other operations
Operation::Upsert => {}
Operation::Update => {}
Operation::Delete => {}
}
}
}

fn commit_transaction(&self) {
todo!()
fn commit_transaction(&mut self) {
let t1 = self.user_id_to_id.commit_transaction();
let t2 = self.id_to_user_id.commit_transaction();
let t3 = self.records.commit_transaction();
match (t1, t2, t3) {
(Ok(()), Ok(()), Ok(())) => {}
// TODO: handle errors
_ => panic!("Failed to commit transaction"),
}
}

fn rollback_transaction(&self) {
Expand All @@ -71,7 +112,75 @@ impl SegmentWriter for RecordSegment {
}

impl OffsetIdAssigner for RecordSegment {
fn assign_offset_ids(&self, records: Vec<Box<EmbeddingRecord>>) -> Vec<u32> {
todo!()
fn assign_offset_ids(&self, records: Vec<Box<EmbeddingRecord>>) -> Vec<Option<u32>> {
// TODO: this should happen in a transaction
let mut offset_ids = Vec::new();
for record in records {
// Only ADD and UPSERT (if an add) assign an offset id
let id = match record.operation {
Operation::Add => Some(
self.current_max_offset_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
),
Operation::Upsert => {
// TODO: support empty prefixes in blockfile keys
let exists = self
.user_id_to_id
.get(BlockfileKey::new("".to_string(), Key::String(record.id)));
// TODO: I think not-found should be a None not an error
match exists {
Ok(_) => None,
Err(_) => Some(
self.current_max_offset_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
),
}
}
_ => None,
};
offset_ids.push(id);
}
offset_ids
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::ScalarEncoding;
use num_bigint::BigInt;
use uuid::Uuid;

#[test]
fn can_write_to_segment() {
let blockfile_provider =
Box::new(crate::blockstore::arrow_blockfile::provider::ArrowBlockfileProvider::new());
let mut segment = RecordSegment::new(blockfile_provider);
segment.begin_transaction();
let record = Box::new(EmbeddingRecord {
id: "test".to_string(),
operation: Operation::Add,
embedding: Some(vec![1.0, 2.0, 3.0]),
seq_id: BigInt::from(0),
encoding: Some(ScalarEncoding::FLOAT32),
metadata: None,
collection_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
});
let records = vec![record];
let offset_ids = segment.assign_offset_ids(records.clone());
segment.write_records(records, offset_ids);
segment.commit_transaction();

let res = segment
.records
.get(BlockfileKey::new("".to_string(), Key::Uint(0)));
assert!(res.is_ok());
let res = res.unwrap();
match res {
Value::EmbeddingRecordValue(record) => {
assert_eq!(record.id, "test");
}
_ => panic!("Wrong value type"),
}
}
}
8 changes: 4 additions & 4 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::types::EmbeddingRecord;

pub(super) trait SegmentWriter {
fn begin_transaction(&self);
fn write_records(&self, records: Vec<Box<EmbeddingRecord>>, offset_ids: Vec<u32>);
fn commit_transaction(&self);
fn begin_transaction(&mut self);
fn write_records(&mut self, records: Vec<Box<EmbeddingRecord>>, offset_ids: Vec<Option<u32>>);
fn commit_transaction(&mut self);
fn rollback_transaction(&self);
}

pub(super) trait OffsetIdAssigner: SegmentWriter {
fn assign_offset_ids(&self, records: Vec<Box<EmbeddingRecord>>) -> Vec<u32>;
fn assign_offset_ids(&self, records: Vec<Box<EmbeddingRecord>>) -> Vec<Option<u32>>;
}

0 comments on commit b5aeb39

Please sign in to comment.