diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 4147dadd..6e17e4f3 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -320,8 +320,10 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file) std::cout << "DEBUG: {" << msg << "}" << std::endl; }; + std::function fp_nop = [](const char* msg){}; + std::function fp_continue_nop = [](std::string& result){return 0;}; parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); - //parquet_processor.set_external_debug_system(fp_debug); + parquet_processor.set_external_system_functions(fp_continue_nop, fp_s3select_result_format, fp_s3select_header_format,fp_nop); std::string result; diff --git a/include/s3select.h b/include/s3select.h index 7f8a09e6..fa7f640d 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -2458,6 +2458,7 @@ class base_s3object } result.append(res->to_string()); + //TODO the strlen should replace with the size of the string(performance perspective) m_returned_bytes_size += strlen(res->to_string()); ++j; } @@ -2474,6 +2475,8 @@ class base_s3object if(m_csv_defintion.output_json_format && projections_resuls.values.size()) { json_result_format(projections_resuls, result, output_delimiter); result.append(output_row_delimiter); + //TODO to add asneeded + //TODO "flush" the result string return; } @@ -2493,10 +2496,6 @@ class base_s3object set_processing_time_error(); } - - if(m_fp_ext_debug_mesg) - m_fp_ext_debug_mesg(column_result.data()); - if (m_csv_defintion.quote_fields_always) { std::ostringstream quoted_result; quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char); @@ -2524,9 +2523,30 @@ class base_s3object if(!m_aggr_flow) { result.append(output_row_delimiter); m_returned_bytes_size += output_delimiter.size(); - } + } + + +//TODO config / default-value +#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024) + if(m_fp_s3select_result_format && m_fp_s3select_header_format) + { + if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT) + {//there are systems that might resject the response due to its size. + m_fp_s3select_result_format(result); + m_fp_s3select_header_format(result); + } + } } + void flush_sql_result(std::string& result) + {//purpose: flush remaining data reside in the buffer + if(m_fp_s3select_result_format && m_fp_s3select_header_format) + { + m_fp_s3select_result_format(result); + m_fp_s3select_header_format(result); + } + } + Status getMatchRow( std::string& result) { multi_values projections_resuls; @@ -2670,8 +2690,6 @@ class base_s3object }; //base_s3object -//TODO config / default-value -#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024) class csv_object : public base_s3object { @@ -2984,16 +3002,6 @@ class csv_object : public base_s3object return -1; } } - - if(m_fp_s3select_result_format && m_fp_s3select_header_format) - { - if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT) - {//there are systems that might resject the response due to its size. - m_fp_s3select_result_format(result); - m_fp_s3select_header_format(result); - } - } - if (m_sql_processing_status == Status::END_OF_STREAM) { break; @@ -3009,12 +3017,7 @@ class csv_object : public base_s3object } while (true); - if(m_fp_s3select_result_format && m_fp_s3select_header_format) - { //note: it may produce empty response(more the once) - //upon empty result, it should return *only* upon last call. - m_fp_s3select_result_format(result); - m_fp_s3select_header_format(result); - } + flush_sql_result(result); return 0; } @@ -3050,7 +3053,7 @@ class parquet_object : public base_s3object parquet_query_setting(nullptr); } - parquet_object() : base_s3object(nullptr) + parquet_object() : base_s3object(nullptr), not_to_increase_first_time(true) {} void parquet_query_setting(s3select *s3_query) @@ -3127,26 +3130,6 @@ class parquet_object : public base_s3object } } -#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024) - if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT) - {//AWS-cli limits response size the following callbacks send response upon some threshold - if(m_fp_s3select_result_format) - m_fp_s3select_result_format(result); - - if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED)) - { - if(m_fp_s3select_header_format) - m_fp_s3select_header_format(result); - } - } - else - { - if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED)) - { - if(m_fp_s3select_result_format) - m_fp_s3select_result_format(result); - } - } //TODO is_end_of_stream() required? if (get_sql_processing_status() == Status::END_OF_STREAM || is_end_of_stream() || get_sql_processing_status() == Status::LIMIT_REACHED) @@ -3156,6 +3139,7 @@ class parquet_object : public base_s3object } while (1); + flush_sql_result(result); return 0; } diff --git a/include/s3select_parquet_intrf.h b/include/s3select_parquet_intrf.h index cac2b892..8dad80ef 100644 --- a/include/s3select_parquet_intrf.h +++ b/include/s3select_parquet_intrf.h @@ -747,6 +747,30 @@ class PARQUET_EXPORT RowGroupReader { std::unique_ptr contents_; }; +#define RGW_default_buffer_size 1024*1024*1024 +class S3select_Config { +public: + static S3select_Config& getInstance() { + static S3select_Config instance; + return instance; + } + + void set_s3select_reader_properties(uint64_t value) { this->m_reader_properties = value; } + uint64_t get_s3select_reader_properties() const { return m_reader_properties; } + +private: + uint64_t m_reader_properties; + S3select_Config() : m_reader_properties(RGW_default_buffer_size) {} // Private constructor +}; + +ReaderProperties s3select_reader_properties() { + + static ReaderProperties default_reader_properties; + default_reader_properties.enable_buffered_stream(); + default_reader_properties.set_buffer_size(S3select_Config::getInstance().get_s3select_reader_properties()); + return default_reader_properties; +} + class PARQUET_EXPORT ParquetFileReader { public: // Declare a virtual class 'Contents' to aid dependency injection and more @@ -755,7 +779,7 @@ class PARQUET_EXPORT ParquetFileReader { struct PARQUET_EXPORT Contents { static std::unique_ptr Open( std::shared_ptr<::arrow::io::RandomAccessFile> source, - const ReaderProperties& props = default_reader_properties(), + const ReaderProperties& props = s3select_reader_properties(), std::shared_ptr metadata = NULLPTR); virtual ~Contents() = default; @@ -776,21 +800,21 @@ class PARQUET_EXPORT ParquetFileReader { ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version") static std::unique_ptr Open( std::unique_ptr source, - const ReaderProperties& props = default_reader_properties(), + const ReaderProperties& props = s3select_reader_properties(), std::shared_ptr metadata = NULLPTR); // Create a file reader instance from an Arrow file object. Thread-safety is // the responsibility of the file implementation static std::unique_ptr Open( std::shared_ptr<::arrow::io::RandomAccessFile> source, - const ReaderProperties& props = default_reader_properties(), + const ReaderProperties& props = s3select_reader_properties(), std::shared_ptr metadata = NULLPTR); // API Convenience to open a serialized Parquet file on disk, using Arrow IO // interfaces. static std::unique_ptr OpenFile( const std::string& path,s3selectEngine::rgw_s3select_api* rgw, bool memory_map = true, - const ReaderProperties& props = default_reader_properties(), + const ReaderProperties& props = s3select_reader_properties(), std::shared_ptr metadata = NULLPTR ); @@ -1034,7 +1058,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { class SerializedFile : public ParquetFileReader::Contents { public: SerializedFile(std::shared_ptr source, - const ReaderProperties& props = default_reader_properties()) + const ReaderProperties& props = s3select_reader_properties()) : source_(std::move(source)), properties_(props) { PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize()); } @@ -1241,7 +1265,7 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( #if ARROW_VERSION_MAJOR > 9 file_metadata_ = - FileMetaData::Make(metadata_buffer->data(), &metadata_len, default_reader_properties(), file_decryptor_); + FileMetaData::Make(metadata_buffer->data(), &metadata_len, s3select_reader_properties(), file_decryptor_); #else file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_);