Skip to content

Commit

Permalink
handle correctly "anonymous" array/object (defect reported by QE) (#137)
Browse files Browse the repository at this point in the history
* a fix for defect reported by QE for JSON input.
the fix handle the use cases where the JSON input starts with an anonymous array/object
this may cause wrong search result per the user request(SQL statement)

* editorial

* adding test that verifies correct key-value extraction upon anonymous array

* adding relevant tests per changes.
handle the use-case where the user requests a json-key-path that may point to non-discrete value. i.e. array or an object.
editorial changes.

* a fix for CSV flow.
upon a broken row, the use of csv_parser->next_row() is wrong, since the row may break within a quote
and that could result with an exception.
the csv_parser should init with the csv-stream after skipping the bytes of the broken row

* editorial

* replace ifstream read API

* upon where-clause return empty group aggregation function returned the wrong result.
it should return a null

* add an operator to display the version description of current engine functionalities(recent PR)

* add tests per  aggregation function with an empty group

Signed-off-by: galsalomon66 <[email protected]>
  • Loading branch information
galsalomon66 authored Oct 12, 2023
1 parent abc243b commit 9ade26c
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 57 deletions.
17 changes: 11 additions & 6 deletions example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,14 @@ int process_json_query(const char* input_query,const char* fname)
std::string buff(BUFFER_SIZE,0);
std::string result;

size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);

size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount();
int chunk_count=0;
size_t bytes_read=0;
while(read_sz)
{
std::cout << "read next chunk " << read_sz << std::endl;
bytes_read += read_sz;
std::cout << "read next chunk " << chunk_count++ << ":" << read_sz << ":" << bytes_read << "\r";
result.clear();

try{
Expand All @@ -403,7 +406,10 @@ int process_json_query(const char* input_query,const char* fname)
}
}

std::cout << result << std::endl;
if(result.size())
{
std::cout << result << std::endl;
}

if(status<0)
{
Expand All @@ -415,7 +421,7 @@ int process_json_query(const char* input_query,const char* fname)
std::cout << "json processing reached limit " << std::endl;
break;
}
read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);
read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount();
}
try{
result.clear();
Expand All @@ -430,7 +436,6 @@ int process_json_query(const char* input_query,const char* fname)
}

std::cout << result << std::endl;

return 0;
}

Expand Down Expand Up @@ -621,7 +626,7 @@ int run_on_single_query(const char* fname, const char* query)
std::string buff(BUFFER_SIZE,0);
while (1)
{
size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);
size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount();

status = awscli->run_s3select(query, buff.data(), read_sz, file_sz);
if(status<0)
Expand Down
17 changes: 11 additions & 6 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2499,6 +2499,7 @@ class csv_object : public base_s3object
std::string m_last_line;
size_t m_processed_bytes;
int64_t m_number_of_tokens;
size_t m_skip_x_first_bytes=0;

std::function<int(std::string&)> fp_s3select_result_format=nullptr;
std::function<int(std::string&)> fp_s3select_header_format=nullptr;
Expand Down Expand Up @@ -2651,6 +2652,7 @@ class csv_object : public base_s3object
merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter;
m_previous_line = false;
m_skip_first_line = true;
m_skip_x_first_bytes = tmp_buff.size()+1;

//processing the merged row (previous broken row)
run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false);
Expand Down Expand Up @@ -2685,6 +2687,15 @@ class csv_object : public base_s3object
m_is_to_aggregate = do_aggregate;
m_skip_last_line = skip_last_line;

if(skip_first_line)
{
//the stream may start in the middle of a row (maybe in the middle of a quote).
//at this point the stream should skip the first row(broken row).
//the csv_parser should be init with the fixed stream position.
m_stream += m_skip_x_first_bytes;
m_skip_x_first_bytes=0;
}

