Skip to content

Commit

Permalink
For #906, #902, remove the thread start and stop event
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed May 29, 2017
1 parent 3ffb098 commit b21f92f
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 67 deletions.
9 changes: 3 additions & 6 deletions trunk/src/app/srs_app_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,10 @@ int SrsConnection::cycle()
srs_warn("client disconnect peer. oret=%d, ret=%d", oret, ret);
}

return ERROR_SUCCESS;
}

void SrsConnection::on_thread_stop()
{
// TODO: FIXME: never remove itself, use isolate thread to do cleanup.
// Notify manager to remove it.
manager->remove(this);

return ERROR_SUCCESS;
}

int SrsConnection::srs_id()
Expand Down
6 changes: 0 additions & 6 deletions trunk/src/app/srs_app_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ class SrsConnection : virtual public ISrsConnection, virtual public ISrsOneCycle
* thread will invoke the on_thread_stop() when it terminated.
*/
virtual int cycle();
/**
* when the thread cycle finished, thread will invoke the on_thread_stop(),
* which will remove self from server, server will remove the connection from manager
* then delete the connection.
*/
virtual void on_thread_stop();
public:
/**
* get the srs id which identify the client.
Expand Down
26 changes: 15 additions & 11 deletions trunk/src/app/srs_app_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ void SrsEncoder::on_unpublish()
}

int SrsEncoder::cycle()
{
int ret = do_cycle();

// kill ffmpeg when finished and it alive
std::vector<SrsFFMPEG*>::iterator it;

for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
ffmpeg->stop();
}

return ret;
}

int SrsEncoder::do_cycle()
{
int ret = ERROR_SUCCESS;

Expand All @@ -118,17 +133,6 @@ int SrsEncoder::cycle()
return ret;
}

void SrsEncoder::on_thread_stop()
{
// kill ffmpeg when finished and it alive
std::vector<SrsFFMPEG*>::iterator it;

for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
ffmpeg->stop();
}
}

void SrsEncoder::clear_engines()
{
std::vector<SrsFFMPEG*>::iterator it;
Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class SrsEncoder : public ISrsReusableThreadHandler
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual int do_cycle();
private:
virtual void clear_engines();
virtual SrsFFMPEG* at(int index);
Expand Down
4 changes: 0 additions & 4 deletions trunk/src/app/srs_app_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ int SrsIngester::cycle()
return ret;
}

void SrsIngester::on_thread_stop()
{
}

void SrsIngester::clear_engines()
{
std::vector<SrsIngesterFFMPEG*>::iterator it;
Expand Down
1 change: 0 additions & 1 deletion trunk/src/app/srs_app_ingest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ class SrsIngester : public ISrsReusableThreadHandler, public ISrsReloadHandler
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual void clear_engines();
virtual int parse();
Expand Down
22 changes: 13 additions & 9 deletions trunk/src/app/srs_app_ng_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ void SrsNgExec::on_unpublish()
}

int SrsNgExec::cycle()
{
int ret = do_cycle();

std::vector<SrsProcess*>::iterator it;
for (it = exec_publishs.begin(); it != exec_publishs.end(); ++it) {
SrsProcess* ep = *it;
ep->stop();
}

return ret;
}

int SrsNgExec::do_cycle()
{
int ret = ERROR_SUCCESS;

Expand Down Expand Up @@ -110,15 +123,6 @@ int SrsNgExec::cycle()
return ret;
}

void SrsNgExec::on_thread_stop()
{
std::vector<SrsProcess*>::iterator it;
for (it = exec_publishs.begin(); it != exec_publishs.end(); ++it) {
SrsProcess* ep = *it;
ep->stop();
}
}

int SrsNgExec::parse_exec_publish(SrsRequest* req)
{
int ret = ERROR_SUCCESS;
Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_ng_exec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class SrsNgExec : public ISrsReusableThreadHandler
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual int do_cycle();
private:
virtual int parse_exec_publish(SrsRequest* req);
virtual void clear_exec_publish();
Expand Down
43 changes: 23 additions & 20 deletions trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@ int SrsRecvThread::cycle()
{
int ret = ERROR_SUCCESS;

// the multiple messages writev improve performance large,
// but the timeout recv will cause 33% sys call performance,
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/ossrs/srs/issues/194
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);

pumper->on_start();

ret = do_cycle();

// reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * 1000);

pumper->on_stop();

return ret;
}

int SrsRecvThread::do_cycle()
{
int ret = ERROR_SUCCESS;

while (!trd->interrupted()) {
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
Expand Down Expand Up @@ -129,26 +152,6 @@ int SrsRecvThread::cycle()
return ret;
}

void SrsRecvThread::on_thread_start()
{
// the multiple messages writev improve performance large,
// but the timeout recv will cause 33% sys call performance,
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/ossrs/srs/issues/194
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(SRS_CONSTS_NO_TMMS);

pumper->on_start();
}

void SrsRecvThread::on_thread_stop()
{
// reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * 1000);

pumper->on_stop();
}

SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
: trd(this, rtmp_sdk, timeout_ms)
{
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_recv_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class SrsRecvThread : public ISrsReusableThread2Handler
// interface ISrsReusableThread2Handler
public:
virtual int cycle();
virtual void on_thread_start();
virtual void on_thread_stop();
private:
virtual int do_cycle();
};

/**
Expand Down
7 changes: 2 additions & 5 deletions trunk/src/app/srs_app_rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,6 @@ int SrsRtspConn::cycle()
srs_warn("client disconnect peer. ret=%d", ret);
}

return ERROR_SUCCESS;
}

void SrsRtspConn::on_thread_stop()
{
if (video_rtp) {
caster->free_port(video_rtp->port(), video_rtp->port() + 1);
}
Expand All @@ -431,6 +426,8 @@ void SrsRtspConn::on_thread_stop()
}

caster->remove(this);

return ERROR_SUCCESS;
}

int SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts)
Expand Down
1 change: 0 additions & 1 deletion trunk/src/app/srs_app_rtsp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class SrsRtspConn : public ISrsOneCycleThreadHandler
// interface ISrsOneCycleThreadHandler
public:
virtual int cycle();
virtual void on_thread_stop();
private:
virtual int on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts);
virtual int on_rtp_audio(SrsRtpPacket* pkt, int64_t dts);
Expand Down

0 comments on commit b21f92f

Please sign in to comment.