Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open streams working for all stream types + some bug fixes #11045

Merged
merged 25 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f64defa
Create dds_subscriber and dds_control_server
OhadMeir Oct 30, 2022
e23f732
Add UYVY support to dds_stream_profile
OhadMeir Oct 30, 2022
634aa7b
Fix control topic type
OhadMeir Oct 30, 2022
471e6f2
Add dynamic type support for dds-topic
OhadMeir Oct 30, 2022
63c9544
Fix rs-dds-sniffer reader history mode
OhadMeir Oct 30, 2022
cf0e141
Fix dds-device-server motion profile type
OhadMeir Oct 30, 2022
cab9905
Add type information to dds_stream_profile
OhadMeir Oct 30, 2022
a655f4d
Fix bug with group name at dds_device_server
OhadMeir Oct 30, 2022
7d6185e
Type in stream not profile and other PR#11045 comments
OhadMeir Oct 31, 2022
6493c65
Roll back stream type changes (will get from Eran's PR)
OhadMeir Nov 6, 2022
ac6f8bb
Merge branch 'dds' of https://github.com/IntelRealSense/librealsense …
OhadMeir Nov 6, 2022
cf39725
rs-dds-server open streams logic (not tested)
OhadMeir Nov 7, 2022
f7dbb75
Use dds-topic-reader not dds-control-server
OhadMeir Nov 8, 2022
189ea50
rs-dds-server streams upon request (not automatically)
OhadMeir Nov 8, 2022
9890c59
Merge branch 'dds' of https://github.com/IntelRealSense/librealsense …
OhadMeir Nov 8, 2022
afe730e
support open all stream types
OhadMeir Nov 9, 2022
4f9f8a5
rs-dds-server open all kinds of streams
OhadMeir Nov 9, 2022
4480f74
Fixed bug in rs-dds-server - second participant opened
OhadMeir Nov 10, 2022
a7abce6
Using dispathcer to handle controls
OhadMeir Nov 10, 2022
cc05f8c
Fix for linux compilation
OhadMeir Nov 10, 2022
a2f5fe2
Implement close-streams
OhadMeir Nov 10, 2022
f57ffa1
lrs-device-controller holds device streams
OhadMeir Nov 11, 2022
09443aa
Handle PR#11045 comments
OhadMeir Nov 13, 2022
8b54053
Fix notifications writer history size
OhadMeir Nov 14, 2022
3985402
Update dds-notification-server.cpp
maloel Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ namespace librealsense
if( ! _dds_participant.instance()->is_valid() )
_dds_participant->init( 0, "librealsense" );
_dds_watcher.instance( _dds_participant.get() );
#endif
#endif //BUILD_WITH_DDS
break;
case backend_type::record:
_backend = std::make_shared<platform::record_backend>(platform::create_backend(), filename, section, mode);
Expand Down Expand Up @@ -192,7 +192,7 @@ namespace librealsense
}
//_dds_backend = ...; TODO
}
#endif
#endif //BUILD_WITH_DDS
}


Expand Down Expand Up @@ -710,7 +710,7 @@ namespace librealsense
return platform::backend_device_group{};
}
};
#endif
#endif //BUILD_WITH_DDS

context::~context()
{
Expand All @@ -721,7 +721,7 @@ namespace librealsense
#ifdef BUILD_WITH_DDS
if( _dds_watcher )
_dds_watcher->stop();
#endif
#endif //BUILD_WITH_DDS
}

std::vector<std::shared_ptr<device_info>> context::query_devices(int mask) const
Expand Down Expand Up @@ -777,15 +777,15 @@ namespace librealsense
}
return true;
} );
#endif
#endif //BUILD_WITH_DDS

#ifdef WITH_TRACKING
if (mask & RS2_PRODUCT_LINE_T200)
{
auto tm2_devices = tm2_info::pick_tm2_devices(ctx, devices.usb_devices);
std::copy(begin(tm2_devices), end(tm2_devices), std::back_inserter(list));
}
#endif
#endif //WITH_TRACKING
// Supported recovery devices
if( mask & RS2_PRODUCT_LINE_D400 || mask & RS2_PRODUCT_LINE_SR300 || mask & RS2_PRODUCT_LINE_L500 )
{
Expand Down Expand Up @@ -894,7 +894,7 @@ namespace librealsense
_dds_watcher->on_device_removed( [this]( std::shared_ptr< realdds::dds_device > const & dev ) {} );
_dds_watcher->start();
}
#endif
#endif //BUILD_WITH_DDS

uint64_t context::register_internal_device_callback(devices_changed_callback_ptr callback)
{
Expand All @@ -920,7 +920,7 @@ namespace librealsense
#ifdef BUILD_WITH_DDS
if( _dds_watcher )
_dds_watcher->stop();
#endif
#endif //BUILD_WITH_DDS
}
}

Expand Down
26 changes: 22 additions & 4 deletions third-party/realdds/include/realdds/dds-device-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,34 @@

#pragma once

#include <librealsense2/utilities/concurrency/concurrency.h>
#include <third-party/json_fwd.hpp>

#include <unordered_map>
#include <vector>
#include <memory>
#include <string>

#include <functional>

