Skip to content

Commit

Permalink
adding timer per continue-response, upon X elapsed time, the continue…
Browse files Browse the repository at this point in the history
…-response-function is called

Signed-off-by: Gal Salomon <[email protected]>
  • Loading branch information
galsalomon66 committed Mar 21, 2024
1 parent df40196 commit ec8dc79
Showing 1 changed file with 43 additions and 11 deletions.
54 changes: 43 additions & 11 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <boost/bind.hpp>
#include <functional>
#include <unordered_set>
#include <chrono>

#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}

Expand Down Expand Up @@ -2168,6 +2169,37 @@ struct s3select_csv_definitions //TODO

};

class ProcessTimer
{
private:
std::chrono::steady_clock::time_point m_last_call_time;
std::chrono::seconds m_interval;

public:
ProcessTimer() {
m_interval = std::chrono::seconds(30);
m_last_call_time = std::chrono::steady_clock::now();
}

ProcessTimer(std::chrono::seconds interval) : m_interval(interval) {
m_last_call_time = std::chrono::steady_clock::now();
}

void set(std::chrono::seconds interval)
{
m_interval = interval;
}

bool hasElapsed(void) {
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - m_last_call_time);
if (elapsed >= m_interval) {
m_last_call_time = now;
return true;
}
return false;
}
};

/////// handling different object types
class base_s3object
Expand All @@ -2193,6 +2225,7 @@ class base_s3object
std::function<int(std::string&)> m_fp_s3select_continue=nullptr;
std::function<int(std::string&)> m_fp_s3select_result_format=nullptr;
std::function<int(std::string&)> m_fp_s3select_header_format=nullptr;
ProcessTimer m_timer;

public:
s3select_csv_definitions m_csv_defintion;//TODO add method for modify
Expand Down Expand Up @@ -2315,6 +2348,11 @@ class base_s3object
m_processed_rows = 0;
}

void set_continue_mesg_interval(std::chrono::seconds interval)
{
m_timer.set(interval);
}

base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_returned_bytes_size(0),m_sql_processing_status(Status::INITIAL_STAT){}

void set_external_system_functions(std::function<int(std::string&)>& continue_message_fp,
Expand Down Expand Up @@ -2448,7 +2486,7 @@ class base_s3object
{
return m_sql_processing_status = Status::LIMIT_REACHED;
}

if (m_aggr_flow == true)
{
do
Expand All @@ -2471,13 +2509,10 @@ class base_s3object
}

m_processed_rows++;
if(m_processed_rows % 10000 == 0)
{//TODO number of rows sould replace by time
if(m_timer.hasElapsed())
{
if(m_fp_s3select_continue)
m_fp_s3select_continue(result);
std::string debug_mesg = "s3select : send continue message "+std::to_string(m_processed_rows);
if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(debug_mesg.c_str());
}

if ((*m_projections.begin())->is_set_last_call())
Expand Down Expand Up @@ -2529,13 +2564,10 @@ class base_s3object
}

m_processed_rows++;
if(m_processed_rows % 10000 == 0)
{//TODO number of rows sould replace by time
if(m_timer.hasElapsed())
{
if(m_fp_s3select_continue)
m_fp_s3select_continue(result);
std::string debug_mesg = "s3select : send continue message "+std::to_string(m_processed_rows);
if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(debug_mesg.c_str());
}
row_update_data();
for (auto& a : *m_s3_select->get_aliases()->get())
Expand Down

0 comments on commit ec8dc79

Please sign in to comment.