Skip to content

Commit

Permalink
Merge pull request #987 from tensorlakeai/seriousben/correctly-delete…
Browse files Browse the repository at this point in the history
…-all-compute-graphs-dependencies

bug(store): deleting compute graphs will now delete all dependencies
  • Loading branch information
seriousben authored Oct 29, 2024
2 parents 6645d22 + 7673e9d commit 1b5ce58
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
2 changes: 1 addition & 1 deletion server/src/routes/internal_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub async fn ingest_files_from_executor(
output_objects.push(res.clone());
} else if diagnostics_keys.iter().any(|e| name_ref.contains(e)) {
let task_result = task_result.as_ref().ok_or_else(|| {
IndexifyAPIError::bad_request("task_result is required before node_outputs")
IndexifyAPIError::bad_request("task_result is required before diagnostics")
})?;
let file_name = format!(
"{}.{}.{}.{}.{}",
Expand Down
50 changes: 48 additions & 2 deletions server/state_store/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,53 @@ pub fn delete_compute_graph(
prefix.as_bytes(),
)?;

for iter in make_prefix_iterator(
txn,
&IndexifyObjectsColumns::Tasks.cf_db(&db),
prefix.as_bytes(),
&None,
) {
let (key, value) = iter?;
let value = JsonEncoder::decode::<Task>(&value)?;

// mark all diagnostics urls for gc.
match &value.diagnostics {
Some(diagnostics) => {
[
diagnostics.exception.clone(),
diagnostics.stdout.clone(),
diagnostics.stderr.clone(),
]
.iter()
.flatten()
.try_for_each(|data| -> Result<()> {
txn.put_cf(
&IndexifyObjectsColumns::GcUrls.cf_db(&db),
data.path.as_bytes(),
[],
)?;

Ok(())
})?;
}
None => {}
}
txn.delete_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &key)?;

delete_cf_prefix(
txn,
&IndexifyObjectsColumns::TaskOutputs.cf_db(&db),
format!("{}|{}", namespace, value.id).as_bytes(),
)?;
}

delete_cf_prefix(
txn,
&IndexifyObjectsColumns::UnallocatedTasks.cf_db(&db),
prefix.as_bytes(),
)?;

// mark all fn output urls for gc.
for iter in make_prefix_iterator(
txn,
&IndexifyObjectsColumns::FnOutputs.cf_db(&db),
Expand All @@ -454,11 +501,10 @@ pub fn delete_compute_graph(
match &value.payload {
OutputPayload::Router(_) => {}
OutputPayload::Fn(payload) => {
println!("delete_compute_graph: {:?}", value.clone());
txn.put_cf(
&IndexifyObjectsColumns::GcUrls.cf_db(&db),
payload.path.as_bytes(),
&[],
[],
)?;
}
}
Expand Down

0 comments on commit 1b5ce58

Please sign in to comment.