Skip to content

Commit

Permalink
Merge pull request eclipse-iceoryx#860 from ApexAI/iox-#415-make-serv…
Browse files Browse the repository at this point in the history
…ice-registry-aware-of-complete-service-description

Iox eclipse-iceoryx#415 Make `ServiceRegistry` aware of complete `ServiceDescription`
  • Loading branch information
mossmaurice authored Aug 30, 2021
2 parents 6930b6d + 78d5952 commit 816270e
Show file tree
Hide file tree
Showing 18 changed files with 676 additions and 292 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ ServiceDescription("First", "Second", "DontCare") myServiceDescription2;
ServiceDescription("Foo", "Bar", "Baz") myServiceDescription3;
```

The service-related methods have been moved from `PoshRuntime` to a separate class (TBD):

```cpp
// before
poshRuntime.offerService(myServiceDescription);
poshRuntime.stopOfferService(myServiceDescription);
poshRuntime.findService({"ServiceA", iox::capro::AnyInstanceString});

// after
discoveryInfo.offerService(myServiceDescription);
discoveryInfo.stopOfferService(myServiceDescription);
discoveryInfo.findService("ServiceA", Wildcard);
```
## [v1.0.1](https://github.com/eclipse-iceoryx/iceoryx/tree/v1.0.0) (2021-06-15)
[Full Changelog](https://github.com/eclipse-iceoryx/iceoryx/compare/v1.0.0...v1.0.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace iox
error(POSH__RUNTIME_NAME_EMPTY) \
error(POSH__RUNTIME_LEADING_SLASH_PROVIDED) \
error(POSH__PORT_MANAGER_PUBLISHERPORT_NOT_UNIQUE) \
error(POSH__PORT_MANAGER_COULD_NOT_ADD_SERVICE_TO_REGISTRY) \
error(POSH__MEMPOOL_POSSIBLE_DOUBLE_FREE) \
error(POSH__RECEIVERPORT_DELIVERYFIFO_OVERFLOW) \
error(POSH__SENDERPORT_SAMPLE_SIZE_CHANGED_FOR_ACTIVE_PORT) \
Expand Down
11 changes: 8 additions & 3 deletions iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class PublisherPortUser;
class SubscriberPortRouDi;
class SubscriberPortUser;
} // namespace popo
namespace capro
{
class ServiceDescription;
}

using PublisherPortRouDiType = iox::popo::PublisherPortRouDi;
using PublisherPortUserType = iox::popo::PublisherPortUser;
Expand Down Expand Up @@ -131,10 +135,11 @@ constexpr uint32_t APP_MESSAGE_SIZE = 512U;

// Processes
constexpr uint32_t MAX_PROCESS_NUMBER = 300U;
/// Maximum number of instances of a given service, which can be found.
/// Maximum number of services, which can be found.
/// This limitation is coming due to the fixed capacity of the cxx::vector (This doesn't limit the offered number of
/// instances)
constexpr uint32_t MAX_NUMBER_OF_INSTANCES = 50U;
/// @todo #415 increase number back to 50 once service registry is available via shared memory
constexpr uint32_t MAX_NUMBER_OF_SERVICES = 10U;

// Nodes
constexpr uint32_t MAX_NODE_NUMBER = 1000U;
Expand Down Expand Up @@ -249,7 +254,7 @@ using TimePointNs_t = std::chrono::time_point<BaseClock_t, DurationNs_t>;

namespace runtime
{
using InstanceContainer = iox::cxx::vector<capro::IdString_t, MAX_NUMBER_OF_INSTANCES>;
using ServiceContainer = iox::cxx::vector<capro::ServiceDescription, MAX_NUMBER_OF_SERVICES>;
using namespace units::duration_literals;
constexpr units::Duration PROCESS_WAITING_FOR_ROUDI_TIMEOUT = 60_s;
constexpr units::Duration PROCESS_KEEP_ALIVE_INTERVAL = 3 * roudi::DISCOVERY_INTERVAL; // > DISCOVERY_INTERVAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class PortManager

void sendToAllMatchingInterfacePorts(const capro::CaproMessage& message) noexcept;

void addEntryToServiceRegistry(const capro::IdString_t& service, const capro::IdString_t& instance) noexcept;
void removeEntryFromServiceRegistry(const capro::IdString_t& service, const capro::IdString_t& instance) noexcept;
void addEntryToServiceRegistry(const capro::ServiceDescription& service) noexcept;
void removeEntryFromServiceRegistry(const capro::ServiceDescription& service) noexcept;

template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* = nullptr>
cxx::optional<RuntimeName_t>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
#ifndef IOX_POSH_ROUDI_SERVICE_REGISTRY_HPP
#define IOX_POSH_ROUDI_SERVICE_REGISTRY_HPP

#include "iceoryx_hoofs/cxx/expected.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_hoofs/internal/cxx/set.hpp"
#include "iceoryx_posh/capro/service_description.hpp"

#include <cstdint>
#include <map>
#include <utility>

namespace iox
{
Expand All @@ -32,23 +33,50 @@ static const capro::IdString_t Wildcard{"*"};
class ServiceRegistry
{
public:
static constexpr uint32_t MAX_INSTANCES_PER_SERVICE = 100u;
using InstanceSet_t = cxx::vector<capro::IdString_t, MAX_INSTANCES_PER_SERVICE>;
struct instance_t
enum class Error
{
InstanceSet_t instanceSet;
INVALID_STATE,
SERVICE_REGISTRY_FULL,
};
using serviceMap_t = std::map<capro::IdString_t, instance_t>;

void add(const capro::IdString_t& service, const capro::IdString_t& instance);
void remove(const capro::IdString_t& service, const capro::IdString_t& instance);
void find(InstanceSet_t& instances,
const capro::IdString_t& service,
const capro::IdString_t& instance = Wildcard) const;
const serviceMap_t& getServiceMap() const;
using ReferenceCounter_t = uint64_t;
struct ServiceDescriptionEntry
{
capro::ServiceDescription serviceDescription{};
ReferenceCounter_t referenceCounter = 0U;
};

/// @todo #415 should be connected with iox::MAX_NUMBER_OF_SERVICES
static constexpr uint32_t MAX_SERVICE_DESCRIPTIONS = 100U;
using ServiceDescriptionVector_t = cxx::vector<ServiceDescriptionEntry, MAX_SERVICE_DESCRIPTIONS>;

/// @brief Adds given service description to registry
/// @param[in] serviceDescription, service to be added
/// @return ServiceRegistryError, error wrapped in cxx::expected
cxx::expected<Error> add(const capro::ServiceDescription& serviceDescription) noexcept;

/// @brief Removes given service description from registry
/// @param[in] serviceDescription, service to be removed
/// @return true, if service description was removed, false otherwise
bool remove(const capro::ServiceDescription& serviceDescription) noexcept;

/// @brief Removes given service description from registry
/// @param[in] searchResult, reference to the vector which will be filled with the results
/// @param[in] service, string or wildcard to search for
/// @param[in] instance, string or wildcard to search for
void find(ServiceDescriptionVector_t& searchResult,
const capro::IdString_t& service = Wildcard,
const capro::IdString_t& instance = Wildcard) const noexcept;

/// @brief Returns all service descriptions as copy
/// @return ServiceDescriptionVector_t, copy of complete service registry
const ServiceDescriptionVector_t getServices() const noexcept;

private:
mutable serviceMap_t m_serviceMap;
/// @todo #859 replace std::multimap with prefix tree
::std::multimap<capro::IdString_t, uint64_t> m_serviceMap;
::std::multimap<capro::IdString_t, uint64_t> m_instanceMap;
ServiceDescriptionVector_t m_serviceDescriptionVector;
};
} // namespace roudi
} // namespace iox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class PoshRuntimeImpl : public PoshRuntime
virtual ~PoshRuntimeImpl() noexcept;

/// @copydoc PoshRuntime::findService
cxx::expected<InstanceContainer, FindServiceError>
findService(const cxx::variant<Any_t, capro::IdString_t> service,
const cxx::variant<Any_t, capro::IdString_t> instance) noexcept override;
cxx::expected<ServiceContainer, FindServiceError>
findService(const cxx::variant<Wildcard_t, capro::IdString_t> service,
const cxx::variant<Wildcard_t, capro::IdString_t> instance) noexcept override;

/// @copydoc PoshRuntime::offerService
bool offerService(const capro::ServiceDescription& serviceDescription) noexcept override;
Expand Down
18 changes: 9 additions & 9 deletions iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ class NodeData;
enum class FindServiceError
{
INVALID_STATE,
UNABLE_TO_WRITE_TO_ROUDI_CHANNEL,
INSTANCE_CONTAINER_OVERFLOW
UNABLE_TO_WRITE_TO_ROUDI_CHANNEL, /// @todo #415 remove as IPC channel won't be used
INSTANCE_CONTAINER_OVERFLOW /// @todo #415 set container to iox::MAX_NUMBER_OF_SERVICES and remove error
};

/// @brief Used to search for any string (wildcard)
struct Any_t
/// @brief Used to search for any string
struct Wildcard_t
{
};

Expand Down Expand Up @@ -89,12 +89,12 @@ class PoshRuntime
/// @brief find all services that match the provided service description
/// @param[in] service service string to search for (wildcards allowed)
/// @param[in] instance instance string to search for (wildcards allowed)
/// @return cxx::expected<InstanceContainer, FindServiceError>
/// InstanceContainer: on success, container that is filled with all matching instances
/// @return cxx::expected<ServiceContainer, FindServiceError>
/// ServiceContainer: on success, container that is filled with all matching instances
/// FindServiceError: if any, encountered during the operation
virtual cxx::expected<InstanceContainer, FindServiceError>
findService(const cxx::variant<Any_t, capro::IdString_t> service,
const cxx::variant<Any_t, capro::IdString_t> instance) noexcept = 0;
virtual cxx::expected<ServiceContainer, FindServiceError>
findService(const cxx::variant<Wildcard_t, capro::IdString_t> service,
const cxx::variant<Wildcard_t, capro::IdString_t> instance) noexcept = 0;

/// @brief offer the provided service, sends the offer from application to RouDi daemon
/// @param[in] service valid ServiceDescription to offer
Expand Down
5 changes: 0 additions & 5 deletions iceoryx_posh/source/capro/service_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ ServiceDescription::ServiceDescription(const IdString_t& service,

bool ServiceDescription::operator==(const ServiceDescription& rhs) const
{
if (!isValid() || !rhs.isValid())
{
return false;
}

if (m_serviceString != rhs.m_serviceString)
{
return false;
Expand Down
59 changes: 26 additions & 33 deletions iceoryx_posh/source/roudi/port_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,11 @@ void PortManager::doDiscoveryForPublisherPort(PublisherPortRouDiType& publisherP
m_portIntrospection.reportMessage(caproMessage);
if (capro::CaproMessageType::OFFER == caproMessage.m_type)
{
this->addEntryToServiceRegistry(caproMessage.m_serviceDescription.getServiceIDString(),
caproMessage.m_serviceDescription.getInstanceIDString());
this->addEntryToServiceRegistry(caproMessage.m_serviceDescription);
}
else if (capro::CaproMessageType::STOP_OFFER == caproMessage.m_type)
{
this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription.getServiceIDString(),
caproMessage.m_serviceDescription.getInstanceIDString());
this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription);
}
else
{
Expand Down Expand Up @@ -268,21 +266,19 @@ void PortManager::handleInterfaces() noexcept
}
}
// also forward services from service registry
auto serviceMap = m_serviceRegistry.getServiceMap();
/// @todo #415 do we still need this? yes but return a copy here to be stored in shared memory via new
/// StatusPort's
auto serviceVector = m_serviceRegistry.getServices();

caproMessage.m_subType = capro::CaproMessageSubType::SERVICE;

for (auto const& x : serviceMap)
for (auto const& element : serviceVector)
{
for (auto& instance : x.second.instanceSet)
{
caproMessage.m_serviceDescription = capro::ServiceDescription(x.first, instance, roudi::Wildcard);
caproMessage.m_serviceDescription = element.serviceDescription;

for (auto& interfacePortData : interfacePortsForInitialForwarding)
{
auto interfacePort = popo::InterfacePort(interfacePortData);
interfacePort.dispatchCaProMessage(caproMessage);
}
for (auto& interfacePortData : interfacePortsForInitialForwarding)
{
popo::InterfacePort(interfacePortData).dispatchCaProMessage(caproMessage);
}
}
}
Expand All @@ -306,14 +302,12 @@ void PortManager::handleApplications() noexcept
{
case capro::CaproMessageType::OFFER:
{
addEntryToServiceRegistry(serviceDescription.getServiceIDString(),
serviceDescription.getInstanceIDString());
addEntryToServiceRegistry(caproMessage.m_serviceDescription);
break;
}
case capro::CaproMessageType::STOP_OFFER:
{
removeEntryFromServiceRegistry(serviceDescription.getServiceIDString(),
serviceDescription.getInstanceIDString());
removeEntryFromServiceRegistry(caproMessage.m_serviceDescription);
break;
}
default:
Expand Down Expand Up @@ -550,8 +544,7 @@ void PortManager::destroyPublisherPort(PublisherPortRouDiType::MemberType_t* con
cxx::Ensures(caproMessage.m_type == capro::CaproMessageType::STOP_OFFER);

m_portIntrospection.reportMessage(caproMessage);
this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription.getServiceIDString(),
caproMessage.m_serviceDescription.getInstanceIDString());
this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription);
this->sendToAllMatchingSubscriberPorts(caproMessage, publisherPortRoudi);
this->sendToAllMatchingInterfacePorts(caproMessage);
});
Expand Down Expand Up @@ -600,17 +593,16 @@ runtime::IpcMessage PortManager::findService(const capro::IdString_t& service,
interfacePort.dispatchCaProMessage(caproMessage);
}

// add all found instances to instanceString
runtime::IpcMessage instanceMessage;
runtime::IpcMessage response;

ServiceRegistry::InstanceSet_t instances;
m_serviceRegistry.find(instances, service, instance);
for (auto& instance : instances)
ServiceRegistry::ServiceDescriptionVector_t searchResult;
m_serviceRegistry.find(searchResult, service, instance);
for (auto& service : searchResult)
{
instanceMessage << instance;
response << static_cast<cxx::Serialization>(service.serviceDescription).toString();
}

return instanceMessage;
return response;
}

const std::atomic<uint64_t>* PortManager::serviceRegistryChangeCounter() noexcept
Expand Down Expand Up @@ -721,17 +713,18 @@ popo::ApplicationPortData* PortManager::acquireApplicationPortData(const Runtime
}
}

void PortManager::addEntryToServiceRegistry(const capro::IdString_t& service,
const capro::IdString_t& instance) noexcept
void PortManager::addEntryToServiceRegistry(const capro::ServiceDescription& service) noexcept
{
m_serviceRegistry.add(service, instance);
m_serviceRegistry.add(service).or_else([&](auto&) {
LogWarn() << "Could not add service " << service.getServiceIDString() << " to service registry!";
errorHandler(Error::kPOSH__PORT_MANAGER_COULD_NOT_ADD_SERVICE_TO_REGISTRY, nullptr, ErrorLevel::MODERATE);
});
m_portPool->serviceRegistryChangeCounter()->fetch_add(1, std::memory_order_relaxed);
}

void PortManager::removeEntryFromServiceRegistry(const capro::IdString_t& service,
const capro::IdString_t& instance) noexcept
void PortManager::removeEntryFromServiceRegistry(const capro::ServiceDescription& service) noexcept
{
m_serviceRegistry.remove(service, instance);
m_serviceRegistry.remove(service);
m_portPool->serviceRegistryChangeCounter()->fetch_add(1, std::memory_order_relaxed);
}

Expand Down
7 changes: 3 additions & 4 deletions iceoryx_posh/source/roudi/process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,10 @@ void ProcessManager::findServiceForProcess(const RuntimeName_t& name,
searchForProcessAndThen(
name,
[&](Process& process) {
runtime::IpcMessage instanceString({m_portManager.findService(service, instance)});
process.sendViaIpcChannel(instanceString);
LogDebug() << "Sent InstanceString to application " << name;
process.sendViaIpcChannel({m_portManager.findService(service, instance)});
LogDebug() << "Sent all found services to application " << name;
},
[&]() { LogWarn() << "Unknown process " << name << " requested an InstanceString."; });
[&]() { LogWarn() << "Unknown process " << name << " requested to find services."; });
}

void ProcessManager::addInterfaceForProcess(const RuntimeName_t& name,
Expand Down
Loading

0 comments on commit 816270e

Please sign in to comment.