Skip to content

Commit

Permalink
PR #12467 from Eran: misc DDS changes
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Dec 5, 2023
2 parents ea13e47 + 9a86df1 commit 7bc0026
Show file tree
Hide file tree
Showing 23 changed files with 542 additions and 116 deletions.
81 changes: 67 additions & 14 deletions third-party/realdds/include/realdds/dds-guid.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "dds-defines.h"
#include <fastdds/rtps/common/Guid.h>
#include <iosfwd>


namespace realdds {
Expand All @@ -13,25 +14,77 @@ namespace realdds {
static constexpr auto & unknown_guid = eprosima::fastrtps::rtps::c_Guid_Unknown;


// Custom GUID printer: attempts a more succinct representation
// If a base_prefix is provided, will try to minimize a common denominator (vendor, host, etc.) -- you can use your
// participant's guid if you want to shorten
// Convert a prefix to a hex string: not human-readable!
// In one of two formats:
// 223344556677.<0xPID> // eProsima
// Or:
// 001122334455667788990011 // everything else
// Used internally by next function.
//
std::string print( dds_guid const & guid,
dds_guid_prefix const & base_prefix = unknown_guid.guidPrefix,
bool readable_name = true );
struct print_raw_guid_prefix
{
dds_guid_prefix const & _prefix;
explicit print_raw_guid_prefix( dds_guid_prefix const & prefix,
dds_guid_prefix const & /*base_prefix*/ = unknown_guid.guidPrefix )
: _prefix( prefix )
{
}
};
std::ostream & operator<<( std::ostream &, print_raw_guid_prefix const & );


// Same as above, without a base prefix
inline std::string print( dds_guid const & guid, bool readable_name )
// Custom GUID printer: attempts a more succinct representation:
// If the participant is known, its name will be shown instead of the raw bytes.
// <name-or-prefix>.<entity-id-in-hex>
// If a base_prefix is provided, will try to minimize a common denominator -- you can use your
// participant's guid if you want to shorten.
//
struct print_guid
{
return print( guid, unknown_guid.guidPrefix, readable_name );
}
dds_guid const & _guid;
dds_guid_prefix const & _base_prefix;

explicit print_guid( dds_guid const & guid, dds_guid_prefix const & base_prefix = unknown_guid.guidPrefix )
: _guid( guid )
, _base_prefix( base_prefix )
{
}
explicit print_guid( dds_guid const & guid, dds_guid const & base_guid )
: print_guid( guid, base_guid.guidPrefix )
{
}
};
std::ostream & operator<<( std::ostream &, print_guid const & );


// Same as above, with a guid base for flexibility
inline std::string print( dds_guid const & guid, dds_guid const & base_guid, bool readable_name = true )
// Same, except leaves output in raw form (bytes, not name)
// <prefix>.<entity-id-in-hex>
//
struct print_raw_guid
{
return print( guid, base_guid.guidPrefix, readable_name );
}
dds_guid const & _guid;
dds_guid_prefix const & _base_prefix;

explicit print_raw_guid( dds_guid const & guid, dds_guid_prefix const & base_prefix = unknown_guid.guidPrefix )
: _guid( guid )
, _base_prefix( base_prefix )
{
}
explicit print_raw_guid( dds_guid const & guid, dds_guid const & base_guid )
: print_raw_guid( guid, base_guid.guidPrefix )
{
}
};
std::ostream & operator<<( std::ostream &, print_raw_guid const & );


// The reverse: get a guid from a RAW guid string.
// Expecting one of two formats:
// 223344556677.<0xPID>.<0xEID> // eProsima
// Or:
// 001122334455667788990011.<0xEID> // everything else
// Returns unknown_guid if not parseable
dds_guid guid_from_string( std::string const & );


} // namespace realdds
3 changes: 3 additions & 0 deletions third-party/realdds/include/realdds/dds-topic-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener
// The callbacks should be set before we actually create the underlying DDS objects, so the writer does not
void run( qos const & = qos() );

// Waits until all changes were acknowledged; return false on timeout
bool wait_for_acks( dds_time timeout );

// DataWriterListener
protected:
// Called when the Publisher is matched (or unmatched) against an endpoint
Expand Down
89 changes: 75 additions & 14 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
#include <realdds/dds-stream-sensor-bridge.h>
#include <realdds/dds-metadata-syncer.h>

#include <rsutils/os/special-folder.h>
#include <rsutils/os/executable-name.h>
#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/string/from.h>
#include <rsutils/json.h>

#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
Expand All @@ -45,12 +49,6 @@

