diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 482def5aaa..af8062f61f 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -97,7 +97,7 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) void SrsAppCasterFlv::remove(ISrsResource* c) { - SrsTcpConnection* conn = dynamic_cast(c); + SrsHttpConn* conn = dynamic_cast(c); std::vector::iterator it; if ((it = std::find(conns.begin(), conns.end(), conn)) != conns.end()) { diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 1dd17b6d13..b32151f804 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -314,6 +314,14 @@ void SrsResourceManager::dispose(ISrsResource* c) srs_freep(c); } +ISrsExpire::ISrsExpire() +{ +} + +ISrsExpire::~ISrsExpire() +{ +} + ISrsStartableConneciton::ISrsStartableConneciton() { } @@ -329,7 +337,7 @@ SrsTcpConnection::SrsTcpConnection(ISrsResourceManager* cm, srs_netfd_t c, strin ip = cip; port = cport; create_time = srsu2ms(srs_get_system_time()); - + skt = new SrsStSocket(); clk = new SrsWallClock(); kbps = new SrsKbps(clk); @@ -507,4 +515,159 @@ void SrsTcpConnection::expire() trd->interrupt(); } +SrsTcpConnection2::SrsTcpConnection2(srs_netfd_t c) +{ + stfd = c; + skt = new SrsStSocket(); +} + +SrsTcpConnection2::~SrsTcpConnection2() +{ + srs_freep(skt); + srs_close_stfd(stfd); +} + +srs_error_t SrsTcpConnection2::initialize() +{ + srs_error_t err = srs_success; + + if ((err = skt->initialize(stfd)) != srs_success) { + return srs_error_wrap(err, "init socket"); + } + + return err; +} + +srs_error_t SrsTcpConnection2::set_tcp_nodelay(bool v) +{ + srs_error_t err = srs_success; + + int r0 = 0; + socklen_t nb_v = sizeof(int); + int fd = srs_netfd_fileno(stfd); + + int ov = 0; + if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0); + } + +#ifndef SRS_PERF_TCP_NODELAY + srs_warn("ignore TCP_NODELAY, fd=%d, ov=%d", fd, ov); + return err; +#endif + + int iv = (v? 1:0); + if ((r0 = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_NO_NODELAY, "setsockopt fd=%d, r0=%d", fd, r0); + } + if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0); + } + + srs_trace("set fd=%d TCP_NODELAY %d=>%d", fd, ov, iv); + + return err; +} + +srs_error_t SrsTcpConnection2::set_socket_buffer(srs_utime_t buffer_v) +{ + srs_error_t err = srs_success; + + int r0 = 0; + int fd = srs_netfd_fileno(stfd); + socklen_t nb_v = sizeof(int); + + int ov = 0; + if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ov, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0); + } + +#ifndef SRS_PERF_MW_SO_SNDBUF + srs_warn("ignore SO_SNDBUF, fd=%d, ov=%d", fd, ov); + return err; +#endif + + // the bytes: + // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536, + // 128KB=131072, 256KB=262144, 512KB=524288 + // the buffer should set to sleep*kbps/8, + // for example, your system delivery stream in 1000kbps, + // sleep 800ms for small bytes, the buffer should set to: + // 800*1000/8=100000B(about 128KB). + // other examples: + // 2000*3000/8=750000B(about 732KB). + // 2000*5000/8=1250000B(about 1220KB). + int kbps = 4000; + int iv = srsu2ms(buffer_v) * kbps / 8; + + // socket send buffer, system will double it. + iv = iv / 2; + + // override the send buffer by macro. +#ifdef SRS_PERF_SO_SNDBUF_SIZE + iv = SRS_PERF_SO_SNDBUF_SIZE / 2; +#endif + + // set the socket send buffer when required larger buffer + if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, nb_v) < 0) { + return srs_error_new(ERROR_SOCKET_SNDBUF, "setsockopt fd=%d, r0=%d", fd, r0); + } + if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, &nb_v)) != 0) { + return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0); + } + + srs_trace("set fd=%d, SO_SNDBUF=%d=>%d, buffer=%dms", fd, ov, iv, srsu2ms(buffer_v)); + + return err; +} + +void SrsTcpConnection2::set_recv_timeout(srs_utime_t tm) +{ + skt->set_recv_timeout(tm); +} + +srs_utime_t SrsTcpConnection2::get_recv_timeout() +{ + return skt->get_recv_timeout(); +} + +srs_error_t SrsTcpConnection2::read_fully(void* buf, size_t size, ssize_t* nread) +{ + return skt->read_fully(buf, size, nread); +} + +int64_t SrsTcpConnection2::get_recv_bytes() +{ + return skt->get_recv_bytes(); +} + +int64_t SrsTcpConnection2::get_send_bytes() +{ + return skt->get_send_bytes(); +} + +srs_error_t SrsTcpConnection2::read(void* buf, size_t size, ssize_t* nread) +{ + return skt->read(buf, size, nread); +} + +void SrsTcpConnection2::set_send_timeout(srs_utime_t tm) +{ + skt->set_send_timeout(tm); +} + +srs_utime_t SrsTcpConnection2::get_send_timeout() +{ + return skt->get_send_timeout(); +} + +srs_error_t SrsTcpConnection2::write(void* buf, size_t size, ssize_t* nwrite) +{ + return skt->write(buf, size, nwrite); +} + +srs_error_t SrsTcpConnection2::writev(const iovec *iov, int iov_size, ssize_t* nwrite) +{ + return skt->writev(iov, iov_size, nwrite); +} diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index fb39336669..ac0619fad3 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -106,6 +106,18 @@ class SrsResourceManager : virtual public ISrsCoroutineHandler, virtual public I void dispose(ISrsResource* c); }; +// If a connection is able to be expired, +// user can use HTTP-API to kick-off it. +class ISrsExpire +{ +public: + ISrsExpire(); + virtual ~ISrsExpire(); +public: + // Set connection to expired to kick-off it. + virtual void expire() = 0; +}; + // Interface for connection that is startable. class ISrsStartableConneciton : virtual public ISrsConnection , virtual public ISrsStartable, virtual public ISrsKbpsDelta @@ -119,7 +131,7 @@ class ISrsStartableConneciton : virtual public ISrsConnection // all connections accept from listener must extends from this base class, // server will add the connection to manager, and delete it when remove. class SrsTcpConnection : virtual public ISrsStartableConneciton - , virtual public ISrsReloadHandler, virtual public ISrsCoroutineHandler + , virtual public ISrsReloadHandler, virtual public ISrsCoroutineHandler, virtual public ISrsExpire { protected: // Each connection start a green thread, @@ -185,4 +197,38 @@ class SrsTcpConnection : virtual public ISrsStartableConneciton virtual srs_error_t do_cycle() = 0; }; +// The basic connection of SRS, for TCP based protocols, +// all connections accept from listener must extends from this base class, +// server will add the connection to manager, and delete it when remove. +class SrsTcpConnection2 : virtual public ISrsProtocolReadWriter +{ +private: + // The underlayer st fd handler. + srs_netfd_t stfd; + // The underlayer socket. + SrsStSocket* skt; +public: + SrsTcpConnection2(srs_netfd_t c); + virtual ~SrsTcpConnection2(); +public: + virtual srs_error_t initialize(); +public: + // Set socket option TCP_NODELAY. + virtual srs_error_t set_tcp_nodelay(bool v); + // Set socket option SO_SNDBUF in srs_utime_t. + virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); +// Interface ISrsProtocolReadWriter +public: + virtual void set_recv_timeout(srs_utime_t tm); + virtual srs_utime_t get_recv_timeout(); + virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread); + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); + virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_utime_t get_send_timeout(); + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); + virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); +}; + #endif diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index fea77eca3c..7bea05f90a 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -852,7 +852,7 @@ srs_error_t SrsGoApiClients::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa if (!client) { return srs_api_response_code(w, r, ERROR_RTMP_CLIENT_NOT_FOUND); } - + client->conn->expire(); srs_warn("kickoff client id=%s ok", client_id.c_str()); } else { @@ -1674,22 +1674,38 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } #endif -SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) - : SrsTcpConnection(cm, fd, cip, port) +SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int cport) { mux = m; cors = new SrsHttpCorsMux(); parser = new SrsHttpParser(); + + skt = new SrsTcpConnection2(fd); + manager = cm; + ip = cip; + port = cport; + create_time = srsu2ms(srs_get_system_time()); + clk = new SrsWallClock(); + kbps = new SrsKbps(clk); + kbps->set_io(skt, skt); + trd = new SrsSTCoroutine("api", this); _srs_config->subscribe(this); } SrsHttpApi::~SrsHttpApi() { + _srs_config->unsubscribe(this); + + trd->interrupt(); + srs_freep(trd); + srs_freep(parser); srs_freep(cors); - - _srs_config->unsubscribe(this); + + srs_freep(kbps); + srs_freep(clk); + srs_freep(skt); } std::string SrsHttpApi::desc() @@ -1699,7 +1715,7 @@ std::string SrsHttpApi::desc() void SrsHttpApi::remark(int64_t* in, int64_t* out) { - // TODO: FIXME: implements it + kbps->remark(in, out); } srs_error_t SrsHttpApi::do_cycle() @@ -1804,3 +1820,62 @@ srs_error_t SrsHttpApi::on_reload_http_api_crossdomain() return err; } +srs_error_t SrsHttpApi::start() +{ + srs_error_t err = srs_success; + + if ((err = skt->initialize()) != srs_success) { + return srs_error_wrap(err, "init socket"); + } + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; +} + +srs_error_t SrsHttpApi::cycle() +{ + srs_error_t err = do_cycle(); + + // Notify manager to remove it. + manager->remove(this); + + // success. + if (err == srs_success) { + srs_trace("client finished."); + return err; + } + + // It maybe success with message. + if (srs_error_code(err) == ERROR_SUCCESS) { + srs_trace("client finished%s.", srs_error_summary(err).c_str()); + srs_freep(err); + return err; + } + + // client close peer. + // TODO: FIXME: Only reset the error when client closed it. + if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. ret=%d", srs_error_code(err)); + } else if (srs_is_server_gracefully_close(err)) { + srs_warn("server disconnect. ret=%d", srs_error_code(err)); + } else { + srs_error("serve error %s", srs_error_desc(err).c_str()); + } + + srs_freep(err); + return srs_success; +} + +string SrsHttpApi::remote_ip() +{ + return ip; +} + +const SrsContextId& SrsHttpApi::get_id() +{ + return trd->cid(); +} + diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index da3fbfd3ce..ae4cb63d29 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -254,13 +254,33 @@ class SrsGoApiTcmalloc : public ISrsHttpHandler }; #endif -// TODO: FIXME: Refine arch, change to use SrsTcpConnection -class SrsHttpApi : virtual public SrsTcpConnection, virtual public ISrsReloadHandler +// Handle the HTTP API request. +class SrsHttpApi : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler + , virtual public ISrsCoroutineHandler { private: SrsHttpParser* parser; SrsHttpCorsMux* cors; SrsHttpServeMux* mux; +private: + SrsTcpConnection2* skt; + // Each connection start a green thread, + // when thread stop, the connection will be delete by server. + SrsCoroutine* trd; + // The manager object to manage the connection. + ISrsResourceManager* manager; + // The ip and port of client. + std::string ip; + int port; + // The connection total kbps. + // not only the rtmp or http connection, all type of connection are + // need to statistic the kbps of io. + // The SrsStatistic will use it indirectly to statistic the bytes delta of current connection. + SrsKbps* kbps; + SrsWallClock* clk; + // The create time in milliseconds. + // for current connection to log self create time and calculate the living time. + int64_t create_time; public: SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpApi(); @@ -270,13 +290,34 @@ class SrsHttpApi : virtual public SrsTcpConnection, virtual public ISrsReloadHan // Interface ISrsKbpsDelta public: virtual void remark(int64_t* in, int64_t* out); -protected: +private: virtual srs_error_t do_cycle(); private: virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_http_api_crossdomain(); +// Extract APIs from SrsTcpConnection. +// Interface ISrsStartable +public: + // Start the client green thread. + // when server get a client from listener, + // 1. server will create an concrete connection(for instance, RTMP connection), + // 2. then add connection to its connection manager, + // 3. start the client thread by invoke this start() + // when client cycle thread stop, invoke the on_thread_stop(), which will use server + // To remove the client by server->remove(this). + virtual srs_error_t start(); +// Interface ISrsOneCycleThreadHandler +public: + // The thread cycle function, + // when serve connection completed, terminate the loop which will terminate the thread, + // thread will invoke the on_thread_stop() when it terminated. + virtual srs_error_t cycle(); +// Interface ISrsConnection. +public: + virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); }; #endif diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index a0ca691255..d2161ed8c3 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -59,18 +59,38 @@ using namespace std; #include #include -SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) - : SrsTcpConnection(cm, fd, cip, port) +SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport) { parser = new SrsHttpParser(); cors = new SrsHttpCorsMux(); http_mux = m; + + skt = new SrsTcpConnection2(fd); + manager = cm; + ip = cip; + port = cport; + create_time = srsu2ms(srs_get_system_time()); + clk = new SrsWallClock(); + kbps = new SrsKbps(clk); + kbps->set_io(skt, skt); + trd = new SrsSTCoroutine("http", this); + + _srs_config->subscribe(this); } SrsHttpConn::~SrsHttpConn() { + _srs_config->unsubscribe(this); + + trd->interrupt(); + srs_freep(trd); + srs_freep(parser); srs_freep(cors); + + srs_freep(kbps); + srs_freep(clk); + srs_freep(skt); } std::string SrsHttpConn::desc() @@ -80,7 +100,7 @@ std::string SrsHttpConn::desc() void SrsHttpConn::remark(int64_t* in, int64_t* out) { - // TODO: FIXME: implements it + kbps->remark(in, out); } srs_error_t SrsHttpConn::do_cycle() @@ -190,26 +210,94 @@ srs_error_t SrsHttpConn::on_reload_http_stream_crossdomain() return err; } -SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) +srs_error_t SrsHttpConn::set_tcp_nodelay(bool v) { + return skt->set_tcp_nodelay(v); } -SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn() +srs_error_t SrsHttpConn::set_socket_buffer(srs_utime_t buffer_v) { + return skt->set_socket_buffer(buffer_v); } -srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) +srs_error_t SrsHttpConn::start() { srs_error_t err = srs_success; - - SrsStSocket skt; - if ((err = skt.initialize(stfd)) != srs_success) { + if ((err = skt->initialize()) != srs_success) { return srs_error_wrap(err, "init socket"); } + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; +} + +srs_error_t SrsHttpConn::cycle() +{ + srs_error_t err = do_cycle(); + + // Notify manager to remove it. + manager->remove(this); + + // success. + if (err == srs_success) { + srs_trace("client finished."); + return err; + } + + // It maybe success with message. + if (srs_error_code(err) == ERROR_SUCCESS) { + srs_trace("client finished%s.", srs_error_summary(err).c_str()); + srs_freep(err); + return err; + } + + // client close peer. + // TODO: FIXME: Only reset the error when client closed it. + if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. ret=%d", srs_error_code(err)); + } else if (srs_is_server_gracefully_close(err)) { + srs_warn("server disconnect. ret=%d", srs_error_code(err)); + } else { + srs_error("serve error %s", srs_error_desc(err).c_str()); + } + + srs_freep(err); + return srs_success; +} + +string SrsHttpConn::remote_ip() +{ + return ip; +} + +const SrsContextId& SrsHttpConn::get_id() +{ + return trd->cid(); +} + +void SrsHttpConn::expire() +{ + trd->interrupt(); +} + +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) +{ +} + +SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn() +{ +} + +srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) +{ + srs_error_t err = srs_success; + // Check user interrupt by interval. - skt.set_recv_timeout(3 * SRS_UTIME_SECONDS); + skt->set_recv_timeout(3 * SRS_UTIME_SECONDS); // drop all request body. char body[4096]; @@ -218,7 +306,7 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) return srs_error_wrap(err, "timeout"); } - if ((err = skt.read(body, 4096, NULL)) != srs_success) { + if ((err = skt->read(body, 4096, NULL)) != srs_success) { // Because we use timeout to check trd state, so we should ignore any timeout. if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) { srs_freep(err); @@ -243,7 +331,8 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) return err; } - // drop all request body. + // Drop all request body. + // TODO: Should we set timeout for max reading? char body[4096]; while (!br->eof()) { if ((err = br->read(body, 4096, NULL)) != srs_success) { diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 703fda65f5..234db80279 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -55,13 +55,33 @@ class SrsHttpStreamServer; class SrsHttpStaticServer; // The http connection which request the static or stream content. -// TODO: FIXME: Refine arch, change to use SrsTcpConnection -class SrsHttpConn : public SrsTcpConnection +class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler + , virtual public ISrsCoroutineHandler, virtual public ISrsExpire { protected: SrsHttpParser* parser; ISrsHttpServeMux* http_mux; SrsHttpCorsMux* cors; +protected: + SrsTcpConnection2* skt; + // Each connection start a green thread, + // when thread stop, the connection will be delete by server. + SrsCoroutine* trd; + // The ip and port of client. + std::string ip; + int port; +private: + // The manager object to manage the connection. + ISrsResourceManager* manager; + // The connection total kbps. + // not only the rtmp or http connection, all type of connection are + // need to statistic the kbps of io. + // The SrsStatistic will use it indirectly to statistic the bytes delta of current connection. + SrsKbps* kbps; + SrsWallClock* clk; + // The create time in milliseconds. + // for current connection to log self create time and calculate the living time. + int64_t create_time; public: SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpConn(); @@ -87,6 +107,35 @@ class SrsHttpConn : public SrsTcpConnection // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_http_stream_crossdomain(); +// Extract APIs from SrsTcpConnection. +public: + // Set socket option TCP_NODELAY. + virtual srs_error_t set_tcp_nodelay(bool v); + // Set socket option SO_SNDBUF in srs_utime_t. + virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); +// Interface ISrsStartable +public: + // Start the client green thread. + // when server get a client from listener, + // 1. server will create an concrete connection(for instance, RTMP connection), + // 2. then add connection to its connection manager, + // 3. start the client thread by invoke this start() + // when client cycle thread stop, invoke the on_thread_stop(), which will use server + // To remove the client by server->remove(this). + virtual srs_error_t start(); +// Interface ISrsOneCycleThreadHandler +public: + // The thread cycle function, + // when serve connection completed, terminate the loop which will terminate the thread, + // thread will invoke the on_thread_stop() when it terminated. + virtual srs_error_t cycle(); +// Interface ISrsConnection. +public: + virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); +// Interface ISrsExpire. +public: + virtual void expire(); }; // Drop body of request, only process the response. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 209f4022e1..b487b2d593 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -104,10 +104,20 @@ SrsClientInfo::~SrsClientInfo() srs_freep(res); } -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port) - : SrsTcpConnection(svr, c, cip, port) +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int cport) { server = svr; + + stfd = c; + skt = new SrsTcpConnection2(c); + manager = svr; + ip = cip; + port = cport; + create_time = srsu2ms(srs_get_system_time()); + clk = new SrsWallClock(); + kbps = new SrsKbps(clk); + kbps->set_io(skt, skt); + trd = new SrsSTCoroutine("rtmp", this); rtmp = new SrsRtmpServer(skt); refer = new SrsRefer(); @@ -132,6 +142,17 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int port) SrsRtmpConn::~SrsRtmpConn() { _srs_config->unsubscribe(this); + + trd->interrupt(); + // wakeup the handler which need to notice. + if (wakable) { + wakable->wakeup(); + } + srs_freep(trd); + + srs_freep(kbps); + srs_freep(clk); + srs_freep(skt); srs_freep(info); srs_freep(rtmp); @@ -145,16 +166,6 @@ std::string SrsRtmpConn::desc() return "RtmpConn"; } -void SrsRtmpConn::dispose() -{ - SrsTcpConnection::dispose(); - - // wakeup the handler which need to notice. - if (wakable) { - wakable->wakeup(); - } -} - // TODO: return detail message when error for client. srs_error_t SrsRtmpConn::do_cycle() { @@ -276,7 +287,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost) mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); mw_sleep = _srs_config->get_mw_sleep(req->vhost); - set_socket_buffer(mw_sleep); + skt->set_socket_buffer(mw_sleep); return err; } @@ -314,7 +325,7 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost) mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); mw_sleep = _srs_config->get_mw_sleep(req->vhost); - set_socket_buffer(mw_sleep); + skt->set_socket_buffer(mw_sleep); return err; } @@ -716,7 +727,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // when mw_sleep changed, resize the socket send buffer. mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); mw_sleep = _srs_config->get_mw_sleep(req->vhost); - set_socket_buffer(mw_sleep); + skt->set_socket_buffer(mw_sleep); // initialize the send_min_interval send_min_interval = _srs_config->get_send_min_interval(req->vhost); @@ -1137,7 +1148,7 @@ void SrsRtmpConn::set_sock_options() if (nvalue != tcp_nodelay) { tcp_nodelay = nvalue; - srs_error_t err = set_tcp_nodelay(tcp_nodelay); + srs_error_t err = skt->set_tcp_nodelay(tcp_nodelay); if (err != srs_success) { srs_warn("ignore err %s", srs_error_desc(err).c_str()); srs_freep(err); @@ -1411,3 +1422,67 @@ void SrsRtmpConn::http_hooks_on_stop() return; } +srs_error_t SrsRtmpConn::start() +{ + srs_error_t err = srs_success; + + if ((err = skt->initialize()) != srs_success) { + return srs_error_wrap(err, "init socket"); + } + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "coroutine"); + } + + return err; +} + +srs_error_t SrsRtmpConn::cycle() +{ + srs_error_t err = do_cycle(); + + // Notify manager to remove it. + manager->remove(this); + + // success. + if (err == srs_success) { + srs_trace("client finished."); + return err; + } + + // It maybe success with message. + if (srs_error_code(err) == ERROR_SUCCESS) { + srs_trace("client finished%s.", srs_error_summary(err).c_str()); + srs_freep(err); + return err; + } + + // client close peer. + // TODO: FIXME: Only reset the error when client closed it. + if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. ret=%d", srs_error_code(err)); + } else if (srs_is_server_gracefully_close(err)) { + srs_warn("server disconnect. ret=%d", srs_error_code(err)); + } else { + srs_error("serve error %s", srs_error_desc(err).c_str()); + } + + srs_freep(err); + return srs_success; +} + +string SrsRtmpConn::remote_ip() +{ + return ip; +} + +const SrsContextId& SrsRtmpConn::get_id() +{ + return trd->cid(); +} + +void SrsRtmpConn::expire() +{ + trd->interrupt(); +} + diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 8661c89382..138dd1ab49 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -83,8 +83,8 @@ class SrsClientInfo }; // The client provides the main logic control for RTMP clients. -// TODO: FIXME: Refine arch, change to use SrsTcpConnection -class SrsRtmpConn : virtual public SrsTcpConnection, virtual public ISrsReloadHandler +class SrsRtmpConn : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler + , virtual public ISrsCoroutineHandler, virtual public ISrsExpire { // For the thread to directly access any field of connection. friend class SrsPublishRecvThread; @@ -117,14 +117,32 @@ class SrsRtmpConn : virtual public SrsTcpConnection, virtual public ISrsReloadHa bool tcp_nodelay; // About the rtmp client. SrsClientInfo* info; +private: + srs_netfd_t stfd; + SrsTcpConnection2* skt; + // Each connection start a green thread, + // when thread stop, the connection will be delete by server. + SrsCoroutine* trd; + // The manager object to manage the connection. + ISrsResourceManager* manager; + // The ip and port of client. + std::string ip; + int port; + // The connection total kbps. + // not only the rtmp or http connection, all type of connection are + // need to statistic the kbps of io. + // The SrsStatistic will use it indirectly to statistic the bytes delta of current connection. + SrsKbps* kbps; + SrsWallClock* clk; + // The create time in milliseconds. + // for current connection to log self create time and calculate the living time. + int64_t create_time; public: SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip, int port); virtual ~SrsRtmpConn(); // Interface ISrsResource. public: virtual std::string desc(); -public: - virtual void dispose(); protected: virtual srs_error_t do_cycle(); // Interface ISrsReloadHandler @@ -167,6 +185,30 @@ class SrsRtmpConn : virtual public SrsTcpConnection, virtual public ISrsReloadHa virtual void http_hooks_on_unpublish(); virtual srs_error_t http_hooks_on_play(); virtual void http_hooks_on_stop(); +// Extract APIs from SrsTcpConnection. +// Interface ISrsStartable +public: + // Start the client green thread. + // when server get a client from listener, + // 1. server will create an concrete connection(for instance, RTMP connection), + // 2. then add connection to its connection manager, + // 3. start the client thread by invoke this start() + // when client cycle thread stop, invoke the on_thread_stop(), which will use server + // To remove the client by server->remove(this). + virtual srs_error_t start(); +// Interface ISrsOneCycleThreadHandler +public: + // The thread cycle function, + // when serve connection completed, terminate the loop which will terminate the thread, + // thread will invoke the on_thread_stop() when it terminated. + virtual srs_error_t cycle(); +// Interface ISrsConnection. +public: + virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); +// Interface ISrsExpire. +public: + virtual void expire(); }; #endif diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index b24583d06b..fcf0314472 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -422,7 +422,7 @@ void SrsStatistic::on_stream_close(SrsRequest* req) } } -srs_error_t SrsStatistic::on_client(SrsContextId cid, SrsRequest* req, SrsTcpConnection* conn, SrsRtmpConnType type) +srs_error_t SrsStatistic::on_client(SrsContextId cid, SrsRequest* req, ISrsExpire* conn, SrsRtmpConnType type) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 3320fac872..edd6ac8211 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -36,7 +36,7 @@ class SrsKbps; class SrsWallClock; class SrsRequest; -class SrsTcpConnection; +class ISrsExpire; class SrsJsonObject; class SrsJsonArray; class ISrsKbpsDelta; @@ -110,8 +110,8 @@ struct SrsStatisticStream struct SrsStatisticClient { public: + ISrsExpire* conn; SrsStatisticStream* stream; - SrsTcpConnection* conn; SrsRequest* req; SrsRtmpConnType type; std::string id; @@ -206,7 +206,7 @@ class SrsStatistic : public ISrsProtocolPerf // @param conn, the physical absract connection object. // @param type, the type of connection. // TODO: FIXME: We should not use context id as client id. - virtual srs_error_t on_client(SrsContextId id, SrsRequest* req, SrsTcpConnection* conn, SrsRtmpConnType type); + virtual srs_error_t on_client(SrsContextId id, SrsRequest* req, ISrsExpire* conn, SrsRtmpConnType type); // Client disconnect // @remark the on_disconnect always call, while the on_client is call when // only got the request object, so the client specified by id maybe not