-
Notifications
You must be signed in to change notification settings - Fork 374
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce codec for the gRPC QueryResponse payload (#7733)
Initial version of codec for gRPC query response payload ### What gRPC layer ``QueryResponse`` contains raw bytes as the payload. We keep interpretation of the payload to our own encoder which this PR introduces. V0 codec is simple and stateless and doesn't deal with the fact every serialized record batch has schema included. As for the encoding, we simply rely on arrow2 serialization and deserialization constructs for the record batch and we have our own header to differentiate between "no data" and a "record batch" in the QueryResponse. ### Checklist * [x] I have read and agree to [Contributor Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and the [Code of Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md) * [x] I've included a screenshot or gif (if applicable) * [ x] I have tested the web demo (if applicable): * Using examples from latest `main` build: [rerun.io/viewer](https://rerun.io/viewer/pr/7733?manifest_url=https://app.rerun.io/version/main/examples_manifest.json) * Using full set of examples from `nightly` build: [rerun.io/viewer](https://rerun.io/viewer/pr/7733?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json) * [x] The PR title and labels are set such as to maximize their usefulness for the next release's CHANGELOG * [x] If applicable, add a new check to the [release checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)! * [x] If have noted any breaking changes to the log API in `CHANGELOG.md` and the migration guide - [PR Build Summary](https://build.rerun.io/pr/7733) - [Recent benchmark results](https://build.rerun.io/graphs/crates.html) - [Wasm size tracking](https://build.rerun.io/graphs/sizes.html) To run all checks from `main`, comment on the PR with `@rerun-bot full-check`.
- Loading branch information
Showing
7 changed files
with
303 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
use arrow2::error::Error as ArrowError; | ||
use arrow2::io::ipc::{read, write}; | ||
use re_dataframe::TransportChunk; | ||
|
||
use crate::v0::EncoderVersion; | ||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum CodecError { | ||
#[error("Arrow serialization error: {0}")] | ||
ArrowSerialization(ArrowError), | ||
|
||
#[error("Failed to decode message header {0}")] | ||
HeaderDecoding(std::io::Error), | ||
|
||
#[error("Failed to encode message header {0}")] | ||
HeaderEncoding(std::io::Error), | ||
|
||
#[error("Missing record batch")] | ||
MissingRecordBatch, | ||
|
||
#[error("Unexpected stream state")] | ||
UnexpectedStreamState, | ||
|
||
#[error("Unknown message header")] | ||
UnknownMessageHeader, | ||
} | ||
|
||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] | ||
pub struct MessageHader(pub u8); | ||
|
||
impl MessageHader { | ||
pub const NO_DATA: Self = Self(1); | ||
pub const RECORD_BATCH: Self = Self(2); | ||
|
||
pub const SIZE_BYTES: usize = 1; | ||
} | ||
|
||
impl MessageHader { | ||
fn decode(read: &mut impl std::io::Read) -> Result<Self, CodecError> { | ||
let mut buffer = [0_u8; Self::SIZE_BYTES]; | ||
read.read_exact(&mut buffer) | ||
.map_err(CodecError::HeaderDecoding)?; | ||
|
||
let header = u8::from_le(buffer[0]); | ||
|
||
Ok(Self(header)) | ||
} | ||
|
||
fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> { | ||
write | ||
.write_all(&[self.0]) | ||
.map_err(CodecError::HeaderEncoding)?; | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub enum TransportMessageV0 { | ||
NoData, | ||
RecordBatch(TransportChunk), | ||
} | ||
|
||
impl TransportMessageV0 { | ||
fn to_bytes(&self) -> Result<Vec<u8>, CodecError> { | ||
match self { | ||
Self::NoData => { | ||
let mut data: Vec<u8> = Vec::new(); | ||
MessageHader::NO_DATA.encode(&mut data)?; | ||
Ok(data) | ||
} | ||
Self::RecordBatch(chunk) => { | ||
let mut data: Vec<u8> = Vec::new(); | ||
MessageHader::RECORD_BATCH.encode(&mut data)?; | ||
|
||
let options = write::WriteOptions { compression: None }; | ||
let mut sw = write::StreamWriter::new(&mut data, options); | ||
|
||
sw.start(&chunk.schema, None) | ||
.map_err(CodecError::ArrowSerialization)?; | ||
sw.write(&chunk.data, None) | ||
.map_err(CodecError::ArrowSerialization)?; | ||
sw.finish().map_err(CodecError::ArrowSerialization)?; | ||
|
||
Ok(data) | ||
} | ||
} | ||
} | ||
|
||
fn from_bytes(data: &[u8]) -> Result<Self, CodecError> { | ||
let mut reader = std::io::Cursor::new(data); | ||
let header = MessageHader::decode(&mut reader)?; | ||
|
||
match header { | ||
MessageHader::NO_DATA => Ok(Self::NoData), | ||
MessageHader::RECORD_BATCH => { | ||
let metadata = read::read_stream_metadata(&mut reader) | ||
.map_err(CodecError::ArrowSerialization)?; | ||
let mut stream = read::StreamReader::new(&mut reader, metadata, None); | ||
|
||
let schema = stream.schema().clone(); | ||
// there should be at least one record batch in the stream | ||
// TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function?? | ||
let stream_state = stream | ||
.next() | ||
.ok_or(CodecError::MissingRecordBatch)? | ||
.map_err(CodecError::ArrowSerialization)?; | ||
|
||
match stream_state { | ||
read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState), | ||
read::StreamState::Some(chunk) => { | ||
let tc = TransportChunk { | ||
schema: schema.clone(), | ||
data: chunk, | ||
}; | ||
|
||
Ok(Self::RecordBatch(tc)) | ||
} | ||
} | ||
} | ||
_ => Err(CodecError::UnknownMessageHeader), | ||
} | ||
} | ||
} | ||
|
||
// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead | ||
// of sending schema in each transport message for the same stream of batches. This will require codec | ||
// to become stateful and keep track if schema was sent / received. | ||
/// Encode a transport chunk into a byte stream. | ||
pub fn encode(version: EncoderVersion, chunk: TransportChunk) -> Result<Vec<u8>, CodecError> { | ||
match version { | ||
EncoderVersion::V0 => TransportMessageV0::RecordBatch(chunk).to_bytes(), | ||
} | ||
} | ||
|
||
/// Encode a `NoData` message into a byte stream. This can be used by the remote store | ||
/// (i.e. data producer) to signal back to the client that there's no data available. | ||
pub fn no_data(version: EncoderVersion) -> Result<Vec<u8>, CodecError> { | ||
match version { | ||
EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(), | ||
} | ||
} | ||
|
||
/// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`. | ||
pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportChunk>, CodecError> { | ||
match version { | ||
EncoderVersion::V0 => { | ||
let msg = TransportMessageV0::from_bytes(data)?; | ||
match msg { | ||
TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)), | ||
TransportMessageV0::NoData => Ok(None), | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use re_dataframe::external::re_chunk::{Chunk, RowId}; | ||
use re_log_types::{example_components::MyPoint, Timeline}; | ||
|
||
use crate::{ | ||
codec::{decode, encode, CodecError, TransportMessageV0}, | ||
v0::EncoderVersion, | ||
}; | ||
|
||
fn get_test_chunk() -> Chunk { | ||
let row_id1 = RowId::new(); | ||
let row_id2 = RowId::new(); | ||
|
||
let timepoint1 = [ | ||
(Timeline::log_time(), 100), | ||
(Timeline::new_sequence("frame"), 1), | ||
]; | ||
let timepoint2 = [ | ||
(Timeline::log_time(), 104), | ||
(Timeline::new_sequence("frame"), 1), | ||
]; | ||
|
||
let points1 = &[MyPoint::new(1.0, 1.0)]; | ||
let points2 = &[MyPoint::new(2.0, 2.0)]; | ||
|
||
Chunk::builder("mypoints".into()) | ||
.with_component_batches(row_id1, timepoint1, [points1 as _]) | ||
.with_component_batches(row_id2, timepoint2, [points2 as _]) | ||
.build() | ||
.unwrap() | ||
} | ||
|
||
#[test] | ||
fn test_message_v0_no_data() { | ||
let msg = TransportMessageV0::NoData; | ||
let data = msg.to_bytes().unwrap(); | ||
let decoded = TransportMessageV0::from_bytes(&data).unwrap(); | ||
assert!(matches!(decoded, TransportMessageV0::NoData)); | ||
} | ||
|
||
#[test] | ||
fn test_message_v0_record_batch() { | ||
let expected_chunk = get_test_chunk(); | ||
|
||
let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap()); | ||
let data = msg.to_bytes().unwrap(); | ||
let decoded = TransportMessageV0::from_bytes(&data).unwrap(); | ||
|
||
#[allow(clippy::match_wildcard_for_single_variants)] | ||
match decoded { | ||
TransportMessageV0::RecordBatch(transport) => { | ||
let decoded_chunk = Chunk::from_transport(&transport).unwrap(); | ||
assert_eq!(expected_chunk, decoded_chunk); | ||
} | ||
_ => panic!("unexpected message type"), | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_invalid_batch_data() { | ||
let data = vec![2, 3, 4]; // '1' is NO_DATA message header | ||
let decoded = TransportMessageV0::from_bytes(&data); | ||
|
||
assert!(matches!( | ||
decoded.err().unwrap(), | ||
CodecError::ArrowSerialization(_) | ||
)); | ||
} | ||
|
||
#[test] | ||
fn test_unknown_header() { | ||
let data = vec![3]; | ||
let decoded = TransportMessageV0::from_bytes(&data); | ||
assert!(decoded.is_err()); | ||
|
||
assert!(matches!( | ||
decoded.err().unwrap(), | ||
CodecError::UnknownMessageHeader | ||
)); | ||
} | ||
|
||
#[test] | ||
fn test_v0_codec() { | ||
let expected_chunk = get_test_chunk(); | ||
|
||
let encoded = encode( | ||
EncoderVersion::V0, | ||
expected_chunk.clone().to_transport().unwrap(), | ||
) | ||
.unwrap(); | ||
let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap(); | ||
let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); | ||
|
||
assert_eq!(expected_chunk, decoded_chunk); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters