Skip to content

Commit

Permalink
For #906, #902, use coroutine for reusable thread
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed May 29, 2017
1 parent 2ed2513 commit ea9a5f2
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 274 deletions.
19 changes: 9 additions & 10 deletions trunk/src/app/srs_app_async_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>

// the sleep interval in ms for http async callback.
#define SRS_AUTO_ASYNC_CALLBACL_CIMS 30

ISrsAsyncCallTask::ISrsAsyncCallTask()
{
}
Expand All @@ -41,13 +38,13 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()

SrsAsyncCallWorker::SrsAsyncCallWorker()
{
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_CIMS);
trd = NULL;
wait = st_cond_new();
}

SrsAsyncCallWorker::~SrsAsyncCallWorker()
{
srs_freep(pthread);
srs_freep(trd);

std::vector<ISrsAsyncCallTask*>::iterator it;
for (it = tasks.begin(); it != tasks.end(); ++it) {
Expand Down Expand Up @@ -76,29 +73,31 @@ int SrsAsyncCallWorker::count()

int SrsAsyncCallWorker::start()
{
return pthread->start();
srs_freep(trd);
trd = new SrsCoroutine("async", this, _srs_context->get_id());
return trd->start();
}

void SrsAsyncCallWorker::stop()
{
st_cond_signal(wait);
pthread->stop();
trd->stop();
}

int SrsAsyncCallWorker::cycle()
{
int ret = ERROR_SUCCESS;

while (pthread->can_loop()) {
while (!trd->pull()) {
if (tasks.empty()) {
st_cond_wait(wait);
}

std::vector<ISrsAsyncCallTask*> copies = tasks;
std::vector<ISrsAsyncCallTask*> copy = tasks;
tasks.clear();

std::vector<ISrsAsyncCallTask*>::iterator it;
for (it = copies.begin(); it != copies.end(); ++it) {
for (it = copy.begin(); it != copy.end(); ++it) {
ISrsAsyncCallTask* task = *it;
if ((ret = task->call()) != ERROR_SUCCESS) {
srs_warn("ignore async callback %s, ret=%d", task->to_string().c_str(), ret);
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_async_call.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ class ISrsAsyncCallTask
* when worker call with the task, the worker will do it in isolate thread.
* that is, the task is execute/call in async mode.
*/
class SrsAsyncCallWorker : public ISrsReusableThreadHandler
class SrsAsyncCallWorker : public ISrsCoroutineHandler
{
private:
SrsReusableThread* pthread;
SrsCoroutine* trd;
protected:
std::vector<ISrsAsyncCallTask*> tasks;
st_cond_t wait;
Expand Down
26 changes: 17 additions & 9 deletions trunk/src/app/srs_app_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,20 @@ using namespace std;

#ifdef SRS_AUTO_TRANSCODE

// when error, encoder sleep for a while and retry.
#define SRS_RTMP_ENCODER_CIMS (3000)

// for encoder to detect the dead loop
static std::vector<std::string> _transcoded_url;

SrsEncoder::SrsEncoder()
{
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_CIMS);
trd = NULL;
pprint = SrsPithyPrint::create_encoder();
}

SrsEncoder::~SrsEncoder()
{
on_unpublish();

srs_freep(pthread);
srs_freep(trd);
srs_freep(pprint);
}

Expand All @@ -76,24 +73,35 @@ int SrsEncoder::on_publish(SrsRequest* req)
}

// start thread to run all encoding engines.
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_freep(trd);
trd = new SrsCoroutine("encoder", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
srs_trace("encoder thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());

return ret;
}

void SrsEncoder::on_unpublish()
{
pthread->stop();
trd->stop();
clear_engines();
}

// when error, encoder sleep for a while and retry.
#define SRS_RTMP_ENCODER_CIMS (3000)

int SrsEncoder::cycle()
{
int ret = do_cycle();
int ret = ERROR_SUCCESS;

while (!trd->pull()) {
if ((ret = do_cycle()) != ERROR_SUCCESS) {
srs_warn("Encoder: Ignore error, ret=%d", ret);
}
st_usleep(SRS_RTMP_ENCODER_CIMS * 1000);
}

// kill ffmpeg when finished and it alive
std::vector<SrsFFMPEG*>::iterator it;
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class SrsFFMPEG;
* the encoder for a stream,
* may use multiple ffmpegs to transcode the specified stream.
*/
class SrsEncoder : public ISrsReusableThreadHandler
class SrsEncoder : public ISrsCoroutineHandler
{
private:
std::string input_stream_name;
std::vector<SrsFFMPEG*> ffmpegs;
private:
SrsReusableThread* pthread;
SrsCoroutine* trd;
SrsPithyPrint* pprint;
public:
SrsEncoder();
Expand Down
33 changes: 24 additions & 9 deletions trunk/src/app/srs_app_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_protocol_utility.hpp>

// when error, ingester sleep for a while and retry.
// ingest never sleep a long time, for we must start the stream ASAP.
#define SRS_AUTO_INGESTER_CIMS (3000)

SrsIngesterFFMPEG::SrsIngesterFFMPEG()
{
ffmpeg = NULL;
Expand Down Expand Up @@ -109,15 +105,15 @@ SrsIngester::SrsIngester()

expired = false;

pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS);
trd = NULL;
pprint = SrsPithyPrint::create_ingester();
}

SrsIngester::~SrsIngester()
{
_srs_config->unsubscribe(this);

srs_freep(pthread);
srs_freep(trd);
clear_engines();
}

Expand All @@ -144,18 +140,19 @@ int SrsIngester::start()
// for the reload may add more ingesters.

// start thread to run all encoding engines.
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_freep(trd);
trd = new SrsCoroutine("ingest", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());

return ret;
}

void SrsIngester::stop()
{
pthread->stop();
trd->stop();
clear_engines();
}

Expand All @@ -172,10 +169,28 @@ void SrsIngester::fast_stop()
}
}

// when error, ingester sleep for a while and retry.
// ingest never sleep a long time, for we must start the stream ASAP.
#define SRS_AUTO_INGESTER_CIMS (3000)

int SrsIngester::cycle()
{
int ret = ERROR_SUCCESS;

while (!trd->pull()) {
if ((ret = do_cycle()) != ERROR_SUCCESS) {
srs_warn("Ingester: Ignore error, ret=%d", ret);
}
st_usleep(SRS_AUTO_INGESTER_CIMS * 1000);
}

return ret;
}

int SrsIngester::do_cycle()
{
int ret = ERROR_SUCCESS;

// when expired, restart all ingesters.
if (expired) {
expired = false;
Expand Down
6 changes: 4 additions & 2 deletions trunk/src/app/srs_app_ingest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ class SrsIngesterFFMPEG
* encode with FFMPEG(optional),
* push to SRS(or any RTMP server) over RTMP.
*/
class SrsIngester : public ISrsReusableThreadHandler, public ISrsReloadHandler
class SrsIngester : public ISrsCoroutineHandler, public ISrsReloadHandler
{
private:
std::vector<SrsIngesterFFMPEG*> ingesters;
private:
SrsReusableThread* pthread;
SrsCoroutine* trd;
SrsPithyPrint* pprint;
// whether the ingesters are expired,
// for example, the listen port changed,
Expand All @@ -95,6 +95,8 @@ class SrsIngester : public ISrsReusableThreadHandler, public ISrsReloadHandler
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
private:
virtual int do_cycle();
private:
virtual void clear_engines();
virtual int parse();
Expand Down
19 changes: 12 additions & 7 deletions trunk/src/app/srs_app_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ using namespace std;

#ifdef SRS_AUTO_KAFKA

#define SRS_KAKFA_CIMS 3000
#define SRS_KAFKA_PRODUCER_TIMEOUT 30000
#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1

Expand Down Expand Up @@ -366,7 +365,7 @@ SrsKafkaProducer::SrsKafkaProducer()
metadata_expired = st_cond_new();

lock = st_mutex_new();
pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CIMS);
trd = NULL;
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();

Expand All @@ -380,7 +379,7 @@ SrsKafkaProducer::~SrsKafkaProducer()
srs_freep(lb);

srs_freep(worker);
srs_freep(pthread);
srs_freep(trd);
srs_freep(cache);

st_mutex_destroy(lock);
Expand Down Expand Up @@ -410,7 +409,9 @@ int SrsKafkaProducer::start()
return ret;
}

if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_freep(trd);
trd = new SrsCoroutine("kafka", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start kafka thread failed. ret=%d", ret);
}

Expand All @@ -425,7 +426,7 @@ void SrsKafkaProducer::stop()
return;
}

pthread->stop();
trd->stop();
worker->stop();
}

Expand Down Expand Up @@ -491,12 +492,16 @@ int SrsKafkaProducer::on_close(int key)
return worker->execute(new SrsKafkaMessage(this, key, obj));
}

#define SRS_KAKFA_CIMS 3000
int SrsKafkaProducer::cycle()
{
int ret = ERROR_SUCCESS;

if ((ret = do_cycle()) != ERROR_SUCCESS) {
srs_warn("ignore kafka error. ret=%d", ret);
while (!trd->pull()) {
if ((ret = do_cycle()) != ERROR_SUCCESS) {
srs_warn("ignore kafka error. ret=%d", ret);
}
st_usleep(SRS_KAKFA_CIMS * 1000);
}

return ret;
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_kafka.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ extern void srs_dispose_kafka();
/**
* the kafka producer used to save log to kafka cluster.
*/
class SrsKafkaProducer : virtual public ISrsReusableThreadHandler, virtual public ISrsKafkaCluster
class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISrsKafkaCluster
{
private:
// TODO: FIXME: support reload.
bool enabled;
st_mutex_t lock;
SrsReusableThread* pthread;
SrsCoroutine* trd;
private:
bool metadata_ok;
st_cond_t metadata_expired;
Expand Down
Loading

0 comments on commit ea9a5f2

Please sign in to comment.