Skip to content

Commit

Permalink
Merge branch 'main' into emilk/fix-ci-target
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Apr 6, 2023
2 parents 854e21c + aedf1c0 commit e719abb
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 126 deletions.
13 changes: 10 additions & 3 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, MsgId};
use re_log_types::{entity_path, DataRow, MsgId, RecordingId};

fn main() {
log_messages();
Expand Down Expand Up @@ -91,6 +91,7 @@ fn log_messages() {

const NUM_POINTS: usize = 1_000;

let recording_id = RecordingId::random();
let timeline = Timeline::new_sequence("frame_nr");
let mut time_point = TimePoint::default();
time_point.insert(timeline, TimeInt::from(0));
Expand All @@ -116,7 +117,10 @@ fn log_messages() {
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
ArrowMsg::try_from(&*table).unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
Expand All @@ -139,7 +143,10 @@ fn log_messages() {
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
ArrowMsg::try_from(&*table).unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,15 @@ impl LogDb {

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
LogMsg::EntityPathOpMsg(_, msg) => {
let EntityPathOpMsg {
msg_id,
time_point,
path_op,
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

Expand Down
29 changes: 16 additions & 13 deletions crates/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -42,18 +42,18 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap()))
.map(|table| LogMsg::ArrowMsg(recording_id, ArrowMsg::try_from(table).unwrap()))
.collect()
}

fn decode_tables(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg {
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
Expand Down Expand Up @@ -81,21 +81,22 @@ fn mono_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_tables);
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -136,21 +137,22 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("mono_points_arrow_batched");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_table);
});
let tables = [generate_table()];
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&[generate_table()])));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()])));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -192,21 +194,22 @@ fn batch_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_tables);
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
20 changes: 14 additions & 6 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ pub enum LogMsg {
BeginRecordingMsg(BeginRecordingMsg),

/// Server-backed operation on an [`EntityPath`].
EntityPathOpMsg(EntityPathOpMsg),
EntityPathOpMsg(RecordingId, EntityPathOpMsg),

/// Log an entity using an [`ArrowMsg`].
ArrowMsg(ArrowMsg),
ArrowMsg(RecordingId, ArrowMsg),

/// Sent when the client shuts down the connection.
Goodbye(MsgId),
Expand All @@ -182,19 +182,27 @@ impl LogMsg {
pub fn id(&self) -> MsgId {
match self {
Self::BeginRecordingMsg(msg) => msg.msg_id,
Self::EntityPathOpMsg(msg) => msg.msg_id,
Self::EntityPathOpMsg(_, msg) => msg.msg_id,
Self::Goodbye(msg_id) => *msg_id,
// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
Self::ArrowMsg(msg) => msg.table_id,
Self::ArrowMsg(_, msg) => msg.table_id,
}
}

pub fn recording_id(&self) -> Option<&RecordingId> {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => {
Some(recording_id)
}
Self::Goodbye(_) => None,
}
}
}

impl_into_enum!(BeginRecordingMsg, LogMsg, BeginRecordingMsg);
impl_into_enum!(EntityPathOpMsg, LogMsg, EntityPathOpMsg);
impl_into_enum!(ArrowMsg, LogMsg, ArrowMsg);

// ----------------------------------------------------------------------------

Expand Down
31 changes: 23 additions & 8 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError};
use std::borrow::Borrow;

use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId};

use crate::{
components::Transform,
log::{DataCell, LogMsg, MsgId},
sink::LogSink,
time::{Time, TimeInt, TimePoint, Timeline},
Component, EntityPath, SerializableComponent,
Component, EntityPath, SerializableComponent, Session,
};

// TODO(#1619): Rust SDK batching
Expand Down Expand Up @@ -229,29 +231,42 @@ impl MsgSender {

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
pub fn send(self, sink: &impl std::borrow::Borrow<dyn LogSink>) -> Result<(), DataTableError> {
self.send_to_sink(sink.borrow())
pub fn send(self, session: &Session) -> Result<(), DataTableError> {
self.send_to_sink(session.recording_id(), session.borrow())
}

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> {
fn send_to_sink(
self,
recording_id: RecordingId,
sink: &dyn LogSink,
) -> Result<(), DataTableError> {
if !sink.is_enabled() {
return Ok(()); // silently drop the message
}

let [row_standard, row_transforms, row_splats] = self.into_rows();

if let Some(row_transforms) = row_transforms {
sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_transforms.into_table()).try_into()?,
));
}
if let Some(row_splats) = row_splats {
sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_splats.into_table()).try_into()?,
));
}
// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_standard.into_table()).try_into()?,
));
}

Ok(())
Expand Down
36 changes: 29 additions & 7 deletions crates/re_sdk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl SessionBuilder {
#[must_use]
#[derive(Clone)]
pub struct Session {
recording_info: RecordingInfo,
sink: Arc<dyn LogSink>,
// TODO(emilk): add convenience `TimePoint` here so that users can
// do things like `session.set_time_sequence("frame", frame_idx);`
Expand Down Expand Up @@ -222,20 +223,33 @@ impl Session {
sink.send(
re_log_types::BeginRecordingMsg {
msg_id: re_log_types::MsgId::random(),
info: recording_info,
info: recording_info.clone(),
}
.into(),
);
}

Self { sink: sink.into() }
Self {
recording_info,
sink: sink.into(),
}
}

/// Construct a new session with a disabled "dummy" sink that drops all logging messages.
///
/// [`Self::is_enabled`] will return `false`.
pub fn disabled() -> Self {
Self {
recording_info: RecordingInfo {
application_id: ApplicationId::unknown(),
recording_id: Default::default(),
is_official_example: crate::called_from_official_rust_example(),
started: Time::now(),
recording_source: RecordingSource::RustSdk {
rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
},
},
sink: crate::sink::disabled().into(),
}
}
Expand Down Expand Up @@ -272,17 +286,25 @@ impl Session {
time_point: &re_log_types::TimePoint,
path_op: re_log_types::PathOp,
) {
self.send(LogMsg::EntityPathOpMsg(re_log_types::EntityPathOpMsg {
msg_id: re_log_types::MsgId::random(),
time_point: time_point.clone(),
path_op,
}));
self.send(LogMsg::EntityPathOpMsg(
self.recording_id(),
re_log_types::EntityPathOpMsg {
msg_id: re_log_types::MsgId::random(),
time_point: time_point.clone(),
path_op,
},
));
}

/// Drain all buffered [`LogMsg`]es and return them.
pub fn drain_backlog(&self) -> Vec<LogMsg> {
self.sink.drain_backlog()
}

/// The current [`RecordingId`].
pub fn recording_id(&self) -> RecordingId {
self.recording_info.recording_id
}
}

impl AsRef<dyn LogSink> for Session {
Expand Down
6 changes: 4 additions & 2 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl CongestionManager {
#[allow(clippy::match_same_arms)]
match msg {
// we don't want to drop any of these
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true,
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_, _) | LogMsg::Goodbye(_) => {
true
}

LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
LogMsg::ArrowMsg(_, arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
}
}

Expand Down
Loading

0 comments on commit e719abb

Please sign in to comment.