namespace realdds {


// Forward declaration
namespace topics {
class flexible_msg;
class device_info;
namespace raw {
class device_info;
} // namespace raw
class device_info;
} // namespace topics


class dds_participant;
class dds_publisher;
class dds_subscriber;
class dds_stream_server;
class dds_notification_server;
class dds_topic_reader;
struct image_header;


Expand Down Expand Up @@ -57,16 +62,29 @@ class dds_device_server
bool is_valid() const { return( nullptr != _notification_server.get() ); }
bool operator!() const { return ! is_valid(); }

void start_streaming( const std::string & stream_name, const image_header & header );

void start_streaming( const std::vector< std::pair < std::string, image_header > > & ); //< stream_name, header > pairs
void stop_streaming( const std::vector< std::string > & stream_to_close );

void publish_image( const std::string & stream_name, const uint8_t * data, size_t size );
void publish_notification( topics::flexible_msg && );

typedef std::function< void( const nlohmann::json & msg ) > control_callback;
void on_open_streams( control_callback callback ) { _open_streams_callback = std::move( callback ); }
void on_close_streams( control_callback callback ) { _close_streams_callback = std::move( callback ); }

private:
void on_control_message_received();
void handle_control_message( topics::flexible_msg control_message );

std::shared_ptr< dds_publisher > _publisher;
std::shared_ptr< dds_subscriber > _subscriber;
std::string _topic_root;
std::unordered_map<std::string, std::shared_ptr<dds_stream_server>> _stream_name_to_server;
std::shared_ptr< dds_notification_server > _notification_server;
std::shared_ptr< dds_topic_reader > _control_reader;
dispatcher _control_dispatcher;
control_callback _open_streams_callback = nullptr;
control_callback _close_streams_callback = nullptr;
}; // class dds_device_server


Expand Down
7 changes: 4 additions & 3 deletions third-party/realdds/include/realdds/dds-stream-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ class dds_stream_profile;
struct image_header
{
int format;
int height = 0;
int width = 0;
int height = -1;
maloel marked this conversation as resolved.
Show resolved Hide resolved
int width = -1;

bool is_valid() const { return width != 0 && height != 0; }
bool is_valid() const { return width != -1 && height != -1; }
void invalidate() { width = -1; height = -1; }
};


Expand Down
43 changes: 43 additions & 0 deletions third-party/realdds/include/realdds/dds-subscriber.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2022 Intel Corporation. All Rights Reserved.

#pragma once

#include <memory>


namespace eprosima {
namespace fastdds {
namespace dds {
class Subscriber;
} // namespace dds
} // namespace fastdds
} // namespace eprosima


namespace realdds {


class dds_participant;


// The Subscriber manages the activities of several dds_topic_reader (DataReader) entities
//
class dds_subscriber
{
std::shared_ptr< dds_participant > _participant;

eprosima::fastdds::dds::Subscriber * _subscriber;

public:
dds_subscriber( std::shared_ptr< dds_participant > const & participant );
~dds_subscriber();

eprosima::fastdds::dds::Subscriber * get() const { return _subscriber; }
eprosima::fastdds::dds::Subscriber * operator->() const { return get(); }

std::shared_ptr< dds_participant > const & get_participant() const { return _participant; }
};


} // namespace realdds
8 changes: 6 additions & 2 deletions third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@ namespace realdds {


class dds_topic;

class dds_subscriber;

// The 'reader' is the entity used to subscribe to updated values of data in a topic. It is bound at creation to this
// topic.
//
// You may choose to create one via a 'subscriber' that manages the activities of several readers.
// on_data_available callback will be called when a sample is received.
//
class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
{
std::shared_ptr< dds_topic > const _topic;
std::shared_ptr < dds_subscriber > const _subscriber;

eprosima::fastdds::dds::Subscriber * _subscriber = nullptr;
eprosima::fastdds::dds::DataReader * _reader = nullptr;

public:
dds_topic_reader( std::shared_ptr< dds_topic > const & topic );
dds_topic_reader( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber );
~dds_topic_reader();

eprosima::fastdds::dds::DataReader * get() const { return _reader; }
Expand Down
13 changes: 7 additions & 6 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ bool dds_device::impl::init()
if( ! notification.is_valid() )
continue;
auto j = notification.json_data();
auto id = j["id"].get< std::string >();
auto id = utilities::json::get< std::string >( j, "id" );
if( state_type::WAIT_FOR_DEVICE_HEADER == state && id == "device-header" )
{
n_streams_expected = utilities::json::get< size_t >( j, "n-streams" );
LOG_INFO( "... device-header: " << n_streams_expected << " streams expected" );
LOG_DEBUG( "... device-header: " << n_streams_expected << " streams expected" );
if( n_streams_expected )
state = state_type::WAIT_FOR_PROFILES;
else
Expand Down Expand Up @@ -184,8 +184,8 @@ bool dds_device::impl::init()
"failed to instantiate stream type '" + stream_type + "' (instead, got '"
+ stream->type_string() + "')" );
stream->init_profiles( profiles, default_profile_index );
LOG_INFO( "... stream '" << stream_name << "' (" << _streams.size() << "/" << n_streams_expected
<< ") received with " << profiles.size() << " profiles" );
LOG_DEBUG( "... stream '" << stream_name << "' (" << _streams.size() << "/" << n_streams_expected
<< ") received with " << profiles.size() << " profiles" );
if( _streams.size() >= n_streams_expected )
state = state_type::DONE;
}
Expand All @@ -196,8 +196,9 @@ bool dds_device::impl::init()
}
}
}
if( state_type::DONE != state )
LOG_DEBUG( "timed out; state is " << state );

LOG_DEBUG( "... " << ( state_type::DONE == state ? "" : "timed out; state is " ) << state );

return ( state_type::DONE == state );
}

Expand Down
Loading