Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FR] Listener callback: provide group ID #2728

Open
maxsharabayko opened this issue Apr 28, 2023 · 8 comments
Open

[FR] Listener callback: provide group ID #2728

maxsharabayko opened this issue Apr 28, 2023 · 8 comments
Labels
[API] Area: Changes in SRT library API Type: Enhancement Indicates new feature requests
Milestone

Comments

@maxsharabayko
Copy link
Collaborator

Problem Statement

The SRT listener callback currently (SRT v1.5.2) does not provide a way to find out SRT group ID of a socket to be accepted.

typedef int srt_listen_callback_fn(void* opaq, SRTSOCKET ns, int hsversion, const struct sockaddr* peeraddr, const char* streamid);
SRT_API int srt_listen_callback(SRTSOCKET lsn, srt_listen_callback_fn* hook_fn, void* hook_opaque);

Solution 1 (Prefered)

Extend the listener callback with the Group ID value, or better with some key-value array to enable further extensions in the future.

The Group ID value can be used to limit the number of group connections on a listener socket.

Solution 2 (Discouraged)

Calling srt_groupof(ns) returns SRT_INVALID_SOCK because the HS was not yet parsed, and no group has been created/assigned to a socket.
Creating a fake group ID value for the socket ID to be accepted may be a way around it, but would add unnecessary and potentially confusing logic.

@maxsharabayko maxsharabayko added Type: Enhancement Indicates new feature requests [API] Area: Changes in SRT library API labels Apr 28, 2023
@maxsharabayko maxsharabayko added this to the v1.6.0 milestone Apr 28, 2023
@ethouris
Copy link
Collaborator

ethouris commented Oct 5, 2023

Are you sure that it is possible to obtain this number in case when this is the first connection in the group?

@maxsharabayko maxsharabayko modified the milestones: v1.5.4, Major Jun 12, 2024
@jeandube
Copy link
Collaborator

What I really need is to distinguish a new group connection from a new link of an already connected group.

@ethouris
Copy link
Collaborator

Well, the handshake structure is passed to the runAcceptHook method, and this is also used to extract the group type that can be later obtained. Of course, there's only the PEER's group ID available this way, but there can be also called a function to find the group that has that peer ID, and that group ID can be made available through the srt_groupof call, similarly like this is done for the group type. Of course, for the very first call in the group it would return an invalid ID, but I think that's not a problem.

@maxsharabayko
Copy link
Collaborator Author

(Elaborating on @ethouris's comment)
As a hack, a group connection may be distinguished from a regular connection attempt via the SRTO_GROUPTYPE option. A regular connection will have the value 0, while a group member connection will have a non-zero group type value.

Regarding the group ID. At the point of the listener callback, we only know the group ID of a peer, but not the local one unless a local group already exists. There is a corner case though when both members connect over separate listener ports in parallel. Then there would be two listener callback calls both BEFORE a local group is created, meaning both would be treated as a new group connection request.

SRT.cn: @600335716: runAcceptHook: peer group ID @1332354199 (does not exist).
SRT.cn: @600335715: runAcceptHook: peer group ID @1332354199 (does not exist).
SRT.gm: addGroup: @1674077536  <--- Local group has been created!!!

Example 2: one member connects a bit later.

SRT.cn: @803205604: runAcceptHook: remote group ID 1904656706 (does not exist).
SRT.gm: addGroup: @1876947425 <--- Local group has been created!!!
SRT.cn: @803205603: runAcceptHook: remote group ID 1904656706 (exists) <--- Second member connects

@jeandube
Copy link
Collaborator

The group type does not distinguish an added link from a new group. The peer group ID is a good solution and probably better than the local one due to the listen_callback race condition. It tells that only one group will be created. It help count both group links and accepted connections.

@maxsharabayko
Copy link
Collaborator Author

maxsharabayko commented Jun 25, 2024

Proposal: Adding local and remote group IDs to the listen callback.

Could be added using the #ifdef SRT_160_API_PREVIEW in the next release, and replace the previous callback in v1.6.0.

Drawbacks:

  • Changes the API. Might be better to create, cache and sync new group ID before actually creating a group. Making the srt_groupod(SRTSOCKET) operational from the listen callback.
  • Simultaneous connections attempts on two or more listeners would both be identified as a new group connection, because the group is to be created AFTER the listen callback.
  • What if more info would be needed later on, e.g. member weight? Extend again?
Git diff: listener callback with group IDs (click to expand)
diff --git a/srtcore/core.cpp b/srtcore/core.cpp
index 1612830..4d8b388 100644
--- a/srtcore/core.cpp
+++ b/srtcore/core.cpp
@@ -11850,6 +11850,8 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak
     bool have_group = false;
     SRT_GROUP_TYPE gt = SRT_GTYPE_UNDEFINED;
 #endif
+   SRTSOCKET grpid = SRT_INVALID_SOCK;
+   SRTSOCKET peergrpid = SRT_INVALID_SOCK;
 
     // This tests if there are any extensions.
     if (hspkt.getLength() > CHandShake::m_iContentSize + 4 && IsSet(ext_flags, CHandShake::HS_EXT_CONFIG))
@@ -11890,6 +11892,11 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak
                 {
                     uint32_t gd = groupdata[GRPD_GROUPDATA];
                     gt = SRT_GROUP_TYPE(SrtHSRequest::HS_GROUP_TYPE::unwrap(gd));
+
+                    peergrpid = groupdata[GRPD_GROUPID];
+                    CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergrpid);
+                    if (gp)
+                        grpid = gp->id();
                 }
             }
 #endif
