From 1351ac61119f177b4543d49c931f9f339c96a93b Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 12 Mar 2021 08:45:10 +0800 Subject: [PATCH] For #2188: Run hybrid server in thread. 1. Create thread when execute by thread pool. 2. The primordial thread check all threads status. 3. Have not complete the cleanup and stop. --- trunk/src/app/srs_app_hybrid.cpp | 5 -- trunk/src/app/srs_app_threads.cpp | 122 +++++++++++++++++++++++++- trunk/src/app/srs_app_threads.hpp | 46 +++++++++- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/main/srs_main_server.cpp | 2 +- 5 files changed, 166 insertions(+), 10 deletions(-) diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index ecfabddd9a..f04536f412 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -161,11 +161,6 @@ srs_error_t SrsHybridServer::initialize() { srs_error_t err = srs_success; - // init st - if ((err = srs_st_init()) != srs_success) { - return srs_error_wrap(err, "initialize st failed"); - } - if ((err = setup_ticks()) != srs_success) { return srs_error_wrap(err, "tick"); } diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 22a3dd088e..83803d4688 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -24,35 +24,153 @@ #include #include +#include +#include +#include + +#include + +using namespace std; + +SrsThreadLock::SrsThreadLock() +{ + // https://michaelkerrisk.com/linux/man-pages/man3/pthread_mutex_init.3p.html + int r0 = pthread_mutex_init(&lock_, NULL); + srs_assert(!r0); + + // https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html + r0 = pthread_mutex_lock(&lock_); + srs_assert(!r0); +} + +SrsThreadLock::~SrsThreadLock() +{ + int r0 = pthread_mutex_unlock(&lock_); + srs_assert(!r0); + + r0 = pthread_mutex_destroy(&lock_); + srs_assert(!r0); +} + +SrsThreadEntry::SrsThreadEntry() +{ + pool = NULL; + start = NULL; + arg = NULL; + num = 0; + + trd = NULL; + tid = 0; + err = srs_success; +} SrsThreadPool::SrsThreadPool() { + entry_ = NULL; } SrsThreadPool::~SrsThreadPool() { + // TODO: FIXME: Implements it. } srs_error_t SrsThreadPool::initialize() { srs_error_t err = srs_success; + + // TODO: FIXME: Should init ST for each thread. + if ((err = srs_st_init()) != srs_success) { + return srs_error_wrap(err, "initialize st failed"); + } + + // Add primordial thread, current thread itself. + SrsThreadEntry* entry = new SrsThreadEntry(); + threads_.push_back(entry); + entry_ = entry; + + entry->pool = this; + entry->label = "primordial"; + entry->start = NULL; + entry->arg = NULL; + entry->num = 1; + entry->trd = NULL; + + entry->tid = entry->num; +#ifndef SRS_OSX + entry->tid = gettid(); +#endif + + srs_trace("Thread #%d: %s init, tid=%d", entry_->num, entry_->label.c_str(), (int)entry_->tid); + return err; } -srs_error_t SrsThreadPool::execute(srs_error_t (*start)(void* arg), void* arg) +srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg), void* arg) { - srs_error_t err = start(arg); + srs_error_t err = srs_success; + + SrsThreadLock* lock = new SrsThreadLock(); + SrsAutoFree(SrsThreadLock, lock); + + static int num = entry_->num + 1; + + SrsThreadEntry* entry = new SrsThreadEntry(); + threads_.push_back(entry); + + entry->pool = this; + entry->label = label; + entry->start = start; + entry->arg = arg; + entry->num = num++; + + entry->tid = entry->num; +#ifndef SRS_OSX + entry->tid = gettid(); +#endif + + // https://man7.org/linux/man-pages/man3/pthread_create.3.html + pthread_t trd; + int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry); + if (r0 != 0) { + return srs_error_new(ERROR_THREAD_CREATE, "create thread %s", label.c_str()); + } + + entry->trd = trd; + return err; } srs_error_t SrsThreadPool::run() { srs_error_t err = srs_success; + + while (true) { + srs_trace("Thread #%d: %s run, tid=%d, threads=%d", entry_->num, entry_->label.c_str(), + (int)entry_->tid, (int)threads_.size()); + sleep(60); + } + return err; } void SrsThreadPool::stop() { + // TODO: FIXME: Implements it. +} + +void* SrsThreadPool::start(void* arg) +{ + srs_error_t err = srs_success; + + SrsThreadEntry* entry = (SrsThreadEntry*)arg; + srs_trace("Thread #%d: %s run, tid=%d", entry->num, entry->label.c_str(), (int)entry->tid); + + if ((err = entry->start(entry->arg)) != srs_success) { + entry->err = err; + } + + // We do not use the return value, the err has been set to entry->err. + return NULL; } SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 5aa12582d3..509b3bed25 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -26,22 +26,64 @@ #include +#include + +#include +#include + +class SrsThreadPool; + +// The thread lock. +class SrsThreadLock +{ +private: + pthread_mutex_t lock_; +public: + SrsThreadLock(); + virtual ~SrsThreadLock(); +}; + +// The information for a thread. +class SrsThreadEntry +{ +public: + SrsThreadPool* pool; + std::string label; + srs_error_t (*start)(void* arg); + void* arg; + int num; +public: + // The thread object. + pthread_t trd; + // The thread id of Linux. + pid_t tid; + // The exit error of thread. + srs_error_t err; + + SrsThreadEntry(); +}; + // Allocate a(or almost) fixed thread poll to execute tasks, // so that we can take the advantage of multiple CPUs. class SrsThreadPool { +private: + SrsThreadEntry* entry_; + std::vector threads_; public: SrsThreadPool(); virtual ~SrsThreadPool(); public: // Initialize the thread pool. srs_error_t initialize(); - // Execute start function in thread. - srs_error_t execute(srs_error_t (*start)(void* arg), void* arg); + // Execute start function with label in thread. + srs_error_t execute(std::string label, srs_error_t (*start)(void* arg), void* arg); // Run in the primordial thread, util stop or quit. srs_error_t run(); // Stop the thread pool and quit the primordial thread. void stop(); +private: + static void* start(void* arg); }; // The global thread pool. diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index de30407935..225d414097 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -118,6 +118,7 @@ #define ERROR_SOCKET_SETREUSEADDR 1079 #define ERROR_SOCKET_SETCLOSEEXEC 1080 #define ERROR_SOCKET_ACCEPT 1081 +#define ERROR_THREAD_CREATE 1082 /////////////////////////////////////////////////////// // RTMP protocol error. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index e3c2ceb7c7..b82e91c55f 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -469,7 +469,7 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "init thread pool"); } - if ((err = _srs_thread_pool->execute(run_hybrid_server, NULL)) != srs_success) { + if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { return srs_error_wrap(err, "run hybrid server"); }