diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 20547d673c..59c622de34 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -206,6 +206,18 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp srt_conn_ptr->update_timestamp(srt_now_ms); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); + { + std::unique_lock locker(srt2rtmp::_srt_error_mutex); + if (srt2rtmp::_srt_error_map.count(subpath) == 1) { + if (srt2rtmp::_srt_error_map[subpath] != 0) { + close_push_conn(conn_fd); + srt_log_error("handle_push_data srt to rtmp error, fd:%d", conn_fd); + //todo: reset to next use, maybe update by srt2rtmp::cycle again + srt2rtmp::_srt_error_map[subpath] = 0; + return; + } + } + } //send data to subscriber(players) //streamid, play map diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index 268a1cbf40..258a713c30 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -16,6 +16,8 @@ #include std::shared_ptr srt2rtmp::s_srt2rtmp_ptr; +std::mutex srt2rtmp::_srt_error_mutex; +std::map srt2rtmp::_srt_error_map; std::shared_ptr srt2rtmp::get_instance() { if (!s_srt2rtmp_ptr) { @@ -163,7 +165,10 @@ srs_error_t srt2rtmp::cycle() { switch (msg_ptr->msg_type()) { case SRT_MSG_DATA_TYPE: { - handle_ts_data(msg_ptr); + if (handle_ts_data(msg_ptr) != srs_success) { + std::unique_lock locker(_srt_error_mutex); + _srt_error_map[msg_ptr->get_path()] = -1; + } break; } case SRT_MSG_CLOSE_TYPE: @@ -192,7 +197,7 @@ srs_error_t srt2rtmp::cycle() { } } -void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { +int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { RTMP_CLIENT_PTR rtmp_ptr; auto iter = _rtmp_client_map.find(data_ptr->get_path()); if (iter == _rtmp_client_map.end()) { @@ -203,9 +208,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { rtmp_ptr = iter->second; } - rtmp_ptr->receive_ts_data(data_ptr); - - return; + return rtmp_ptr->receive_ts_data(data_ptr); } void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) { @@ -343,9 +346,8 @@ srs_error_t rtmp_client::connect() { return err; } -void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { - _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback - return; +int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) { + return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback } srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) { @@ -668,13 +670,13 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_ return err; } -void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, +int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) { srs_error_t err = srs_success; if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) { assert(0); - return; + return 0; } auto avs_ptr = std::make_shared((char*)data_ptr->get_data(), data_ptr->data_len()); @@ -685,13 +687,15 @@ void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media err = on_ts_audio(avs_ptr, dts, pts); } else { srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type); - return; + return 0; } if (err != srs_success) { - srs_error("send media data error:", srs_error_code(err)); + srs_error("send media data error:%s", srs_error_desc(err).c_str()); + srs_freep(err); + return -1; } - return; + return 0; } rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT) diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index 0cfa15c5a2..4c67355fb8 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -71,7 +71,7 @@ class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_f rtmp_client(std::string key_path); ~rtmp_client(); - void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); + int receive_ts_data(SRT_DATA_MSG_PTR data_ptr); int64_t get_last_live_ts(); std::string get_url(); @@ -79,7 +79,7 @@ class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_f void close(); private: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); + virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); private: srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); @@ -140,7 +140,7 @@ class srt2rtmp : public ISrsCoroutineHandler { private: SRT_DATA_MSG_PTR get_data_message(); virtual srs_error_t cycle(); - void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + int handle_ts_data(SRT_DATA_MSG_PTR data_ptr); void handle_close_rtmpsession(const std::string& key_path); void handle_log_data(SRT_DATA_MSG_PTR data_ptr); void check_rtmp_alive(); @@ -154,6 +154,9 @@ class srt2rtmp : public ISrsCoroutineHandler { std::unordered_map _rtmp_client_map; int64_t _lastcheck_ts; +public: + static std::mutex _srt_error_mutex; + static std::map _srt_error_map; }; #endif diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index 92d055e442..d43ea9b098 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -276,7 +276,8 @@ int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_C uint64_t pts = 0; //callback last media data in data buffer - on_callback(callback, _last_pid, key_path, _last_dts, _last_pts); + if (on_callback(callback, _last_pid, key_path, _last_dts, _last_pts) != 0) + return -1; int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts); if (ret > 188) { @@ -335,15 +336,15 @@ void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std: return; } -void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, +int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, uint64_t dts, uint64_t pts) { if ((_data_total <=0 ) || (_data_buffer_vec.empty())) { - return; + return 0; } auto iter = _pmt._pid2steamtype.find(pid); if (iter == _pmt._pid2steamtype.end()) { - return; + return 0; } unsigned char stream_type = iter->second; auto total_data_ptr = std::make_shared(_data_total, key_path); @@ -358,8 +359,7 @@ void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, st _data_buffer_vec.clear(); _data_total = 0; - callback->on_data_callback(total_data_ptr, stream_type, dts, pts); - return; + return callback->on_data_callback(total_data_ptr, stream_type, dts, pts); } bool ts_demux::is_pmt(unsigned short pid) { diff --git a/trunk/src/srt/ts_demux.hpp b/trunk/src/srt/ts_demux.hpp index bdd7e9221b..5a824567f6 100644 --- a/trunk/src/srt/ts_demux.hpp +++ b/trunk/src/srt/ts_demux.hpp @@ -74,7 +74,7 @@ Value Description class ts_media_data_callback_I { public: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; + virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0; }; typedef std::shared_ptr TS_DATA_CALLBACK_PTR; @@ -227,7 +227,7 @@ class ts_demux { int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size, uint64_t& dts, uint64_t& pts); void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid); - void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, + int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path, uint64_t dts, uint64_t pts); private: