Skip to content

Commit

Permalink
SRT: Close connection if RTMP failed. (#2917)
Browse files Browse the repository at this point in the history
* SRT: using global variables  to pass errors (#2897)

* SRT: using global variables  to pass errors (#2897)
  • Loading branch information
zhouxiaojun2008 authored Feb 15, 2022
1 parent 0a84843 commit e8fca60
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 25 deletions.
13 changes: 13 additions & 0 deletions trunk/src/srt/srt_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
srt_conn_ptr->update_timestamp(srt_now_ms);

srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
{
std::unique_lock<std::mutex> locker(srt2rtmp::_srt_error_mutex);
if (srt2rtmp::_srt_error_map.count(subpath) == 1) {
int err_code = srt2rtmp::_srt_error_map[subpath];
if (err_code != ERROR_SUCCESS) {
close_push_conn(conn_fd);
srt_log_error("handle_push_data srt to rtmp error:%d, fd:%d", err_code,conn_fd);
//todo: reset to next use, maybe update by srt2rtmp::cycle again
srt2rtmp::_srt_error_map[subpath] = ERROR_SUCCESS;
return;
}
}
}

//send data to subscriber(players)
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
Expand Down
33 changes: 20 additions & 13 deletions trunk/src/srt/srt_to_rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <srs_kernel_stream.hpp>

std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
std::mutex srt2rtmp::_srt_error_mutex;
std::map<std::string, int> srt2rtmp::_srt_error_map;

std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
if (!s_srt2rtmp_ptr) {
Expand Down Expand Up @@ -153,6 +155,7 @@ void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) {
srs_error_t srt2rtmp::cycle() {
srs_error_t err = srs_success;
_lastcheck_ts = 0;
int err_code = -1;

while(true) {
SRT_DATA_MSG_PTR msg_ptr = get_data_message();
Expand All @@ -163,7 +166,11 @@ srs_error_t srt2rtmp::cycle() {
switch (msg_ptr->msg_type()) {
case SRT_MSG_DATA_TYPE:
{
handle_ts_data(msg_ptr);
err_code = handle_ts_data(msg_ptr);
if (err_code != ERROR_SUCCESS) {
std::unique_lock<std::mutex> locker(_srt_error_mutex);
_srt_error_map[msg_ptr->get_path()] = err_code;
}
break;
}
case SRT_MSG_CLOSE_TYPE:
Expand Down Expand Up @@ -192,7 +199,7 @@ srs_error_t srt2rtmp::cycle() {
}
}

void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
RTMP_CLIENT_PTR rtmp_ptr;
auto iter = _rtmp_client_map.find(data_ptr->get_path());
if (iter == _rtmp_client_map.end()) {
Expand All @@ -203,9 +210,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
rtmp_ptr = iter->second;
}

rtmp_ptr->receive_ts_data(data_ptr);

return;
return rtmp_ptr->receive_ts_data(data_ptr);
}

void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) {
Expand Down Expand Up @@ -343,9 +348,8 @@ srs_error_t rtmp_client::connect() {
return err;
}

void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
_ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
return;
int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
}

srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) {
Expand Down Expand Up @@ -668,13 +672,13 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_
return err;
}

void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
uint64_t dts, uint64_t pts)
{
srs_error_t err = srs_success;
if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
assert(0);
return;
return 0;
}

auto avs_ptr = std::make_shared<SrsBuffer>((char*)data_ptr->get_data(), data_ptr->data_len());
Expand All @@ -685,13 +689,16 @@ void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media
err = on_ts_audio(avs_ptr, dts, pts);
} else {
srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type);
return;
return 0;
}

if (err != srs_success) {
srs_error("send media data error:", srs_error_code(err));
srs_error("send media data error:%s", srs_error_desc(err).c_str());
int err_code = srs_error_code(err);
srs_freep(err);
return err_code;
}
return;
return 0;
}

rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT)
Expand Down
9 changes: 6 additions & 3 deletions trunk/src/srt/srt_to_rtmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_f
rtmp_client(std::string key_path);
virtual ~rtmp_client();

void receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
int receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
int64_t get_last_live_ts();
std::string get_url();

srs_error_t connect();
void close();

private:
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);

private:
srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
Expand Down Expand Up @@ -140,7 +140,7 @@ class srt2rtmp : public ISrsCoroutineHandler {
private:
SRT_DATA_MSG_PTR get_data_message();
virtual srs_error_t cycle();
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
int handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
void handle_close_rtmpsession(const std::string& key_path);
void handle_log_data(SRT_DATA_MSG_PTR data_ptr);
void check_rtmp_alive();
Expand All @@ -154,6 +154,9 @@ class srt2rtmp : public ISrsCoroutineHandler {

std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map;
int64_t _lastcheck_ts;
public:
static std::mutex _srt_error_mutex;
static std::map<std::string, int> _srt_error_map;
};

#endif
15 changes: 8 additions & 7 deletions trunk/src/srt/ts_demux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_C
uint64_t pts = 0;

//callback last media data in data buffer
on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
int err_code = on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
if (err_code != 0)
return err_code;

int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts);
if (ret > 188) {
Expand Down Expand Up @@ -320,7 +322,7 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
continue;
}
ret = decode_unit(data, path, callback);
if (ret < 0)
if (ret != 0) // srs_error_code is positive
{
break;
}
Expand All @@ -335,15 +337,15 @@ void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std:
return;
}

void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
uint64_t dts, uint64_t pts) {
if ((_data_total <=0 ) || (_data_buffer_vec.empty())) {
return;
return 0;
}

auto iter = _pmt._pid2steamtype.find(pid);
if (iter == _pmt._pid2steamtype.end()) {
return;
return 0;
}
unsigned char stream_type = iter->second;
auto total_data_ptr = std::make_shared<SRT_DATA_MSG>(_data_total, key_path);
Expand All @@ -358,8 +360,7 @@ void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, st
_data_buffer_vec.clear();
_data_total = 0;

callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
return;
return callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
}

bool ts_demux::is_pmt(unsigned short pid) {
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/srt/ts_demux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Value Description

class ts_media_data_callback_I {
public:
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
};

typedef std::shared_ptr<ts_media_data_callback_I> TS_DATA_CALLBACK_PTR;
Expand Down Expand Up @@ -227,7 +227,7 @@ class ts_demux {
int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size,
uint64_t& dts, uint64_t& pts);
void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid);
void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
std::string key_path, uint64_t dts, uint64_t pts);

private:
Expand Down

0 comments on commit e8fca60

Please sign in to comment.