From 57b771a4ae15e541dc235e0adaf56f5102081176 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 19 Mar 2021 11:30:46 +0800 Subject: [PATCH] Threads: Refine variables and do dispose 1. Rename packets to srtp or received packets. 2. Add task dispose API, cleanup in future. 3. If got packets before init AsyncSRTP, return error. 4. Never free the SRTPTask, dispose it instead. --- trunk/src/app/srs_app_threads.cpp | 140 ++++++++++++++++++++---------- trunk/src/app/srs_app_threads.hpp | 17 ++-- 2 files changed, 105 insertions(+), 52 deletions(-) diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 2d3ee0a860..fd6a02ebc7 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -453,7 +453,7 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) { filename_ = p; writer_ = new SrsFileWriter(); - queue_ = new SrsThreadQueue(); + chunks_ = new SrsThreadQueue(); } // TODO: FIXME: Before free the writer, we must remove it from the manager. @@ -461,7 +461,7 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter() { // TODO: FIXME: Should we flush dirty logs? srs_freep(writer_); - srs_freep(queue_); + srs_freep(chunks_); } srs_error_t SrsAsyncFileWriter::open() @@ -493,7 +493,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite) SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); msg->wrap(cp, count); - queue_->push_back(msg); + chunks_->push_back(msg); if (pnwrite) { *pnwrite = count; @@ -530,9 +530,9 @@ srs_error_t SrsAsyncFileWriter::flush() // at queue to push_back or swap all messages. srs_utime_t now = srs_update_system_time(); - vector flying; + vector flying_chunks; if (true) { - queue_->swap(flying); + chunks_->swap(flying_chunks); } // Stat the sync wait of locks. @@ -547,9 +547,9 @@ srs_error_t SrsAsyncFileWriter::flush() ++_srs_thread_sync_plus->sugar; } - // Flush the flying messages to disk. - for (int i = 0; i < (int)flying.size(); i++) { - SrsSharedPtrMessage* msg = flying.at(i); + // Flush the chunks to disk. + for (int i = 0; i < (int)flying_chunks.size(); i++) { + SrsSharedPtrMessage* msg = flying_chunks.at(i); srs_error_t r0 = writer_->write(msg->payload, msg->size, NULL); @@ -639,7 +639,7 @@ std::string SrsAsyncLogManager::description() for (int i = 0; i < (int)writers_.size(); i++) { SrsAsyncFileWriter* writer = writers_.at(i); - int nn = (int)writer->queue_->size(); + int nn = (int)writer->chunks_->size(); nn_logs += nn; max_logs = srs_max(max_logs, nn); } @@ -706,8 +706,7 @@ SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport) SrsAsyncSRTP::~SrsAsyncSRTP() { - // TODO: FIXME: Check it carefully. - _srs_async_srtp->remove_task(task_); + _srs_async_srtp->on_srtp_codec_destroy(task_); } srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key) @@ -728,18 +727,30 @@ srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key) srs_error_t SrsAsyncSRTP::protect_rtp(void* packet, int* nb_cipher) { + if (!task_) { + return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready"); + } + // TODO: FIMXE: Remove it. return SrsSRTP::protect_rtp(packet, nb_cipher); } srs_error_t SrsAsyncSRTP::protect_rtcp(void* packet, int* nb_cipher) { + if (!task_) { + return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready"); + } + // TODO: FIMXE: Remove it. return SrsSRTP::protect_rtcp(packet, nb_cipher); } srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext) { + if (!task_) { + return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready"); + } + int nb_cipher = *nb_plaintext; char* buf = new char[nb_cipher]; memcpy(buf, packet, nb_cipher); @@ -758,6 +769,10 @@ srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext) srs_error_t SrsAsyncSRTP::unprotect_rtcp(void* packet, int* nb_plaintext) { + if (!task_) { + return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready"); + } + // TODO: FIMXE: Remove it. return SrsSRTP::unprotect_rtcp(packet, nb_plaintext); } @@ -766,6 +781,7 @@ SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec) { codec_ = codec; impl_ = new SrsSRTP(); + disposing_ = false; } SrsAsyncSRTPTask::~SrsAsyncSRTPTask() @@ -784,10 +800,25 @@ srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_ return err; } +void SrsAsyncSRTPTask::dispose() +{ + // TODO: FIXME: Do cleanup in future. + // TODO: FIXME: Memory leak here, use lazy free to avoid lock for each packet. + disposing_ = true; + + // It's safe to set the codec to NULl, because it has been freed. + codec_ = NULL; +} + srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt) { srs_error_t err = srs_success; + // It's safe, because here we do not use the codec. + if (disposing_) { + return err; + } + if (pkt->do_decrypt_) { if (pkt->is_rtp_) { pkt->nb_consumed_ = pkt->msg_->size; @@ -801,8 +832,30 @@ srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt) return err; } +srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt) +{ + srs_error_t err = srs_success; + + // It's safe, because the dispose and consume are in the same thread hybrid. + if (disposing_) { + return err; + } + + char* payload = pkt->msg_->payload; + + if (pkt->do_decrypt_) { + if (pkt->is_rtp_) { + err = codec_->transport_->on_rtp_plaintext(payload, pkt->nb_consumed_); + } + } + + return err; +} + SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task) { + srs_assert(task); + task_ = task; msg_ = new SrsSharedPtrMessage(); is_rtp_ = false; @@ -818,7 +871,7 @@ SrsAsyncSRTPPacket::~SrsAsyncSRTPPacket() SrsAsyncSRTPManager::SrsAsyncSRTPManager() { lock_ = new SrsThreadMutex(); - packets_ = new SrsThreadQueue(); + srtp_packets_ = new SrsThreadQueue(); cooked_packets_ = new SrsThreadQueue(); trd_ = new SrsFastCoroutine("srtp", this); } @@ -829,7 +882,7 @@ SrsAsyncSRTPManager::~SrsAsyncSRTPManager() srs_freep(trd_); srs_freep(lock_); - srs_freep(packets_); + srs_freep(srtp_packets_); srs_freep(cooked_packets_); vector::iterator it; @@ -849,7 +902,7 @@ void SrsAsyncSRTPManager::register_task(SrsAsyncSRTPTask* task) tasks_.push_back(task); } -void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task) +void SrsAsyncSRTPManager::on_srtp_codec_destroy(SrsAsyncSRTPTask* task) { if (!task) { return; @@ -859,19 +912,21 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task) vector::iterator it; if ((it = std::find(tasks_.begin(), tasks_.end(), task)) != tasks_.end()) { tasks_.erase(it); - srs_freep(task); + + // TODO: FIXME: Do cleanup in future. + task->dispose(); } } // TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer. void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt) { - packets_->push_back(pkt); + srtp_packets_->push_back(pkt); } int SrsAsyncSRTPManager::size() { - return packets_->size(); + return srtp_packets_->size(); } int SrsAsyncSRTPManager::cooked_size() { @@ -892,11 +947,11 @@ srs_error_t SrsAsyncSRTPManager::do_start() srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS; while (true) { - vector flying; - packets_->swap(flying); + vector flying_srtp_packets; + srtp_packets_->swap(flying_srtp_packets); - for (int i = 0; i < (int)flying.size(); i++) { - SrsAsyncSRTPPacket* pkt = flying.at(i); + for (int i = 0; i < (int)flying_srtp_packets.size(); i++) { + SrsAsyncSRTPPacket* pkt = flying_srtp_packets.at(i); if ((err = pkt->task_->cook(pkt)) != srs_success) { srs_error_reset(err); // Ignore any error. @@ -906,7 +961,7 @@ srs_error_t SrsAsyncSRTPManager::do_start() } // If got packets, maybe more packets in queue. - if (!flying.empty()) { + if (!flying_srtp_packets.empty()) { continue; } @@ -943,26 +998,19 @@ srs_error_t SrsAsyncSRTPManager::cycle() return srs_error_wrap(err, "pull"); } - vector flying; - cooked_packets_->swap(flying); + vector flying_cooked_packets; + cooked_packets_->swap(flying_cooked_packets); - if (flying.empty()) { + if (flying_cooked_packets.empty()) { srs_usleep(20 * SRS_UTIME_MILLISECONDS); continue; } - for (int i = 0; i < (int)flying.size(); i++) { - SrsAsyncSRTPPacket* pkt = flying.at(i); - SrsSecurityTransport* transport = pkt->task_->codec_->transport_; - char* payload = pkt->msg_->payload; + for (int i = 0; i < (int)flying_cooked_packets.size(); i++) { + SrsAsyncSRTPPacket* pkt = flying_cooked_packets.at(i); - if (pkt->do_decrypt_) { - if (pkt->is_rtp_) { - err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_); - } - } - if (err != srs_success) { - srs_error_reset(err); // Ignore any error. + if ((err = pkt->task_->consume(pkt)) != srs_success) { + srs_error_reset(err); } srs_freep(pkt); @@ -993,7 +1041,7 @@ SrsThreadUdpListener::~SrsThreadUdpListener() SrsAsyncRecvManager::SrsAsyncRecvManager() { lock_ = new SrsThreadMutex(); - packets_ = new SrsThreadQueue(); + received_packets_ = new SrsThreadQueue(); handler_ = NULL; max_recv_queue_ = 0; trd_ = new SrsFastCoroutine("recv", this); @@ -1005,7 +1053,7 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager() srs_freep(trd_); srs_freep(lock_); - srs_freep(packets_); + srs_freep(received_packets_); vector::iterator it; for (it = listeners_.begin(); it != listeners_.end(); ++it) { @@ -1027,7 +1075,7 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener) int SrsAsyncRecvManager::size() { - return packets_->size(); + return received_packets_->size(); } srs_error_t SrsAsyncRecvManager::start(void* arg) @@ -1062,7 +1110,7 @@ srs_error_t SrsAsyncRecvManager::do_start() } // Drop packet if queue is critical full. - int nb_packets = (int)packets_->size(); + int nb_packets = (int)received_packets_->size(); if (nb_packets >= max_recv_queue_) { ++_srs_pps_aloss->sugar; continue; @@ -1070,7 +1118,7 @@ srs_error_t SrsAsyncRecvManager::do_start() // If got packet, copy to the queue. got_packets = true; - packets_->push_back(listener->skt_->copy()); + received_packets_->push_back(listener->skt_->copy()); } } @@ -1115,16 +1163,16 @@ srs_error_t SrsAsyncRecvManager::cycle() return srs_error_wrap(err, "pull"); } - vector flying; - packets_->swap(flying); + vector flying_received_packets; + received_packets_->swap(flying_received_packets); - if (flying.empty()) { + if (flying_received_packets.empty()) { srs_usleep(20 * SRS_UTIME_MILLISECONDS); continue; } - for (int i = 0; i < (int)flying.size(); i++) { - SrsUdpMuxSocket* pkt = flying.at(i); + for (int i = 0; i < (int)flying_received_packets.size(); i++) { + SrsUdpMuxSocket* pkt = flying_received_packets.at(i); if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) { srs_error_reset(err); // Ignore any error. diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 00538919cc..1981a8c931 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -240,7 +240,7 @@ class SrsAsyncFileWriter : public ISrsWriter SrsFileWriter* writer_; private: // The thread-queue, to flush to disk by dedicated thread. - SrsThreadQueue* queue_; + SrsThreadQueue* chunks_; private: SrsAsyncFileWriter(std::string p); virtual ~SrsAsyncFileWriter(); @@ -298,7 +298,8 @@ extern SrsAsyncLogManager* _srs_async_log; // The async SRTP codec. class SrsAsyncSRTP : public SrsSRTP { -public: + friend class SrsAsyncSRTPTask; +private: SrsAsyncSRTPTask* task_; SrsSecurityTransport* transport_; public: @@ -316,16 +317,20 @@ class SrsAsyncSRTP : public SrsSRTP // which alive longer than either SrsAsyncSRTPTask or SrsAsyncSRTP. class SrsAsyncSRTPTask { -public: +private: SrsAsyncSRTP* codec_; SrsSRTP* impl_; + // For disposing, only set a flag, free it in future. + int disposing_; public: SrsAsyncSRTPTask(SrsAsyncSRTP* codec); virtual ~SrsAsyncSRTPTask(); public: srs_error_t initialize(std::string recv_key, std::string send_key); + void dispose(); public: srs_error_t cook(SrsAsyncSRTPPacket* pkt); + srs_error_t consume(SrsAsyncSRTPPacket* pkt); }; // The async SRTP packet, handle by task. @@ -350,7 +355,7 @@ class SrsAsyncSRTPManager : public ISrsCoroutineHandler std::vector tasks_; SrsThreadMutex* lock_; private: - SrsThreadQueue* packets_; + SrsThreadQueue* srtp_packets_; private: // A coroutine to consume cooked packets. SrsFastCoroutine* trd_; @@ -361,7 +366,7 @@ class SrsAsyncSRTPManager : public ISrsCoroutineHandler virtual ~SrsAsyncSRTPManager(); public: void register_task(SrsAsyncSRTPTask* task); - void remove_task(SrsAsyncSRTPTask* task); + void on_srtp_codec_destroy(SrsAsyncSRTPTask* task); void add_packet(SrsAsyncSRTPPacket* pkt); int size(); int cooked_size(); @@ -398,7 +403,7 @@ class SrsAsyncRecvManager : public ISrsCoroutineHandler // A coroutine to consume received packets. SrsFastCoroutine* trd_; // The received UDP packets. - SrsThreadQueue* packets_; + SrsThreadQueue* received_packets_; private: // If exceed max queue, drop packet. int max_recv_queue_;