From e4cf5f4f39345ee0d2d28c792d1a00e0d6089940 Mon Sep 17 00:00:00 2001 From: jianjunz Date: Tue, 31 Aug 2021 12:59:10 +0800 Subject: [PATCH] Allow data flow from QUIC agent to video agent. (#1065) * WebTransportStream sends frames to internal server. * Allow data flow between QUIC agent and video agent. * Add WebTransportFrameSource to associate multiple streams to a single publication. --- doc/Client-Portal Protocol.md | 13 +- doc/design/pics/.gitkeep | 0 doc/design/pics/quic_agent_data_flow.svg | 397 ++++++++++++++++++ doc/design/quic-agent.md | 115 +++++ doc/design/quic-programming-guide.md | 37 +- doc/design/quic-transport-payload-format.md | 44 -- .../addons/common/MediaFramePipelineWrapper.h | 12 +- .../internalIO/InternalClientWrapper.cc | 41 +- .../internalIO/InternalServerWrapper.cc | 18 +- .../addons/internalIO/InternalServerWrapper.h | 5 +- .../agent/addons/quic/QuicTransportStream.cc | 245 +++++++++-- .../agent/addons/quic/QuicTransportStream.h | 51 ++- .../quic/WebTransportFrameDestination.cc | 48 +++ .../quic/WebTransportFrameDestination.h | 31 ++ .../addons/quic/WebTransportFrameSource.cc | 103 +++++ .../addons/quic/WebTransportFrameSource.h | 53 +++ source/agent/addons/quic/addon.cc | 6 +- source/agent/addons/quic/binding.gyp | 4 +- source/agent/addons/quic/quic_sdk_url | 2 +- source/agent/conference/quicController.js | 27 +- source/agent/connections.js | 6 +- source/agent/internalConnectionRouter.js | 14 +- source/agent/quic/index.js | 64 ++- .../quic/webtransport/quicTransportServer.js | 9 + .../quicTransportStreamPipeline.js | 4 - source/core/owt_base/MediaFramePipeline.h | 12 +- 26 files changed, 1167 insertions(+), 194 deletions(-) delete mode 100644 doc/design/pics/.gitkeep create mode 100644 doc/design/pics/quic_agent_data_flow.svg create mode 100644 doc/design/quic-agent.md delete mode 100644 doc/design/quic-transport-payload-format.md create mode 100644 source/agent/addons/quic/WebTransportFrameDestination.cc create mode 100644 source/agent/addons/quic/WebTransportFrameDestination.h create mode 100644 source/agent/addons/quic/WebTransportFrameSource.cc create mode 100644 source/agent/addons/quic/WebTransportFrameSource.h diff --git a/doc/Client-Portal Protocol.md b/doc/Client-Portal Protocol.md index 02e7a4857..30b8f0eab 100644 --- a/doc/Client-Portal Protocol.md +++ b/doc/Client-Portal Protocol.md @@ -362,36 +362,41 @@ This a format for client reconnects. ``` object(PublicationRequest):: { - media: object(WebRTCMediaOptions) | null, + media: object(MediaOptions) | null, data: true | false, transport: object(TransportOptions), attributes: object(ClientDefinedAttributes) | null } ``` -A publication can send either media or data, but a QUIC *transport* channel can support multiple stream for both media and data. Setting `media:null` and `data:false` is meaningless, so it should be rejected by server. Protocol itself doesn't forbit to create WebRTC connection for data. However, SCTP data channel is not implemented at server side, so currently `data:true` is only support by QUIC transport channels. +A publication can send either media or data. Setting `media:null` and `data:false` is meaningless, so it should be rejected by server. Protocol itself doesn't forbid to create WebRTC connection for data. However, SCTP data channel is not implemented at server side, so currently `data:true` is only support by WebTransport channels. ``` - object(WebRTCMediaOptions):: + object(MediaOptions):: { tracks: [ { type: "audio" | "video", - mid: string(MID), + mid: string(MID) | undefined, /* undefined if transport's type is "quic" */ source: "mic" | "screen-cast" | ... | "encoded-file", + format: object(AudioFormat) | object(VideoFormat) | undefined /* undefined if transport's type is "webrtc" */ } ] } } ``` + **ResponseData**: The PublicationResult object with following definition if **ResponseStatus** is “ok”: +``` object(PublicationResult):: { transportId: string(transportId), // Can be reused in the following publication or subscription. id: string(SessionId) //will be used as the stream id when it gets ready. } +``` + ### 3.3.8 Participant Stops Publishing a Stream to Room **RequestName**: “unpublish”
diff --git a/doc/design/pics/.gitkeep b/doc/design/pics/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/doc/design/pics/quic_agent_data_flow.svg b/doc/design/pics/quic_agent_data_flow.svg new file mode 100644 index 000000000..fc9bc7639 --- /dev/null +++ b/doc/design/pics/quic_agent_data_flow.svg @@ -0,0 +1,397 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + Page-1 + + + + + + + + + + + + + + + + Entity 2 + + Sheet.4 + + + + Sheet.5 + QUIC Agent + + + + QUIC Agent + + + Dynamic connector + Stream (audio 1) + + + + + + + + Stream (audio 1) + + Entity 1.12 + Client (receiving 1 stream) + + + + + + + + + + + + + Client(receiving 1 stream) + + Dynamic connector.13 + Stream (audio 2) + + + + + + + + Stream (audio 2) + + Dynamic connector.15 + Datagram (video 1, video 2) + + + + + + + + Datagram (video 1, video 2) + + Rectangle + Internal Connection Module + + + + + + + + Internal Connection Module + + Dynamic connector.18 + Datagram (video 3) + + + + + + + + Datagram (video 3) + + Dynamic connector.19 + Stream (audio 3) + + + + + + + + Stream (audio 3) + + Dynamic connector.20 + Audio 1 and video 1 + + + + + + + + Audio 1 and video 1 + + Dynamic connector.21 + Audio 2 and video 2 + + + + + + + + Audio 2 and video 2 + + Dynamic connector.22 + Audio 3 + + + + + + + + Audio 3 + + Dynamic connector.23 + Video 3 + + + + + + + + Video 3 + + Rectangle.1002 + WebTransportFrameSource + + + + + + + + WebTransportFrameSource + + Rectangle.1003 + WebTransportFrameSource + + + + + + + + WebTransportFrameSource + + Rectangle.1004 + Datagram source + + + + + + + + Datagram source + + Rectangle.1005 + Client (sending 2 streams with audio and video) + + + + + + + + Client(sending 2 streams with audio and video) + + Dynamic connector.1006 + Frames + + + + + + + + Frames + + Dynamic connector.1007 + Frames + + + + + + + + Frames + + Entity 1.1008 + Client (sending 1 data stream and receiving a data stream) + + + + + + + + + + + + + Client(sending 1 data stream and receiving a data stream) + + Rectangle.1009 + WebTransportFrameSource + + + + + + + + WebTransportFrameSource + + Rectangle.1010 + WebTransportFrameDestination + + + + + + + + WebTransportFrameDestination + + Dynamic connector.1011 + Stream1 + + + + + + + + Stream1 + + Dynamic connector.1012 + Stream 2 + + + + + + + + Stream 2 + + Dynamic connector.1013 + Stream 1 + + + + + + + + Stream 1 + + Dynamic connector.1014 + Stream 2 + + + + + + + + Stream 2 + + Rectangle.1015 + WebTransportFrameDestination + + + + + + + + WebTransportFrameDestination + + Rectangle.1016 + Datagram Destination + + + + + + + + Datagram Destination + + Dynamic connector.1017 + Frames + + + + + + + + Frames + + diff --git a/doc/design/quic-agent.md b/doc/design/quic-agent.md new file mode 100644 index 000000000..b50c154ea --- /dev/null +++ b/doc/design/quic-agent.md @@ -0,0 +1,115 @@ +# QUIC agent + +## Overview +QUIC agents are designed for [WebTransport](https://w3c.github.io/webtransport/) over HTTP/3 connections. A WebTransport connection could send and receive arbitrary data, as well as media data encoded or can be decoded by [WebCodecs](https://www.w3.org/TR/webcodecs/). + +## Architecture and dataflow + +![data flow](./pics/quic_agent_data_flow.svg) + +A WebTransportFrameSource handles all audio and video frames for a publication. A WebTransportFrameDestination dispatches audio and video frames to different WebTransport streams or a datagram sender. + +A DatagramSource processes datagrams (RTP packets) received from client side, depacketizes them to create audio or video frames, and dispatches media frames to a WebTransportFrameSource. It also handles FEC and NACK, similar to RTCRtpReceiver in WebRTC. A DatagramDestination is similar to RTCRtpSender. + +## WebTransport payload and message format + +This section defines the payload and message format for data transmitted over WebTransport. + +### Streams + +Both server and client can initialize a stream. When a stream is created, initial side sends a session ID, which is a 128 bit length message to the remote side. Session ID could be a publication ID or subscription ID as defined in [Client-Portal Protocol](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md). As the session ID issued by server may less than 128 bit right now, fill it with 0 in most significant bits. Session ID 0 is reserved for signaling. When remote side receives the session ID, it should check whether session ID is valid. Terminate the stream if session ID is invalid, or send the same session ID to client if it is valid. Depends on the type of stream it created, one side or both sides are ready to send data. + +### Datagram + +Each package has a 128 bit header for session ID. + +``` + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + | Session Identifier | + | .... | + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Datagram Data (*) ... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +``` + +It may increase about 2% network cost. + +### Signaling Session + +After creating a WebTransport, a stream with session 0 should be created for authentication and signaling. Every signaling message is followed by a 32 bit length integer that indicates the body's length. + +``` + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Message length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Message ... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +``` + +### Media Stream + +After sending 128 bit length session ID, a 128 bit length track ID is sent to remote side to indicates the track of a stream. Since audio track and video track of a single stream shares the same track ID at this time, track 1 is for audio and track 2 is for video. + +When a WebTransport stream is used for transmitting data of a media stream track (e.g.: H.264 bitstream), a 32 (8+24) bit length header is added to indicate frame size. + +``` + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Reserved | Message length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Message ... + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +``` + +### Authentication + +If signaling messages are transmitted over WebTransport, authentication follows the regular process defined by [Client-Portal Protocol](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md). Otherwise, client sends a token for WebTransport as a signaling message. WebTransport token is issued during joining a conference. If the token is valid, server sends a 128 bit length zeros to client. + +## Build conference server with QUIC agent + +Because we don't have a good place to store pre-built QUIC SDK for public access, QUIC agent is not enabled by default. Additional flags are required to enable QUIC agent. + +1. Download QUIC SDK from the URL specified [here](https://github.com/open-webrtc-toolkit/owt-server/blob/master/source/agent/addons/quic/quic_sdk_url). QUIC SDK is hosted on GitHub as an artifact. You will need to follow [this description](https://docs.github.com/en/rest/reference/actions#download-an-artifact) to make a REST request to GitHub. Or you can download the latest QUIC SDK from [GitHub Actions](https://github.com/open-webrtc-toolkit/owt-sdk-quic/actions) tab. Commits pushed to main branch have artifact for downloading. +1. After running `installDeps.sh`, put headers to build/libdeps/build/include, and put libraries(.so file) to build/libdeps/build/lib. +1. Append `-t quic` to the arguments for build.js. +1. Append `-t quic-agent` to the arguments for pack.js. + +## Certificate for QUIC + +OWT Conference Server is using a self-signed certificate during development phase, which would be only valid for 14 days. You can use a CA-signed certificate to avoid refreshing the certificate periodically. A CA-signed certificate is recommended for production environment. WebTransport connection will fail if certificate is not valid or expires. + +### Certificates signed by a trusted CA + +- Copy your PKCS12 format certificate to `quic_agent/cert/` directory to replace the one there. +- Restart Conference Server QUIC agent to apply the change. +- Don't provide any fingerprint in client applications. + +### Generate self-signed certificates + +#### Precondition +- Make sure you are running the tool under Linux and, +- Openssl tool is correctly setup in your system. +- Download the tool under chromium/src/net/tools/quic/certs/ from chromium project ([v93.0.4575.1](https://chromium.googlesource.com/chromium/src/+archive/refs/tags/93.0.4575.1/net/tools/quic/certs.tar.gz.)) to local directory named `tool`. This contains three files: `ca.cnf`, `generate-certs.sh` and `leaf.cnf`. + +#### Certificate Generation + +- Modify leaf.cnf, adding an entry into `other_hosts` section. +- Make sure generate-certs.sh is executable. If not, run `chmod +x generate-certs.sh`; +- Remove the `out` dir in case it exists. +- Under the downloaded tool dir, run `./generate-certs.sh`. It is expected to generate a series of files under out dir. +- Under the downloaded tool dir, run `openssl pkcs12 -inkey out/leaf_cert.key -in out/leaf_cert.pem -export -out out/certificate.pfx`. This will prompt for password for the pfx. Please type the certificate password of your conference server. The default password is `abc123`. +- Under the downloaded tool dir, run `openssl x509 -noout -fingerprint -sha256 -inform pem -in out/leaf_cert.pem`. You will get the fingerprint string in the form of "XX:XX:XX....XX:XX". + +#### Use the Certificate + +- Copy the generated certificate.pfx under `out` dir to `quic_agent/cert/` dir to replace the one there. +- Restart Conference Server QUIC agent to apply the change. +- If you're using JavaScript sample for QUIC, make sure you also update JS sample with the new fingerprint. +- In your native client sample, make sure you include the fingerprint of new cert in the `ConferenceClientConfiguration.trusted_quic_certificate_fingerprints` you passed to `ConferenceClient` ctor. See more details in the conference sample. diff --git a/doc/design/quic-programming-guide.md b/doc/design/quic-programming-guide.md index 129e061b3..f6134e845 100644 --- a/doc/design/quic-programming-guide.md +++ b/doc/design/quic-programming-guide.md @@ -123,15 +123,6 @@ Please see the conference sample application for more detailed usage. Please follow [Conference Server build instructions](https://github.com/open-webrtc-toolkit/owt-server/blob/master/README.md) on how to build and deploy the conference server. -## Build Conference Server with QUIC agent - -Because we don't have a good place to store pre-built QUIC SDK for public access, QUIC agent is not enabled by default. Additional flags are required to enable QUIC agent. - -1. Download QUIC SDK from the URL specified [here](https://github.com/open-webrtc-toolkit/owt-server/blob/master/source/agent/addons/quic/quic_sdk_url). QUIC SDK is hosted on GitHub as an artifact. You will need to follow [this description](https://docs.github.com/en/rest/reference/actions#download-an-artifact) to make a REST request to GitHub. Or you can download the latest QUIC SDK from [GitHub Actions](https://github.com/open-webrtc-toolkit/owt-sdk-quic/actions) tab. Commits pushed to main branch have artifact for downloading. -1. After running `installDeps.sh`, put headers to build/libdeps/build/include, and put libraries(.so file) to build/libdeps/build/lib. -1. Append `-t quic` to the arguments for build.js. -1. Append `-t quic-agent` to the arguments for pack.js. - ## How to use Pre-built Conference Server Binary Steps to run Conference Server with pre-built binary: @@ -146,30 +137,4 @@ Steps to run Conference Server with pre-built binary: # OWT QUIC Windows Sample -The Windows sample will be provided in OWT repo separately. More details will be provided later. - -# How to Replace the Certificate for QUIC - -OWT Conference Server is using a self-signed certificate during development phase, which would be only valid for 14 days. You can use a CA-signed certificate to avoid refreshing the certificate periodically. WebTransport connection will fail if certificate is not valid or expires. - -## Precondition - -- Make sure you are running the tool under Linux and, -- Openssl tool is correctly setup in your system. -- Download the tool under chromium/src/net/tools/quic/certs/ from chromium project to local dir named `tool`. This contains three files: `ca.cnf`, `generate-certs.sh` and `leaf.cnf`. - -## Certificate Generation - -- Modify leaf.cnf, adding an entry into `other_hosts` section. -- Make sure generate-certs.sh is exectuable. If not, run `chmod +x generate-certs.sh`; -- Remove the `out` dir in case it exists. -- Under the downloaded tool dir, run `./generate-certs.sh`. It is expected to generate a series of files under out dir. -- Under the downloaded tool dir, run `openssl pkcs12 -inkey out/leaf_cert.key -in out/leaf_cert.pem -export -out out/certificate.pfx`. This will prompt for password for the pfx. Make sure you always use `abc123` as the password. -- Under the downloaded tool dir, run `openssl x509 -noout -fingerprint -sha256 -inform pem -in out/leaf_cert.pem`. You will get the fingerprint string in the form of "XX:XX:XX....XX:XX". - -## Use the Certificate - -- Copy the generated certificate.pfx under `out` dir to `quic_agent/cert/` dir to replace the one there. -- Restart Conference Server QUIC agent to apply the change. If you're using JS sample for QUIC, make sure you also update JS sample with the new fingerprint. -- In your native client sample, make sure you include the fingerprint of new cert in the `ConferenceClientConfiguration.trusted_quic_certificate_fingerprints` you passed to `ConferenceClient` ctor. See more details in the conference sample. - +The Windows sample will be provided in OWT repo separately. More details will be provided later. \ No newline at end of file diff --git a/doc/design/quic-transport-payload-format.md b/doc/design/quic-transport-payload-format.md deleted file mode 100644 index 6102c5887..000000000 --- a/doc/design/quic-transport-payload-format.md +++ /dev/null @@ -1,44 +0,0 @@ -# QUIC Transport Payload and Message Format - -This post defines the payload and message format for data transmitted over [WebTransport](https://w3c.github.io/webtransport/#web-transport). - -## Streams - -Both server and client can initialize a stream. When a stream is created, initial side sends a session ID, which is a 128 bit length message to the remote side. Session ID could be a publication ID or subscription ID as defined in [Client-Portal Protocol](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md). As the session ID issued by server may less than 128 bit right now, fill it with 0 in most significant bits. Session ID 0 is reserved for signaling. When remote side receives the session ID, it should check whether session ID is valid. Terminate the stream if session ID is invalid, or send the same session ID to client if it is valid. Depends on the type of stream it created, one side or both sides are ready to send data. - -## Datagram - -Each package has a 128 bit header for session ID. - -``` - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | | - | Session Identifier | - | .... | - | | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Datagram Data (*) ... - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -``` - -It may increase about 2% network cost. - -## Signaling Session - -After creating a WebTransport, a stream with session 0 should be created for authentication and signaling. Every signaling message is followed by a 32 bit length integer that indicates the body's length. - -``` - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Message length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Message ... - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -``` - -## Authentication - -If signaling messages are transmitted over WebTransport, authentication follows the regular process defined by [Client-Portal Protocol](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md). Otherwise, client sends a token for WebTransport as a signaling message. WebTransport token is issued during joining a conference. If the token is valid, server sends a 128 bit length zeros to client. \ No newline at end of file diff --git a/source/agent/addons/common/MediaFramePipelineWrapper.h b/source/agent/addons/common/MediaFramePipelineWrapper.h index 8b3f5ba3f..112714357 100644 --- a/source/agent/addons/common/MediaFramePipelineWrapper.h +++ b/source/agent/addons/common/MediaFramePipelineWrapper.h @@ -6,10 +6,10 @@ #ifndef MEDIAFRAMEPIPELINEWRAPPER_H #define MEDIAFRAMEPIPELINEWRAPPER_H +#include +#include #include #include -#include - /* * Wrapper class of owt_base::FrameDestination @@ -30,5 +30,13 @@ class FrameSource : public node::ObjectWrap{ owt_base::FrameSource* src; }; +/* + * Nan::ObjectWrap of owt_base::FrameSource and owt_base::FrameDestination, represents a node in the media or data pipeline. + */ +class NanFrameNode : public Nan::ObjectWrap { +public: + virtual owt_base::FrameSource* FrameSource() = 0; + virtual owt_base::FrameDestination* FrameDestination() = 0; +}; #endif \ No newline at end of file diff --git a/source/agent/addons/internalIO/InternalClientWrapper.cc b/source/agent/addons/internalIO/InternalClientWrapper.cc index 357844bcc..d0d4d257c 100644 --- a/source/agent/addons/internalIO/InternalClientWrapper.cc +++ b/source/agent/addons/internalIO/InternalClientWrapper.cc @@ -88,23 +88,36 @@ NAN_METHOD(InternalClient::close) { obj->me = nullptr; } -NAN_METHOD(InternalClient::addDestination) { - InternalClient* obj = ObjectWrap::Unwrap(info.Holder()); - owt_base::InternalClient* me = obj->me; +NAN_METHOD(InternalClient::addDestination) +{ + InternalClient* obj = ObjectWrap::Unwrap(info.Holder()); + owt_base::InternalClient* me = obj->me; - Nan::Utf8String param0(Nan::To(info[0]).ToLocalChecked()); - std::string track = std::string(*param0); + Nan::Utf8String param0(Nan::To(info[0]).ToLocalChecked()); + std::string track = std::string(*param0); - FrameDestination* param = - ObjectWrap::Unwrap( - info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); - owt_base::FrameDestination* dest = param->dest; + bool isNanDestination(false); + if (info.Length() >= 3) { + isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value(); + } - if (track == "audio") { - me->addAudioDestination(dest); - } else if (track == "video") { - me->addVideoDestination(dest); - } + owt_base::FrameDestination* dest(nullptr); + if (isNanDestination) { + NanFrameNode* param = Nan::ObjectWrap::Unwrap(info[1]->ToObject()); + dest = param->FrameDestination(); + } else { + FrameDestination* param = ObjectWrap::Unwrap( + info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); + dest = param->dest; + } + + if (track == "audio") { + me->addAudioDestination(dest); + } else if (track == "video") { + me->addVideoDestination(dest); + } else if (track == "data") { + me->addDataDestination(dest); + } } NAN_METHOD(InternalClient::removeDestination) { diff --git a/source/agent/addons/internalIO/InternalServerWrapper.cc b/source/agent/addons/internalIO/InternalServerWrapper.cc index 0443a2838..efe1d0cb0 100644 --- a/source/agent/addons/internalIO/InternalServerWrapper.cc +++ b/source/agent/addons/internalIO/InternalServerWrapper.cc @@ -99,10 +99,20 @@ NAN_METHOD(InternalServer::addSource) { Nan::Utf8String param0(Nan::To(info[0]).ToLocalChecked()); std::string streamId = std::string(*param0); - FrameSource* param = - ObjectWrap::Unwrap( - info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); - owt_base::FrameSource* src = param->src; + bool isNanSource(false); + if (info.Length() >= 3) { + isNanSource = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value(); + } + + owt_base::FrameSource* src(nullptr); + if (isNanSource) { + NanFrameNode* param = Nan::ObjectWrap::Unwrap(info[1]->ToObject()); + src = param->FrameSource(); + } else { + FrameSource* param = ObjectWrap::Unwrap( + info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); + src = param->src; + } me->addSource(streamId, src); } diff --git a/source/agent/addons/internalIO/InternalServerWrapper.h b/source/agent/addons/internalIO/InternalServerWrapper.h index 65831e980..8ffce22ab 100644 --- a/source/agent/addons/internalIO/InternalServerWrapper.h +++ b/source/agent/addons/internalIO/InternalServerWrapper.h @@ -44,7 +44,10 @@ class InternalServer : public node::ObjectWrap, static NAN_METHOD(close); static NAN_METHOD(getListeningPort); - + // Arguments: + // type: string, type of the source, "audio", "video" or "data". + // source: A node addon object or NAN object. + // isNanObject: indicates whether `source` is a NAN object. static NAN_METHOD(addSource); static NAN_METHOD(removeSource); diff --git a/source/agent/addons/quic/QuicTransportStream.cc b/source/agent/addons/quic/QuicTransportStream.cc index 5a86d8de4..88fc88bf7 100644 --- a/source/agent/addons/quic/QuicTransportStream.cc +++ b/source/agent/addons/quic/QuicTransportStream.cc @@ -5,6 +5,7 @@ */ #include "QuicTransportStream.h" +#include "../common/MediaFramePipelineWrapper.h" using v8::Function; using v8::FunctionTemplate; @@ -18,6 +19,7 @@ DEFINE_LOGGER(QuicTransportStream, "QuicTransportStream"); Nan::Persistent QuicTransportStream::s_constructor; const int uuidSizeInBytes = 16; +const int frameHeaderSize = 4; QuicTransportStream::QuicTransportStream() : QuicTransportStream(nullptr) @@ -25,11 +27,21 @@ QuicTransportStream::QuicTransportStream() } QuicTransportStream::QuicTransportStream(owt::quic::WebTransportStreamInterface* stream) : m_stream(stream) - , m_contentSessionId() - , m_receivedContentSessionId(false) + , m_contentSessionId(uuidSizeInBytes) + , m_receivedContentSessionIdSize(0) + , m_trackId(uuidSizeInBytes) + , m_receivedTrackIdSize(0) + , m_readingTrackId(false) , m_isPiped(false) + , m_hasSink(false) , m_buffer(nullptr) , m_bufferSize(0) + , m_isMedia(false) + , m_readingFrameSize(false) + , m_frameSizeOffset(0) + , m_frameSizeArray(new uint8_t[frameHeaderSize]) + , m_currentFrameSize(0) + , m_receivedFrameOffset(0) { } @@ -38,6 +50,9 @@ QuicTransportStream::~QuicTransportStream() if (!uv_is_closing(reinterpret_cast(&m_asyncOnContentSessionId))) { uv_close(reinterpret_cast(&m_asyncOnContentSessionId), NULL); } + if (!uv_is_closing(reinterpret_cast(&m_asyncOnTrackId))) { + uv_close(reinterpret_cast(&m_asyncOnTrackId), NULL); + } if (!uv_is_closing(reinterpret_cast(&m_asyncOnData))) { uv_close(reinterpret_cast(&m_asyncOnData), NULL); } @@ -47,8 +62,10 @@ QuicTransportStream::~QuicTransportStream() void QuicTransportStream::OnCanRead() { - if (!m_receivedContentSessionId) { - MaybeReadContentSessionId(); + if (m_receivedContentSessionIdSize < uuidSizeInBytes) { + ReadContentSessionId(); + } else if (m_readingTrackId) { + ReadTrackId(); } else { SignalOnData(); } @@ -64,6 +81,37 @@ void QuicTransportStream::OnFinRead() ELOG_DEBUG("On FIN read."); } +void QuicTransportStream::AddedDestination() +{ + m_hasSink = true; + if (m_stream->ReadableBytes() > 0) { + SignalOnData(); + } +} + +void QuicTransportStream::RemovedDestination() +{ + // When all destinations are removed, set m_hasSink to false. +} + +void QuicTransportStream::addAudioDestination(owt_base::FrameDestination* dest) +{ + owt_base::FrameSource::addAudioDestination(dest); + AddedDestination(); +} + +void QuicTransportStream::addVideoDestination(owt_base::FrameDestination* dest) +{ + owt_base::FrameSource::addVideoDestination(dest); + AddedDestination(); +}; + +void QuicTransportStream::addDataDestination(owt_base::FrameDestination* dest) +{ + owt_base::FrameSource::addDataDestination(dest); + AddedDestination(); +}; + NAN_MODULE_INIT(QuicTransportStream::init) { Local tpl = Nan::New(newInstance); @@ -72,7 +120,9 @@ NAN_MODULE_INIT(QuicTransportStream::init) instanceTpl->SetInternalFieldCount(1); Nan::SetPrototypeMethod(tpl, "write", write); + Nan::SetPrototypeMethod(tpl, "readTrackId", readTrackId); Nan::SetPrototypeMethod(tpl, "addDestination", addDestination); + Nan::SetAccessor(instanceTpl, Nan::New("isMedia").ToLocalChecked(), isMediaGetter, isMediaSetter); s_constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked()); Nan::Set(target, Nan::New("QuicTransportStream").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked()); @@ -87,6 +137,7 @@ NAN_METHOD(QuicTransportStream::newInstance) QuicTransportStream* obj = new QuicTransportStream(); obj->Wrap(info.This()); uv_async_init(uv_default_loop(), &obj->m_asyncOnContentSessionId, &QuicTransportStream::onContentSessionId); + uv_async_init(uv_default_loop(), &obj->m_asyncOnTrackId, &QuicTransportStream::onTrackId); uv_async_init(uv_default_loop(), &obj->m_asyncOnData, &QuicTransportStream::onData); info.GetReturnValue().Set(info.This()); } @@ -113,20 +164,54 @@ NAN_METHOD(QuicTransportStream::close) NAN_METHOD(QuicTransportStream::addDestination) { QuicTransportStream* obj = Nan::ObjectWrap::Unwrap(info.Holder()); - if (info.Length() != 2) { + if (info.Length() > 3) { Nan::ThrowTypeError("Invalid argument length for addDestination."); return; } - // TODO: Check if info[0] is an Nan wrapped object. - auto framePtr = Nan::ObjectWrap::Unwrap(info[1]->ToObject()); - // void* ptr = info[0]->ToObject()->GetAlignedPointerFromInternalField(0); - // auto framePtr=static_cast(ptr); - obj->addDataDestination(framePtr); - obj->m_isPiped = true; + Nan::Utf8String param0(Nan::To(info[0]).ToLocalChecked()); + std::string track = std::string(*param0); + bool isNanDestination(false); + if (info.Length() == 3) { + isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value(); + } + owt_base::FrameDestination* dest(nullptr); + if (isNanDestination) { + NanFrameNode* param = Nan::ObjectWrap::Unwrap(info[1]->ToObject()); + dest = param->FrameDestination(); + } else { + ::FrameDestination* param = node::ObjectWrap::Unwrap<::FrameDestination>( + info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); + dest = param->dest; + } + if (track == "audio") { + obj->addAudioDestination(dest); + } else if (track == "video") { + obj->addVideoDestination(dest); + } else if (track == "data") { + obj->addDataDestination(dest); + } } NAN_METHOD(QuicTransportStream::removeDestination) { + QuicTransportStream* obj = Nan::ObjectWrap::Unwrap(info.Holder()); + obj->m_hasSink = false; +} + +NAN_METHOD(QuicTransportStream::readTrackId){ + QuicTransportStream* obj = Nan::ObjectWrap::Unwrap(info.Holder()); + obj->ReadTrackId(); +} + +NAN_GETTER(QuicTransportStream::isMediaGetter){ + QuicTransportStream* obj = Nan::ObjectWrap::Unwrap(info.Holder()); + info.GetReturnValue().Set(Nan::New(obj->m_isMedia)); +} + +NAN_SETTER(QuicTransportStream::isMediaSetter) +{ + QuicTransportStream* obj = Nan::ObjectWrap::Unwrap(info.Holder()); + obj->m_isMedia = value->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value(); } v8::Local QuicTransportStream::newInstance(owt::quic::WebTransportStreamInterface* stream) @@ -134,29 +219,51 @@ v8::Local QuicTransportStream::newInstance(owt::quic::WebTransportSt Local streamObject = Nan::NewInstance(Nan::New(QuicTransportStream::s_constructor)).ToLocalChecked(); QuicTransportStream* obj = Nan::ObjectWrap::Unwrap(streamObject); obj->m_stream = stream; - obj->MaybeReadContentSessionId(); + obj->ReadContentSessionId(); return streamObject; } -void QuicTransportStream::MaybeReadContentSessionId() +void QuicTransportStream::ReadTrackId() { - if (!m_receivedContentSessionId && m_stream->ReadableBytes() > 0) { - // Match to a content session. - if (m_stream->ReadableBytes() > 0 && m_stream->ReadableBytes() < uuidSizeInBytes) { - ELOG_ERROR("No enough data to get content session ID."); - m_stream->Close(); - return; - } - uint8_t* data = new uint8_t[uuidSizeInBytes]; - m_stream->Read(data, uuidSizeInBytes); - m_contentSessionId = std::vector(data, data + uuidSizeInBytes); - m_receivedContentSessionId = true; + m_readingTrackId = true; + size_t readSize = std::min(uuidSizeInBytes - m_receivedTrackIdSize, m_stream->ReadableBytes()); + if (readSize == 0) { + return; + } + m_stream->Read(m_trackId.data() + m_receivedTrackIdSize, readSize); + m_receivedTrackIdSize += readSize; + if (m_receivedTrackIdSize == uuidSizeInBytes) { + m_readingTrackId = false; + m_asyncOnTrackId.data = this; + uv_async_send(&m_asyncOnTrackId); + } + if (m_stream->ReadableBytes() > 0) { + OnCanRead(); + } +} + +void QuicTransportStream::ReadContentSessionId() +{ + size_t readSize = std::min(uuidSizeInBytes - m_receivedContentSessionIdSize, m_stream->ReadableBytes()); + if (readSize == 0) { + return; + } + m_stream->Read(m_contentSessionId.data() + m_receivedContentSessionIdSize, readSize); + m_receivedContentSessionIdSize += readSize; + if (m_receivedContentSessionIdSize == uuidSizeInBytes) { m_asyncOnContentSessionId.data = this; uv_async_send(&m_asyncOnContentSessionId); - if (m_stream->ReadableBytes() > 0) { - SignalOnData(); + // Only signaling stream is not piped. + for (uint8_t d : m_contentSessionId) { + if (d != 0) { + m_isPiped = true; + break; + } } } + if (m_stream->ReadableBytes() > 0) { + OnCanRead(); + } } NAUV_WORK_CB(QuicTransportStream::onData) @@ -173,7 +280,6 @@ NAUV_WORK_CB(QuicTransportStream::onData) v8::Local eventCallback = onEventLocal.As(); Nan::AsyncResource* resource = new Nan::AsyncResource(Nan::New("ondata").ToLocalChecked()); auto readableBytes = obj->m_stream->ReadableBytes(); - ELOG_DEBUG("Readable bytes: %d", readableBytes); uint8_t* buffer = new uint8_t[readableBytes]; // Use a shared buffer instead to reduce performance cost on new. obj->m_stream->Read(buffer, readableBytes); Local args[] = { Nan::NewBuffer((char*)buffer, readableBytes).ToLocalChecked() }; @@ -182,6 +288,24 @@ NAUV_WORK_CB(QuicTransportStream::onData) } } +NAUV_WORK_CB(QuicTransportStream::onTrackId){ + Nan::HandleScope scope; + QuicTransportStream* obj = reinterpret_cast(async->data); + if (obj == nullptr) { + return; + } + Nan::MaybeLocal onEvent = Nan::Get(obj->handle(), Nan::New("ontrackid").ToLocalChecked()); + if (!onEvent.IsEmpty()) { + v8::Local onEventLocal = onEvent.ToLocalChecked(); + if (onEventLocal->IsFunction()) { + v8::Local eventCallback = onEventLocal.As(); + Nan::AsyncResource* resource = new Nan::AsyncResource(Nan::New("ontrackid").ToLocalChecked()); + Local args[] = { Nan::CopyBuffer((char*)obj->m_trackId.data(), uuidSizeInBytes).ToLocalChecked() }; + resource->runInAsyncScope(Nan::GetCurrentContext()->Global(), eventCallback, 1, args); + } + } +} + NAUV_WORK_CB(QuicTransportStream::onContentSessionId) { Nan::HandleScope scope; @@ -209,17 +333,68 @@ void QuicTransportStream::SignalOnData() return; } + if (!m_hasSink) { + return; + } + while (m_stream->ReadableBytes() > 0) { auto readableBytes = m_stream->ReadableBytes(); - if (readableBytes > m_bufferSize) { - ReallocateBuffer(readableBytes); + // Future check if it's an audio stream or video stream. Audio is not supported at this time. + if (m_isMedia) { + // A new frame. + if (m_currentFrameSize == 0 && m_receivedFrameOffset == 0 && !m_readingFrameSize) { + m_readingFrameSize = true; + memset(m_frameSizeArray, 0, frameHeaderSize * sizeof(uint8_t)); + m_frameSizeOffset = 0; + } + // Read frame header. + if (m_readingFrameSize) { + size_t readSize = std::min(frameHeaderSize - m_frameSizeOffset, m_stream->ReadableBytes()); + m_stream->Read(m_frameSizeArray + m_frameSizeOffset, readSize); + m_frameSizeOffset += readSize; + if (m_frameSizeOffset == frameHeaderSize) { + // Usually only the last 2 bytes are used. The first two bits could be used for indicating frame size. + for (int i = 0; i < frameHeaderSize; i++) { + m_currentFrameSize <<= 8; + m_currentFrameSize += m_frameSizeArray[i]; + } + if (m_currentFrameSize > m_bufferSize) { + ReallocateBuffer(m_currentFrameSize); + } + m_readingFrameSize = false; + } + continue; + } + // Read frame body. + if (m_receivedFrameOffset < m_currentFrameSize) { + // Append data to current frame. + size_t readBytes = std::min(readableBytes, m_currentFrameSize - m_receivedFrameOffset); + m_stream->Read(m_buffer + m_receivedFrameOffset, readBytes); + m_receivedFrameOffset += readBytes; + } + // Complete frame. + if (m_receivedFrameOffset == m_currentFrameSize) { + owt_base::Frame frame; + frame.format = owt_base::FRAME_FORMAT_I420; + frame.length = m_currentFrameSize; + frame.payload = m_buffer; + // Transport layer doesn't know a frame's type. Video agent is able to parse the type of a frame from bistream. However, video agent doesn't feed the frame to decoder when a key frame is requested. + frame.additionalInfo.video.isKeyFrame = "key"; + deliverFrame(frame); + m_currentFrameSize = 0; + m_receivedFrameOffset = 0; + } + } else { + if (readableBytes > m_bufferSize) { + ReallocateBuffer(readableBytes); + } + owt_base::Frame frame; + frame.format = owt_base::FRAME_FORMAT_DATA; + frame.length = readableBytes; + frame.payload = m_buffer; + m_stream->Read(frame.payload, readableBytes); + deliverFrame(frame); } - owt_base::Frame frame; - frame.format = owt_base::FRAME_FORMAT_DATA; - frame.length = readableBytes; - frame.payload = m_buffer; - m_stream->Read(frame.payload, readableBytes); - deliverFrame(frame); } } diff --git a/source/agent/addons/quic/QuicTransportStream.h b/source/agent/addons/quic/QuicTransportStream.h index 4d51836df..205cb9ef2 100644 --- a/source/agent/addons/quic/QuicTransportStream.h +++ b/source/agent/addons/quic/QuicTransportStream.h @@ -7,14 +7,15 @@ #ifndef QUIC_QUICTRANSPORTSTREAM_H_ #define QUIC_QUICTRANSPORTSTREAM_H_ -#include "owt/quic/web_transport_stream_interface.h" #include "../../core/owt_base/MediaFramePipeline.h" +#include "../common/MediaFramePipelineWrapper.h" +#include "owt/quic/web_transport_stream_interface.h" #include -#include #include +#include #include -class QuicTransportStream : public owt_base::FrameSource, public owt_base::FrameDestination, public Nan::ObjectWrap, public owt::quic::WebTransportStreamInterface::Visitor { +class QuicTransportStream : public owt_base::FrameSource, public owt_base::FrameDestination, public NanFrameNode, public owt::quic::WebTransportStreamInterface::Visitor { DECLARE_LOGGER(); public: @@ -35,8 +36,17 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame static NAN_METHOD(close); static NAN_METHOD(addDestination); static NAN_METHOD(removeDestination); + // Read 128 bits after content session ID. Only media streams have track ID. Result will be returned by onData callback. + // TODO: Make this as an async method when it's supported. + static NAN_METHOD(readTrackId); + + static NAN_GETTER(isMediaGetter); + static NAN_SETTER(isMediaSetter); + static NAUV_WORK_CB(onContentSessionId); - static NAUV_WORK_CB(onData); // TODO: Move to pipe. + static NAUV_WORK_CB(onTrackId); + // Only signaling stream (UUID: 0) invokes this method. + static NAUV_WORK_CB(onData); // TODO: Move to pipe. // Overrides owt_base::FrameSource. void onFeedback(const owt_base::FeedbackMsg&) override; @@ -45,6 +55,10 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame void onFrame(const owt_base::Frame&) override; void onVideoSourceChanged() override; + // Overrides NanFrameNode. + owt_base::FrameSource* FrameSource() override { return this; } + owt_base::FrameDestination* FrameDestination() override { return this; } + static Nan::Persistent s_constructor; protected: @@ -53,21 +67,44 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame void OnCanWrite() override; void OnFinRead() override; + // Overrides owt_base::FrameSource. + void addAudioDestination(owt_base::FrameDestination*) override; + void addVideoDestination(owt_base::FrameDestination*) override; + void addDataDestination(owt_base::FrameDestination*) override; + private: - // Try to read content session ID from data buffered. - void MaybeReadContentSessionId(); + // Read content session ID from data buffered. + void ReadContentSessionId(); + void ReadTrackId(); void SignalOnData(); void ReallocateBuffer(size_t size); + void AddedDestination(); + void RemovedDestination(); owt::quic::WebTransportStreamInterface* m_stream; std::vector m_contentSessionId; - bool m_receivedContentSessionId; + size_t m_receivedContentSessionIdSize; + std::vector m_trackId; + size_t m_receivedTrackIdSize; + bool m_readingTrackId; // True if it's piped to a receiver in C++ layer. In this case, data will not be sent to JavaScript code. bool m_isPiped; + // If a stream is piped but doesn't have a sink, no data from WebTransport stream will be consumed. + bool m_hasSink; uint8_t* m_buffer; size_t m_bufferSize; + // Indicates whether this is a media stream. If this is not a media stream, it can only be piped to another QUIC agent. + bool m_isMedia; + + size_t m_readingFrameSize; + size_t m_frameSizeOffset; + uint8_t* m_frameSizeArray; + size_t m_currentFrameSize; + size_t m_receivedFrameOffset; + uv_async_t m_asyncOnContentSessionId; + uv_async_t m_asyncOnTrackId; uv_async_t m_asyncOnData; }; diff --git a/source/agent/addons/quic/WebTransportFrameDestination.cc b/source/agent/addons/quic/WebTransportFrameDestination.cc new file mode 100644 index 000000000..75c76ac13 --- /dev/null +++ b/source/agent/addons/quic/WebTransportFrameDestination.cc @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2021 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "WebTransportFrameDestination.h" + +using v8::Function; +using v8::FunctionTemplate; +using v8::Local; +using v8::Object; +using v8::ObjectTemplate; +using v8::Value; + +DEFINE_LOGGER(WebTransportFrameDestination, "WebTransportFrameDestination"); + +Nan::Persistent WebTransportFrameDestination::s_constructor; + +WebTransportFrameDestination::WebTransportFrameDestination() +{ +} + +WebTransportFrameDestination::~WebTransportFrameDestination() +{ +} + +NAN_MODULE_INIT(WebTransportFrameDestination::init) +{ + Local tpl = Nan::New(newInstance); + tpl->SetClassName(Nan::New("WebTransportFrameDestination").ToLocalChecked()); + Local instanceTpl = tpl->InstanceTemplate(); + instanceTpl->SetInternalFieldCount(1); + + s_constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked()); + Nan::Set(target, Nan::New("WebTransportFrameDestination").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked()); +} + +NAN_METHOD(WebTransportFrameDestination::newInstance) +{ + if (!info.IsConstructCall()) { + ELOG_DEBUG("Not construct call."); + return; + } + WebTransportFrameDestination* obj = new WebTransportFrameDestination(); + obj->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); +} \ No newline at end of file diff --git a/source/agent/addons/quic/WebTransportFrameDestination.h b/source/agent/addons/quic/WebTransportFrameDestination.h new file mode 100644 index 000000000..070892109 --- /dev/null +++ b/source/agent/addons/quic/WebTransportFrameDestination.h @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2021 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef QUIC_ADDON_WEB_TRANSPORT_FRAME_DESTINATION_H_ +#define QUIC_ADDON_WEB_TRANSPORT_FRAME_DESTINATION_H_ + +#include "../../core/owt_base/MediaFramePipeline.h" +#include "../common/MediaFramePipelineWrapper.h" +#include "owt/quic/web_transport_stream_interface.h" +#include +#include + +// A WebTransportFrameDestination is a hub for a single InternalIO input to multiple WebTransport outputs. +class WebTransportFrameDestination : public Nan::ObjectWrap { + DECLARE_LOGGER(); + +public: + explicit WebTransportFrameDestination(); + ~WebTransportFrameDestination(); + + static NAN_MODULE_INIT(init); + +private: + static Nan::Persistent s_constructor; + static NAN_METHOD(newInstance); +}; + +#endif \ No newline at end of file diff --git a/source/agent/addons/quic/WebTransportFrameSource.cc b/source/agent/addons/quic/WebTransportFrameSource.cc new file mode 100644 index 000000000..3c2a37589 --- /dev/null +++ b/source/agent/addons/quic/WebTransportFrameSource.cc @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2021 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "WebTransportFrameSource.h" + +using v8::Function; +using v8::FunctionTemplate; +using v8::Local; +using v8::Object; +using v8::ObjectTemplate; +using v8::Value; + +DEFINE_LOGGER(WebTransportFrameSource, "WebTransportFrameSource"); + +Nan::Persistent WebTransportFrameSource::s_constructor; + +WebTransportFrameSource::WebTransportFrameSource() + : m_audioStream(nullptr) + , m_videoStream(nullptr) + , m_dataStream(nullptr) +{ +} + +WebTransportFrameSource::~WebTransportFrameSource(){} + +NAN_MODULE_INIT(WebTransportFrameSource::init) +{ + Local tpl = Nan::New(newInstance); + tpl->SetClassName(Nan::New("WebTransportFrameSource").ToLocalChecked()); + Local instanceTpl = tpl->InstanceTemplate(); + instanceTpl->SetInternalFieldCount(1); + + Nan::SetPrototypeMethod(tpl, "addDestination", addDestination); + Nan::SetPrototypeMethod(tpl, "addInputStream", addInputStream); + + s_constructor.Reset(Nan::GetFunction(tpl).ToLocalChecked()); + Nan::Set(target, Nan::New("WebTransportFrameSource").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked()); +} + +NAN_METHOD(WebTransportFrameSource::newInstance) +{ + if (!info.IsConstructCall()) { + ELOG_DEBUG("Not construct call."); + return; + } + WebTransportFrameSource* obj = new WebTransportFrameSource(); + obj->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); +} + +NAN_METHOD(WebTransportFrameSource::addInputStream) +{ + WebTransportFrameSource* obj = Nan::ObjectWrap::Unwrap(info.Holder()); + NanFrameNode* inputStream = Nan::ObjectWrap::Unwrap(info[0]->ToObject()); + inputStream->FrameSource()->addDataDestination(obj); + inputStream->FrameSource()->addAudioDestination(obj); + inputStream->FrameSource()->addVideoDestination(obj); +} + +NAN_METHOD(WebTransportFrameSource::addDestination) +{ + WebTransportFrameSource* obj = Nan::ObjectWrap::Unwrap(info.Holder()); + if (info.Length() > 3) { + Nan::ThrowTypeError("Invalid argument length for addDestination."); + return; + } + Nan::Utf8String param0(Nan::To(info[0]).ToLocalChecked()); + std::string track = std::string(*param0); + bool isNanDestination(false); + if (info.Length() == 3) { + isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value(); + } + owt_base::FrameDestination* dest(nullptr); + if (isNanDestination) { + NanFrameNode* param = Nan::ObjectWrap::Unwrap(info[1]->ToObject()); + dest = param->FrameDestination(); + } else { + ::FrameDestination* param = node::ObjectWrap::Unwrap<::FrameDestination>( + info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked()); + dest = param->dest; + } + obj->addAudioDestination(dest); + obj->addVideoDestination(dest); + obj->addDataDestination(dest); +} + +void WebTransportFrameSource::onFeedback(const owt_base::FeedbackMsg&) +{ + // No feedbacks righ now. +} + +void WebTransportFrameSource::onFrame(const owt_base::Frame& frame) +{ + deliverFrame(frame); +} + +void WebTransportFrameSource::onVideoSourceChanged() +{ + // Do nothing. +} \ No newline at end of file diff --git a/source/agent/addons/quic/WebTransportFrameSource.h b/source/agent/addons/quic/WebTransportFrameSource.h new file mode 100644 index 000000000..10bdea857 --- /dev/null +++ b/source/agent/addons/quic/WebTransportFrameSource.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2021 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef QUIC_ADDON_WEB_TRANSPORT_FRAME_SOURCE_H_ +#define QUIC_ADDON_WEB_TRANSPORT_FRAME_SOURCE_H_ + +#include "../../core/owt_base/MediaFramePipeline.h" +#include "../common/MediaFramePipelineWrapper.h" +#include "owt/quic/web_transport_stream_interface.h" +#include +#include + +// A WebTransportFrameSource is a hub for multiple WebTransport inputs to a single InternalIO output. +class WebTransportFrameSource : public owt_base::FrameSource, public owt_base::FrameDestination, public NanFrameNode { + DECLARE_LOGGER(); + +public: + explicit WebTransportFrameSource(); + ~WebTransportFrameSource(); + + static NAN_MODULE_INIT(init); + + // Overrides owt_base::FrameSource. + void onFeedback(const owt_base::FeedbackMsg&) override; + + // Overrides owt_base::FrameDestination. + void onFrame(const owt_base::Frame&) override; + void onVideoSourceChanged() override; + + // Overrides NanFrameNode. + owt_base::FrameSource* FrameSource() override { return this; } + owt_base::FrameDestination* FrameDestination() override { return this; } + +private: + // new WebTransportFrameSource(contentSessionId, options) + static NAN_METHOD(newInstance); + static NAN_METHOD(addDestination); + static NAN_METHOD(removeDestination); + // addInputStream(stream, kind) + // kind could be "data", "audio" or "video". + static NAN_METHOD(addInputStream); + + static Nan::Persistent s_constructor; + + owt::quic::WebTransportStreamInterface* m_audioStream; + owt::quic::WebTransportStreamInterface* m_videoStream; + owt::quic::WebTransportStreamInterface* m_dataStream; +}; + +#endif \ No newline at end of file diff --git a/source/agent/addons/quic/addon.cc b/source/agent/addons/quic/addon.cc index 74d1fa489..b9d172045 100644 --- a/source/agent/addons/quic/addon.cc +++ b/source/agent/addons/quic/addon.cc @@ -5,6 +5,8 @@ */ #include "QuicTransportServer.h" +#include "WebTransportFrameDestination.h" +#include "WebTransportFrameSource.h" #include using namespace v8; @@ -12,8 +14,10 @@ using namespace v8; NAN_MODULE_INIT(InitAll) { QuicTransportServer::init(target); - QuicTransportConnection::init(target); QuicTransportStream::init(target); + QuicTransportConnection::init(target); + WebTransportFrameSource::init(target); + WebTransportFrameDestination::init(target); } NODE_MODULE(addon, InitAll) diff --git a/source/agent/addons/quic/binding.gyp b/source/agent/addons/quic/binding.gyp index 124f04f2d..75ec3e1e0 100644 --- a/source/agent/addons/quic/binding.gyp +++ b/source/agent/addons/quic/binding.gyp @@ -4,9 +4,11 @@ 'sources':[ 'addon.cc', "QuicFactory.cc", - 'QuicTransportStream.cc', 'QuicTransportServer.cc', 'QuicTransportConnection.cc', + 'QuicTransportStream.cc', + 'WebTransportFrameSource.cc', + 'WebTransportFrameDestination.cc', '../../../core/owt_base/MediaFramePipeline.cpp', '../../../core/owt_base/MediaFrameMulticaster.cpp', '../../../core/owt_base/Utils.cc', diff --git a/source/agent/addons/quic/quic_sdk_url b/source/agent/addons/quic/quic_sdk_url index 51ca18330..9110d882c 100644 --- a/source/agent/addons/quic/quic_sdk_url +++ b/source/agent/addons/quic/quic_sdk_url @@ -1 +1 @@ -https://api.github.com/repos/open-webrtc-toolkit/owt-sdk-quic/actions/artifacts/71814132/zip \ No newline at end of file +https://api.github.com/repos/open-webrtc-toolkit/owt-sdk-quic/actions/artifacts/84639537/zip \ No newline at end of file diff --git a/source/agent/conference/quicController.js b/source/agent/conference/quicController.js index ae66ccfc5..797ee597f 100644 --- a/source/agent/conference/quicController.js +++ b/source/agent/conference/quicController.js @@ -34,15 +34,20 @@ class Transport { class Operation { constructor(id, transport, direction, tracks, data) { - if(tracks){ - throw new Error('QUIC agent does not support media stream tracks so far.'); - } this.id = id; this.transport = transport; this.transportId = transport.id; this.direction = direction; + this.tracks = tracks; this.data = data; this.promise = Promise.resolve(); + this.tracks = this.tracks ? this.tracks.map(t => { + if (t.type === 'video') { + t.format = { codec : 'h264', profile : 'B' }; + } + return t; + }) + : undefined; } } @@ -84,21 +89,6 @@ class QuicController extends EventEmitter { return this.operations.get(operationId); } - onSessionProgress(sessionId, status) - { - if (!status.data) { - log.error('onSessionProgress is called by QUIC connections.'); - return; - } - if (status.type === 'ready') { - if (!this.operations.get(sessionId)) { - log.error('Invalid session ID.'); - return; - } - this.emit('session-established', this.operations.get(sessionId)); - } - } - onSessionProgress(sessionId, status) { if (!status.data) { @@ -168,6 +158,7 @@ class QuicController extends EventEmitter { // Return Promise terminate(sessionId, direction, reason) { + console.trace(); log.debug(`terminate, sessionId: ${sessionId} direction: ${direction}, ${reason}`); if (!this.operations.has(sessionId)) { diff --git a/source/agent/connections.js b/source/agent/connections.js index fd0fd82d2..25e6c8e83 100644 --- a/source/agent/connections.js +++ b/source/agent/connections.js @@ -124,7 +124,11 @@ module.exports = function Connections () { if (!dest) { return Promise.reject({ type : 'failed', reason : 'Destination connection(' + name + ') is not ready' }); } - connections[from].connection.addDestination(name, dest); + let isNanObj=false; + if (dest.constructor.name === 'QuicTransportStream'){ + isNanObj=true; + } + connections[from].connection.addDestination(name, dest, isNanObj); connections[connectionId][name + 'From'] = from; } } diff --git a/source/agent/internalConnectionRouter.js b/source/agent/internalConnectionRouter.js index 4a705f848..98b62d8ad 100644 --- a/source/agent/internalConnectionRouter.js +++ b/source/agent/internalConnectionRouter.js @@ -107,7 +107,11 @@ class StreamSource { addDestination(track, sink) { if (!this.dests[track].has(sink.id)) { this.dests[track].set(sink.id, sink); - this.conn.addDestination(track, sink.conn); + let isNanObject = false; + if (sink instanceof QuicTransportStreamPipeline) { + isNanObject = true; + } + this.conn.addDestination(track, sink.conn, isNanObject); sink._addSource(track, this); } } @@ -167,8 +171,8 @@ class InternalConnectionRouter { * @param {FrameSource} source Wrapper class for FrameSource */ addLocalSource(id, type, source) { - log.debug('addLocalSource:', id, type); - this.internalServer.addSource(id, source); + const isNativeSource = (type === 'quic'); + this.internalServer.addSource(id, source, isNativeSource); return this.connections.addConnection(id, type, '', source, 'in'); } @@ -191,7 +195,11 @@ class InternalConnectionRouter { return this.removeLocalSource(id); } else if (conn.direction === 'out') { return this.removeLocalDestination(id); + } else { + return Promise.reject('Unexpected direction '+conn.direction); } + } else { + return Promise.reject('Cannot find connection.'); } } diff --git a/source/agent/quic/index.js b/source/agent/quic/index.js index 968243a60..cbb060584 100644 --- a/source/agent/quic/index.js +++ b/source/agent/quic/index.js @@ -38,6 +38,10 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { new Map(); // Key is publication ID, value is stream pipeline. const outgoingStreamPipelines = new Map(); // Key is subscription ID, value is stream pipeline. + const frameSourceMap = + new Map(); // Key is publication ID, value is WebTransportFrameSource. + const publicationOptions = new Map(); + const subscriptionOptions = new Map(); let quicTransportServer; const clusterName = global && global.config && global.config.cluster ? global.config.cluster.name : @@ -101,11 +105,25 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { log.debug( 'A stream with session ID ' + stream.contentSessionId + ' is added.'); - let pipeline = null; + // Set isMedia=true if the session is for media. if (outgoingStreamPipelines.has(stream.contentSessionId)) { - pipeline = outgoingStreamPipelines.get(stream.contentSessionId); - } else if (incomingStreamPipelines.has(stream.contentSessionId)) { - pipeline = incomingStreamPipelines.get(stream.contentSessionId); + const pipeline = outgoingStreamPipelines.get(stream.contentSessionId); + pipeline.quicStream(stream); + } else if (frameSourceMap.has(stream.contentSessionId)) { + if (!publicationOptions.has(stream.contentSessionId)) { + log.warn( + 'Cannot find publication options for session ' + + stream.contentSessionId); + stream.close(); + } + const options = publicationOptions.get(stream.contentSessionId); + // Only publications for media have tracks. + if (options.tracks && options.tracks.length) { + stream.isMedia = true; + stream.readTrackId(); + } else { + frameSourceMap.get(stream.contentSessionId).addInputStream(stream); + } } else { log.warn( 'Cannot find a pipeline for QUIC stream. Content session ID: ' + @@ -113,7 +131,11 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { stream.close(); return; } - pipeline.quicStream(stream); + }); + quicTransportServer.on('trackid', (stream) => { + if (frameSourceMap.has(stream.contentSessionId)) { + frameSourceMap.get(stream.contentSessionId).addInputStream(stream); + } }); quicTransportServer.on('connectionadded', (connection) => { log.debug( @@ -122,6 +144,18 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { }); }); + const createFrameSource = (streamId, options, callback) => { + if (frameSourceMap.has(streamId)) { + callback('callback', { + type: 'failed', + reason: 'Frame source for ' + streamId + ' exists.' + }); + } + const frameSource = new addon.WebTransportFrameSource(streamId); + frameSourceMap.set(streamId, frameSource); + return frameSource; + }; + const createStreamPipeline = function(streamId, direction, options, callback) { // Client is expected to create a QuicTransport before sending publish or @@ -177,7 +211,8 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { var conn = null; switch (connectionType) { case 'quic': - conn = createStreamPipeline(connectionId, 'in', options, callback); + publicationOptions.set(connectionId, options); + conn = createFrameSource(connectionId, 'in', options, callback); if (!conn) { return; } @@ -189,10 +224,15 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { log.error('Create connection failed', connectionId, connectionType); return callback('callback', {type: 'failed', reason: 'Create Connection failed'}); } - conn.bindRouterAsSourceCallback = function(stream) { - router.addLocalSource(connectionId, connectionType, stream); - } + router.addLocalSource(connectionId, connectionType, conn); onSuccess(callback)(); + notifyStatus(options.controller, connectionId, 'in', { + type: 'ready', + audio: undefined, + video: undefined, + data: true, + simulcast: false + }); }; that.unpublish = function (connectionId, callback) { @@ -204,6 +244,9 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { } callback('callback', 'ok'); }, onError(callback)); + if (publicationOptions.has(connectionId)) { + publicationOptions.delete(connectionId); + } }; that.subscribe = function (connectionId, connectionType, options, callback) { @@ -213,9 +256,6 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { } } log.debug('subscribe, connectionId:', connectionId, 'connectionType:', connectionType, 'options:', options); - if(!options.data){ - log.error('Subscription request does not include data field.'); - } if (router.getConnection(connectionId)) { return callback('callback', {type: 'failed', reason: 'Connection already exists:'+connectionId}); } diff --git a/source/agent/quic/webtransport/quicTransportServer.js b/source/agent/quic/webtransport/quicTransportServer.js index 5278b3e29..c20008d9e 100644 --- a/source/agent/quic/webtransport/quicTransportServer.js +++ b/source/agent/quic/webtransport/quicTransportServer.js @@ -67,6 +67,15 @@ module.exports = class QuicTransportServer extends EventEmitter { stream.write(uuidBytes, uuidBytes.length); } }; + stream.ontrackid = (id) => { + if (stream.trackId) { + log.warn('Duplicate events for track ID.'); + return; + } + const trackId = this._uuidBytesToString(new Uint8Array(id)) + stream.trackId = trackId; + this.emit('trackid', stream); + }; stream.ondata = (data) => { if (stream.contentSessionId === zeroUuid) { // Please refer diff --git a/source/agent/quic/webtransport/quicTransportStreamPipeline.js b/source/agent/quic/webtransport/quicTransportStreamPipeline.js index 8c9549232..2aedf9263 100644 --- a/source/agent/quic/webtransport/quicTransportStreamPipeline.js +++ b/source/agent/quic/webtransport/quicTransportStreamPipeline.js @@ -46,10 +46,6 @@ module.exports = class QuicTransportStreamPipeline { }; this.receiver = function(kind) { - if (kind !== 'data') { - log.error('Unsupported receiver.'); - return null; - } return this._quicStream; }; diff --git a/source/core/owt_base/MediaFramePipeline.h b/source/core/owt_base/MediaFramePipeline.h index a209ac745..137f20915 100644 --- a/source/core/owt_base/MediaFramePipeline.h +++ b/source/core/owt_base/MediaFramePipeline.h @@ -250,14 +250,14 @@ class FrameSource { virtual void onFeedback(const FeedbackMsg&) { }; - void addAudioDestination(FrameDestination*); - void removeAudioDestination(FrameDestination*); + virtual void addAudioDestination(FrameDestination*); + virtual void removeAudioDestination(FrameDestination*); - void addVideoDestination(FrameDestination*); - void removeVideoDestination(FrameDestination*); + virtual void addVideoDestination(FrameDestination*); + virtual void removeVideoDestination(FrameDestination*); - void addDataDestination(FrameDestination*); - void removeDataDestination(FrameDestination*); + virtual void addDataDestination(FrameDestination*); + virtual void removeDataDestination(FrameDestination*); protected: void deliverFrame(const Frame&);