Skip to content

Commit

Permalink
For #2188: Run hybrid server in thread.
Browse files Browse the repository at this point in the history
1. Create thread when execute by thread pool.
2. The primordial thread check all threads status.
3. Have not complete the cleanup and stop.
  • Loading branch information
winlinvip committed Mar 12, 2021
1 parent 59b6887 commit c1c9a06
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 10 deletions.
5 changes: 0 additions & 5 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ srs_error_t SrsHybridServer::initialize()
{
srs_error_t err = srs_success;

// init st
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}

if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}
Expand Down
118 changes: 116 additions & 2 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,149 @@
#include <srs_app_threads.hpp>

#include <srs_kernel_error.hpp>
#include <srs_app_config.hpp>
#include <srs_app_log.hpp>
#include <srs_core_autofree.hpp>

#include <unistd.h>

using namespace std;

SrsThreadMutex::SrsThreadMutex()
{
// https://michaelkerrisk.com/linux/man-pages/man3/pthread_mutex_init.3p.html
int r0 = pthread_mutex_init(&lock_, NULL);
srs_assert(!r0);
}

SrsThreadMutex::~SrsThreadMutex()
{
int r0 = pthread_mutex_destroy(&lock_);
srs_assert(!r0);
}

void SrsThreadMutex::lock()
{
// https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html
int r0 = pthread_mutex_lock(&lock_);
srs_assert(!r0);
}

void SrsThreadMutex::unlock()
{
int r0 = pthread_mutex_unlock(&lock_);
srs_assert(!r0);
}

SrsThreadEntry::SrsThreadEntry()
{
pool = NULL;
start = NULL;
arg = NULL;
num = 0;

err = srs_success;
}

SrsThreadPool::SrsThreadPool()
{
entry_ = NULL;
lock_ = new SrsThreadMutex();
}

SrsThreadPool::~SrsThreadPool()
{
srs_freep(lock_);
}

srs_error_t SrsThreadPool::initialize()
{
srs_error_t err = srs_success;

// TODO: FIXME: Should init ST for each thread.
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}

// Add primordial thread, current thread itself.
SrsThreadEntry* entry = new SrsThreadEntry();
threads_.push_back(entry);
entry_ = entry;

entry->pool = this;
entry->label = "primordial";
entry->start = NULL;
entry->arg = NULL;
entry->num = 1;

srs_trace("Thread #%d: %s init", entry_->num, entry_->label.c_str());

return err;
}

srs_error_t SrsThreadPool::execute(srs_error_t (*start)(void* arg), void* arg)
srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg), void* arg)
{
srs_error_t err = start(arg);
srs_error_t err = srs_success;

static int num = entry_->num + 1;

SrsThreadEntry* entry = new SrsThreadEntry();

if (true) {
SrsThreadLocker(lock_);
threads_.push_back(entry);
}

entry->pool = this;
entry->label = label;
entry->start = start;
entry->arg = arg;
entry->num = num++;

// https://man7.org/linux/man-pages/man3/pthread_create.3.html
pthread_t trd;
int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry);
if (r0 != 0) {
entry->err = srs_error_new(ERROR_THREAD_CREATE, "create thread %s", label.c_str());
return srs_error_copy(entry->err);
}

entry->trd = trd;

return err;
}

srs_error_t SrsThreadPool::run()
{
srs_error_t err = srs_success;

while (true) {
srs_trace("Thread #%d: %s run, threads=%d", entry_->num, entry_->label.c_str(),
(int)threads_.size());
sleep(60);
}

return err;
}

void SrsThreadPool::stop()
{
// TODO: FIXME: Implements it.
}

void* SrsThreadPool::start(void* arg)
{
srs_error_t err = srs_success;

SrsThreadEntry* entry = (SrsThreadEntry*)arg;
srs_trace("Thread #%d: %s run", entry->num, entry->label.c_str());

if ((err = entry->start(entry->arg)) != srs_success) {
entry->err = err;
}

// We do not use the return value, the err has been set to entry->err.
return NULL;
}

SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
67 changes: 65 additions & 2 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,85 @@

#include <srs_core.hpp>

#include <pthread.h>

#include <vector>
#include <string>

class SrsThreadPool;

// The thread mutex wrapper, without error.
class SrsThreadMutex
{
private:
pthread_mutex_t lock_;
public:
SrsThreadMutex();
virtual ~SrsThreadMutex();
public:
void lock();
void unlock();
};

// The thread mutex locker.
#define SrsThreadLocker(instance) \
impl__SrsThreadLocker _SRS_free_##instance(instance)

class impl__SrsThreadLocker
{
private:
SrsThreadMutex* lock;
public:
impl__SrsThreadLocker(SrsThreadMutex* l) {
lock = l;
lock->lock();
}
virtual ~impl__SrsThreadLocker() {
lock->unlock();
}
};

// The information for a thread.
class SrsThreadEntry
{
public:
SrsThreadPool* pool;
std::string label;
srs_error_t (*start)(void* arg);
void* arg;
int num;
public:
// The thread object.
pthread_t trd;
// The exit error of thread.
srs_error_t err;

SrsThreadEntry();
};

// Allocate a(or almost) fixed thread poll to execute tasks,
// so that we can take the advantage of multiple CPUs.
class SrsThreadPool
{
private:
SrsThreadEntry* entry_;
private:
SrsThreadMutex* lock_;
std::vector<SrsThreadEntry*> threads_;
public:
SrsThreadPool();
virtual ~SrsThreadPool();
public:
// Initialize the thread pool.
srs_error_t initialize();
// Execute start function in thread.
srs_error_t execute(srs_error_t (*start)(void* arg), void* arg);
// Execute start function with label in thread.
srs_error_t execute(std::string label, srs_error_t (*start)(void* arg), void* arg);
// Run in the primordial thread, util stop or quit.
srs_error_t run();
// Stop the thread pool and quit the primordial thread.
void stop();
private:
static void* start(void* arg);
};

// The global thread pool.
Expand Down
1 change: 1 addition & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
#define ERROR_SOCKET_SETREUSEADDR 1079
#define ERROR_SOCKET_SETCLOSEEXEC 1080
#define ERROR_SOCKET_ACCEPT 1081
#define ERROR_THREAD_CREATE 1082

///////////////////////////////////////////////////////
// RTMP protocol error.
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ srs_error_t run_in_thread_pool()
return srs_error_wrap(err, "init thread pool");
}

if ((err = _srs_thread_pool->execute(run_hybrid_server, NULL)) != srs_success) {
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "run hybrid server");
}

Expand Down

0 comments on commit c1c9a06

Please sign in to comment.