Skip to content

Commit

Permalink
refactor to LogRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 29, 2024
1 parent f2e37a3 commit 9420030
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 77 deletions.
27 changes: 14 additions & 13 deletions rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl BlockDeltaInner {
fn get_metadata_size(&self) -> usize {
self.new_data.iter().fold(0, |acc, (_, value)| match value {
Value::EmbeddingRecordValue(embedding_record) => {
match &embedding_record.metadata {
match &embedding_record.record.metadata {
Some(metadata) => {
// TODO: cache this
let as_proto: chroma_proto::UpdateMetadata = metadata.clone().into();
Expand All @@ -255,7 +255,7 @@ impl BlockDeltaInner {

fn get_user_id_size(&self) -> usize {
self.new_data.iter().fold(0, |acc, (_, value)| match value {
Value::EmbeddingRecordValue(embedding_record) => acc + embedding_record.id.len(),
Value::EmbeddingRecordValue(embedding_record) => acc + embedding_record.record.id.len(),
_ => 0,
})
}
Expand Down Expand Up @@ -442,9 +442,10 @@ impl From<Arc<Block>> for BlockDelta {
mod test {
use super::*;
use crate::blockstore::types::{Key, KeyType, ValueType};
use crate::types::{EmbeddingRecord, ScalarEncoding, UpdateMetadataValue};
use crate::types::{
LogRecord, Operation, OperationRecord, ScalarEncoding, UpdateMetadataValue,
};
use arrow::array::Int32Array;
use num_bigint::BigInt;
use rand::{random, Rng};
use std::collections::HashMap;
use std::str::FromStr;
Expand Down Expand Up @@ -562,15 +563,15 @@ mod test {
"random_float".to_string(),
UpdateMetadataValue::Float(random::<f64>()),
);
let value = Value::EmbeddingRecordValue(EmbeddingRecord {
seq_id: BigInt::from(0),
embedding: Some(vec![1.0, 2.0, 3.0]),
id: "test".to_string(),
encoding: Some(ScalarEncoding::FLOAT32),
metadata: Some(metadata),
operation: crate::types::Operation::Add,
collection_id: uuid::Uuid::from_str("00000000-0000-0000-0000-000000000000")
.unwrap(),
let value = Value::EmbeddingRecordValue(LogRecord {
log_offset: 0,
record: OperationRecord {
id: "test".to_string(),
embedding: Some(vec![1.0, 2.0, 3.0]),
encoding: Some(ScalarEncoding::FLOAT32),
metadata: Some(metadata),
operation: Operation::Add,
},
});
delta.add(key, value);
}
Expand Down
32 changes: 17 additions & 15 deletions rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType};
use crate::chroma_proto;
use crate::errors::{ChromaError, ErrorCodes};
use crate::types::{EmbeddingRecord, MetadataValue, UpdateMetadata, UpdateMetadataValue};
use crate::types::{
LogRecord, MetadataValue, Operation, OperationRecord, UpdateMetadata, UpdateMetadataValue,
};
use arrow::array::{
ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, Float32Array,
Float32Builder, GenericByteBuilder, StructArray, StructBuilder, UInt32Array, UInt32Builder,
Expand All @@ -13,7 +15,6 @@ use arrow::{
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use num_bigint::BigInt;
use parking_lot::RwLock;
use prost::Message;
use std::io::Error;
Expand Down Expand Up @@ -226,17 +227,18 @@ impl Block {
}
None => None,
};
return Some(Value::EmbeddingRecordValue(EmbeddingRecord {
id,
seq_id: BigInt::from(0), // TODO: THIS IS WRONG, WE NEED A NEW TYPE
operation: crate::types::Operation::Add, // TODO: THIS IS WRONG, WE NEED A NEW TYPE
embedding: Some(vec![1.0, 2.0, 3.0]), // TODO: populate this
encoding: None, // TODO: populate this
metadata,
collection_id: Uuid::parse_str(
"00000000-0000-0000-0000-000000000000",
)
.unwrap(),
// TOOD: Replace with DataRecord abstraction.
// i.e the materialized state of a record
// not a log record
return Some(Value::EmbeddingRecordValue(LogRecord {
log_offset: 0,
record: OperationRecord {
id,
embedding: None,
encoding: None,
metadata,
operation: Operation::Add,
},
}));
}
// TODO: Add support for other types
Expand Down Expand Up @@ -648,11 +650,11 @@ impl BlockDataBuilder {
Value::EmbeddingRecordValue(embedding_record) => {
builder
.user_id_builder
.append_value(embedding_record.id.clone());
.append_value(embedding_record.record.id.clone());
builder
.document_builder
.append_option(embedding_record.get_document());
match embedding_record.metadata {
match embedding_record.record.metadata {
Some(metadata) => {
let proto: chroma_proto::UpdateMetadata = metadata.into();
let bytes = proto.encode_to_vec();
Expand Down
24 changes: 12 additions & 12 deletions rust/worker/src/blockstore/arrow_blockfile/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,10 @@ impl ArrowBlockfile {
mod tests {
use std::collections::HashMap;

use crate::types::{EmbeddingRecord, Operation, UpdateMetadataValue};
use crate::types::{LogRecord, Operation, OperationRecord, UpdateMetadataValue};

use super::*;
use arrow::array::Int32Array;
use num_bigint::BigInt;

#[test]
fn test_blockfile() {
Expand Down Expand Up @@ -655,14 +654,15 @@ mod tests {
blockfile
.set(
key,
Value::EmbeddingRecordValue(EmbeddingRecord {
id: format!("{:04}", i),
seq_id: BigInt::from(i),
embedding: None,
encoding: None,
metadata: Some(metadata),
operation: Operation::Add,
collection_id: Uuid::new_v4(),
Value::EmbeddingRecordValue(LogRecord {
log_offset: i,
record: OperationRecord {
id: format!("{:04}", i),
embedding: None,
encoding: None,
metadata: Some(metadata),
operation: Operation::Add,
},
}),
)
.unwrap();
Expand All @@ -674,8 +674,8 @@ mod tests {
let res = blockfile.get(key).unwrap();
match res {
Value::EmbeddingRecordValue(record) => {
let metadata = record.metadata.unwrap();
assert_eq!(record.id, format!("{:04}", i));
let metadata = record.record.metadata.unwrap();
assert_eq!(record.record.id, format!("{:04}", i));
assert_eq!(metadata.len(), 2);
assert_eq!(
metadata.get("chroma:document").unwrap(),
Expand Down
10 changes: 5 additions & 5 deletions rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::positional_posting_list_value::PositionalPostingList;
use crate::chroma_proto;
use crate::errors::{ChromaError, ErrorCodes};
use crate::types::EmbeddingRecord;
use crate::types::LogRecord;
use arrow::array::{Array, Int32Array};
use parking_lot::RwLock;
use prost::Message;
Expand Down Expand Up @@ -188,7 +188,7 @@ pub(crate) enum Value {
IntValue(i32),
UintValue(u32),
RoaringBitmapValue(RoaringBitmap),
EmbeddingRecordValue(EmbeddingRecord),
EmbeddingRecordValue(LogRecord),
}

impl Clone for Value {
Expand Down Expand Up @@ -231,12 +231,12 @@ impl Value {
unimplemented!("Size of positional posting list")
}
Value::EmbeddingRecordValue(record) => {
let user_id_size = record.id.len();
let embedding_size = match &record.embedding {
let user_id_size = record.record.id.len();
let embedding_size = match &record.record.embedding {
Some(embedding) => embedding.len(),
None => 0,
};
let metadata_size = match &record.metadata {
let metadata_size = match &record.record.metadata {
Some(metadata) => {
let as_proto: chroma_proto::UpdateMetadata = metadata.clone().into();
as_proto.encoded_len()
Expand Down
53 changes: 26 additions & 27 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::types::{OffsetIdAssigner, SegmentWriter};
use crate::blockstore::{provider::BlockfileProvider, Blockfile};
use crate::blockstore::{BlockfileKey, Key, KeyType, Value, ValueType};
use crate::types::{EmbeddingRecord, Operation, Segment};
use crate::types::{LogRecord, Operation, Segment};
use std::sync::atomic::AtomicU32;

struct RecordSegment {
Expand All @@ -21,7 +21,7 @@ struct RecordSegment {

struct StoredRecord<'a> {
segment_offset_id: u32,
record: &'a Option<EmbeddingRecord>,
record: &'a Option<LogRecord>,
embedding: Option<Vec<f32>>,
metadata: Option<crate::types::Metadata>,
document: Option<String>,
Expand Down Expand Up @@ -69,20 +69,19 @@ impl SegmentWriter for RecordSegment {
}
}

fn write_records(
&mut self,
mut records: Vec<Box<EmbeddingRecord>>,
mut offset_ids: Vec<Option<u32>>,
) {
fn write_records(&mut self, records: Vec<Box<LogRecord>>, offset_ids: Vec<Option<u32>>) {
// TODO: this should not be mut
let mut records = records;
let mut offset_ids = offset_ids;
// TODO: Once this uses log chunk, we should expect invalid ADDs to already be filtered out
// we also then can assume that UPSERTs have been converted to ADDs or UPDATEs
for (record, offset_id) in records.drain(..).zip(offset_ids.drain(..)) {
match record.operation {
match record.record.operation {
Operation::Add => {
// Check if the key already exists
let res = self.user_id_to_id.get(BlockfileKey::new(
"".to_string(),
Key::String(record.id.clone()),
Key::String(record.record.id.clone()),
));
// See if its a KeyNotFound error
match res {
Expand All @@ -101,13 +100,13 @@ impl SegmentWriter for RecordSegment {
let id = offset_id.unwrap();
// TODO: Support empty prefixes in blockfile keys
let res = self.user_id_to_id.set(
BlockfileKey::new("".to_string(), Key::String(record.id.clone())),
BlockfileKey::new("".to_string(), Key::String(record.record.id.clone())),
Value::UintValue(id),
);
// TODO: use the res
let res = self.id_to_user_id.set(
BlockfileKey::new("".to_string(), Key::Uint(id)),
Value::StringValue(record.id.clone()),
Value::StringValue(record.record.id.clone()),
);
let res = self.records.set(
BlockfileKey::new("".to_string(), Key::Uint(id)),
Expand All @@ -119,7 +118,7 @@ impl SegmentWriter for RecordSegment {
// Check if the key already exists
let res = self.user_id_to_id.get(BlockfileKey::new(
"".to_string(),
Key::String(record.id.clone()),
Key::String(record.record.id.clone()),
));
// See if its a KeyNotFound error
match res {
Expand Down Expand Up @@ -156,23 +155,23 @@ impl SegmentWriter for RecordSegment {
);
}

fn rollback_transaction(&self) {
fn rollback_transaction(&mut self) {
todo!()
}
}

impl OffsetIdAssigner for RecordSegment {
fn assign_offset_ids(&self, records: Vec<Box<EmbeddingRecord>>) -> Vec<Option<u32>> {
fn assign_offset_ids(&mut self, records: Vec<Box<LogRecord>>) -> Vec<Option<u32>> {
// TODO: this should happen in a transaction
let mut offset_ids = Vec::new();
for record in records {
// Only ADD and UPSERT assign an offset id if the key doesn't exist
let id = match record.operation {
let id = match record.record.operation {
Operation::Add | Operation::Upsert => {
// Check if the key already exists
let res = self.user_id_to_id.get(BlockfileKey::new(
"".to_string(),
Key::String(record.id.clone()),
Key::String(record.record.id.clone()),
));
// See if its a KeyNotFound error
match res {
Expand Down Expand Up @@ -201,8 +200,7 @@ impl OffsetIdAssigner for RecordSegment {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::ScalarEncoding;
use num_bigint::BigInt;
use crate::types::{OperationRecord, ScalarEncoding};
use uuid::Uuid;

// RESUME POINT: STORE METADATA AS PROTO AND ADD A RECORD TYPE FOR INTERNAL USE. THIS RECORD TYPE IS A OPERATION NOT A VALUE
Expand All @@ -213,14 +211,15 @@ mod tests {
Box::new(crate::blockstore::arrow_blockfile::provider::ArrowBlockfileProvider::new());
let mut segment = RecordSegment::new(blockfile_provider);
segment.begin_transaction();
let record = Box::new(EmbeddingRecord {
id: "test".to_string(),
operation: Operation::Add,
embedding: Some(vec![1.0, 2.0, 3.0]),
seq_id: BigInt::from(0),
encoding: Some(ScalarEncoding::FLOAT32),
metadata: None,
collection_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
let record = Box::new(LogRecord {
log_offset: 1,
record: OperationRecord {
id: "test".to_string(),
embedding: Some(vec![1.0, 2.0, 3.0]),
encoding: Some(ScalarEncoding::FLOAT32),
metadata: None,
operation: Operation::Add,
},
});
let records = vec![record];
let offset_ids = segment.assign_offset_ids(records.clone());
Expand All @@ -235,7 +234,7 @@ mod tests {
println!("{:?}", res);
match res {
Value::EmbeddingRecordValue(record) => {
assert_eq!(record.id, "test");
assert_eq!(record.record.id, "test");
}
_ => panic!("Wrong value type"),
}
Expand Down
10 changes: 5 additions & 5 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::types::LogRecord;

pub(super) trait SegmentWriter {
fn begin_transaction(&self);
fn write_records(&self, records: Vec<Box<LogRecord>>, offset_ids: Vec<u32>);
fn commit_transaction(&self);
fn rollback_transaction(&self);
fn begin_transaction(&mut self);
fn write_records(&mut self, records: Vec<Box<LogRecord>>, offset_ids: Vec<Option<u32>>);
fn commit_transaction(&mut self);
fn rollback_transaction(&mut self);
}

pub(super) trait OffsetIdAssigner: SegmentWriter {
fn assign_offset_ids(&self, records: Vec<Box<LogRecord>>) -> Vec<u32>;
fn assign_offset_ids(&mut self, records: Vec<Box<LogRecord>>) -> Vec<Option<u32>>;
}

0 comments on commit 9420030

Please sign in to comment.