diff --git a/Cargo.lock b/Cargo.lock index dda16c809997..b3d0df50da3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5626,6 +5626,7 @@ name = "re_remote_store_types" version = "0.20.0-alpha.1+dev" dependencies = [ "prost 0.13.3", + "re_arrow2", "re_dataframe", "re_log_types", "thiserror", diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index 0816e6707803..ec3fab58bc5b 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -9,7 +9,7 @@ pub use self::query::QueryHandle; #[doc(no_inline)] pub use self::external::arrow2::chunk::Chunk as ArrowChunk; #[doc(no_inline)] -pub use self::external::re_chunk::util::concatenate_record_batches; +pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk}; #[doc(no_inline)] pub use self::external::re_chunk_store::{ ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression, diff --git a/crates/store/re_remote_store_types/Cargo.toml b/crates/store/re_remote_store_types/Cargo.toml index 933745526dfb..b342fa235da9 100644 --- a/crates/store/re_remote_store_types/Cargo.toml +++ b/crates/store/re_remote_store_types/Cargo.toml @@ -14,6 +14,7 @@ re_log_types.workspace = true re_dataframe.workspace = true # External +arrow2 = { workspace = true, features = ["io_ipc"] } prost.workspace = true thiserror.workspace = true tonic.workspace = true diff --git a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto index 55822e1fbd37..4fe3b74c2e95 100644 --- a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto @@ -61,12 +61,21 @@ message QueryRequest { // unique identifier of the recording RecordingId recording_id = 1; // query to execute - Query query = 2; + Query query = 3; } message QueryResponse { - // single record batch (encoding TBD - TODO). - bytes record_batch = 1; + // TODO(zehiko) we need to expand this to become something like 'encoder options' + // as we will need to specify additional options like compression, including schema + // in payload, etc. + EncoderVersion encoder_version = 1; + // payload is raw bytes that the relevant codec can interpret + bytes payload = 2; +} + + +enum EncoderVersion { + V0 = 0; } diff --git a/crates/store/re_remote_store_types/src/codec.rs b/crates/store/re_remote_store_types/src/codec.rs new file mode 100644 index 000000000000..42acc31c9734 --- /dev/null +++ b/crates/store/re_remote_store_types/src/codec.rs @@ -0,0 +1,253 @@ +use arrow2::error::Error as ArrowError; +use arrow2::io::ipc::{read, write}; +use re_dataframe::TransportChunk; + +use crate::v0::EncoderVersion; + +#[derive(Debug, thiserror::Error)] +pub enum CodecError { + #[error("Arrow serialization error: {0}")] + ArrowSerialization(ArrowError), + + #[error("Failed to decode message header {0}")] + HeaderDecoding(std::io::Error), + + #[error("Failed to encode message header {0}")] + HeaderEncoding(std::io::Error), + + #[error("Missing record batch")] + MissingRecordBatch, + + #[error("Unexpected stream state")] + UnexpectedStreamState, + + #[error("Unknown message header")] + UnknownMessageHeader, +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] +pub struct MessageHader(pub u8); + +impl MessageHader { + pub const NO_DATA: Self = Self(1); + pub const RECORD_BATCH: Self = Self(2); + + pub const SIZE_BYTES: usize = 1; +} + +impl MessageHader { + fn decode(read: &mut impl std::io::Read) -> Result { + let mut buffer = [0_u8; Self::SIZE_BYTES]; + read.read_exact(&mut buffer) + .map_err(CodecError::HeaderDecoding)?; + + let header = u8::from_le(buffer[0]); + + Ok(Self(header)) + } + + fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> { + write + .write_all(&[self.0]) + .map_err(CodecError::HeaderEncoding)?; + + Ok(()) + } +} + +#[derive(Debug)] +pub enum TransportMessageV0 { + NoData, + RecordBatch(TransportChunk), +} + +impl TransportMessageV0 { + fn to_bytes(&self) -> Result, CodecError> { + match self { + Self::NoData => { + let mut data: Vec = Vec::new(); + MessageHader::NO_DATA.encode(&mut data)?; + Ok(data) + } + Self::RecordBatch(chunk) => { + let mut data: Vec = Vec::new(); + MessageHader::RECORD_BATCH.encode(&mut data)?; + + let options = write::WriteOptions { compression: None }; + let mut sw = write::StreamWriter::new(&mut data, options); + + sw.start(&chunk.schema, None) + .map_err(CodecError::ArrowSerialization)?; + sw.write(&chunk.data, None) + .map_err(CodecError::ArrowSerialization)?; + sw.finish().map_err(CodecError::ArrowSerialization)?; + + Ok(data) + } + } + } + + fn from_bytes(data: &[u8]) -> Result { + let mut reader = std::io::Cursor::new(data); + let header = MessageHader::decode(&mut reader)?; + + match header { + MessageHader::NO_DATA => Ok(Self::NoData), + MessageHader::RECORD_BATCH => { + let metadata = read::read_stream_metadata(&mut reader) + .map_err(CodecError::ArrowSerialization)?; + let mut stream = read::StreamReader::new(&mut reader, metadata, None); + + let schema = stream.schema().clone(); + // there should be at least one record batch in the stream + // TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function?? + let stream_state = stream + .next() + .ok_or(CodecError::MissingRecordBatch)? + .map_err(CodecError::ArrowSerialization)?; + + match stream_state { + read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState), + read::StreamState::Some(chunk) => { + let tc = TransportChunk { + schema: schema.clone(), + data: chunk, + }; + + Ok(Self::RecordBatch(tc)) + } + } + } + _ => Err(CodecError::UnknownMessageHeader), + } + } +} + +// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead +// of sending schema in each transport message for the same stream of batches. This will require codec +// to become stateful and keep track if schema was sent / received. +/// Encode a transport chunk into a byte stream. +pub fn encode(version: EncoderVersion, chunk: TransportChunk) -> Result, CodecError> { + match version { + EncoderVersion::V0 => TransportMessageV0::RecordBatch(chunk).to_bytes(), + } +} + +/// Encode a `NoData` message into a byte stream. This can be used by the remote store +/// (i.e. data producer) to signal back to the client that there's no data available. +pub fn no_data(version: EncoderVersion) -> Result, CodecError> { + match version { + EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(), + } +} + +/// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`. +pub fn decode(version: EncoderVersion, data: &[u8]) -> Result, CodecError> { + match version { + EncoderVersion::V0 => { + let msg = TransportMessageV0::from_bytes(data)?; + match msg { + TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)), + TransportMessageV0::NoData => Ok(None), + } + } + } +} + +#[cfg(test)] +mod tests { + use re_dataframe::external::re_chunk::{Chunk, RowId}; + use re_log_types::{example_components::MyPoint, Timeline}; + + use crate::{ + codec::{decode, encode, CodecError, TransportMessageV0}, + v0::EncoderVersion, + }; + + fn get_test_chunk() -> Chunk { + let row_id1 = RowId::new(); + let row_id2 = RowId::new(); + + let timepoint1 = [ + (Timeline::log_time(), 100), + (Timeline::new_sequence("frame"), 1), + ]; + let timepoint2 = [ + (Timeline::log_time(), 104), + (Timeline::new_sequence("frame"), 1), + ]; + + let points1 = &[MyPoint::new(1.0, 1.0)]; + let points2 = &[MyPoint::new(2.0, 2.0)]; + + Chunk::builder("mypoints".into()) + .with_component_batches(row_id1, timepoint1, [points1 as _]) + .with_component_batches(row_id2, timepoint2, [points2 as _]) + .build() + .unwrap() + } + + #[test] + fn test_message_v0_no_data() { + let msg = TransportMessageV0::NoData; + let data = msg.to_bytes().unwrap(); + let decoded = TransportMessageV0::from_bytes(&data).unwrap(); + assert!(matches!(decoded, TransportMessageV0::NoData)); + } + + #[test] + fn test_message_v0_record_batch() { + let expected_chunk = get_test_chunk(); + + let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap()); + let data = msg.to_bytes().unwrap(); + let decoded = TransportMessageV0::from_bytes(&data).unwrap(); + + #[allow(clippy::match_wildcard_for_single_variants)] + match decoded { + TransportMessageV0::RecordBatch(transport) => { + let decoded_chunk = Chunk::from_transport(&transport).unwrap(); + assert_eq!(expected_chunk, decoded_chunk); + } + _ => panic!("unexpected message type"), + } + } + + #[test] + fn test_invalid_batch_data() { + let data = vec![2, 3, 4]; // '1' is NO_DATA message header + let decoded = TransportMessageV0::from_bytes(&data); + + assert!(matches!( + decoded.err().unwrap(), + CodecError::ArrowSerialization(_) + )); + } + + #[test] + fn test_unknown_header() { + let data = vec![3]; + let decoded = TransportMessageV0::from_bytes(&data); + assert!(decoded.is_err()); + + assert!(matches!( + decoded.err().unwrap(), + CodecError::UnknownMessageHeader + )); + } + + #[test] + fn test_v0_codec() { + let expected_chunk = get_test_chunk(); + + let encoded = encode( + EncoderVersion::V0, + expected_chunk.clone().to_transport().unwrap(), + ) + .unwrap(); + let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap(); + let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); + + assert_eq!(expected_chunk, decoded_chunk); + } +} diff --git a/crates/store/re_remote_store_types/src/lib.rs b/crates/store/re_remote_store_types/src/lib.rs index 6edddad23c91..86126cf614ba 100644 --- a/crates/store/re_remote_store_types/src/lib.rs +++ b/crates/store/re_remote_store_types/src/lib.rs @@ -6,6 +6,9 @@ //! necessary conversion code (in the form of `From` and `TryFrom` traits) in this crate. //! +/// Codec for serializing and deserializing query response (record batch) data +pub mod codec; + /// Generated types for the remote store gRPC service API v0. pub mod v0 { // Ignoring all warnings for the auto-generated code. diff --git a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs index 4b6342b7e00e..9414f0fe4a41 100644 --- a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs @@ -267,14 +267,19 @@ pub struct QueryRequest { #[prost(message, optional, tag = "1")] pub recording_id: ::core::option::Option, /// query to execute - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag = "3")] pub query: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryResponse { - /// single record batch (encoding TBD - TODO). - #[prost(bytes = "vec", tag = "1")] - pub record_batch: ::prost::alloc::vec::Vec, + /// TODO(zehiko) we need to expand this to become something like 'encoder options' + /// as we will need to specify additional options like compression, including schema + /// in payload, etc. + #[prost(enumeration = "EncoderVersion", tag = "1")] + pub encoder_version: i32, + /// payload is raw bytes that the relevant codec can interpret + #[prost(bytes = "vec", tag = "2")] + pub payload: ::prost::alloc::vec::Vec, } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ListRecordingsRequest {} @@ -298,6 +303,29 @@ pub struct RecordingInfo { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum EncoderVersion { + V0 = 0, +} +impl EncoderVersion { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::V0 => "V0", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "V0" => Some(Self::V0), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum RecordingType { Rrd = 0, }