namespace {

std::string to_string( realdds::dds_guid const & guid )
{
return realdds::print( guid );
}


py::list get_vector3( geometry_msgs::msg::Vector3 const & v )
{
py::list obj( 3 );
Expand All @@ -69,6 +67,55 @@ void set_vector3( geometry_msgs::msg::Vector3 & v, std::array< double, 3 > const
}


std::string script_name()
{
// Returns the name of the python script that's currently running us
return rsutils::os::base_name(
py::module_::import( "__main__" ).attr( "__file__" ).cast< std::string >() );
}


nlohmann::json load_rs_settings( nlohmann::json const & local_settings )
{
nlohmann::json config;

// Load the realsense configuration file settings
std::ifstream f( rsutils::os::get_special_folder( rsutils::os::special_folder::app_data ) + "realsense-config.json" );
if( f.good() )
{
try
{
config = nlohmann::json::parse( f );
}
catch( std::exception const & e )
{
throw std::runtime_error( "failed to load configuration file: " + std::string( e.what() ) );
}
}

// Load "python"-specific settings
auto settings = rsutils::json::load_app_settings( config, "python", "context", "config-file" );

// Take the "dds" settings only
settings = rsutils::json::nested( settings, "dds" );

// Patch any script-specific settings
// NOTE: this is also accessed by pyrealsense2, where a "context" hierarchy is still used
auto script = script_name();
if( auto script_settings = rsutils::json::nested( config, script, "context", "dds" ) )
rsutils::json::patch( settings, script_settings, "config-file/" + script + "/context" );

// We should always have DDS enabled
if( settings.is_object() )
settings.erase( "enabled" );

// Patch the given local settings into the configuration
rsutils::json::patch( settings, local_settings, "local settings" );

return settings;
}


} // namespace


Expand All @@ -94,14 +141,19 @@ PYBIND11_MODULE(NAME, m) {
using realdds::dds_guid;
py::class_< dds_guid >( m, "guid" )
.def( py::init<>() )
.def( "__bool__", []( dds_guid const& self ) { return self != dds_guid::unknown(); } )
.def( "__repr__", []( dds_guid const & self ) { return to_string( self ); } )
.def_static( "from_string",
[]( std::string const & raw_guid ) { return realdds::guid_from_string( raw_guid ); } )
.def( "__bool__", []( dds_guid const & self ) { return self != realdds::unknown_guid; } )
.def( "__str__",
[]( dds_guid const & self ) { return rsutils::string::from( realdds::print_guid( (self) ) ).str(); } )
.def( "__repr__",
[]( dds_guid const & self ) { return rsutils::string::from( realdds::print_raw_guid( ( self ) ) ).str(); } )
// Following two (hash and ==) are needed if we want to be able to use guids as dictionary keys
.def( "__hash__",
[]( dds_guid const & self )
{
return std::hash< std::string >{}(
realdds::print( self, false ) ); // use hex; not the human-readable name
rsutils::string::from( realdds::print_raw_guid( self ) ) );
} )
.def( py::self == py::self );

Expand Down Expand Up @@ -149,12 +201,19 @@ PYBIND11_MODULE(NAME, m) {
( char const * topic_name, eprosima::fastrtps::types::DynamicType_ptr dyn_type ),
callback( topic_name, dyn_type->get_name() ); ) );

m.def( "load_rs_settings", &load_rs_settings, "local-settings"_a = nlohmann::json::object() );
m.def( "script_name", &script_name );

py::class_< dds_participant,
std::shared_ptr< dds_participant > // handled with a shared_ptr
>
participant( m, "participant" );
participant.def( py::init<>() )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "settings"_a = nlohmann::json::object() )
.def( "init",
[]( dds_participant & self, nlohmann::json const & local_settings, realdds::dds_domain_id domain_id )
{ self.init( domain_id, script_name(), local_settings ); },
"local-settings"_a = nlohmann::json::object(), "domain-id"_a = -1 )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "local-settings"_a = nlohmann::json::object() )
.def( "is_valid", &dds_participant::is_valid )
.def( "guid", &dds_participant::guid )
.def( "create_guid", &dds_participant::create_guid )
Expand Down Expand Up @@ -182,7 +241,7 @@ PYBIND11_MODULE(NAME, m) {
eprosima::fastdds::dds::DomainParticipantQos qos;
if( ReturnCode_t::RETCODE_OK == self.get()->get_qos( qos ) )
os << " \"" << qos.name() << "\"";
os << " " << to_string( self.guid() );
os << " " << realdds::print_guid( self.guid() );
}
os << ">";
return os.str();
Expand Down Expand Up @@ -298,8 +357,10 @@ PYBIND11_MODULE(NAME, m) {
callback( self, status.current_count_change ); ) )
.def( "topic", &dds_topic_writer::topic )
.def( "run", &dds_topic_writer::run )
.def( "qos", []() { return writer_qos(); } )
.def( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } );
.def( "has_readers", &dds_topic_writer::has_readers )
.def( "wait_for_acks", &dds_topic_writer::wait_for_acks )
.def_static( "qos", []() { return writer_qos(); } )
.def_static( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } );


