From 40fbfd85606338048906986063758aa15bdc0555 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 17 Feb 2015 16:28:28 +0800 Subject: [PATCH] for #133, rtsp extract tcp/udp listener. --- trunk/conf/full.conf | 5 + trunk/configure | 2 +- trunk/ide/srs_upp/srs_upp.upp | 2 + trunk/ide/srs_vs2010/srs.vcxproj | 2 + trunk/ide/srs_vs2010/srs.vcxproj.filters | 6 + trunk/src/app/srs_app_config.cpp | 26 ++- trunk/src/app/srs_app_config.hpp | 8 + trunk/src/app/srs_app_listener.cpp | 272 +++++++++++++++++++++++ trunk/src/app/srs_app_listener.hpp | 125 +++++++++++ trunk/src/app/srs_app_mpegts_udp.cpp | 8 - trunk/src/app/srs_app_mpegts_udp.hpp | 22 +- trunk/src/app/srs_app_rtsp.cpp | 38 +++- trunk/src/app/srs_app_rtsp.hpp | 15 +- trunk/src/app/srs_app_server.cpp | 240 ++++++-------------- trunk/src/app/srs_app_server.hpp | 51 +++-- trunk/src/protocol/srs_rtsp_stack.cpp | 163 ++++++++++++-- trunk/src/protocol/srs_rtsp_stack.hpp | 183 +++++++++++---- 17 files changed, 872 insertions(+), 296 deletions(-) create mode 100644 trunk/src/app/srs_app_listener.cpp create mode 100644 trunk/src/app/srs_app_listener.hpp diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 31d6def3cd..779a8c2521 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -160,6 +160,11 @@ stream_caster { # for mpegts_over_udp caster, listen at udp port. for example, 8935. # for rtsp caster, listen at tcp port. for example, 554. listen 8935; + # for the rtsp caster, the rtp server local port over udp, + # which reply the rtsp setup request message, the port will be used: + # [rtp_port_min, rtp_port_max) + rtp_port_min 57200; + rtp_port_max 57300; } stream_caster { enabled off; diff --git a/trunk/configure b/trunk/configure index b1e79e28fe..44bb1916a1 100755 --- a/trunk/configure +++ b/trunk/configure @@ -392,7 +392,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" - "srs_app_mpegts_udp" "srs_app_rtsp") + "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener") APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh APP_OBJS="${MODULE_OBJS[@]}" fi diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index 245dd7fbd0..a27b7b6355 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -102,6 +102,8 @@ file ../../src/app/srs_app_json.cpp, ../../src/app/srs_app_kbps.hpp, ../../src/app/srs_app_kbps.cpp, + ../../src/app/srs_app_listener.hpp, + ../../src/app/srs_app_listener.cpp, ../../src/app/srs_app_log.hpp, ../../src/app/srs_app_log.cpp, ../../src/app/srs_app_mpegts_udp.hpp, diff --git a/trunk/ide/srs_vs2010/srs.vcxproj b/trunk/ide/srs_vs2010/srs.vcxproj index 3013b0eaeb..c9050517a7 100755 --- a/trunk/ide/srs_vs2010/srs.vcxproj +++ b/trunk/ide/srs_vs2010/srs.vcxproj @@ -82,6 +82,7 @@ + @@ -161,6 +162,7 @@ + diff --git a/trunk/ide/srs_vs2010/srs.vcxproj.filters b/trunk/ide/srs_vs2010/srs.vcxproj.filters index 1c1ad77401..5a60d8b869 100755 --- a/trunk/ide/srs_vs2010/srs.vcxproj.filters +++ b/trunk/ide/srs_vs2010/srs.vcxproj.filters @@ -235,6 +235,9 @@ srs + + srs + @@ -432,6 +435,9 @@ srs + + srs + diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index a769bbf1d3..9c486fa54c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1395,7 +1395,7 @@ int SrsConfig::check_config() SrsConfDirective* conf = stream_caster->at(i); string n = conf->name; if (n != "enabled" && n != "caster" && n != "output" - && n != "listen" + && n != "listen" && n != "rtp_port_min" && n != "rtp_port_max" ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported stream_caster directive %s, ret=%d", n.c_str(), ret); @@ -2065,6 +2065,30 @@ int SrsConfig::get_stream_caster_listen(SrsConfDirective* sc) return ::atoi(conf->arg0().c_str()); } +int SrsConfig::get_stream_caster_rtp_port_min(SrsConfDirective* sc) +{ + srs_assert(sc); + + SrsConfDirective* conf = sc->get("rtp_port_min"); + if (!conf) { + return 0; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* sc) +{ + srs_assert(sc); + + SrsConfDirective* conf = sc->get("rtp_port_max"); + if (!conf) { + return 0; + } + + return ::atoi(conf->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_vhost(string vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 36807ea5e3..8eed103d7c 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -481,6 +481,14 @@ class SrsConfig * get the listen port of stream caster. */ virtual int get_stream_caster_listen(SrsConfDirective* sc); + /** + * get the min udp port for rtp of stream caster rtsp. + */ + virtual int get_stream_caster_rtp_port_min(SrsConfDirective* sc); + /** + * get the max udp port for rtp of stream caster rtsp. + */ + virtual int get_stream_caster_rtp_port_max(SrsConfDirective* sc); // vhost specified section public: /** diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp new file mode 100644 index 0000000000..5f536857d0 --- /dev/null +++ b/trunk/src/app/srs_app_listener.cpp @@ -0,0 +1,272 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +// set the max packet size. +#define SRS_UDP_MAX_PACKET_SIZE 65535 + +// sleep in ms for udp recv packet. +#define SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS 0 + +// nginx also set to 512 +#define SERVER_LISTEN_BACKLOG 512 + +ISrsUdpHandler::ISrsUdpHandler() +{ +} + +ISrsUdpHandler::~ISrsUdpHandler() +{ +} + +ISrsTcpHandler::ISrsTcpHandler() +{ +} + +ISrsTcpHandler::~ISrsTcpHandler() +{ +} + +SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, int p) +{ + handler = h; + port = p; + + _fd = -1; + stfd = NULL; + + nb_buf = SRS_UDP_MAX_PACKET_SIZE; + buf = new char[nb_buf]; + + pthread = new SrsThread("udp", this, 0, true); +} + +SrsUdpListener::~SrsUdpListener() +{ + srs_close_stfd(stfd); + + pthread->stop(); + srs_freep(pthread); + + // st does not close it sometimes, + // close it manually. + close(_fd); + + srs_freep(buf); +} + +int SrsUdpListener::fd() +{ + return _fd; +} + +int SrsUdpListener::listen() +{ + int ret = ERROR_SUCCESS; + + if ((_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { + ret = ERROR_SOCKET_CREATE; + srs_error("create linux socket error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd); + + int reuse_socket = 1; + if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) { + ret = ERROR_SOCKET_SETREUSE; + srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd); + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) { + ret = ERROR_SOCKET_BIND; + srs_error("bind socket error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("bind socket success. port=%d, fd=%d", port, _fd); + + if ((stfd = st_netfd_open_socket(_fd)) == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("st open socket success. port=%d, fd=%d", port, _fd); + + if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("create st listen thread success, port=%d", port); + + return ret; +} + +int SrsUdpListener::cycle() +{ + int ret = ERROR_SUCCESS; + + for (;;) { + // TODO: FIXME: support ipv6, @see man 7 ipv6 + sockaddr_in from; + int nb_from = sizeof(sockaddr_in); + int nread = 0; + + if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { + srs_warn("ignore recv udp packet failed, nread=%d", nread); + continue; + } + + if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { + srs_warn("handle udp packet failed. ret=%d", ret); + continue; + } + + if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { + st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); + } + } + + return ret; +} + +SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, int p) +{ + handler = h; + port = p; + + _fd = -1; + stfd = NULL; + + pthread = new SrsThread("tcp", this, 0, true); +} + +SrsTcpListener::~SrsTcpListener() +{ + srs_close_stfd(stfd); + + pthread->stop(); + srs_freep(pthread); + + // st does not close it sometimes, + // close it manually. + close(_fd); +} + +int SrsTcpListener::fd() +{ + return _fd; +} + +int SrsTcpListener::listen() +{ + int ret = ERROR_SUCCESS; + + if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + ret = ERROR_SOCKET_CREATE; + srs_error("create linux socket error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd); + + int reuse_socket = 1; + if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) { + ret = ERROR_SOCKET_SETREUSE; + srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd); + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) { + ret = ERROR_SOCKET_BIND; + srs_error("bind socket error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("bind socket success. port=%d, fd=%d", port, _fd); + + if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) { + ret = ERROR_SOCKET_LISTEN; + srs_error("listen socket error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("listen socket success. port=%d, fd=%d", port, _fd); + + if ((stfd = st_netfd_open_socket(_fd)) == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("st open socket success. port=%d, fd=%d", port, _fd); + + if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret); + return ret; + } + srs_verbose("create st listen thread success, port=%d", port); + + return ret; +} + +int SrsTcpListener::cycle() +{ + int ret = ERROR_SUCCESS; + + st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); + + if(client_stfd == NULL){ + // ignore error. + srs_error("ignore accept thread stoppped for accept client error"); + return ret; + } + srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); + + if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) { + srs_warn("accept client error. ret=%d", ret); + return ret; + } + + return ret; +} + diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp new file mode 100644 index 0000000000..f55f286727 --- /dev/null +++ b/trunk/src/app/srs_app_listener.hpp @@ -0,0 +1,125 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_LISTENER_HPP +#define SRS_APP_LISTENER_HPP + +/* +#include +*/ + +#include + +#include +#include + +class sockaddr_in; + +/** +* the udp packet handler. +*/ +class ISrsUdpHandler +{ +public: + ISrsUdpHandler(); + virtual ~ISrsUdpHandler(); +public: + /** + * when udp listener got a udp packet, notice server to process it. + * @param type, the client type, used to create concrete connection, + * for instance RTMP connection to serve client. + * @param from, the udp packet from address. + * @param buf, the udp packet bytes, user should copy if need to use. + * @param nb_buf, the size of udp packet bytes. + * @remark user should never use the buf, for it's a shared memory bytes. + */ + virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0; +}; + +/** +* the tcp connection handler. +*/ +class ISrsTcpHandler +{ +public: + ISrsTcpHandler(); + virtual ~ISrsTcpHandler(); +public: + /** + * when got tcp client. + */ + virtual int on_tcp_client(st_netfd_t stfd) = 0; +}; + +/** +* bind udp port, start thread to recv packet and handler it. +*/ +class SrsUdpListener : public ISrsThreadHandler +{ +private: + int _fd; + st_netfd_t stfd; + SrsThread* pthread; +private: + char* buf; + int nb_buf; +private: + ISrsUdpHandler* handler; + int port; +public: + SrsUdpListener(ISrsUdpHandler* h, int p); + virtual ~SrsUdpListener(); +public: + virtual int fd(); +public: + virtual int listen(); +// interface ISrsThreadHandler. +public: + virtual int cycle(); +}; + +/** +* bind and listen tcp port, use handler to process the client. +*/ +class SrsTcpListener : public ISrsThreadHandler +{ +private: + int _fd; + st_netfd_t stfd; + SrsThread* pthread; +private: + ISrsTcpHandler* handler; + int port; +public: + SrsTcpListener(ISrsTcpHandler* h, int p); + virtual ~SrsTcpListener(); +public: + virtual int fd(); +public: + virtual int listen(); +// interface ISrsThreadHandler. +public: + virtual int cycle(); +}; + +#endif diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 1aa1c380b4..ff1cb6cbf7 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -49,14 +49,6 @@ using namespace std; #include #include -ISrsUdpHandler::ISrsUdpHandler() -{ -} - -ISrsUdpHandler::~ISrsUdpHandler() -{ -} - SrsMpegtsQueue::SrsMpegtsQueue() { nb_audios = nb_videos = 0; diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index 0455c1eb5b..1fa3cf6975 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -50,27 +50,7 @@ class SrsRawAacStreamCodec; #include #include - -/** -* the udp packet handler. -*/ -class ISrsUdpHandler -{ -public: - ISrsUdpHandler(); - virtual ~ISrsUdpHandler(); -public: - /** - * when udp listener got a udp packet, notice server to process it. - * @param type, the client type, used to create concrete connection, - * for instance RTMP connection to serve client. - * @param from, the udp packet from address. - * @param buf, the udp packet bytes, user should copy if need to use. - * @param nb_buf, the size of udp packet bytes. - * @remark user should never use the buf, for it's a shared memory bytes. - */ - virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) = 0; -}; +#include /** * the queue for mpegts over udp to send packets. diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 5402be91f2..28ee0dee44 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -44,9 +44,14 @@ ISrsRtspHandler::~ISrsRtspHandler() { } -SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) +SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o, int lpmin, int lpmax) { output = o; + local_port_min = lpmin; + local_port_max = lpmax; + + session = "O9EaZ4bf"; // TODO: FIXME: generate session id. + caster = c; stfd = fd; skt = new SrsStSocket(fd); @@ -97,12 +102,35 @@ int SrsRtspConn::do_cycle() return ret; } } else if (req->is_announce()) { + srs_assert(req->sdp); + sps = req->sdp->video_sps; + pps = req->sdp->video_pps; + asc = req->sdp->audio_sh; + srs_trace("rtsp: video(#%s, %s), audio(#%s, %s, %sHZ %schannels)", + req->sdp->video_stream_id.c_str(), req->sdp->video_codec.c_str(), + req->sdp->audio_stream_id.c_str(), req->sdp->audio_codec.c_str(), + req->sdp->audio_sample_rate.c_str(), req->sdp->audio_channel.c_str() + ); if ((ret = rtsp->send_message(new SrsRtspResponse(req->seq))) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("rtsp: send ANNOUNCE response failed. ret=%d", ret); } return ret; } + } else if (req->is_setup()) { + srs_assert(req->transport); + SrsRtspSetupResponse* res = new SrsRtspSetupResponse(req->seq); + res->client_port_min = req->transport->client_port_min; + res->client_port_max = req->transport->client_port_max; + res->local_port_min = local_port_min; + res->local_port_max = local_port_max; + res->session = session; + if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("rtsp: send SETUP response failed. ret=%d", ret); + } + return ret; + } } } @@ -144,6 +172,8 @@ SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) { // TODO: FIXME: support reload. output = _srs_config->get_stream_caster_output(c); + local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); + local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); } SrsRtspCaster::~SrsRtspCaster() @@ -160,7 +190,11 @@ int SrsRtspCaster::serve_client(st_netfd_t stfd) { int ret = ERROR_SUCCESS; - SrsRtspConn* conn = new SrsRtspConn(this, stfd, output); + SrsRtspConn* conn = new SrsRtspConn( + this, stfd, + output, local_port_min, local_port_max + ); + if ((ret = conn->serve()) != ERROR_SUCCESS) { srs_error("rtsp: serve client failed. ret=%d", ret); srs_freep(conn); diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 0fd461e130..6cbbbfb297 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -65,13 +65,23 @@ class SrsRtspConn : public ISrsThreadHandler { private: std::string output; + int local_port_min; + int local_port_max; +private: + std::string session; + // video sequence header. + std::string sps; + std::string pps; + // audio sequence header. + std::string asc; +private: st_netfd_t stfd; SrsStSocket* skt; SrsRtspStack* rtsp; SrsRtspCaster* caster; SrsThread* trd; public: - SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o); + SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o, int lpmin, int lpmax); virtual ~SrsRtspConn(); public: virtual int serve(); @@ -90,6 +100,9 @@ class SrsRtspCaster : public ISrsRtspHandler { private: std::string output; + int local_port_min; + int local_port_max; +private: std::vector clients; public: SrsRtspCaster(SrsConfDirective* c); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 100fc34421..dbc2cefdb4 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -24,8 +24,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include -#include #include #include #include @@ -51,15 +49,6 @@ using namespace std; // signal defines. #define SIGNAL_RELOAD SIGHUP -// nginx also set to 512 -#define SERVER_LISTEN_BACKLOG 512 - -// sleep in ms for udp recv packet. -#define SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS 0 - -// set the max packet size. -#define SRS_UDP_MAX_PACKET_SIZE 65535 - // system interval in ms, // all resolution times should be times togother, // for example, system-interval is x=1s(1000ms), @@ -122,26 +111,13 @@ std::string __srs_listener_type2string(SrsListenerType type) SrsListener::SrsListener(SrsServer* server, SrsListenerType type) { - fd = -1; - stfd = NULL; - _port = 0; _server = server; _type = type; - - pthread = new SrsThread("listen", this, 0, true); } SrsListener::~SrsListener() { - srs_close_stfd(stfd); - - pthread->stop(); - srs_freep(pthread); - - // st does not close it sometimes, - // close it manually. - close(fd); } SrsListenerType SrsListener::type() @@ -149,92 +125,55 @@ SrsListenerType SrsListener::type() return _type; } -int SrsListener::listen(int port) +SrsStreamListener::SrsStreamListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type) +{ + listener = NULL; +} + +SrsStreamListener::~SrsStreamListener() +{ + srs_freep(listener); +} + +int SrsStreamListener::listen(int port) { int ret = ERROR_SUCCESS; _port = port; - - if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - ret = ERROR_SOCKET_CREATE; - srs_error("create linux socket error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("create linux socket success. port=%d, fd=%d", port, fd); - - int reuse_socket = 1; - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) { - ret = ERROR_SOCKET_SETREUSE; - srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, fd); - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(_port); - addr.sin_addr.s_addr = INADDR_ANY; - if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) { - ret = ERROR_SOCKET_BIND; - srs_error("bind socket error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("bind socket success. port=%d, fd=%d", port, fd); - - if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) { - ret = ERROR_SOCKET_LISTEN; - srs_error("listen socket error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("listen socket success. port=%d, fd=%d", port, fd); - - if ((stfd = st_netfd_open_socket(fd)) == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("st open socket success. port=%d, fd=%d", port, fd); - - if ((ret = pthread->start()) != ERROR_SUCCESS) { - srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret); + + srs_freep(listener); + listener = new SrsTcpListener(this, port); + + if ((ret = listener->listen()) != ERROR_SUCCESS) { + srs_error("tcp listen failed. ret=%d", ret); return ret; } - srs_verbose("create st listen thread success, port=%d", port); srs_info("listen thread cid=%d, current_cid=%d, " "listen at port=%d, type=%d, fd=%d started success, port=%d", pthread->cid(), _srs_context->get_id(), _port, _type, fd, port); - srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, fd); - + srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, listener->fd()); + return ret; } -int SrsListener::cycle() +int SrsStreamListener::on_tcp_client(st_netfd_t stfd) { int ret = ERROR_SUCCESS; - st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); - - if(client_stfd == NULL){ - // ignore error. - srs_error("ignore accept thread stoppped for accept client error"); - return ret; - } - srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); - - if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) { + if ((ret = _server->accept_client(_type, stfd)) != ERROR_SUCCESS) { srs_warn("accept client error. ret=%d", ret); return ret; } - + return ret; } #ifdef SRS_AUTO_STREAM_CASTER SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type) { - _type = type; + listener = NULL; // the caller already ensure the type is ok, // we just assert here for unknown stream caster. @@ -247,34 +186,52 @@ SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsCon SrsRtspListener::~SrsRtspListener() { srs_freep(caster); + srs_freep(listener); } -int SrsRtspListener::cycle() +int SrsRtspListener::listen(int port) { int ret = ERROR_SUCCESS; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(_type == SrsListenerRtsp); - st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); - - if(client_stfd == NULL){ - // ignore error. - srs_error("ignore accept thread stoppped for accept client error"); + _port = port; + + srs_freep(listener); + listener = new SrsTcpListener(this, port); + + if ((ret = listener->listen()) != ERROR_SUCCESS) { + srs_error("udp caster listen failed. ret=%d", ret); return ret; } - srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); - if ((ret = caster->serve_client(client_stfd)) != ERROR_SUCCESS) { + srs_info("listen thread cid=%d, current_cid=%d, " + "listen at port=%d, type=%d, fd=%d started success, port=%d", + pthread->cid(), _srs_context->get_id(), _port, _type, fd, port); + + srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, listener->fd()); + + return ret; +} + +int SrsRtspListener::on_tcp_client(st_netfd_t stfd) +{ + int ret = ERROR_SUCCESS; + + if ((ret = caster->serve_client(stfd)) != ERROR_SUCCESS) { srs_warn("accept client error. ret=%d", ret); return ret; } - + return ret; } -SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type) +SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type) { _type = type; - nb_buf = SRS_UDP_MAX_PACKET_SIZE; - buf = new char[nb_buf]; + listener = NULL; // the caller already ensure the type is ok, // we just assert here for unknown stream caster. @@ -284,13 +241,13 @@ SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type, SrsConfD } } -SrsUdpListener::~SrsUdpListener() +SrsUdpCasterListener::~SrsUdpCasterListener() { srs_freep(caster); - srs_freep(buf); + srs_freep(listener); } -int SrsUdpListener::listen(int port) +int SrsUdpCasterListener::listen(int port) { int ret = ERROR_SUCCESS; @@ -299,83 +256,20 @@ int SrsUdpListener::listen(int port) srs_assert(_type == SrsListenerMpegTsOverUdp); _port = port; - - if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { - ret = ERROR_SOCKET_CREATE; - srs_error("create linux socket error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("create linux socket success. port=%d, fd=%d", port, fd); - - int reuse_socket = 1; - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) { - ret = ERROR_SOCKET_SETREUSE; - srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, fd); - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(_port); - addr.sin_addr.s_addr = INADDR_ANY; - if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) { - ret = ERROR_SOCKET_BIND; - srs_error("bind socket error. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("bind socket success. port=%d, fd=%d", port, fd); - - if ((stfd = st_netfd_open_socket(fd)) == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret); - return ret; - } - srs_verbose("st open socket success. port=%d, fd=%d", port, fd); - - if ((ret = pthread->start()) != ERROR_SUCCESS) { - srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret); + + srs_freep(listener); + listener = new SrsUdpListener(caster, port); + + if ((ret = listener->listen()) != ERROR_SUCCESS) { + srs_error("udp caster listen failed. ret=%d", ret); return ret; } - srs_verbose("create st listen thread success, port=%d", port); srs_info("listen thread cid=%d, current_cid=%d, " "listen at port=%d, type=%d, fd=%d started success, port=%d", pthread->cid(), _srs_context->get_id(), _port, _type, fd, port); - srs_trace("%s listen at udp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, fd); - - return ret; -} - -int SrsUdpListener::cycle() -{ - int ret = ERROR_SUCCESS; - - // the caller already ensure the type is ok, - // we just assert here for unknown stream caster. - srs_assert(_type == SrsListenerMpegTsOverUdp); - - for (;;) { - // TODO: FIXME: support ipv6, @see man 7 ipv6 - sockaddr_in from; - int nb_from = sizeof(sockaddr_in); - int nread = 0; - - if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { - srs_warn("ignore recv udp packet failed, nread=%d", nread); - continue; - } - - if ((ret = caster->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) { - srs_warn("handle udp packet failed. ret=%d", ret); - continue; - } - - if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) { - st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000); - } - } + srs_trace("%s listen at udp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, listener->fd()); return ret; } @@ -992,7 +886,7 @@ int SrsServer::listen_rtmp() close_listeners(SrsListenerRtmpStream); for (int i = 0; i < (int)ports.size(); i++) { - SrsListener* listener = new SrsListener(this, SrsListenerRtmpStream); + SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream); listeners.push_back(listener); int port = ::atoi(ports[i].c_str()); @@ -1012,7 +906,7 @@ int SrsServer::listen_http_api() #ifdef SRS_AUTO_HTTP_API close_listeners(SrsListenerHttpApi); if (_srs_config->get_http_api_enabled()) { - SrsListener* listener = new SrsListener(this, SrsListenerHttpApi); + SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi); listeners.push_back(listener); int port = _srs_config->get_http_api_listen(); @@ -1033,7 +927,7 @@ int SrsServer::listen_http_stream() #ifdef SRS_AUTO_HTTP_SERVER close_listeners(SrsListenerHttpStream); if (_srs_config->get_http_stream_enabled()) { - SrsListener* listener = new SrsListener(this, SrsListenerHttpStream); + SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpStream); listeners.push_back(listener); int port = _srs_config->get_http_stream_listen(); @@ -1067,7 +961,7 @@ int SrsServer::listen_stream_caster() std::string caster = _srs_config->get_stream_caster_engine(stream_caster); if (caster == SRS_CONF_DEFAULT_STREAM_CASTER_MPEGTS_OVER_UDP) { - listener = new SrsUdpListener(this, SrsListenerMpegTsOverUdp, stream_caster); + listener = new SrsUdpCasterListener(this, SrsListenerMpegTsOverUdp, stream_caster); } else if (caster == SRS_CONF_DEFAULT_STREAM_CASTER_RTSP) { listener = new SrsRtspListener(this, SrsListenerRtsp, stream_caster); } else { diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 496f6c509d..c58f4de68d 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -35,9 +35,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include #include #include +#include class SrsServer; class SrsConnection; @@ -49,6 +49,8 @@ class SrsKbps; class SrsConfDirective; class ISrsUdpHandler; class ISrsRtspHandler; +class SrsUdpListener; +class SrsTcpListener; // listener type for server to identify the connection, // that is, use different type to process the connection. @@ -69,60 +71,71 @@ enum SrsListenerType /** * the common tcp listener, for RTMP/HTTP server. */ -class SrsListener : public ISrsThreadHandler +class SrsListener { -public: +protected: SrsListenerType _type; protected: - int fd; - st_netfd_t stfd; int _port; SrsServer* _server; - SrsThread* pthread; public: SrsListener(SrsServer* server, SrsListenerType type); virtual ~SrsListener(); public: virtual SrsListenerType type(); + virtual int listen(int port) = 0; +}; + +/** +* tcp listener. +*/ +class SrsStreamListener : virtual public SrsListener, virtual public ISrsTcpHandler +{ +private: + SrsTcpListener* listener; +public: + SrsStreamListener(SrsServer* server, SrsListenerType type); + virtual ~SrsStreamListener(); +public: virtual int listen(int port); -// interface ISrsThreadHandler. +// ISrsTcpHandler public: - virtual int cycle(); + virtual int on_tcp_client(st_netfd_t stfd); }; #ifdef SRS_AUTO_STREAM_CASTER /** * the tcp listener, for rtsp server. */ -class SrsRtspListener : public SrsListener +class SrsRtspListener : virtual public SrsListener, virtual public ISrsTcpHandler { +private: + SrsTcpListener* listener; private: ISrsRtspHandler* caster; public: SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c); virtual ~SrsRtspListener(); -// interface ISrsThreadHandler. public: - virtual int cycle(); + virtual int listen(int port); +// ISrsTcpHandler +public: + virtual int on_tcp_client(st_netfd_t stfd); }; /** * the udp listener, for udp server. */ -class SrsUdpListener : public SrsListener +class SrsUdpCasterListener : public SrsListener { private: - char* buf; - int nb_buf; + SrsUdpListener* listener; ISrsUdpHandler* caster; public: - SrsUdpListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c); - virtual ~SrsUdpListener(); + SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c); + virtual ~SrsUdpCasterListener(); public: virtual int listen(int port); -// interface ISrsThreadHandler. -public: - virtual int cycle(); }; #endif diff --git a/trunk/src/protocol/srs_rtsp_stack.cpp b/trunk/src/protocol/srs_rtsp_stack.cpp index 078e45159a..b097aa6516 100644 --- a/trunk/src/protocol/srs_rtsp_stack.cpp +++ b/trunk/src/protocol/srs_rtsp_stack.cpp @@ -231,9 +231,9 @@ int SrsRtspSdp::parse(string token) audio_sample_rate = audio_codec.substr(pos + 1); audio_codec = audio_codec.substr(0, pos); } - if ((pos = audio_codec.find("/")) != string::npos) { - audio_channel = audio_codec.substr(pos + 1); - audio_codec = audio_codec.substr(0, pos); + if ((pos = audio_sample_rate.find("/")) != string::npos) { + audio_channel = audio_sample_rate.substr(pos + 1); + audio_sample_rate = audio_sample_rate.substr(0, pos); } } } else if (desc_key == "fmtp") { @@ -283,19 +283,20 @@ int SrsRtspSdp::parse(string token) return ret; } -int SrsRtspSdp::parse_fmtp_attribute(string& attr) +int SrsRtspSdp::parse_fmtp_attribute(string attr) { int ret = ERROR_SUCCESS; size_t pos = string::npos; + std::string token = attr; - while (!attr.empty()) { - std::string item = attr; + while (!token.empty()) { + std::string item = token; if ((pos = item.find(";")) != string::npos) { - item = attr.substr(0, pos); - attr = attr.substr(pos + 1); + item = token.substr(0, pos); + token = token.substr(pos + 1); } else { - attr = ""; + token = ""; } std::string item_key = item, item_value; @@ -337,19 +338,20 @@ int SrsRtspSdp::parse_fmtp_attribute(string& attr) return ret; } -int SrsRtspSdp::parse_control_attribute(string& attr) +int SrsRtspSdp::parse_control_attribute(string attr) { int ret = ERROR_SUCCESS; size_t pos = string::npos; + std::string token = attr; - while (!attr.empty()) { - std::string item = attr; + while (!token.empty()) { + std::string item = token; if ((pos = item.find(";")) != string::npos) { - item = attr.substr(0, pos); - attr = attr.substr(pos + 1); + item = token.substr(0, pos); + token = token.substr(pos + 1); } else { - attr = ""; + token = ""; } std::string item_key = item, item_value; @@ -392,16 +394,81 @@ string SrsRtspSdp::base64_decode(string value) return plaintext; } +SrsRtspTransport::SrsRtspTransport() +{ + client_port_min = 0; + client_port_max = 0; +} + +SrsRtspTransport::~SrsRtspTransport() +{ +} + +int SrsRtspTransport::parse(string attr) +{ + int ret = ERROR_SUCCESS; + + size_t pos = string::npos; + std::string token = attr; + + while (!token.empty()) { + std::string item = token; + if ((pos = item.find(";")) != string::npos) { + item = token.substr(0, pos); + token = token.substr(pos + 1); + } else { + token = ""; + } + + std::string item_key = item, item_value; + if ((pos = item.find("=")) != string::npos) { + item_key = item.substr(0, pos); + item_value = item.substr(pos + 1); + } + + if (transport.empty()) { + transport = item_key; + if ((pos = transport.find("/")) != string::npos) { + profile = transport.substr(pos + 1); + transport = transport.substr(0, pos); + } + if ((pos = profile.find("/")) != string::npos) { + lower_transport = profile.substr(pos + 1); + profile = profile.substr(0, pos); + } + } + + if (item_key == "unicast" || item_key == "multicast") { + cast_type = item_key; + } else if (item_key == "mode") { + mode = item_value; + } else if (item_key == "client_port") { + std::string sport = item_value; + std::string eport = item_value; + if ((pos = eport.find("-")) != string::npos) { + sport = eport.substr(0, pos); + eport = eport.substr(pos + 1); + } + client_port_min = ::atoi(sport.c_str()); + client_port_max = ::atoi(eport.c_str()); + } + } + + return ret; +} + SrsRtspRequest::SrsRtspRequest() { seq = 0; content_length = 0; sdp = NULL; + transport = NULL; } SrsRtspRequest::~SrsRtspRequest() { srs_freep(sdp); + srs_freep(transport); } bool SrsRtspRequest::is_options() @@ -414,6 +481,11 @@ bool SrsRtspRequest::is_announce() return method == __SRS_METHOD_ANNOUNCE; } +bool SrsRtspRequest::is_setup() +{ + return method == __SRS_METHOD_SETUP; +} + SrsRtspResponse::SrsRtspResponse(int cseq) { seq = cseq; @@ -424,8 +496,10 @@ SrsRtspResponse::~SrsRtspResponse() { } -stringstream& SrsRtspResponse::encode(stringstream& ss) +int SrsRtspResponse::encode(stringstream& ss) { + int ret = ERROR_SUCCESS; + // status line ss << __SRS_VERSION << __SRS_RTSP_SP << status << __SRS_RTSP_SP @@ -439,7 +513,20 @@ stringstream& SrsRtspResponse::encode(stringstream& ss) << "Pragma: no-cache" << __SRS_RTSP_CRLF << "Server: " << RTMP_SIG_SRS_SERVER << __SRS_RTSP_CRLF; - return ss; + if ((ret = encode_header(ss)) != ERROR_SUCCESS) { + srs_error("rtsp: encode header failed. ret=%d", ret); + return ret; + }; + + // header EOF. + ss << __SRS_RTSP_CRLF; + + return ret; +} + +int SrsRtspResponse::encode_header(std::stringstream& ss) +{ + return ERROR_SUCCESS; } SrsRtspOptionsResponse::SrsRtspOptionsResponse(int cseq) : SrsRtspResponse(cseq) @@ -453,10 +540,8 @@ SrsRtspOptionsResponse::~SrsRtspOptionsResponse() { } -stringstream& SrsRtspOptionsResponse::encode(stringstream& ss) +int SrsRtspOptionsResponse::encode_header(stringstream& ss) { - SrsRtspResponse::encode(ss); - SrsRtspMethod __methods[] = { SrsRtspMethodDescribe, SrsRtspMethodAnnounce, @@ -489,10 +574,27 @@ stringstream& SrsRtspOptionsResponse::encode(stringstream& ss) } ss << __SRS_RTSP_CRLF; - // eof header. - ss << __SRS_RTSP_CRLF; + return ERROR_SUCCESS; +} + +SrsRtspSetupResponse::SrsRtspSetupResponse(int seq) : SrsRtspResponse(seq) +{ + local_port_min = 0; + local_port_max = 0; +} + +SrsRtspSetupResponse::~SrsRtspSetupResponse() +{ +} - return ss; +int SrsRtspSetupResponse::encode_header(stringstream& ss) +{ + ss << __SRS_TOKEN_SESSION << ":" << __SRS_RTSP_SP << session << __SRS_RTSP_CRLF; + ss << __SRS_TOKEN_TRANSPORT << ":" << __SRS_RTSP_SP + << "RTP/AVP;unicast;client_port=" << client_port_min << "-" << client_port_max << ";" + << "server_port=" << local_port_min << "-" << local_port_max + << __SRS_RTSP_CRLF; + return ERROR_SUCCESS; } SrsRtspStack::SrsRtspStack(ISrsProtocolReaderWriter* s) @@ -613,6 +715,21 @@ int SrsRtspStack::do_recv_message(SrsRtspRequest* req) return ret; } req->content_length = ::atol(cl.c_str()); + } else if (token == __SRS_TOKEN_TRANSPORT) { + std::string transport; + if ((ret = recv_token_eof(transport)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("rtsp: parse %s failed. ret=%d", __SRS_TOKEN_TRANSPORT, ret); + } + return ret; + } + if (!req->transport) { + req->transport = new SrsRtspTransport(); + } + if ((ret = req->transport->parse(transport)) != ERROR_SUCCESS) { + srs_error("rtsp: parse transport failed, transport=%s. ret=%d", transport.c_str(), ret); + return ret; + } } else { // unknown header name, parse util EOF. SrsRtspTokenState state = SrsRtspTokenStateNormal; diff --git a/trunk/src/protocol/srs_rtsp_stack.hpp b/trunk/src/protocol/srs_rtsp_stack.hpp index 5c79c0669d..f80bde5fff 100644 --- a/trunk/src/protocol/srs_rtsp_stack.hpp +++ b/trunk/src/protocol/srs_rtsp_stack.hpp @@ -60,6 +60,8 @@ class ISrsProtocolReaderWriter; #define __SRS_TOKEN_PUBLIC "Public" #define __SRS_TOKEN_CONTENT_TYPE "Content-Type" #define __SRS_TOKEN_CONTENT_LENGTH "Content-Length" +#define __SRS_TOKEN_TRANSPORT "Transport" +#define __SRS_TOKEN_SESSION "Session" // RTSP methods #define __SRS_METHOD_OPTIONS "OPTIONS" @@ -97,6 +99,53 @@ enum SrsRtspSdpState SrsRtspSdpStateVideo, }; +/** +* 10 Method Definitions +* The method token indicates the method to be performed on the resource +* identified by the Request-URI. The method is case-sensitive. New +* methods may be defined in the future. Method names may not start with +* a $ character (decimal 24) and must be a token. Methods are +* summarized in Table 2. +* Notes on Table 2: PAUSE is recommended, but not required in that a +* fully functional server can be built that does not support this +* method, for example, for live feeds. If a server does not support a +* particular method, it MUST return "501 Not Implemented" and a client +* SHOULD not try this method again for this server. +*/ +enum SrsRtspMethod +{ + SrsRtspMethodDescribe = 0x0001, + SrsRtspMethodAnnounce = 0x0002, + SrsRtspMethodGetParameter = 0x0004, + SrsRtspMethodOptions = 0x0008, + SrsRtspMethodPause = 0x0010, + SrsRtspMethodPlay = 0x0020, + SrsRtspMethodRecord = 0x0040, + SrsRtspMethodRedirect = 0x0080, + SrsRtspMethodSetup = 0x0100, + SrsRtspMethodSetParameter = 0x0200, + SrsRtspMethodTeardown = 0x0400, +}; + +/** +* the state of rtsp token. +*/ +enum SrsRtspTokenState +{ + /** + * parse token failed, default state. + */ + SrsRtspTokenStateError = 100, + /** + * when SP follow the token. + */ + SrsRtspTokenStateNormal = 101, + /** + * when CRLF follow the token. + */ + SrsRtspTokenStateEOF = 102, +}; + /** * the sdp in announce. * Appendix C: Use of SDP for RTSP Session Descriptions @@ -179,17 +228,64 @@ class SrsRtspSdp /** * generally, the fmtp is the sequence header for video or audio. */ - virtual int parse_fmtp_attribute(std::string& attr); + virtual int parse_fmtp_attribute(std::string attr); /** * generally, the control is the stream info for video or audio. */ - virtual int parse_control_attribute(std::string& attr); + virtual int parse_control_attribute(std::string attr); /** * decode the string by base64. */ virtual std::string base64_decode(std::string value); }; +/** +* the rtsp transport. +* 12.39 Transport +* This request header indicates which transport protocol is to be used +* and configures its parameters such as destination address, +* compression, multicast time-to-live and destination port for a single +* stream. It sets those values not already determined by a presentation +* description. +*/ +class SrsRtspTransport +{ +public: + // The syntax for the transport specifier is + // transport/profile/lower-transport + std::string transport; + std::string profile; + std::string lower_transport; + // unicast | multicast + // mutually exclusive indication of whether unicast or multicast + // delivery will be attempted. Default value is multicast. + // Clients that are capable of handling both unicast and + // multicast transmission MUST indicate such capability by + // including two full transport-specs with separate parameters + // for each. + std::string cast_type; + // The mode parameter indicates the methods to be supported for + // this session. Valid values are PLAY and RECORD. If not + // provided, the default is PLAY. + std::string mode; + // This parameter provides the unicast RTP/RTCP port pair on + // which the client has chosen to receive media data and control + // information. It is specified as a range, e.g., + // client_port=3456-3457. + // where client will use port in: + // [client_port_min, client_port_max) + int client_port_min; + int client_port_max; +public: + SrsRtspTransport(); + virtual ~SrsRtspTransport(); +public: + /** + * parse a line of token for transport. + */ + virtual int parse(std::string attr); +}; + /** * the rtsp request message. * 6 Request @@ -245,12 +341,17 @@ class SrsRtspRequest * the sdp in announce, NULL for no sdp. */ SrsRtspSdp* sdp; + /** + * the transport in setup, NULL for no transport. + */ + SrsRtspTransport* transport; public: SrsRtspRequest(); virtual ~SrsRtspRequest(); public: virtual bool is_options(); virtual bool is_announce(); + virtual bool is_setup(); }; /** @@ -302,35 +403,12 @@ class SrsRtspResponse /** * encode message to string. */ - virtual std::stringstream& encode(std::stringstream& ss); -}; - -/** -* 10 Method Definitions -* The method token indicates the method to be performed on the resource -* identified by the Request-URI. The method is case-sensitive. New -* methods may be defined in the future. Method names may not start with -* a $ character (decimal 24) and must be a token. Methods are -* summarized in Table 2. -* Notes on Table 2: PAUSE is recommended, but not required in that a -* fully functional server can be built that does not support this -* method, for example, for live feeds. If a server does not support a -* particular method, it MUST return "501 Not Implemented" and a client -* SHOULD not try this method again for this server. -*/ -enum SrsRtspMethod -{ - SrsRtspMethodDescribe = 0x0001, - SrsRtspMethodAnnounce = 0x0002, - SrsRtspMethodGetParameter = 0x0004, - SrsRtspMethodOptions = 0x0008, - SrsRtspMethodPause = 0x0010, - SrsRtspMethodPlay = 0x0020, - SrsRtspMethodRecord = 0x0040, - SrsRtspMethodRedirect = 0x0080, - SrsRtspMethodSetup = 0x0100, - SrsRtspMethodSetParameter = 0x0200, - SrsRtspMethodTeardown = 0x0400, + virtual int encode(std::stringstream& ss); +protected: + /** + * sub classes override this to encode the headers. + */ + virtual int encode_header(std::stringstream& ss); }; /** @@ -349,27 +427,38 @@ class SrsRtspOptionsResponse : public SrsRtspResponse public: SrsRtspOptionsResponse(int cseq); virtual ~SrsRtspOptionsResponse(); -public: - virtual std::stringstream& encode(std::stringstream& ss); +protected: + virtual int encode_header(std::stringstream& ss); }; /** -* the state of rtsp token. +* 10.4 SETUP +* The SETUP request for a URI specifies the transport mechanism to be +* used for the streamed media. A client can issue a SETUP request for a +* stream that is already playing to change transport parameters, which +* a server MAY allow. If it does not allow this, it MUST respond with +* error "455 Method Not Valid In This State". For the benefit of any +* intervening firewalls, a client must indicate the transport +* parameters even if it has no influence over these parameters, for +* example, where the server advertises a fixed multicast address. */ -enum SrsRtspTokenState +class SrsRtspSetupResponse : public SrsRtspResponse { - /** - * parse token failed, default state. - */ - SrsRtspTokenStateError = 100, - /** - * when SP follow the token. - */ - SrsRtspTokenStateNormal = 101, - /** - * when CRLF follow the token. - */ - SrsRtspTokenStateEOF = 102, +public: + // the client specified port. + int client_port_min; + int client_port_max; + // client will use the port in: + // [local_port_min, local_port_max) + int local_port_min; + int local_port_max; + // session. + std::string session; +public: + SrsRtspSetupResponse(int cseq); + virtual ~SrsRtspSetupResponse(); +protected: + virtual int encode_header(std::stringstream& ss); }; /**