Skip to content

Commit

Permalink
refactor of the send-back-result-response.
Browse files Browse the repository at this point in the history
Signed-off-by: Gal Salomon <[email protected]>
  • Loading branch information
galsalomon66 committed Aug 30, 2024
1 parent 9b9f357 commit ac758c1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 40 deletions.
4 changes: 3 additions & 1 deletion example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,10 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)
std::cout << "DEBUG: {" << msg << "}" << std::endl;
};

std::function<void(const char*)> fp_nop = [](const char* msg){};
std::function<int(std::string&)> fp_continue_nop = [](std::string& result){return 0;};
parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
//parquet_processor.set_external_debug_system(fp_debug);
parquet_processor.set_external_system_functions(fp_continue_nop, fp_s3select_result_format, fp_s3select_header_format,fp_nop);

std::string result;

Expand Down
64 changes: 25 additions & 39 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,8 @@ class base_s3object
if(m_csv_defintion.output_json_format && projections_resuls.values.size()) {
json_result_format(projections_resuls, result, output_delimiter);
result.append(output_row_delimiter);
//TODO to add asneeded
//TODO "flush" the result string
return;
}

Expand Down Expand Up @@ -2524,9 +2526,30 @@ class base_s3object
if(!m_aggr_flow) {
result.append(output_row_delimiter);
m_returned_bytes_size += output_delimiter.size();
}
}


//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}
}

void flush_sql_result(std::string& result)
{//purpose: flush remaining data reside in the buffer
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

Status getMatchRow( std::string& result)
{
multi_values projections_resuls;
Expand Down Expand Up @@ -2670,8 +2693,6 @@ class base_s3object

}; //base_s3object

//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
class csv_object : public base_s3object
{

Expand Down Expand Up @@ -2984,16 +3005,6 @@ class csv_object : public base_s3object
return -1;
}
}

if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

if (m_sql_processing_status == Status::END_OF_STREAM)
{
break;
Expand All @@ -3009,12 +3020,7 @@ class csv_object : public base_s3object

} while (true);

if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{ //note: it may produce empty response(more the once)
//upon empty result, it should return *only* upon last call.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
flush_sql_result(result);

return 0;
}
Expand Down Expand Up @@ -3127,26 +3133,6 @@ class parquet_object : public base_s3object
}
}

#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
{//AWS-cli limits response size the following callbacks send response upon some threshold
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);

if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED))
{
if(m_fp_s3select_header_format)
m_fp_s3select_header_format(result);
}
}
else
{
if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED))
{
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);
}
}

//TODO is_end_of_stream() required?
if (get_sql_processing_status() == Status::END_OF_STREAM || is_end_of_stream() || get_sql_processing_status() == Status::LIMIT_REACHED)
Expand Down

0 comments on commit ac758c1

Please sign in to comment.