Skip to content

Commit

Permalink
[eclipse-iceoryx#532] Add builder argument to define the allocation s…
Browse files Browse the repository at this point in the history
…trategy of the publisher; rename max_slice_len into initial_max_slice_len
  • Loading branch information
elfenpiff committed Nov 27, 2024
1 parent f656be3 commit 8e94f4b
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 44 deletions.
4 changes: 2 additions & 2 deletions benchmarks/publish-subscribe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn perform_benchmark<T: Service>(args: &Args) -> Result<(), Box<dyn std::error::
.spawn(|| {
let sender_a2b = service_a2b
.publisher_builder()
.max_slice_len(args.payload_size)
.initial_max_slice_len(args.payload_size)
.create()
.unwrap();
let receiver_b2a = service_b2a.subscriber_builder().create().unwrap();
Expand Down Expand Up @@ -93,7 +93,7 @@ fn perform_benchmark<T: Service>(args: &Args) -> Result<(), Box<dyn std::error::
.spawn(|| {
let sender_b2a = service_b2a
.publisher_builder()
.max_slice_len(args.payload_size)
.initial_max_slice_len(args.payload_size)
.create()
.unwrap();
let receiver_a2b = service_a2b.subscriber_builder().create().unwrap();
Expand Down
6 changes: 4 additions & 2 deletions examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ auto main() -> int {
// Since the payload type is uint8_t, this number is the same as the number of bytes in the payload.
// For other types, number of bytes used by the payload will be max_slice_len * sizeof(Payload::ValueType)
const uint64_t maximum_elements = 1024; // NOLINT
auto publisher =
service.publisher_builder().max_slice_len(maximum_elements).create().expect("successful publisher creation");
auto publisher = service.publisher_builder()
.initial_max_slice_len(maximum_elements)
.create()
.expect("successful publisher creation");

auto counter = 0;

Expand Down
2 changes: 1 addition & 1 deletion examples/rust/publish_subscribe_dynamic_data/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let maximum_elements = 1024;
let publisher = service
.publisher_builder()
.max_slice_len(maximum_elements)
.initial_max_slice_len(maximum_elements)
.create()?;

let mut counter = 0;
Expand Down
9 changes: 5 additions & 4 deletions iceoryx2-ffi/cxx/include/iox2/port_factory_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PortFactoryPublisher {
/// Sets the maximum slice length that a user can allocate with
/// [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`].
template <typename T = Payload, typename = std::enable_if_t<iox::IsSlice<T>::VALUE, void>>
auto max_slice_len(uint64_t value) && -> PortFactoryPublisher&&;
auto initial_max_slice_len(uint64_t value) && -> PortFactoryPublisher&&;

/// Creates a new [`Publisher`] or returns a [`PublisherCreateError`] on failure.
auto create() && -> iox::expected<Publisher<S, Payload, UserHeader>, PublisherCreateError>;
Expand All @@ -68,7 +68,8 @@ inline PortFactoryPublisher<S, Payload, UserHeader>::PortFactoryPublisher(iox2_p

template <ServiceType S, typename Payload, typename UserHeader>
template <typename T, typename>
inline auto PortFactoryPublisher<S, Payload, UserHeader>::max_slice_len(uint64_t value) && -> PortFactoryPublisher&& {
inline auto
PortFactoryPublisher<S, Payload, UserHeader>::initial_max_slice_len(uint64_t value) && -> PortFactoryPublisher&& {
m_max_slice_len.emplace(value);
return std::move(*this);
}
Expand All @@ -82,8 +83,8 @@ PortFactoryPublisher<S, Payload, UserHeader>::create() && -> iox::expected<Publi
&m_handle, static_cast<iox2_unable_to_deliver_strategy_e>(iox::into<int>(value)));
});
m_max_slice_len
.and_then([&](auto value) { iox2_port_factory_publisher_builder_set_max_slice_len(&m_handle, value); })
.or_else([&]() { iox2_port_factory_publisher_builder_set_max_slice_len(&m_handle, 1); });
.and_then([&](auto value) { iox2_port_factory_publisher_builder_set_initial_max_slice_len(&m_handle, value); })
.or_else([&]() { iox2_port_factory_publisher_builder_set_initial_max_slice_len(&m_handle, 1); });
m_max_loaned_samples.and_then(
[&](auto value) { iox2_port_factory_publisher_builder_set_max_loaned_samples(&m_handle, value); });

Expand Down
6 changes: 3 additions & 3 deletions iceoryx2-ffi/cxx/include/iox2/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Publisher {

/// Returns the maximum number of elements that can be loaned in a slice.
template <typename T = Payload, typename = std::enable_if_t<iox::IsSlice<T>::VALUE, void>>
auto max_slice_len() const -> uint64_t;
auto initial_max_slice_len() const -> uint64_t;

/// Copies the input `value` into a [`SampleMut`] and delivers it.
/// On success it returns the number of [`Subscriber`]s that received
Expand Down Expand Up @@ -154,8 +154,8 @@ inline auto Publisher<S, Payload, UserHeader>::unable_to_deliver_strategy() cons

template <ServiceType S, typename Payload, typename UserHeader>
template <typename T, typename>
inline auto Publisher<S, Payload, UserHeader>::max_slice_len() const -> uint64_t {
return iox2_publisher_max_slice_len(&m_handle);
inline auto Publisher<S, Payload, UserHeader>::initial_max_slice_len() const -> uint64_t {
return iox2_publisher_initial_max_slice_len(&m_handle);
}

template <ServiceType S, typename Payload, typename UserHeader>
Expand Down
16 changes: 8 additions & 8 deletions iceoryx2-ffi/cxx/tests/src/service_publish_subscribe_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ TYPED_TEST(ServicePublishSubscribeTest, slice_copy_send_receive_works) {
auto service =
node.service_builder(service_name).template publish_subscribe<iox::Slice<DummyData>>().create().expect("");

auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_subscriber = service.subscriber_builder().create().expect("");

iox::UninitializedArray<DummyData, SLICE_MAX_LENGTH, iox::ZeroedBuffer> elements;
Expand Down Expand Up @@ -279,7 +279,7 @@ TYPED_TEST(ServicePublishSubscribeTest, loan_slice_send_receive_works) {
.create()
.expect("");

auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_subscriber = service.subscriber_builder().create().expect("");

auto send_sample = sut_publisher.loan_slice(SLICE_MAX_LENGTH).expect("");
Expand Down Expand Up @@ -316,7 +316,7 @@ TYPED_TEST(ServicePublishSubscribeTest, loan_slice_uninit_send_receive_works) {
.create()
.expect("");

auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_subscriber = service.subscriber_builder().create().expect("");

auto send_sample = sut_publisher.loan_slice_uninit(SLICE_MAX_LENGTH).expect("");
Expand Down Expand Up @@ -359,7 +359,7 @@ TYPED_TEST(ServicePublishSubscribeTest, loan_slice_uninit_with_bytes_send_receiv
.create()
.expect("");

auto sut_publisher = service.publisher_builder().max_slice_len(sizeof(DummyData)).create().expect("");
auto sut_publisher = service.publisher_builder().initial_max_slice_len(sizeof(DummyData)).create().expect("");
auto sut_subscriber = service.subscriber_builder().create().expect("");

auto send_sample = sut_publisher.loan_slice_uninit(sizeof(DummyData)).expect("");
Expand Down Expand Up @@ -389,7 +389,7 @@ TYPED_TEST(ServicePublishSubscribeTest, write_from_fn_send_receive_works) {
auto service =
node.service_builder(service_name).template publish_subscribe<iox::Slice<DummyData>>().create().expect("");

auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_subscriber = service.subscriber_builder().create().expect("");

auto send_sample = sut_publisher.loan_slice_uninit(SLICE_MAX_LENGTH).expect("");
Expand Down Expand Up @@ -423,7 +423,7 @@ TYPED_TEST(ServicePublishSubscribeTest, write_from_slice_send_receive_works) {
auto service =
node.service_builder(service_name).template publish_subscribe<iox::Slice<DummyData>>().create().expect("");

auto sut_publisher = service.publisher_builder().max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_publisher = service.publisher_builder().initial_max_slice_len(SLICE_MAX_LENGTH).create().expect("");
auto sut_subscriber = service.subscriber_builder().create().expect("");

iox::UninitializedArray<DummyData, SLICE_MAX_LENGTH, iox::ZeroedBuffer> elements;
Expand Down Expand Up @@ -607,9 +607,9 @@ TYPED_TEST(ServicePublishSubscribeTest, publisher_applies_max_slice_len) {
auto service =
node.service_builder(service_name).template publish_subscribe<iox::Slice<ValueType>>().create().expect("");

auto sut = service.publisher_builder().max_slice_len(DESIRED_MAX_SLICE_LEN).create().expect("");
auto sut = service.publisher_builder().initial_max_slice_len(DESIRED_MAX_SLICE_LEN).create().expect("");

ASSERT_THAT(sut.max_slice_len(), Eq(DESIRED_MAX_SLICE_LEN));
ASSERT_THAT(sut.initial_max_slice_len(), Eq(DESIRED_MAX_SLICE_LEN));
}

TYPED_TEST(ServicePublishSubscribeTest, send_receive_with_user_header_works) {
Expand Down
6 changes: 3 additions & 3 deletions iceoryx2-ffi/ffi/src/api/port_factory_publisher_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl HandleToType for iox2_port_factory_publisher_builder_h_ref {
///
/// * `port_factory_handle` must be valid handles
#[no_mangle]
pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_max_slice_len(
pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_initial_max_slice_len(
port_factory_handle: iox2_port_factory_publisher_builder_h_ref,
value: c_size_t,
) {
Expand All @@ -197,14 +197,14 @@ pub unsafe extern "C" fn iox2_port_factory_publisher_builder_set_max_slice_len(
let port_factory = ManuallyDrop::take(&mut port_factory_struct.value.as_mut().ipc);

port_factory_struct.set(PortFactoryPublisherBuilderUnion::new_ipc(
port_factory.max_slice_len(value),
port_factory.initial_max_slice_len(value),
));
}
iox2_service_type_e::LOCAL => {
let port_factory = ManuallyDrop::take(&mut port_factory_struct.value.as_mut().local);

port_factory_struct.set(PortFactoryPublisherBuilderUnion::new_local(
port_factory.max_slice_len(value),
port_factory.initial_max_slice_len(value),
));
}
}
Expand Down
8 changes: 5 additions & 3 deletions iceoryx2-ffi/ffi/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,17 @@ pub unsafe extern "C" fn iox2_publisher_unable_to_deliver_strategy(
///
/// * `publisher_handle` is valid and non-null
#[no_mangle]
pub unsafe extern "C" fn iox2_publisher_max_slice_len(
pub unsafe extern "C" fn iox2_publisher_initial_max_slice_len(
publisher_handle: iox2_publisher_h_ref,
) -> c_int {
publisher_handle.assert_non_null();

let publisher = &mut *publisher_handle.as_type();
match publisher.service_type {
iox2_service_type_e::IPC => publisher.value.as_mut().ipc.max_slice_len() as c_int,
iox2_service_type_e::LOCAL => publisher.value.as_mut().local.max_slice_len() as c_int,
iox2_service_type_e::IPC => publisher.value.as_mut().ipc.initial_max_slice_len() as c_int,
iox2_service_type_e::LOCAL => {
publisher.value.as_mut().local.initial_max_slice_len() as c_int
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl<Service: service::Service> DataSegment<Service> {
match self.subscriber_connections.create(
i,
*subscriber_details,
self.config.max_slice_len,
self.config.initial_max_slice_len,
) {
Ok(()) => match &self.subscriber_connections.get(i) {
Some(connection) => self.deliver_sample_history(connection),
Expand Down Expand Up @@ -587,17 +587,17 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
with PublisherCreateError::UnableToCreateDataSegment,
"{} since the data segment could not be acquired.", msg);

let max_slice_len = config.max_slice_len;
let max_slice_len = config.initial_max_slice_len;
let data_segment = Arc::new(DataSegment {
is_active: IoxAtomicBool::new(true),
memory: data_segment,
payload_size: static_config
.message_type_details()
.sample_layout(config.max_slice_len)
.sample_layout(config.initial_max_slice_len)
.size(),
payload_type_layout: static_config
.message_type_details()
.payload_layout(config.max_slice_len),
.payload_layout(config.initial_max_slice_len),
sample_reference_counter: {
let mut v = Vec::with_capacity(number_of_samples);
for _ in 0..number_of_samples {
Expand Down Expand Up @@ -680,7 +680,7 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
) -> Result<Service::SharedMemory, SharedMemoryCreateError> {
let l = static_config
.message_type_details
.sample_layout(config.max_slice_len);
.sample_layout(config.initial_max_slice_len);
let allocator_config = shm_allocator::pool_allocator::Config { bucket_layout: l };

Ok(fail!(from "Publisher::create_data_segment()",
Expand All @@ -705,8 +705,8 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
}

/// Returns the maximum slice length configured for this [`Publisher`].
pub fn max_slice_len(&self) -> usize {
self.data_segment.config.max_slice_len
pub fn initial_max_slice_len(&self) -> usize {
self.data_segment.config.initial_max_slice_len
}

fn allocate(&self, layout: Layout) -> Result<ShmPointer, PublisherLoanError> {
Expand Down Expand Up @@ -995,7 +995,7 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
underlying_number_of_slice_elements: usize,
) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, PublisherLoanError>
{
let max_slice_len = self.data_segment.config.max_slice_len;
let max_slice_len = self.data_segment.config.initial_max_slice_len;
if max_slice_len < slice_len {
fail!(from self, with PublisherLoanError::ExceedsMaxLoanSize,
"Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
Expand Down
20 changes: 16 additions & 4 deletions iceoryx2/src/service/port_factory/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
use std::fmt::Debug;

use iceoryx2_bb_log::fail;
use iceoryx2_cal::shm_allocator::AllocationStrategy;
use serde::{de::Visitor, Deserialize, Serialize};

use super::publish_subscribe::PortFactory;
Expand Down Expand Up @@ -130,7 +131,8 @@ pub(crate) struct LocalPublisherConfig {
pub(crate) max_loaned_samples: usize,
pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy,
pub(crate) degration_callback: Option<DegrationCallback<'static>>,
pub(crate) max_slice_len: usize,
pub(crate) initial_max_slice_len: usize,
pub(crate) allocation_strategy: AllocationStrategy,
}

/// Factory to create a new [`Publisher`] port/endpoint for
Expand All @@ -153,8 +155,9 @@ impl<'factory, Service: service::Service, Payload: Debug + ?Sized, UserHeader: D
pub(crate) fn new(factory: &'factory PortFactory<Service, Payload, UserHeader>) -> Self {
Self {
config: LocalPublisherConfig {
allocation_strategy: AllocationStrategy::Static,
degration_callback: None,
max_slice_len: 1,
initial_max_slice_len: 1,
max_loaned_samples: factory
.service
.__internal_state()
Expand Down Expand Up @@ -227,8 +230,17 @@ impl<'factory, Service: service::Service, Payload: Debug, UserHeader: Debug>
{
/// Sets the maximum slice length that a user can allocate with
/// [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`].
pub fn max_slice_len(mut self, value: usize) -> Self {
self.config.max_slice_len = value;
pub fn initial_max_slice_len(mut self, value: usize) -> Self {
self.config.initial_max_slice_len = value;
self
}

/// Defines the allocation strategy that is used when the provided
/// [`PortFactoryPublisher::initial_max_slice_len()`] is exhausted. This happens when the user
/// acquires a more than max slice len in [`Publisher::loan_slice()`] or
/// [`Publisher::loan_slice_uninit()`].
pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self {
self.config.allocation_strategy = value;
self
}
}
6 changes: 3 additions & 3 deletions iceoryx2/tests/publisher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod publisher {

let publisher = service
.publisher_builder()
.max_slice_len(NUMBER_OF_ELEMENTS)
.initial_max_slice_len(NUMBER_OF_ELEMENTS)
.create()?;
let sut = publisher.loan_slice(NUMBER_OF_ELEMENTS)?;

Expand All @@ -129,7 +129,7 @@ mod publisher {

let publisher = service
.publisher_builder()
.max_slice_len(NUMBER_OF_ELEMENTS)
.initial_max_slice_len(NUMBER_OF_ELEMENTS)
.create()?;

for i in 0..NUMBER_OF_ELEMENTS {
Expand All @@ -153,7 +153,7 @@ mod publisher {

let publisher = service
.publisher_builder()
.max_slice_len(NUMBER_OF_ELEMENTS)
.initial_max_slice_len(NUMBER_OF_ELEMENTS)
.create()?;

let sut = publisher.loan_slice(NUMBER_OF_ELEMENTS + 1);
Expand Down
6 changes: 3 additions & 3 deletions iceoryx2/tests/service_publish_subscribe_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2474,7 +2474,7 @@ mod service_publish_subscribe {

let publisher = sut
.publisher_builder()
.max_slice_len(MAX_ELEMENTS)
.initial_max_slice_len(MAX_ELEMENTS)
.create()
.unwrap();
let subscriber = sut.subscriber_builder().create().unwrap();
Expand Down Expand Up @@ -2517,7 +2517,7 @@ mod service_publish_subscribe {

let publisher = service_pub
.publisher_builder()
.max_slice_len(MAX_ELEMENTS)
.initial_max_slice_len(MAX_ELEMENTS)
.create()
.unwrap();
let subscriber = service_sub.subscriber_builder().create().unwrap();
Expand Down Expand Up @@ -2958,7 +2958,7 @@ mod service_publish_subscribe {

let publisher = sut
.publisher_builder()
.max_slice_len(NUMBER_OF_ELEMENTS)
.initial_max_slice_len(NUMBER_OF_ELEMENTS)
.create()
.unwrap();
let subscriber = sut.subscriber_builder().create().unwrap();
Expand Down

0 comments on commit 8e94f4b

Please sign in to comment.