@@ -11922,7 +11929,7 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak
     acore->m_RejectReason = SRT_REJX_FALLBACK;
     try
     {
-        int result = CALLBACK_CALL(m_cbAcceptHook, acore->m_SocketID, hs.m_iVersion, peer, target);
+        int result = CALLBACK_CALL(m_cbAcceptHook, acore->m_SocketID, grpid, peergrpid, hs.m_iVersion, peer, target);
         if (result == -1)
             return false;
     }
diff --git a/srtcore/srt.h b/srtcore/srt.h
index 614a85a..00c97d1 100644
--- a/srtcore/srt.h
+++ b/srtcore/srt.h
@@ -762,8 +762,15 @@ static inline int srt_bind_peerof  (SRTSOCKET u, UDPSOCKET sys_udp_sock) { retur
 SRT_API       int srt_listen       (SRTSOCKET u, int backlog);
 SRT_API SRTSOCKET srt_accept       (SRTSOCKET u, struct sockaddr* addr, int* addrlen);
 SRT_API SRTSOCKET srt_accept_bond  (const SRTSOCKET listeners[], int lsize, int64_t msTimeOut);
-typedef int srt_listen_callback_fn   (void* opaq, SRTSOCKET ns, int hsversion, const struct sockaddr* peeraddr, const char* streamid);
+
+//#ifdef SRT_160_API_PREVIEW
+typedef int srt_listen_callback_fn(void* opaq, SRTSOCKET ns, SRTSOCKET grpid, SRTSOCKET peergrpid, int hsversion, const struct sockaddr* peeraddr, const char* streamid);
 SRT_API       int srt_listen_callback(SRTSOCKET lsn, srt_listen_callback_fn* hook_fn, void* hook_opaque);
+//#else
+//typedef int srt_listen_callback_fn   (void* opaq, SRTSOCKET ns, int hsversion, const struct sockaddr* peeraddr, const char* streamid);
+//SRT_API       int srt_listen_callback(SRTSOCKET lsn, srt_listen_callback_fn* hook_fn, void* hook_opaque);
+//#endif
+
 typedef void srt_connect_callback_fn  (void* opaq, SRTSOCKET ns, int errorcode, const struct sockaddr* peeraddr, int token);
 SRT_API       int srt_connect_callback(SRTSOCKET clr, srt_connect_callback_fn* hook_fn, void* hook_opaque);
 SRT_API       int srt_connect      (SRTSOCKET u, const struct sockaddr* name, int namelen);
srt-xtransmit diff
diff --git a/submodule/srt b/submodule/srt
index cf13200..72303d7 160000
--- a/submodule/srt
+++ b/submodule/srt
@@ -1 +1 @@
-Subproject commit cf132005044232689cdc890a266b976e74333ca6
+Subproject commit 72303d7934f9c6b1cbe23c438672f0eba0f318cb-dirty
diff --git a/xtransmit/misc.cpp b/xtransmit/misc.cpp
index 1bf6a55..11420fc 100644
--- a/xtransmit/misc.cpp
+++ b/xtransmit/misc.cpp
@@ -30,7 +30,7 @@ shared_sock_t create_connection(const vector<UriParser>& parsed_urls, shared_soc
                        listening_sock = make_shared<socket::srt_group>(parsed_urls);
                socket::srt_group* s = dynamic_cast<socket::srt_group*>(listening_sock.get());
                const bool  accept = s->mode() == socket::srt_group::LISTENER;
-               if (accept) {
+               if (accept && !is_listening) {
                        s->listen();
                }
                shared_sock_t connection = accept ? s->accept() : s->connect();
diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp
index ddca1bb..f0ca3a2 100644
--- a/xtransmit/srt_socket_group.cpp
+++ b/xtransmit/srt_socket_group.cpp
@@ -425,7 +425,7 @@ int socket::srt_group::on_listen_callback(SRTSOCKET sock)
        return 0;
 }

-int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion,
+int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, SRTSOCKET grpid, int hsversion,
        const struct sockaddr* peeraddr, const char* streamid)
 {
        if (opaq == nullptr)
@@ -434,16 +434,21 @@ int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsvers
                return 0;
        }

+       int optval = 0;
+       int optlen = sizeof optval;
+       srt_getsockflag(sock, SRTO_GROUPTYPE, (void*) &optval, &optlen);
+
        netaddr_any sa(peeraddr);

-       sockaddr host_sa = {};
-       int host_sa_len = sizeof host_sa;
-       srt_getsockname(sock, &host_sa, &host_sa_len);
-       netaddr_any host(&host_sa, host_sa_len);
-       spdlog::trace(LOG_SRT_GROUP "Accepted member socket @{}, host IP {}, remote IP {}", sock, host.str(), sa.str());
+       netaddr_any host_sa;
+       int host_sa_len = host_sa.storage_size();
+       srt_getsockname(sock, host_sa.get(), &host_sa_len);
+       netaddr_any host(host_sa.get(), host_sa_len);

        // TODO: this group may no longer exist. Use some global array to track valid groups.
        socket::srt_group* group = reinterpret_cast<socket::srt_group*>(opaq);
+       spdlog::trace(LOG_SRT_GROUP "@{} Accepted member socket @{}, host IP {}, remote IP {}, GT {}.", grpid, sock, host.str(), sa.str(), optval);
+
        return group->on_listen_callback(sock);
 }

diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp
index 9fbf3f4..bf294dc 100644
--- a/xtransmit/srt_socket_group.hpp
+++ b/xtransmit/srt_socket_group.hpp
@@ -70,7 +70,7 @@ private:
        void on_connect_callback(SRTSOCKET sock, int error, const sockaddr*, int token);
        static void connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token);
        int on_listen_callback(SRTSOCKET sock);
-       static int listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion,
+       static int listen_callback_fn(void* opaq, SRTSOCKET sock, SRTSOCKET grpid, int hsversion,
                const struct sockaddr* peeraddr, const char* streamid);

        using options = std::map<string, string>;

@ethouris
Copy link
Collaborator

I think there's not much we can do about that corner case when connections to the same group are being added perfectly simultaneously. Although I think that the probability of the number of simultaneous connections reporting simultaneously decreases with the number of connections.

Note that you can request yourself to be notified with the new member connection and you can simply forcefully break the connection if it exceeds the limit.

@ethouris
Copy link
Collaborator

BTW Changes in the callback function signature is a bad idea. It introduces hard backward ABI incompatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[API] Area: Changes in SRT library API Type: Enhancement Indicates new feature requests
Projects
None yet
Development

No branches or pull requests

3 participants