From ec8dc79bf43b9c0c1aa92c1f6cde7e5d1945ffff Mon Sep 17 00:00:00 2001 From: Gal Salomon Date: Thu, 21 Mar 2024 03:39:46 +0200 Subject: [PATCH] adding timer per continue-response, upon X elapsed time, the continue-response-function is called Signed-off-by: Gal Salomon --- include/s3select.h | 54 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/include/s3select.h b/include/s3select.h index 252ee1a4..df5673d8 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -19,6 +19,7 @@ #include #include #include +#include #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} @@ -2168,6 +2169,37 @@ struct s3select_csv_definitions //TODO }; +class ProcessTimer +{ +private: + std::chrono::steady_clock::time_point m_last_call_time; + std::chrono::seconds m_interval; + +public: + ProcessTimer() { + m_interval = std::chrono::seconds(30); + m_last_call_time = std::chrono::steady_clock::now(); + } + + ProcessTimer(std::chrono::seconds interval) : m_interval(interval) { + m_last_call_time = std::chrono::steady_clock::now(); + } + + void set(std::chrono::seconds interval) + { + m_interval = interval; + } + + bool hasElapsed(void) { + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - m_last_call_time); + if (elapsed >= m_interval) { + m_last_call_time = now; + return true; + } + return false; + } +}; /////// handling different object types class base_s3object @@ -2193,6 +2225,7 @@ class base_s3object std::function m_fp_s3select_continue=nullptr; std::function m_fp_s3select_result_format=nullptr; std::function m_fp_s3select_header_format=nullptr; + ProcessTimer m_timer; public: s3select_csv_definitions m_csv_defintion;//TODO add method for modify @@ -2315,6 +2348,11 @@ class base_s3object m_processed_rows = 0; } + void set_continue_mesg_interval(std::chrono::seconds interval) + { + m_timer.set(interval); + } + base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_returned_bytes_size(0),m_sql_processing_status(Status::INITIAL_STAT){} void set_external_system_functions(std::function& continue_message_fp, @@ -2448,7 +2486,7 @@ class base_s3object { return m_sql_processing_status = Status::LIMIT_REACHED; } - + if (m_aggr_flow == true) { do @@ -2471,13 +2509,10 @@ class base_s3object } m_processed_rows++; - if(m_processed_rows % 10000 == 0) - {//TODO number of rows sould replace by time + if(m_timer.hasElapsed()) + { if(m_fp_s3select_continue) m_fp_s3select_continue(result); - std::string debug_mesg = "s3select : send continue message "+std::to_string(m_processed_rows); - if(m_fp_ext_debug_mesg) - m_fp_ext_debug_mesg(debug_mesg.c_str()); } if ((*m_projections.begin())->is_set_last_call()) @@ -2529,13 +2564,10 @@ class base_s3object } m_processed_rows++; - if(m_processed_rows % 10000 == 0) - {//TODO number of rows sould replace by time + if(m_timer.hasElapsed()) + { if(m_fp_s3select_continue) m_fp_s3select_continue(result); - std::string debug_mesg = "s3select : send continue message "+std::to_string(m_processed_rows); - if(m_fp_ext_debug_mesg) - m_fp_ext_debug_mesg(debug_mesg.c_str()); } row_update_data(); for (auto& a : *m_s3_select->get_aliases()->get())