diff --git a/server/src/routes/internal_ingest.rs b/server/src/routes/internal_ingest.rs index 98ccbbd8c..1b0593461 100644 --- a/server/src/routes/internal_ingest.rs +++ b/server/src/routes/internal_ingest.rs @@ -130,7 +130,7 @@ pub async fn ingest_files_from_executor( output_objects.push(res.clone()); } else if diagnostics_keys.iter().any(|e| name_ref.contains(e)) { let task_result = task_result.as_ref().ok_or_else(|| { - IndexifyAPIError::bad_request("task_result is required before node_outputs") + IndexifyAPIError::bad_request("task_result is required before diagnostics") })?; let file_name = format!( "{}.{}.{}.{}.{}", diff --git a/server/state_store/src/state_machine.rs b/server/state_store/src/state_machine.rs index ead1417de..6109bd10a 100644 --- a/server/state_store/src/state_machine.rs +++ b/server/state_store/src/state_machine.rs @@ -443,6 +443,53 @@ pub fn delete_compute_graph( prefix.as_bytes(), )?; + for iter in make_prefix_iterator( + txn, + &IndexifyObjectsColumns::Tasks.cf_db(&db), + prefix.as_bytes(), + &None, + ) { + let (key, value) = iter?; + let value = JsonEncoder::decode::(&value)?; + + // mark all diagnostics urls for gc. + match &value.diagnostics { + Some(diagnostics) => { + [ + diagnostics.exception.clone(), + diagnostics.stdout.clone(), + diagnostics.stderr.clone(), + ] + .iter() + .flatten() + .try_for_each(|data| -> Result<()> { + txn.put_cf( + &IndexifyObjectsColumns::GcUrls.cf_db(&db), + data.path.as_bytes(), + [], + )?; + + Ok(()) + })?; + } + None => {} + } + txn.delete_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &key)?; + + delete_cf_prefix( + txn, + &IndexifyObjectsColumns::TaskOutputs.cf_db(&db), + format!("{}|{}", namespace, value.id).as_bytes(), + )?; + } + + delete_cf_prefix( + txn, + &IndexifyObjectsColumns::UnallocatedTasks.cf_db(&db), + prefix.as_bytes(), + )?; + + // mark all fn output urls for gc. for iter in make_prefix_iterator( txn, &IndexifyObjectsColumns::FnOutputs.cf_db(&db), @@ -454,11 +501,10 @@ pub fn delete_compute_graph( match &value.payload { OutputPayload::Router(_) => {} OutputPayload::Fn(payload) => { - println!("delete_compute_graph: {:?}", value.clone()); txn.put_cf( &IndexifyObjectsColumns::GcUrls.cf_db(&db), payload.path.as_bytes(), - &[], + [], )?; } }