diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 5b6f553ee428..f324862dc3a8 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -5,6 +5,7 @@ mod address; pub use address::{InvalidRedapAddress, RedapAddress}; use re_chunk::external::arrow2; use re_log_types::external::re_types_core::ComponentDescriptor; +use re_protos::remote_store::v0::CatalogFilter; use re_types::blueprint::archetypes::{ContainerBlueprint, ViewportBlueprint}; use re_types::blueprint::archetypes::{ViewBlueprint, ViewContents}; use re_types::blueprint::components::{ContainerKind, RootContainer}; @@ -16,6 +17,7 @@ use url::Url; // ---------------------------------------------------------------------------- use std::error::Error; +use std::sync::Arc; use arrow2::array::Utf8Array as Arrow2Utf8Array; use arrow2::datatypes::Field as Arrow2Field; @@ -175,6 +177,43 @@ async fn stream_recording_async( StorageNodeClient::new(tonic_client).max_decoding_message_size(usize::MAX) }; + re_log::debug!("Fetching catalog data for {recording_id}…"); + + let resp = client + .query_catalog(QueryCatalogRequest { + column_projection: None, // fetch all columns + filter: Some(CatalogFilter { + recording_ids: vec![RecordingId { + id: recording_id.clone(), + }], + }), + }) + .await + .map_err(TonicStatusError)? + .into_inner() + .filter_map(|resp| { + resp.and_then(|r| { + decode(r.encoder_version(), &r.payload) + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + .transpose() + }) + .collect::, tonic::Status>>() + .await + .map_err(TonicStatusError)?; + + if resp.len() != 1 || resp[0].num_rows() != 1 { + return Err(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!( + "expected exactly one recording with id {recording_id}, got {}", + resp.len() + ), + })); + } + + let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?; + let store_id = store_info.store_id.clone(); + re_log::debug!("Fetching {recording_id}…"); let mut resp = client @@ -196,19 +235,6 @@ async fn stream_recording_async( drop(client); - // TODO(zehiko) - we need a separate gRPC endpoint for fetching Store info REDAP #85 - let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone()); - - let store_info = StoreInfo { - application_id: ApplicationId::from("redap_recording"), - store_id: store_id.clone(), - cloned_from: None, - is_official_example: false, - started: Time::now(), - store_source: StoreSource::Unknown, - store_version: None, - }; - // We need a whole StoreInfo here. if tx .send(LogMsg::SetStoreInfo(SetStoreInfo { @@ -242,6 +268,51 @@ async fn stream_recording_async( Ok(()) } +fn store_info_from_catalog_chunk( + tc: &TransportChunk, + recording_id: &str, +) -> Result { + let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned()); + + let (_field, data) = tc + .components() + .find(|(f, _)| f.name == "application_id") + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: "no application_id field found".to_owned(), + }))?; + let app_id = data + .as_any() + .downcast_ref::>() + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!("application_id must be a utf8 array: {:?}", tc.schema), + }))? + .value(0); + + let (_field, data) = tc + .components() + .find(|(f, _)| f.name == "start_time") + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: "no start_time field found".to_owned(), + }))?; + let start_time = data + .as_any() + .downcast_ref::() + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!("start_time must be an int64 array: {:?}", tc.schema), + }))? + .value(0); + + Ok(StoreInfo { + application_id: ApplicationId::from(app_id), + store_id: store_id.clone(), + cloned_from: None, + is_official_example: false, + started: Time::from_ns_since_epoch(start_time), + store_source: StoreSource::Unknown, + store_version: None, + }) +} + async fn stream_catalog_async( tx: re_smart_channel::Sender, redap_endpoint: Url, @@ -318,16 +389,88 @@ async fn stream_catalog_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let mut tc = result.map_err(TonicStatusError)?; - // received TransportChunk doesn't have ChunkId, hence we need to add it before converting - // to Chunk + let input = result.map_err(TonicStatusError)?; + + // Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk: + // - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray + // - conversion expects the input TransportChunk to have a ChunkId so we need to add that piece of metadata + + let mut fields = Vec::new(); + let mut arrays = Vec::new(); + // add the (row id) control field + let (row_id_field, row_id_data) = input.controls().next().ok_or( + StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: "no control field found".to_owned(), + }), + )?; + + fields.push( + Arrow2Field::new( + RowId::name().to_string(), // need to rename to Rerun Chunk expected control field + row_id_field.data_type().clone(), + false, /* not nullable */ + ) + .with_metadata(TransportChunk::field_metadata_control_column()), + ); + arrays.push(row_id_data.clone()); + + // next add any timeline field + for (field, data) in input.timelines() { + fields.push(field.clone()); + arrays.push(data.clone()); + } + + // now add all the 'data' fields - we slice each column array into individual arrays and then convert the whole lot into a ListArray + for (field, data) in input.components() { + let data_field_inner = + Arrow2Field::new("item", field.data_type().clone(), true /* nullable */); + + let data_field = Arrow2Field::new( + field.name.clone(), + arrow2::datatypes::DataType::List(Arc::new(data_field_inner.clone())), + false, /* not nullable */ + ) + .with_metadata(TransportChunk::field_metadata_data_column()); + + let mut sliced: Vec> = Vec::new(); + for idx in 0..data.len() { + let mut array = data.clone(); + array.slice(idx, 1); + sliced.push(array); + } + + let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::>(); + #[allow(clippy::unwrap_used)] // we know we've given the right field type + let data_field_array: arrow2::array::ListArray = + re_chunk::util::arrays_to_list_array( + data_field_inner.data_type().clone(), + &data_arrays, + ) + .unwrap(); + + fields.push(data_field); + arrays.push(Box::new(data_field_array)); + } + + let mut schema = arrow2::datatypes::Schema::from(fields); + schema.metadata.insert( + TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), + "catalog".to_owned(), + ); + + // modified and enriched TransportChunk + let mut tc = TransportChunk { + schema, + data: arrow2::chunk::Chunk::new(arrays), + }; + tc.schema.metadata.insert( TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), ChunkId::new().to_string(), ); let mut chunk = Chunk::from_transport(&tc)?; - // enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) + // finally, enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) // and the recording id (that we have in the catalog data) let host = redap_endpoint .host()