// The actual types are declared as functions and not classes: the py::init<> inheritance rules are pretty strict
Expand Down Expand Up @@ -350,7 +411,7 @@ PYBIND11_MODULE(NAME, m) {
[]( SampleIdentity const & self )
{
std::ostringstream os;
os << to_string( self.writer_guid() );
os << realdds::print_guid( self.writer_guid() );
os << '.';
os << self.sequence_number();
return os.str();
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/scripts/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def domain_arg(x):
if t <= 0 or t > 232:
raise ValueError( f'--domain should be [0,232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=0, help='DDS domain to use (default=0)' )
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args = args.parse_args()


Expand All @@ -36,7 +36,7 @@ def e( *a, **kw ):
dds.debug( args.debug )

participant = dds.participant()
participant.init( args.domain, 'devices' )
participant.init( dds.load_rs_settings( settings ), args.domain )

watcher = dds.device_watcher( participant )
watcher.start()
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/scripts/fps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def domain_arg(x):
if t <= 0 or t > 232:
raise ValueError( f'--domain should be [0,232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=0, help='DDS domain to use (default=0)' )
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args.add_argument( '--with-metadata', action='store_true', help='stream with metadata, if available (default off)' )
args = args.parse_args()

Expand All @@ -43,7 +43,7 @@ def e( *a, **kw ):
settings['device'] = { 'metadata' : False };

participant = dds.participant()
participant.init( args.domain, 'fps', settings )
participant.init( dds.load_rs_settings( settings ), args.domain )

# Most important is the topic-root: this assumes we know it in advance and do not have to
# wait for a device-info message (which would complicate the code here).
Expand Down
14 changes: 12 additions & 2 deletions third-party/realdds/scripts/topic-send.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ def json_arg(x):
except Exception as e:
raise ArgumentError( str(e) )
args.add_argument( '--message', metavar='<json>', type=json_arg, help='a message to send', default='{"id":"ping","message":"some message"}' )
args.add_argument( '--ack', action='store_true', help='wait for acks' )
def domain_arg(x):
t = int(x)
if t <= 0 or t > 232:
raise ArgumentError( f'--domain should be [0-232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=0, help='DDS domain to use (default=0)' )
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args = args.parse_args()


Expand All @@ -43,7 +44,7 @@ def e( *a, **kw ):
settings = {}

participant = dds.participant()
participant.init( args.domain, 'topic-send', settings )
participant.init( dds.load_rs_settings( settings ), args.domain )

message = args.message

Expand Down Expand Up @@ -78,7 +79,16 @@ def e( *a, **kw ):
writer.run( dds.topic_writer.qos() )
# Let the client pick up on the new entity - if we send it too quickly, they won't see it before we disappear...
time.sleep( 1 )
if not writer.has_readers():
e( 'No readers exist on topic:', topic_path )
sys.exit( 1 )
start = dds.now()
dds.message.flexible( message ).write_to( writer )
i( f'Sent {message} on {topic_path}' )
if args.ack:
if not writer.wait_for_acks( dds.time( 5. ) ): # seconds
e( 'Timeout waiting for ack' )
sys.exit( 1 )
i( f'Acknowledged ({dds.timestr( dds.now(), start )})' )


6 changes: 3 additions & 3 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ void dds_device::impl::handle_notification( nlohmann::json const & j,
if( sample.size() == 2 && sample.is_array() )
{
// We have to be the ones who sent the control!
auto const guid_string = rsutils::json::get< std::string >( sample, 0 );
auto const control_guid_string = realdds::print( _control_writer->get()->guid(), false ); // raw guid
if( guid_string == control_guid_string )
auto const reply_guid = guid_from_string( rsutils::json::get< std::string >( sample, 0 ) );
auto const control_guid = _control_writer->get()->guid();
if( reply_guid == control_guid )
{
auto const sequence_number = rsutils::json::get< uint64_t >( sample, 1 );
std::unique_lock< std::mutex > lock( _replies_mutex );
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/src/dds-device-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ void dds_device_server::on_control_message_received()
{
json reply;
reply[sample_key] = json::array( {
realdds::print( sample.sample_identity.writer_guid(), false ), // raw guid
rsutils::string::from( realdds::print_raw_guid( sample.sample_identity.writer_guid() ) ),
sample.sample_identity.sequence_number().to64long(),
} );
try
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/src/dds-device-watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void dds_device_watcher::start()
init();
}
LOG_DEBUG( "DDS device watcher started on '" << _participant->get()->get_qos().name() << "' "
<< realdds::print( _participant->guid() ) );
<< realdds::print_guid( _participant->guid() ) );
}

void dds_device_watcher::stop()
Expand Down
Loading

0 comments on commit 7bc0026

Please sign in to comment.