Skip to content

Commit

Permalink
[ENH] Use log position from sysdb to as the offset to pull from log (#…
Browse files Browse the repository at this point in the history
…2132)

## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- Currently, we commit the log position for a collection in both sysdb
and log service. The commit to the sysdb must be successful for the
files visible to query nodes. The commit to offset is best effort and
can be behind in case of failures happening in between the commit to
sysdb and to the log service.
- The log position committed in the log service is used to by compactor
to figure out what which collections has new data that needs to be
compacted the offsets for the new logs. Since the log position in the
log service can be lagged and we need to use the log position in sysdb
when fetching from the log during compaction.
- This PR compares the log position in sysdb and log and use the larger
one of the 2. An alternative solution would be filtering out the log
records with offset smaller than the log position in sysdb. However,
that solution needs to inspect every log record fetched from the log
service.
 - New functionality.
 
	 - ...

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara authored May 6, 2024
1 parent 2b38ddc commit 970ac74
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 10 deletions.
4 changes: 2 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ mod tests {
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
log_position: -1,
version: 0,
};

Expand All @@ -366,7 +366,7 @@ mod tests {
dimension: Some(1),
tenant: tenant_2.clone(),
database: "database_2".to_string(),
log_position: 0,
log_position: -1,
version: 0,
};
sysdb.add_collection(collection_1);
Expand Down
139 changes: 138 additions & 1 deletion rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Scheduler {
}

// TODO: make querying the last compaction time in batch
let log_position_in_collecion = collection[0].log_position;
let tenant_ids = vec![collection[0].tenant.clone()];
let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await;

Expand All @@ -96,12 +97,26 @@ impl Scheduler {
}
};

let mut offset = collection_info.first_log_offset;
// offset in log is the first offset in the log that has not been compacted. Note that
// since the offset is the first offset of log we get from the log service, we should
// use this offset to pull data from the log service.
if log_position_in_collecion + 1 < offset {
panic!(
"offset in sysdb is less than offset in log, this should not happen!"
)
} else {
// The offset in sysdb is the last offset that has been compacted.
// We need to start from the next offset.
offset = log_position_in_collecion + 1;
}

collection_records.push(CollectionRecord {
id: collection[0].id,
tenant_id: collection[0].tenant.clone(),
last_compaction_time,
first_record_time: collection_info.first_log_ts,
offset: collection_info.first_log_offset,
offset,
collection_version: collection[0].version,
});
}
Expand Down Expand Up @@ -316,4 +331,126 @@ mod tests {
let jobs = scheduler.get_jobs();
assert_eq!(jobs.count(), 1);
}

#[tokio::test]
#[should_panic(
expected = "offset in sysdb is less than offset in log, this should not happen!"
)]
async fn test_scheduler_panic() {
let mut log = Box::new(InMemoryLog::new());

let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_ts: 2,
record: LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 2,
log_ts: 3,
record: LogRecord {
log_offset: 2,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 3,
log_ts: 4,
record: LogRecord {
log_offset: 3,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
}),
);
let _ = log.update_collection_log_offset(collection_uuid_1, 2).await;

let mut sysdb = Box::new(TestSysDb::new());

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
id: collection_uuid_1,
name: "collection_1".to_string(),
metadata: None,
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
version: 0,
};

sysdb.add_collection(collection_1);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);

let my_ip = "0.0.0.1".to_string();
let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {});
let max_concurrent_jobs = 1000;

// Set assignment policy
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new());
assignment_policy.set_members(vec![my_ip.clone()]);

let mut scheduler = Scheduler::new(
my_ip.clone(),
log,
sysdb.clone(),
scheduler_policy,
max_concurrent_jobs,
assignment_policy,
);

scheduler.set_memberlist(vec![my_ip.clone()]);
scheduler.schedule().await;
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{distance::DistanceFunction, execution::operator::Operator};
use async_trait::async_trait;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use tracing::{debug, trace};
use tracing::trace;

/// The brute force k-nearest neighbors operator is responsible for computing the k-nearest neighbors
/// of a given query vector against a set of vectors using brute force calculation.
Expand Down
2 changes: 0 additions & 2 deletions rust/worker/src/execution/operators/merge_knn_results.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::f64::consts::E;

use crate::{
blockstore::provider::BlockfileProvider,
errors::ChromaError,
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::log::log::Log;
use crate::log::log::PullLogsError;
use crate::types::LogRecord;
use async_trait::async_trait;
use tracing::debug;
use tracing::trace;
use uuid::Uuid;

Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ impl Operator<RegisterInput, RegisterOutput> for RegisterOperator {
input.segment_flush_info.clone(),
)
.await;

// We must make sure that the log postion in sysdb is always greater than or equal to the log position
// in the log service. If the log position in sysdb is less than the log position in the log service,
// the we may lose data in compaction.
let sysdb_registration_result = match result {
Ok(response) => response,
Err(error) => return Err(RegisterError::FlushCompactionError(error)),
Expand Down
5 changes: 2 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl CompactOrchestrator {
// TODO: It is possible that the offset_id from the compaction job is wrong since the log service
// can have an outdated view of the offset. We should filter out entries from the log based on the start offset
// of the segment, and not fully respect the offset_id from the compaction job

async fn pull_logs(&mut self, self_address: Box<dyn Receiver<PullLogsResult>>) {
self.state = ExecutionState::PullLogs;
let operator = PullLogsOperator::new(self.log.clone());
Expand Down Expand Up @@ -268,7 +267,7 @@ impl CompactOrchestrator {
}
}

async fn flush_sysdb(
async fn register(
&mut self,
log_position: i64,
segment_flush_info: Arc<[SegmentFlushInfo]>,
Expand Down Expand Up @@ -522,7 +521,7 @@ impl Handler<FlushS3Result> for CompactOrchestrator {
match message {
Ok(msg) => {
// Unwrap should be safe here as we are guaranteed to have a value by construction
self.flush_sysdb(
self.register(
self.pulled_log_offset.unwrap(),
msg.segment_flush_info,
_ctx.sender.as_receiver(),
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use uuid::Uuid;
/// - collection_id: the id of the collection that needs to be compacted
/// - first_log_offset: the offset of the first log entry in the collection that needs to be compacted
/// - first_log_ts: the timestamp of the first log entry in the collection that needs to be compacted
#[derive(Debug)]
pub(crate) struct CollectionInfo {
pub(crate) collection_id: String,
pub(crate) first_log_offset: i64,
Expand Down

0 comments on commit 970ac74

Please sign in to comment.