Skip to content

Commit

Permalink
Significantly improve C++ logging performance by using C FFI instead …
Browse files Browse the repository at this point in the history
…of arrow IPC (#4273)

### What

* Fixes #4255 
* Depends on  #4282

So far we've been using the arrow IPC format to hand data from C++ to
C/Rust. It turned out that this incurred a major performance cost on our
api. Instead, using the C FFI interface (which still isn't exactly free)
gives us a major performance boost in all of today's logging benchmarks:

large point cloud: `0.58s -> 0.15s`
many points: `17.10 -> 7.52s`
large images: `2.90s -> 0.57s`
_Execute times without prepare, single sample on before, median of three
on after, all ran in a single process_

for comparison, numbers on main for Rust:

large point clouds: `0.82s`
many points: `3.87s`
large images: `1.00s`
_Execute times without prepare via Puffin, single sample, all ran in a
single process. As always these comparisons are very tricky. Also
there's lots of noise!_

There's still some significant discrepancy on `many points`. One likely
source of this is repeated schema transfer. Need to do some more
profiling before continuing.


This PR also simplifies the serialization pipeline a little bit by
removing `SerializedComponentBatch`, replacing it in favor of the
existing `rerun::DataCell` in a few places. Left a comment in the code
on the next steps towards evolving it towards an interface that's more
similar to how things work in Rust.


### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/4273) (if
applicable)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/4273)
- [Docs
preview](https://rerun.io/preview/cb476d8f4fc9025855a0dd58c12c953c4741cf77/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/cb476d8f4fc9025855a0dd58c12c953c4741cf77/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)
  • Loading branch information
Wumpf authored Nov 21, 2023
1 parent 16b4c3c commit b5918ff
Show file tree
Hide file tree
Showing 127 changed files with 1,093 additions and 1,278 deletions.
12 changes: 12 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Lauch C++ minimal example",
"type": "cppdbg",
"request": "launch",
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/build/examples/cpp/minimal/example_minimal",
"args": [],
"stopAtEntry": false,
"environment": [],
"externalConsole": false,
"MIMode": "lldb"
},
{
"name": "Launch tests",
"type": "lldb",
Expand Down
29 changes: 15 additions & 14 deletions crates/re_types_builder/src/codegen/cpp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,11 @@ fn component_to_data_cell_method(
ARROW_RETURN_NOT_OK(builder->Finish(&array));
#NEWLINE_TOKEN
#NEWLINE_TOKEN
return rerun::DataCell::create(#type_ident::NAME, #type_ident::arrow_datatype(), std::move(array));
DataCell cell;
cell.num_instances = num_instances;
cell.component_name = #type_ident::NAME;
cell.array = std::move(array);
return cell;
},
inline: false,
}
Expand All @@ -1290,10 +1294,10 @@ fn component_to_data_cell_method(
fn archetype_serialize(type_ident: &Ident, obj: &Object, hpp_includes: &mut Includes) -> Method {
hpp_includes.insert_rerun("data_cell.hpp");
hpp_includes.insert_rerun("collection.hpp");
hpp_includes.insert_rerun("serialized_component_batch.hpp");
hpp_includes.insert_rerun("data_cell.hpp");
hpp_includes.insert_system("vector"); // std::vector

let num_fields = quote_integer(obj.fields.len());
let num_fields = quote_integer(obj.fields.len() + 1); // Plus one for the indicator.
let push_batches = obj.fields.iter().map(|field| {
let field_name = format_ident!("{}", field.name);
let field_accessor = quote!(archetype.#field_name);
Expand All @@ -1307,7 +1311,7 @@ fn archetype_serialize(type_ident: &Ident, obj: &Object, hpp_includes: &mut Incl

let emplace_back = quote! {
RR_RETURN_NOT_OK(result.error);
cells.emplace_back(std::move(result.value), size);
cells.emplace_back(std::move(result.value));
};


Expand All @@ -1316,15 +1320,13 @@ fn archetype_serialize(type_ident: &Ident, obj: &Object, hpp_includes: &mut Incl
if field.is_nullable {
quote! {
if (#field_accessor.has_value()) {
const size_t size = #field_accessor.value().size();
auto result = #field_type::to_data_cell(#field_accessor.value().data(), size);
auto result = #field_type::to_data_cell(#field_accessor.value().data(), #field_accessor.value().size());
#emplace_back
}
}
} else {
quote! {
{
const size_t size = #field_accessor.size();
auto result = #field_type::to_data_cell(#field_accessor.data(), #field_accessor.size());
#emplace_back
}
Expand All @@ -1333,16 +1335,14 @@ fn archetype_serialize(type_ident: &Ident, obj: &Object, hpp_includes: &mut Incl
} else if field.is_nullable {
quote! {
if (#field_accessor.has_value()) {
const size_t size = 1;
auto result = #field_type::to_data_cell(&#field_accessor.value(), size);
auto result = #field_type::to_data_cell(&#field_accessor.value(), 1);
#emplace_back
}
}
} else {
quote! {
{
const size_t size = 1;
auto result = #field_type::to_data_cell(&#field_accessor, size);
auto result = #field_type::to_data_cell(&#field_accessor, 1);
#emplace_back
}
}
Expand All @@ -1353,13 +1353,14 @@ fn archetype_serialize(type_ident: &Ident, obj: &Object, hpp_includes: &mut Incl
docs: "Serialize all set component batches.".into(),
declaration: MethodDeclaration {
is_static: true,
return_type: quote!(Result<std::vector<SerializedComponentBatch>>),
// TODO(andreas): Use a rerun::Collection here as well.
return_type: quote!(Result<std::vector<DataCell>>),
name_and_parameters: quote!(serialize(const archetypes::#type_ident& archetype)),
},
definition_body: quote! {
using namespace archetypes;
#NEWLINE_TOKEN
std::vector<SerializedComponentBatch> cells;
std::vector<DataCell> cells;
cells.reserve(#num_fields);
#NEWLINE_TOKEN
#NEWLINE_TOKEN
Expand All @@ -1368,7 +1369,7 @@ fn archetype_serialize(type_ident: &Ident, obj: &Object, hpp_includes: &mut Incl
auto indicator = #type_ident::IndicatorComponent();
auto result = #type_ident::IndicatorComponent::to_data_cell(&indicator, 1);
RR_RETURN_NOT_OK(result.error);
cells.emplace_back(std::move(result.value), 1);
cells.emplace_back(std::move(result.value));
}
#NEWLINE_TOKEN
#NEWLINE_TOKEN
Expand Down
140 changes: 43 additions & 97 deletions crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,18 @@ pub struct CStoreInfo {
pub struct CDataCell {
pub component_name: CStringView,

/// Length of [`Self::bytes`].
pub num_bytes: u64,
pub array: arrow2::ffi::ArrowArray,

/// Data in the Arrow IPC encapsulated message format.
pub bytes: *const u8,
/// TODO(andreas): Use a schema registry.
pub schema: arrow2::ffi::ArrowSchema,
}

#[repr(C)]
pub struct CDataRow {
pub entity_path: CStringView,
pub num_instances: u32,
pub num_data_cells: u32,
pub data_cells: *const CDataCell,
pub data_cells: *mut CDataCell,
}

#[repr(u32)]
Expand All @@ -145,7 +144,8 @@ pub enum CErrorCode {
RecordingStreamSpawnFailure,

_CategoryArrow = 0x0000_1000,
ArrowIpcMessageParsingFailure,
ArrowFfiSchemaImportError,
ArrowFfiArrayImportError,
ArrowDataCellError,

Unknown = 0xFFFF_FFFF,
Expand Down Expand Up @@ -559,55 +559,75 @@ pub extern "C" fn rr_recording_stream_reset_time(stream: CRecordingStream) {

#[allow(unsafe_code)]
#[allow(clippy::result_large_err)]
#[allow(clippy::needless_pass_by_value)] // Conceptually we're consuming the data_row, as we take ownership of data it points to.
fn rr_log_impl(
stream: CRecordingStream,
data_row: *const CDataRow,
data_row: CDataRow,
inject_time: bool,
) -> Result<(), CError> {
let stream = recording_stream(stream)?;

let data_row = ptr::try_ptr_as_ref(data_row, "data_row")?;

let CDataRow {
entity_path,
num_instances,
num_data_cells,
data_cells,
} = *data_row;
} = data_row;

let entity_path = entity_path.as_str("entity_path")?;
let entity_path = EntityPath::parse_forgiving(entity_path);

let num_data_cells = num_data_cells as usize;
re_log::debug!(
"rerun_log {entity_path:?}, num_instances: {num_instances}, num_data_cells: {num_data_cells}",
);

let mut cells = re_log_types::DataCellVec::default();
cells.reserve(num_data_cells as usize);
for i in 0..num_data_cells {
let data_cell: &CDataCell = unsafe { &*data_cells.wrapping_add(i as _) };
cells.reserve(num_data_cells);

let data_cells = unsafe { std::slice::from_raw_parts_mut(data_cells, num_data_cells) };

for data_cell in data_cells {
// Arrow2 implements drop for ArrowArray and ArrowSchema.
//
// Therefore, for things to work correctly we have to take ownership of the data cell!
// The C interface is documented to take ownership of the data cell - the user should NOT call `release`.
// This makes sense because from here on out we want to manage the lifetime of the underlying schema and array data:
// the schema won't survive a loop iteration since it's reference passed for import, whereas the ArrowArray lives
// on a longer within the resulting arrow::Array.
let CDataCell {
component_name,
num_bytes,
bytes,
} = *data_cell;
array,
schema,
} = unsafe { std::ptr::read(data_cell) };

// It would be nice to now mark the data_cell as "consumed" by setting the original release method to nullptr.
// This would signifies to the calling code that the data_cell is no longer owned.
// However, Arrow2 doesn't allow us to access the fields of the ArrowArray and ArrowSchema structs.

let component_name = component_name.as_str("data_cells[i].component_name")?;
let component_name = ComponentName::from(component_name);

let bytes = unsafe { std::slice::from_raw_parts(bytes, num_bytes as usize) };
let array = parse_arrow_ipc_encapsulated_message(bytes).map_err(|err| {
let field = unsafe { arrow2::ffi::import_field_from_c(&schema) }.map_err(|err| {
CError::new(
CErrorCode::ArrowIpcMessageParsingFailure,
&format!("Failed to parse Arrow IPC encapsulated message: {err}"),
CErrorCode::ArrowFfiSchemaImportError,
&format!("Failed to import ffi schema: {err}"),
)
})?;

let values =
unsafe { arrow2::ffi::import_array_from_c(array, field.data_type) }.map_err(|err| {
CError::new(
CErrorCode::ArrowFfiArrayImportError,
&format!("Failed to import ffi array: {err}"),
)
})?;

cells.push(
DataCell::try_from_arrow(component_name, array).map_err(|err| {
DataCell::try_from_arrow(component_name, values).map_err(|err| {
CError::new(
CErrorCode::ArrowDataCellError,
&format!("Failed to create arrow datacell from message: {err}"),
&format!("Failed to create arrow datacell: {err}"),
)
})?,
);
Expand Down Expand Up @@ -636,7 +656,7 @@ fn rr_log_impl(
#[no_mangle]
pub unsafe extern "C" fn rr_recording_stream_log(
stream: CRecordingStream,
data_row: *const CDataRow,
data_row: CDataRow,
inject_time: bool,
error: *mut CError,
) {
Expand All @@ -655,77 +675,3 @@ fn initialize_logging() {
re_log::setup_native_logging();
});
}

fn parse_arrow_ipc_encapsulated_message(
bytes: &[u8],
) -> Result<Box<dyn arrow2::array::Array>, String> {
re_log::debug!(
"parse_arrow_ipc_encapsulated_message: {} bytes",
bytes.len()
);

use arrow2::io::ipc::read::{read_stream_metadata, StreamReader, StreamState};

let mut cursor = std::io::Cursor::new(bytes);
let metadata = match read_stream_metadata(&mut cursor) {
Ok(metadata) => metadata,
Err(err) => return Err(format!("Failed to read stream metadata: {err}")),
};

// This IPC message represents the contents of a single DataCell, thus we should have a single
// field.
if metadata.schema.fields.len() != 1 {
return Err(format!(
"Found {} fields in stream metadata - expected exactly one.",
metadata.schema.fields.len(),
));
}
// Might need that later if it turns out we don't have any data to log.
let datatype = metadata.schema.fields[0].data_type().clone();

let stream = StreamReader::new(cursor, metadata, None);
let chunks: Result<Vec<_>, _> = stream
.map(|state| match state {
Ok(StreamState::Some(chunk)) => Ok(chunk),
Ok(StreamState::Waiting) => {
unreachable!("cannot be waiting on a fixed buffer")
}
Err(err) => Err(err),
})
.collect();

let chunks = chunks.map_err(|err| format!("Arrow error: {err}"))?;

// We're not sending a `DataCellColumn`'s (i.e. `List<DataCell>`) worth of data like we normally do
// here, rather we're sending a single, independent `DataCell`'s worth of data.
//
// This distinction is crucial:
// - The data for a `DataCellColumn` containing a single empty `DataCell` is a unit-length list-array whose
// first and only entry is an empty array (`ListArray[[]]`). There's actually data there (as
// in bytes).
// - The data for a standalone empty `DataCell`, on the other hand, is literally nothing. It's
// zero bytes.
//
// Where there's no data whatsoever, the chunk gets optimized out, which is why logging an
// empty array in C++ ends up hitting this path.
if chunks.is_empty() {
// The fix is simple: craft an empty array with the correct datatype.
return Ok(arrow2::array::new_empty_array(datatype));
}

if chunks.len() > 1 {
return Err(format!(
"Found {} chunks in stream - expected just one.",
chunks.len()
));
}
let chunk = chunks.into_iter().next().unwrap();

let arrays = chunk.into_arrays();

if arrays.len() != 1 {
return Err(format!("Expected one array, got {}", arrays.len()));
}

Ok(arrays.into_iter().next().unwrap())
}
29 changes: 15 additions & 14 deletions crates/rerun_c/src/rerun.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extern "C" {

#include <stdbool.h>
#include <stdint.h>
#include "arrow_c_data_interface.h"

// ----------------------------------------------------------------------------
// Types:
Expand Down Expand Up @@ -131,22 +132,17 @@ typedef struct rr_store_info {
rr_store_kind store_kind;
} rr_store_info;

/// Arrow-encoded data of a single component for a single entity.
/// Arrow-encoded data of a single batch components for a single entity.
typedef struct rr_data_cell {
/// The name of the component, e.g. `position`.
rr_string component_name;

/// The number of bytes in the `bytes` field.
/// Must be a multiple of 8.
uint64_t num_bytes;
/// A batch of instances of this component serialized into an arrow array.
ArrowArray array;

/// Data in the Arrow IPC encapsulated message format.
///
/// There must be exactly one chunk of data.
///
/// * <https://arrow.apache.org/docs/format/Columnar.html#format-ipc>
/// * <https://wesm.github.io/arrow-site-test/format/IPC.html#encapsulated-message-format>
const uint8_t* bytes;
/// The schema used for the arrow array.
/// TODO(andreas): Use a schema registry that identifies this and the component name with a unique schema ID.
ArrowSchema schema;
} rr_data_cell;

/// Arrow-encoded log data for a single entity.
Expand All @@ -163,7 +159,7 @@ typedef struct {
uint32_t num_data_cells;

/// One for each component.
const rr_data_cell* data_cells;
rr_data_cell* data_cells;
} rr_data_row;

/// Error codes returned by the Rerun C SDK as part of `rr_error`.
Expand All @@ -190,7 +186,8 @@ enum {

// Arrow data processing errors.
_RR_ERROR_CODE_CATEGORY_ARROW = 0x000001000,
RR_ERROR_CODE_ARROW_IPC_MESSAGE_PARSING_FAILURE,
RR_ERROR_CODE_ARROW_FFI_SCHEMA_IMPORT_ERROR,
RR_ERROR_CODE_ARROW_FFI_ARRAY_IMPORT_ERROR,
RR_ERROR_CODE_ARROW_DATA_CELL_ERROR,

// Generic errors.
Expand Down Expand Up @@ -363,8 +360,12 @@ extern void rr_recording_stream_reset_time(rr_recording_stream stream);
///
/// If `inject_time` is set to `true`, the row's timestamp data will be
/// overridden using the recording streams internal clock.
///
/// Takes ownership of the passed data cells and will release underlying
/// arrow data once it is no longer needed.
/// Any pointers passed via `rr_string` can be safely freed after this call.
extern void rr_recording_stream_log(
rr_recording_stream stream, const rr_data_row* data_row, bool inject_time, rr_error* error
rr_recording_stream stream, rr_data_row data_row, bool inject_time, rr_error* error
);

// ----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit b5918ff

Please sign in to comment.