Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a separate thread for metrics #56

Merged
merged 4 commits into from
Aug 24, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 73 additions & 35 deletions xtransmit/receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using shared_sock = std::shared_ptr<socket::isocket>;

#define LOG_SC_RECEIVE "RECEIVE "

void trace_message(const size_t bytes, const vector<char> &buffer, SOCKET conn_id)
void trace_message(const size_t bytes, const vector<char>& buffer, SOCKET conn_id)
{
::cout << "RECEIVED MESSAGE length " << bytes << " on conn ID " << conn_id;

Expand All @@ -49,33 +49,80 @@ void trace_message(const size_t bytes, const vector<char> &buffer, SOCKET conn_i
#endif
::cout << endl;

//CHandShake hs;
//if (hs.load_from(buffer.data(), buffer.size()) < 0)
// CHandShake hs;
// if (hs.load_from(buffer.data(), buffer.size()) < 0)
// return;
//
//::cout << "SRT HS: " << hs.show() << endl;
}

/// @brief
/// @param metrics_file
/// @param validator
/// @param mtx mutex to protect access to validator
/// @param freq
/// @param force_break
void metrics_writing_loop(ofstream& metrics_file,
metrics::validator& validator,
mutex& mtx,
const chrono::milliseconds& freq,
const atomic_bool& force_break)
{
auto stat_time = steady_clock::now();
while (!force_break)
{
const auto tnow = steady_clock::now();
if (tnow >= stat_time)
{
if (metrics_file.is_open())
{
lock_guard<mutex> lck(mtx);
metrics_file << validator.stats_csv(false);
}
else
{
lock_guard<mutex> lck(mtx);
const auto stats_str = validator.stats();
spdlog::info(LOG_SC_RECEIVE "{}", stats_str);
}
stat_time += freq;
}

std::this_thread::sleep_until(stat_time);
}
}

void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break)
void run_pipe(shared_sock src, const config& cfg, const atomic_bool& force_break)
{
socket::isocket &sock = *src.get();
socket::isocket& sock = *src.get();

vector<char> buffer(cfg.message_size);
vector<char> buffer(cfg.message_size);
metrics::validator validator;

auto stat_time = steady_clock::now();
ofstream metrics_file;
if (cfg.enable_metrics && !cfg.metrics_file.empty() && cfg.metrics_freq_ms > 0)
atomic_bool metrics_stop(false);
mutex metrics_mtx;
future<void> metrics_th;
ofstream metrics_file;
if (cfg.enable_metrics && cfg.metrics_freq_ms > 0)
{
metrics_file.open(cfg.metrics_file, std::ofstream::out);
if (!metrics_file)
if (!cfg.metrics_file.empty())
{
spdlog::error(LOG_SC_RECEIVE "Failed to open metrics file {} for output", cfg.metrics_file);
return;
metrics_file.open(cfg.metrics_file, std::ofstream::out);
if (!metrics_file)
{
spdlog::error(LOG_SC_RECEIVE "Failed to open metrics file {} for output", cfg.metrics_file);
return;
}
metrics_file << validator.stats_csv(true);
}

metrics_file << validator.stats_csv(true);
metrics_th = async(::launch::async,
metrics_writing_loop,
ref(metrics_file),
ref(validator),
ref(metrics_mtx),
chrono::milliseconds(cfg.metrics_freq_ms),
ref(metrics_stop));
}

try
Expand All @@ -93,7 +140,10 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break
if (cfg.print_notifications)
trace_message(bytes, buffer, sock.id());
if (cfg.enable_metrics)
{
lock_guard<mutex> lck(metrics_mtx);
validator.validate_packet(buffer);
}

if (cfg.send_reply)
{
Expand All @@ -106,35 +156,26 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break

if (!cfg.enable_metrics)
continue;

const auto tnow = steady_clock::now();
if (tnow > (stat_time + chrono::milliseconds(cfg.metrics_freq_ms)))
{
if (metrics_file)
{
metrics_file << validator.stats_csv(false);
}
else
{
const auto stats_str = validator.stats();
spdlog::info(LOG_SC_RECEIVE "{}", stats_str);
}
stat_time = tnow;
}
}
}
catch (const socket::exception &e)
catch (const socket::exception& e)
{
spdlog::warn(LOG_SC_RECEIVE "{}", e.what());
}

metrics_stop = true;
if (metrics_th.valid())
metrics_th.get();

if (force_break)
{
spdlog::info(LOG_SC_RECEIVE "interrupted by request!");
}
}

void xtransmit::receive::run(const std::vector<std::string>& src_urls, const config &cfg, const atomic_bool &force_break)
void xtransmit::receive::run(const std::vector<std::string>& src_urls,
const config& cfg,
const atomic_bool& force_break)
{
using namespace std::placeholders;
processing_fn_t process_fn = std::bind(run_pipe, _1, cfg, _2);
Expand All @@ -143,7 +184,7 @@ void xtransmit::receive::run(const std::vector<std::string>& src_urls, const con

CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, std::vector<std::string>& src_urls)
{
const map<string, int> to_ms{ {"s", 1000}, {"ms", 1} };
const map<string, int> to_ms{{"s", 1000}, {"ms", 1}};

CLI::App* sc_receive = app.add_subcommand("receive", "Receive data (SRT, UDP)")->fallthrough();
sc_receive->add_option("-i,--input,src", src_urls, "Source URI");
Expand All @@ -161,6 +202,3 @@ CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, std::ve

return sc_receive;
}