Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SRT: Close connection if RTMP failed. #2917

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions trunk/src/srt/srt_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
srt_conn_ptr->update_timestamp(srt_now_ms);

srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
{
std::unique_lock<std::mutex> locker(srt2rtmp::_srt_error_mutex);
if (srt2rtmp::_srt_error_map.count(subpath) == 1) {
int err_code = srt2rtmp::_srt_error_map[subpath];
if (err_code != ERROR_SUCCESS) {
close_push_conn(conn_fd);
srt_log_error("handle_push_data srt to rtmp error:%d, fd:%d", err_code,conn_fd);
//todo: reset to next use, maybe update by srt2rtmp::cycle again
srt2rtmp::_srt_error_map[subpath] = ERROR_SUCCESS;
return;
}
}
}

//send data to subscriber(players)
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
Expand Down
33 changes: 20 additions & 13 deletions trunk/src/srt/srt_to_rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <srs_kernel_stream.hpp>

std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
std::mutex srt2rtmp::_srt_error_mutex;
std::map<std::string, int> srt2rtmp::_srt_error_map;

std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
if (!s_srt2rtmp_ptr) {
Expand Down Expand Up @@ -153,6 +155,7 @@ void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) {
srs_error_t srt2rtmp::cycle() {
srs_error_t err = srs_success;
_lastcheck_ts = 0;
int err_code = -1;

while(true) {
SRT_DATA_MSG_PTR msg_ptr = get_data_message();
Expand All @@ -163,7 +166,11 @@ srs_error_t srt2rtmp::cycle() {
switch (msg_ptr->msg_type()) {
case SRT_MSG_DATA_TYPE:
{
handle_ts_data(msg_ptr);
err_code = handle_ts_data(msg_ptr);
if (err_code != ERROR_SUCCESS) {
std::unique_lock<std::mutex> locker(_srt_error_mutex);
_srt_error_map[msg_ptr->get_path()] = err_code;
}
break;
}
case SRT_MSG_CLOSE_TYPE:
Expand Down Expand Up @@ -192,7 +199,7 @@ srs_error_t srt2rtmp::cycle() {
}
}

void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
RTMP_CLIENT_PTR rtmp_ptr;
auto iter = _rtmp_client_map.find(data_ptr->get_path());
if (iter == _rtmp_client_map.end()) {
Expand All @@ -203,9 +210,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
rtmp_ptr = iter->second;
}

rtmp_ptr->receive_ts_data(data_ptr);

return;
return rtmp_ptr->receive_ts_data(data_ptr);
}

void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) {
Expand Down Expand Up @@ -343,9 +348,8 @@ srs_error_t rtmp_client::connect() {
return err;
}

void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
_ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
return;
int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
}

srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) {
Expand Down Expand Up @@ -668,13 +672,13 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_
return err;
}

void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
uint64_t dts, uint64_t pts)
{
srs_error_t err = srs_success;
if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
assert(0);
return;
return 0;
}

auto avs_ptr = std::make_shared<SrsBuffer>((char*)data_ptr->get_data(), data_ptr->data_len());
Expand All @@ -685,13 +689,16 @@ void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media
err = on_ts_audio(avs_ptr, dts, pts);
} else {
srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type);
return;
return 0;
}

if (err != srs_success) {
srs_error("send media data error:", srs_error_code(err));
srs_error("send media data error:%s", srs_error_desc(err).c_str());
int err_code = srs_error_code(err);
srs_freep(err);
return err_code;
}
return;
return 0;
}

rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT)
Expand Down
9 changes: 6 additions & 3 deletions trunk/src/srt/srt_to_rtmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_f
rtmp_client(std::string key_path);
~rtmp_client();

void receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
int receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
int64_t get_last_live_ts();
std::string get_url();

srs_error_t connect();
void close();

private:
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);

private:
srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
Expand Down Expand Up @@ -140,7 +140,7 @@ class srt2rtmp : public ISrsCoroutineHandler {
private:
SRT_DATA_MSG_PTR get_data_message();
virtual srs_error_t cycle();
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
int handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
void handle_close_rtmpsession(const std::string& key_path);
void handle_log_data(SRT_DATA_MSG_PTR data_ptr);
void check_rtmp_alive();
Expand All @@ -154,6 +154,9 @@ class srt2rtmp : public ISrsCoroutineHandler {

std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map;
int64_t _lastcheck_ts;
public:
static std::mutex _srt_error_mutex;
static std::map<std::string, int> _srt_error_map;
};

#endif
15 changes: 8 additions & 7 deletions trunk/src/srt/ts_demux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_C
uint64_t pts = 0;

//callback last media data in data buffer
on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
int err_code = on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
if (err_code != 0)
return err_code;

int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts);
if (ret > 188) {
Expand Down Expand Up @@ -320,7 +322,7 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
continue;
}
ret = decode_unit(data, path, callback);
if (ret < 0)
if (ret != 0) // srs_error_code is positive
{
break;
}
Expand All @@ -335,15 +337,15 @@ void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std:
return;
}

void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
uint64_t dts, uint64_t pts) {
if ((_data_total <=0 ) || (_data_buffer_vec.empty())) {
return;
return 0;
}

auto iter = _pmt._pid2steamtype.find(pid);
if (iter == _pmt._pid2steamtype.end()) {
return;
return 0;
}
unsigned char stream_type = iter->second;
auto total_data_ptr = std::make_shared<SRT_DATA_MSG>(_data_total, key_path);
Expand All @@ -358,8 +360,7 @@ void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, st
_data_buffer_vec.clear();
_data_total = 0;

callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
return;
return callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
}

bool ts_demux::is_pmt(unsigned short pid) {
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/srt/ts_demux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Value Description

class ts_media_data_callback_I {
public:
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
};

typedef std::shared_ptr<ts_media_data_callback_I> TS_DATA_CALLBACK_PTR;
Expand Down Expand Up @@ -227,7 +227,7 @@ class ts_demux {
int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size,
uint64_t& dts, uint64_t& pts);
void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid);
void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
std::string key_path, uint64_t dts, uint64_t pts);

private:
Expand Down