From 6718c3843e10bc747f1b67957b17ddfa1bf78189 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 14 Mar 2021 10:42:09 +0800 Subject: [PATCH] Threads-Log: Refine dual queue for log thread. 1. App/User controls the interval to flush coroutine-queue. 2. Use srs_update_system_time to get time for log. 3. Stat the thread sync in us, in SrsThreadPool. 4. Change default interval for thread to 5s. --- trunk/conf/full.conf | 4 +- trunk/src/app/srs_app_config.cpp | 2 +- trunk/src/app/srs_app_log.cpp | 16 ++++++- trunk/src/app/srs_app_log.hpp | 5 +++ trunk/src/app/srs_app_threads.cpp | 60 +++++++++++++++++++------- trunk/src/app/srs_app_threads.hpp | 12 ++---- trunk/src/protocol/srs_service_log.cpp | 6 +-- 7 files changed, 73 insertions(+), 32 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 419c5bb802..d8f942757a 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -119,8 +119,8 @@ tcmalloc_release_rate 0.8; # For thread pool. threads { # The thread pool manager cycle interval, in seconds. - # Default: 60 - interval 60; + # Default: 5 + interval 5; } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index c8d9bc5cf6..8f6e36daa1 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4107,7 +4107,7 @@ double SrsConfig::tcmalloc_release_rate() srs_utime_t SrsConfig::get_threads_interval() { - static srs_utime_t DEFAULT = 60 * SRS_UTIME_SECONDS; + static srs_utime_t DEFAULT = 5 * SRS_UTIME_SECONDS; SrsConfDirective* conf = root->get("threads"); if (!conf) { diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 620833a270..fe234d7635 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -54,6 +54,9 @@ SrsFileLog::SrsFileLog() log_data = new char[LOG_MAX_SIZE]; writer_ = NULL; + + last_flush_time_ = srs_get_system_time(); + interval_ = 0; } SrsFileLog::~SrsFileLog() @@ -71,6 +74,7 @@ srs_error_t SrsFileLog::initialize() filename_ = _srs_config->get_log_file(); level = srs_get_log_level(_srs_config->get_log_level()); utc = _srs_config->get_utc_time(); + interval_ = _srs_config->srs_log_flush_interval(); } if (!log_to_file_tank) { @@ -202,11 +206,12 @@ void SrsFileLog::write_log(char *str_log, int size, int level) // ensure the tail and EOF of string // LOG_TAIL_SIZE for the TAIL char. // 1 for the last char(0). - size = srs_min(LOG_MAX_SIZE - 1 - LOG_TAIL_SIZE, size); + size = srs_min(LOG_MAX_SIZE - 2 - LOG_TAIL_SIZE, size); // add some to the end of char. str_log[size++] = LOG_TAIL; - + str_log[size] = 0; + // if not to file, to console and return. if (!log_to_file_tank) { // if is error msg, then print color msg. @@ -230,5 +235,12 @@ void SrsFileLog::write_log(char *str_log, int size, int level) if ((err = writer_->write(str_log, size, NULL)) != srs_success) { srs_error_reset(err); // Ignore any error for log writing. } + + // Whether flush to thread-queue. + srs_utime_t diff = srs_get_system_time() - last_flush_time_; + if (diff >= interval_) { + last_flush_time_ = srs_get_system_time(); + writer_->flush_co_queue(); + } } diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index 14c8ea73b1..56406d6d59 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -52,6 +52,11 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler char* log_data; // Async file writer. SrsAsyncFileWriter* writer_; +private: + // The interval to flush from coroutine-queue to thread-queue. + srs_utime_t interval_; + // Last flush coroutine-queue time, to calculate the timeout. + srs_utime_t last_flush_time_; private: // Defined in SrsLogLevel. SrsLogLevel level; diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index fa278b24e1..b809057603 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -33,6 +33,13 @@ using namespace std; +#include + +SrsPps* _srs_thread_sync_10us = new SrsPps(); +SrsPps* _srs_thread_sync_100us = new SrsPps(); +SrsPps* _srs_thread_sync_1000us = new SrsPps(); +SrsPps* _srs_thread_sync_plus = new SrsPps(); + SrsThreadMutex::SrsThreadMutex() { // https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html @@ -171,9 +178,19 @@ srs_error_t SrsThreadPool::run() while (true) { sleep(interval_ / SRS_UTIME_SECONDS); + static char buf[128]; string async_logs = _srs_async_log->description(); - srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(), - async_logs.c_str()); + + string sync_desc; + _srs_thread_sync_10us->update(); _srs_thread_sync_100us->update(); + _srs_thread_sync_1000us->update(); _srs_thread_sync_plus->update(); + if (_srs_thread_sync_10us->r10s() || _srs_thread_sync_100us->r10s() || _srs_thread_sync_1000us->r10s() || _srs_thread_sync_plus->r10s()) { + snprintf(buf, sizeof(buf), ", sync=%d,%d,%d,%d", _srs_thread_sync_10us->r10s(), _srs_thread_sync_100us->r10s(), _srs_thread_sync_1000us->r10s(), _srs_thread_sync_plus->r10s()); + sync_desc = buf; + } + + srs_trace("Thread: cycle threads=%d%s%s", (int)threads_.size(), + async_logs.c_str(), sync_desc.c_str()); } return err; @@ -201,14 +218,12 @@ void* SrsThreadPool::start(void* arg) SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); -SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval) +SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) { filename_ = p; writer_ = new SrsFileWriter(); queue_ = new SrsThreadQueue(); co_queue_ = new SrsCoroutineQueue(); - interval_ = interval; - last_flush_time_ = srs_get_system_time(); } // TODO: FIXME: Before free the writer, we must remove it from the manager. @@ -251,15 +266,6 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite) co_queue_->push_back(msg); - // Whether flush to thread-queue. - if (srs_get_system_time() - last_flush_time_ >= interval_) { - last_flush_time_ = srs_get_system_time(); - - vector flying; - co_queue_->swap(flying); - queue_->push_back(flying); - } - if (pnwrite) { *pnwrite = count; } @@ -287,6 +293,28 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn return err; } +void SrsAsyncFileWriter::flush_co_queue() +{ + srs_utime_t now = srs_update_system_time(); + + if (true) { + vector flying; + co_queue_->swap(flying); + queue_->push_back(flying); + } + + srs_utime_t elapsed = srs_update_system_time() - now; + if (elapsed <= 10) { + ++_srs_thread_sync_10us->sugar; + } else if (elapsed <= 100) { + ++_srs_thread_sync_100us->sugar; + } else if (elapsed <= 1000) { + ++_srs_thread_sync_1000us->sugar; + } else { + ++_srs_thread_sync_plus->sugar; + } +} + srs_error_t SrsAsyncFileWriter::flush() { srs_error_t err = srs_success; @@ -362,7 +390,7 @@ srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFile { srs_error_t err = srs_success; - SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_); + SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename); writers_.push_back(writer); if ((err = writer->open()) != srs_success) { @@ -404,7 +432,7 @@ std::string SrsAsyncLogManager::description() } static char buf[128]; - snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d", + snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d", (int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs); return buf; diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 624d94e95e..497cf05ccc 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -201,14 +201,10 @@ class SrsAsyncFileWriter : public ISrsWriter // The thread-queue, to flush to disk by dedicated thread. SrsThreadQueue* queue_; private: - // The interval to flush from coroutine-queue to thread-queue. - srs_utime_t interval_; - // Last flush coroutine-queue time, to calculate the timeout. - srs_utime_t last_flush_time_; // The coroutine-queue, to avoid requires lock for each log. SrsCoroutineQueue* co_queue_; private: - SrsAsyncFileWriter(std::string p, srs_utime_t interval); + SrsAsyncFileWriter(std::string p); virtual ~SrsAsyncFileWriter(); public: // Open file writer, in truncate mode. @@ -222,7 +218,9 @@ class SrsAsyncFileWriter : public ISrsWriter virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite); virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite); public: - // Flush by other thread. + // Flush coroutine-queue to thread-queue, avoid requiring lock for each message. + void flush_co_queue(); + // Flush thread-queue to disk, generally by dedicated thread. srs_error_t flush(); }; @@ -233,8 +231,6 @@ class SrsAsyncLogManager private: // The async flush interval. srs_utime_t interval_; - // The number of logs to flush from coroutine-queue to thread-queue. - int flush_co_queue_; private: // The async reopen event. bool reopen_; diff --git a/trunk/src/protocol/srs_service_log.cpp b/trunk/src/protocol/srs_service_log.cpp index d97c7c06d4..5420712c6d 100644 --- a/trunk/src/protocol/srs_service_log.cpp +++ b/trunk/src/protocol/srs_service_log.cpp @@ -244,9 +244,9 @@ bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char { // clock time timeval tv; - if (gettimeofday(&tv, NULL) == -1) { - return false; - } + srs_utime_t now = srs_update_system_time(); + tv.tv_sec = now / SRS_UTIME_SECONDS; + tv.tv_usec = now % SRS_UTIME_SECONDS; // to calendar time struct tm* tm;