CSVParser _csv_parser("csv", m_stream, m_end_stream);
csv_parser = &_csv_parser;
csv_parser->set_csv_def( m_csv_defintion.row_delimiter,
Expand All @@ -2700,12 +2711,6 @@ class csv_object : public base_s3object
{
extract_csv_header_info();
}

if(skip_first_line)
{
csv_parser->next_line();
}

do
{
m_sql_processing_status = Status::INITIAL_STAT;
Expand Down
59 changes: 50 additions & 9 deletions include/s3select_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ enum class s3select_func_En_t {ADD,
LEADING,
TRAILING,
DECIMAL_OPERATOR,
CAST_TO_DECIMAL
CAST_TO_DECIMAL,
ENGINE_VERSION
};


Expand Down Expand Up @@ -306,7 +307,8 @@ class s3select_functions
{"#leading#", s3select_func_En_t::LEADING},
{"#trailing#", s3select_func_En_t::TRAILING},
{"#decimal_operator#", s3select_func_En_t::DECIMAL_OPERATOR},
{"#cast_as_decimal#", s3select_func_En_t::CAST_TO_DECIMAL}
{"#cast_as_decimal#", s3select_func_En_t::CAST_TO_DECIMAL},
{"engine_version", s3select_func_En_t::ENGINE_VERSION}

};

Expand Down Expand Up @@ -517,9 +519,10 @@ struct _fn_sum : public base_function

value sum;

_fn_sum() : sum(0)
_fn_sum()
{
aggregate = true;
sum.setnull();
}

bool operator()(bs_stmt_vec_t* args, variable* result) override
Expand All @@ -531,6 +534,10 @@ struct _fn_sum : public base_function

try
{
if(sum.is_null())
{
sum = 0;
}
sum = sum + x->eval();
}
catch (base_s3select_exception& e)
Expand Down Expand Up @@ -618,7 +625,9 @@ struct _fn_avg : public base_function
void get_aggregate_result(variable *result) override
{
if(count == static_cast<value>(0)) {
throw base_s3select_exception("count cannot be zero!");
value v_null;
v_null.setnull();
*result=v_null;
} else {
*result = sum/count ;
}
Expand All @@ -630,9 +639,10 @@ struct _fn_min : public base_function

value min;

_fn_min():min(__INT64_MAX__)
_fn_min()
{
aggregate=true;
min.setnull();
}

bool operator()(bs_stmt_vec_t* args, variable* result) override
Expand All @@ -642,7 +652,7 @@ struct _fn_min : public base_function
auto iter = args->begin();
base_statement* x = *iter;

if(min > x->eval())
if(min.is_null() || min > x->eval())
{
min=x->eval();
}
Expand All @@ -662,9 +672,10 @@ struct _fn_max : public base_function

value max;

_fn_max():max(-__INT64_MAX__)
_fn_max()
{
aggregate=true;
max.setnull();
}

bool operator()(bs_stmt_vec_t* args, variable* result) override
Expand All @@ -674,7 +685,7 @@ struct _fn_max : public base_function
auto iter = args->begin();
base_statement* x = *iter;

if(max < x->eval())
if(max.is_null() || max < x->eval())
{
max=x->eval();
}
Expand All @@ -694,7 +705,7 @@ struct _fn_to_int : public base_function
value var_result;

bool operator()(bs_stmt_vec_t* args, variable* result) override
{
{
check_args_size(args,1);

value v = (*args->begin())->eval();
Expand Down Expand Up @@ -2182,6 +2193,32 @@ struct _fn_decimal_operator : public base_function {
}
};

struct _fn_engine_version : public base_function {

const char* version_description =R"(PR #137 :
the change handle the use cases where the JSON input starts with an anonymous array/object this may cause wrong search result per the user request(SQL statement)
handle the use-case where the user requests a json-key-path that may point to a non-discrete value. i.e. array or an object.
editorial changes.
fix for CSV flow, in the case of a "broken row" (upon processing stream of data)
null results upon aggregation functions on an empty group (no match for where clause).
)";


_fn_engine_version()
{
aggregate = true;
}

bool operator()(bs_stmt_vec_t* args, variable* result) override
{
result->set_value(version_description);
return true;
}
};

base_function* s3select_functions::create(std::string_view fn_name,const bs_stmt_vec_t &arguments)
{
const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name.data());
Expand Down Expand Up @@ -2419,6 +2456,10 @@ base_function* s3select_functions::create(std::string_view fn_name,const bs_stmt
return S3SELECT_NEW(this,_fn_cast_to_decimal);
break;

case s3select_func_En_t::ENGINE_VERSION:
return S3SELECT_NEW(this,_fn_engine_version);
break;

default:
throw base_s3select_exception("internal error while resolving function-name");
break;
Expand Down
Loading

0 comments on commit 9ade26c

Please sign in to comment.