From 9ade26c63ef1016dac868f53270a61e6232be9ba Mon Sep 17 00:00:00 2001 From: Gal Salomon Date: Thu, 12 Oct 2023 22:51:36 +0300 Subject: [PATCH] handle correctly "anonymous" array/object (defect reported by QE) (#137) * a fix for defect reported by QE for JSON input. the fix handle the use cases where the JSON input starts with an anonymous array/object this may cause wrong search result per the user request(SQL statement) * editorial * adding test that verifies correct key-value extraction upon anonymous array * adding relevant tests per changes. handle the use-case where the user requests a json-key-path that may point to non-discrete value. i.e. array or an object. editorial changes. * a fix for CSV flow. upon a broken row, the use of csv_parser->next_row() is wrong, since the row may break within a quote and that could result with an exception. the csv_parser should init with the csv-stream after skipping the bytes of the broken row * editorial * replace ifstream read API * upon where-clause return empty group aggregation function returned the wrong result. it should return a null * add an operator to display the version description of current engine functionalities(recent PR) * add tests per aggregation function with an empty group Signed-off-by: galsalomon66 --- example/s3select_example.cpp | 17 ++++-- include/s3select.h | 17 ++++-- include/s3select_functions.h | 59 ++++++++++++++++--- include/s3select_json_parser.h | 98 +++++++++++++++++++++---------- test/s3select_test.cpp | 103 +++++++++++++++++++++++++++++++-- 5 files changed, 237 insertions(+), 57 deletions(-) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 3a4a5593..71aff3d0 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -385,11 +385,14 @@ int process_json_query(const char* input_query,const char* fname) std::string buff(BUFFER_SIZE,0); std::string result; - size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); + size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount(); + int chunk_count=0; + size_t bytes_read=0; while(read_sz) { - std::cout << "read next chunk " << read_sz << std::endl; + bytes_read += read_sz; + std::cout << "read next chunk " << chunk_count++ << ":" << read_sz << ":" << bytes_read << "\r"; result.clear(); try{ @@ -403,7 +406,10 @@ int process_json_query(const char* input_query,const char* fname) } } - std::cout << result << std::endl; + if(result.size()) + { + std::cout << result << std::endl; + } if(status<0) { @@ -415,7 +421,7 @@ int process_json_query(const char* input_query,const char* fname) std::cout << "json processing reached limit " << std::endl; break; } - read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); + read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount(); } try{ result.clear(); @@ -430,7 +436,6 @@ int process_json_query(const char* input_query,const char* fname) } std::cout << result << std::endl; - return 0; } @@ -621,7 +626,7 @@ int run_on_single_query(const char* fname, const char* query) std::string buff(BUFFER_SIZE,0); while (1) { - size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); + size_t read_sz = input_file_stream.read(buff.data(),BUFFER_SIZE).gcount(); status = awscli->run_s3select(query, buff.data(), read_sz, file_sz); if(status<0) diff --git a/include/s3select.h b/include/s3select.h index 03b8463a..3ac11135 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -2499,6 +2499,7 @@ class csv_object : public base_s3object std::string m_last_line; size_t m_processed_bytes; int64_t m_number_of_tokens; + size_t m_skip_x_first_bytes=0; std::function fp_s3select_result_format=nullptr; std::function fp_s3select_header_format=nullptr; @@ -2651,6 +2652,7 @@ class csv_object : public base_s3object merge_line = m_last_line + tmp_buff + m_csv_defintion.row_delimiter; m_previous_line = false; m_skip_first_line = true; + m_skip_x_first_bytes = tmp_buff.size()+1; //processing the merged row (previous broken row) run_s3select_on_object(result, merge_line.c_str(), merge_line.length(), false, false, false); @@ -2685,6 +2687,15 @@ class csv_object : public base_s3object m_is_to_aggregate = do_aggregate; m_skip_last_line = skip_last_line; + if(skip_first_line) + { + //the stream may start in the middle of a row (maybe in the middle of a quote). + //at this point the stream should skip the first row(broken row). + //the csv_parser should be init with the fixed stream position. + m_stream += m_skip_x_first_bytes; + m_skip_x_first_bytes=0; + } + CSVParser _csv_parser("csv", m_stream, m_end_stream); csv_parser = &_csv_parser; csv_parser->set_csv_def( m_csv_defintion.row_delimiter, @@ -2700,12 +2711,6 @@ class csv_object : public base_s3object { extract_csv_header_info(); } - - if(skip_first_line) - { - csv_parser->next_line(); - } - do { m_sql_processing_status = Status::INITIAL_STAT; diff --git a/include/s3select_functions.h b/include/s3select_functions.h index 1572945d..8c507fca 100644 --- a/include/s3select_functions.h +++ b/include/s3select_functions.h @@ -235,7 +235,8 @@ enum class s3select_func_En_t {ADD, LEADING, TRAILING, DECIMAL_OPERATOR, - CAST_TO_DECIMAL + CAST_TO_DECIMAL, + ENGINE_VERSION }; @@ -306,7 +307,8 @@ class s3select_functions {"#leading#", s3select_func_En_t::LEADING}, {"#trailing#", s3select_func_En_t::TRAILING}, {"#decimal_operator#", s3select_func_En_t::DECIMAL_OPERATOR}, - {"#cast_as_decimal#", s3select_func_En_t::CAST_TO_DECIMAL} + {"#cast_as_decimal#", s3select_func_En_t::CAST_TO_DECIMAL}, + {"engine_version", s3select_func_En_t::ENGINE_VERSION} }; @@ -517,9 +519,10 @@ struct _fn_sum : public base_function value sum; - _fn_sum() : sum(0) + _fn_sum() { aggregate = true; + sum.setnull(); } bool operator()(bs_stmt_vec_t* args, variable* result) override @@ -531,6 +534,10 @@ struct _fn_sum : public base_function try { + if(sum.is_null()) + { + sum = 0; + } sum = sum + x->eval(); } catch (base_s3select_exception& e) @@ -618,7 +625,9 @@ struct _fn_avg : public base_function void get_aggregate_result(variable *result) override { if(count == static_cast(0)) { - throw base_s3select_exception("count cannot be zero!"); + value v_null; + v_null.setnull(); + *result=v_null; } else { *result = sum/count ; } @@ -630,9 +639,10 @@ struct _fn_min : public base_function value min; - _fn_min():min(__INT64_MAX__) + _fn_min() { aggregate=true; + min.setnull(); } bool operator()(bs_stmt_vec_t* args, variable* result) override @@ -642,7 +652,7 @@ struct _fn_min : public base_function auto iter = args->begin(); base_statement* x = *iter; - if(min > x->eval()) + if(min.is_null() || min > x->eval()) { min=x->eval(); } @@ -662,9 +672,10 @@ struct _fn_max : public base_function value max; - _fn_max():max(-__INT64_MAX__) + _fn_max() { aggregate=true; + max.setnull(); } bool operator()(bs_stmt_vec_t* args, variable* result) override @@ -674,7 +685,7 @@ struct _fn_max : public base_function auto iter = args->begin(); base_statement* x = *iter; - if(max < x->eval()) + if(max.is_null() || max < x->eval()) { max=x->eval(); } @@ -694,7 +705,7 @@ struct _fn_to_int : public base_function value var_result; bool operator()(bs_stmt_vec_t* args, variable* result) override - { + { check_args_size(args,1); value v = (*args->begin())->eval(); @@ -2182,6 +2193,32 @@ struct _fn_decimal_operator : public base_function { } }; +struct _fn_engine_version : public base_function { + + const char* version_description =R"(PR #137 : +the change handle the use cases where the JSON input starts with an anonymous array/object this may cause wrong search result per the user request(SQL statement) + +handle the use-case where the user requests a json-key-path that may point to a non-discrete value. i.e. array or an object. +editorial changes. + +fix for CSV flow, in the case of a "broken row" (upon processing stream of data) + +null results upon aggregation functions on an empty group (no match for where clause). +)"; + + + _fn_engine_version() + { + aggregate = true; + } + + bool operator()(bs_stmt_vec_t* args, variable* result) override + { + result->set_value(version_description); + return true; + } +}; + base_function* s3select_functions::create(std::string_view fn_name,const bs_stmt_vec_t &arguments) { const FunctionLibrary::const_iterator iter = m_functions_library.find(fn_name.data()); @@ -2419,6 +2456,10 @@ base_function* s3select_functions::create(std::string_view fn_name,const bs_stmt return S3SELECT_NEW(this,_fn_cast_to_decimal); break; + case s3select_func_En_t::ENGINE_VERSION: + return S3SELECT_NEW(this,_fn_engine_version); + break; + default: throw base_s3select_exception("internal error while resolving function-name"); break; diff --git a/include/s3select_json_parser.h b/include/s3select_json_parser.h index 60cd2b32..aa06163f 100644 --- a/include/s3select_json_parser.h +++ b/include/s3select_json_parser.h @@ -165,11 +165,20 @@ class json_variable_access { // to set the following. std::vector* from_clause; std::vector* key_path; +//m_current_depth : trace the depth of the reader, including "anonymous"(meaning JSON may begin with array that has no name attached to it) int* m_current_depth; +//m_current_depth_non_anonymous : trace the depth of the reader, NOT including "anonymous" array/object. +//upon user request the following _1.a[12].b, the key-name{a} may reside on some array with no-name, +//the state machine that search for a[12].b, does NOT contain states for that "anonymous" array, +//thus, the state-machine will fail to trace the user request for that specific key.path +int* m_current_depth_non_anonymous; std::function * m_exact_match_cb; // a state number : (_1).a.b.c[ 17 ].d.e (a.b)=1 (c[)=2 (17)=3 (.d.e)=4 size_t current_state;//contain the current state of the state machine for searching-expression (each JSON variable in SQL statement has a searching expression) int nested_array_level;//in the case of array within array it contain the nesting level +int m_json_index; +s3selectEngine::value v_null; +size_t m_from_clause_size; struct variable_state_md { std::vector required_path;//set by the syntax-parser. in the case of array its empty @@ -184,20 +193,26 @@ std::vector variable_states;//vector is populated upon public: -json_variable_access():from_clause(nullptr),key_path(nullptr),m_current_depth(nullptr),m_exact_match_cb(nullptr),current_state(-1),nested_array_level(0) +json_variable_access():from_clause(nullptr),key_path(nullptr),m_current_depth(nullptr),m_current_depth_non_anonymous(nullptr),m_exact_match_cb(nullptr),current_state(-1),nested_array_level(0),m_json_index(-1),v_null(nullptr),m_from_clause_size(0) {} void init( std::vector* reader_from_clause, std::vector* reader_key_path, int* reader_current_depth, - std::function * excat_match_cb) + int* reader_m_current_depth_non_anonymous, + std::function * excat_match_cb, + int json_index) {//this routine should be called before scanning the JSON input from_clause = reader_from_clause; key_path = reader_key_path; m_exact_match_cb = excat_match_cb; + //m_current_depth and m_current_depth_non_anonymous points to the JSON reader variables. m_current_depth = reader_current_depth; + m_current_depth_non_anonymous = reader_m_current_depth_non_anonymous; current_state = 0; + m_json_index = json_index; + m_from_clause_size = from_clause->size(); //loop on variable_states compute required_depth_size } @@ -249,28 +264,31 @@ void push_variable_state(std::vector& required_path,int required_ar struct variable_state_md& reader_position_state() { - if (current_state>=variable_states.size()) - { - const char* out_of_range = "\nJSON reader failed due to array-out-of-range\n"; - throw s3selectEngine::base_s3select_exception(out_of_range,s3selectEngine::base_s3select_exception::s3select_exp_en_t::FATAL); - } + if (current_state>=variable_states.size()) + {//in case the state-machine reached a "dead-end", should push a null for that JSON variable + //going back one state. + (*m_exact_match_cb)(v_null,m_json_index); + decrease_current_state(); + } return variable_states[ current_state ]; } bool is_array_state() { - return (reader_position_state().required_array_entry_no>=0); + return (reader_position_state().required_array_entry_no >= 0); } bool is_reader_located_on_required_depth() { - return (*m_current_depth == reader_position_state().required_depth_size); + //upon user request `select _1.a.b from s3object[*].c.d;` the c.d sould "cut off" from m_current_depth_non_anonymous + //to get the correct depth of the state-machine + return ((*m_current_depth_non_anonymous - static_cast(m_from_clause_size)) == reader_position_state().required_depth_size); } bool is_on_final_state() { - return ((size_t)current_state == (variable_states.size())); + return (current_state == (variable_states.size())); //&& *m_current_depth == variable_states[ current_state -1 ].required_depth_size); // NOTE: by ignoring the current-depth, the matcher gives precedence to key-path match, while not ignoring accessing using array @@ -278,11 +296,6 @@ bool is_on_final_state() // user may request 'select _1.phonearray.num'; the reader will traverse `num` exist in `phonearray` } -bool is_reader_reached_required_array_entry() -{ - return (reader_position_state().actual_array_entry_no == reader_position_state().required_array_entry_no); -} - bool is_reader_passed_required_array_entry() { return (reader_position_state().actual_array_entry_no > reader_position_state().required_array_entry_no); @@ -295,7 +308,9 @@ bool is_reader_located_on_array_according_to_current_state() bool is_reader_position_depth_lower_than_required() { - return (*m_current_depth < reader_position_state().required_depth_size); + //upon user request `select _1.a.b from s3object[*].c.d;` the c.d sould "cut off" from m_current_depth_non_anonymous + //to have the correct depth of the state-machine + return ((*m_current_depth_non_anonymous - static_cast(m_from_clause_size)) < reader_position_state().required_depth_size); } bool is_reader_located_on_array_entry_according_to_current_state() @@ -307,7 +322,7 @@ void increase_current_state() { DBG - if((size_t)current_state >= (variable_states.size())) return; + if(current_state >= variable_states.size()) return; current_state ++; } @@ -323,8 +338,8 @@ void key() { DBG - if(reader_position_state().required_path.size())//state has a key - {// key should match + if(reader_position_state().required_path.size())//current state is a key + { std::vector* filter = &reader_position_state().required_path; auto required_key_depth_size = reader_position_state().required_key_depth_size; if(std::equal((*key_path).begin()+(*from_clause).size() + required_key_depth_size, //key-path-start-point + from-clause-depth-size + key-depth @@ -332,7 +347,7 @@ void key() (*filter).begin(), (*filter).end(), iequal_predicate)) { - increase_current_state();//key match, advancing to next + increase_current_state();//key match according to user request, advancing to the next state } } } @@ -358,7 +373,7 @@ void dec_key() if(is_reader_located_on_required_depth() && is_array_state())//TODO && is_array_state(). is it necessary?; json_element_state.back() != ARRAY_STATE) {//key-path-depth matches, and it an array - if(is_reader_reached_required_array_entry()) + if(is_reader_located_on_array_entry_according_to_current_state()) {//we reached the required array entry increase_current_state(); } @@ -369,14 +384,14 @@ void dec_key() } } -void new_value(s3selectEngine::value& v,size_t json_index) +void new_value(s3selectEngine::value& v) { DBG if(is_on_final_state()) { - (*m_exact_match_cb)(v, json_index); - decrease_current_state();//TODO why decrease? the state-machine reached its final destination, and it should be only one result + (*m_exact_match_cb)(v, m_json_index); + decrease_current_state();//the state-machine reached its final destination, "going back" one state, upon another match condition the matched value will override the last one } increase_array_index();//next-value in array } @@ -432,16 +447,18 @@ class json_variables_operations { std::vector * from_clause, std::vector* key_path, int* current_depth, + int* current_depth_non_anonymous, std::function * exact_match_cb) { json_statement_variables = jsv; - + int i=0;//the index per JSON variable for(auto& var : json_statement_variables) { var.first->init(from_clause, key_path, current_depth, - exact_match_cb); + current_depth_non_anonymous, + exact_match_cb,i++); } } @@ -484,7 +501,7 @@ class json_variables_operations { { for(auto& j : json_statement_variables) { - j.first->new_value(v,j.second); + j.first->new_value(v); } } };//json_variables_operations @@ -513,12 +530,13 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, std::function m_s3select_processing; int m_start_row_depth; int m_current_depth; + int m_current_depth_non_anonymous; bool m_star_operation; int m_sql_processing_status; bool m_fatal_initialization_ind = false; std::string m_fatal_initialization_description; - JsonParserHandler() : prefix_match(false),init_buffer_stream(false),m_start_row_depth(-1),m_current_depth(0),m_star_operation(false),m_sql_processing_status(0) + JsonParserHandler() : prefix_match(false),init_buffer_stream(false),m_start_row_depth(-1),m_current_depth(0),m_current_depth_non_anonymous(0),m_star_operation(false),m_sql_processing_status(0) { } @@ -615,7 +633,13 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, bool Key(const char* str, rapidjson::SizeType length, bool copy) { key_path.push_back(std::string(str)); - + + if(!m_current_depth_non_anonymous){ + //important: upon a key and m_current_depth_non_anonymous is ZERO + //it should advance by 1. to get the correct current depth(for non anonymous counter). + m_current_depth_non_anonymous++; + } + if(from_clause.size() == 0 || std::equal(key_path.begin(), key_path.end(), from_clause.begin(), from_clause.end(), iequal_predicate)) { prefix_match = true; } @@ -633,9 +657,14 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, return false; } - bool StartObject() { + bool StartObject() { json_element_state.push_back(OBJECT_STATE); m_current_depth++; + if(key_path.size()){ + //advancing the counter only upon there is a key. + m_current_depth_non_anonymous++; + } + if (prefix_match && !is_already_row_started()) { state = row_state::OBJECT_START_ROW; m_start_row_depth = m_current_depth; @@ -648,6 +677,7 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, bool EndObject(rapidjson::SizeType memberCount) { json_element_state.pop_back(); m_current_depth --; + m_current_depth_non_anonymous --; variable_match_operations.end_object(); @@ -662,6 +692,11 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, bool StartArray() { json_element_state.push_back(ARRAY_STATE); m_current_depth++; + if(key_path.size()){ + //advancing the counter only upon there is a key. + m_current_depth_non_anonymous++; + } + if (prefix_match && !is_already_row_started()) { state = row_state::ARRAY_START_ROW; m_start_row_depth = m_current_depth; @@ -675,6 +710,8 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, bool EndArray(rapidjson::SizeType elementCount) { json_element_state.pop_back(); m_current_depth--; + m_current_depth_non_anonymous--; + dec_key_path(); if (state == row_state::ARRAY_START_ROW && (m_start_row_depth > m_current_depth)) { @@ -703,6 +740,7 @@ class JsonParserHandler : public rapidjson::BaseReaderHandler, &from_clause, &key_path, &m_current_depth, + &m_current_depth_non_anonymous, &m_exact_match_cb); } diff --git a/test/s3select_test.cpp b/test/s3select_test.cpp index 6d0860ea..7c372551 100644 --- a/test/s3select_test.cpp +++ b/test/s3select_test.cpp @@ -734,6 +734,69 @@ TEST(TestS3selectFunctions, count) ASSERT_EQ(s3select_result_1,"128"); } +TEST(TestS3selectFunctions, no_args) +{//note: engine throw an exception(and description), currently it is not catch in this test-app +#if 0 + std::string input; + size_t size = 128; + generate_columns_csv(input, size); + std::string input_query_1 = "select min() from stdin;"; + + std::string s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,""); + + input_query_1 = "select max() from stdin;"; + + s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,""); + + input_query_1 = "select avg() from stdin;"; + + s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,""); + + input_query_1 = "select sum() from stdin;"; + + s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,""); +#endif +} + +TEST(TestS3selectFunctions, empty_group_upon_aggtegation) +{ + + std::string input; + size_t size = 128; + generate_columns_csv(input, size); + std::string input_query_1 = "select min(cast(_1 as int)) from stdin where 1 = 0;"; + + std::string s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,"null"); + + input_query_1 = "select max(cast(_1 as int)) from stdin where 1 = 0;"; + + s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,"null"); + + input_query_1 = "select sum(cast(_1 as int)) from stdin where 1 = 0;"; + + s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,"null"); + + input_query_1 = "select avg(cast(_1 as int)) from stdin where 1 = 0;"; + + s3select_result_1 = run_s3select(input_query_1,input); + + ASSERT_EQ(s3select_result_1,"null"); +} + TEST(TestS3selectFunctions, min) { std::string input; @@ -988,8 +1051,8 @@ TEST(TestS3selectFunctions, avgzero) false, // dont skip last line true // aggregate call ); - ASSERT_EQ(status, -1); - ASSERT_EQ(s3select_result, std::string("")); + ASSERT_EQ(status, 0); + ASSERT_EQ(s3select_result, std::string("null")); } TEST(TestS3selectFunctions, floatavg) @@ -3318,14 +3381,12 @@ std::string input_json_data = R"( } )"; -#if 0 - //TODO error phoneNumbers[12][2][2] = null, to check what happen upon reaching the final state - expected_result=R"(post 3D + expected_result=R"(null )"; + //phoneNumbers[12][2][2] is not a discrete value, should return null input_query = "select _1.phoneNumbers[12][2][2] from s3object[*];"; run_json_query(input_query.c_str(), input_json_data, result); ASSERT_EQ(result,expected_result); -#endif //the following tests ia about accessing multi-dimension array expected_result=R"(55 @@ -3352,4 +3413,34 @@ std::string input_json_data = R"( input_query = "select _1.phoneNumbers[11] from s3object[*];"; run_json_query(input_query.c_str(), input_json_data, result); ASSERT_EQ(result,expected_result); + +input_json_data = R"( +[ + { + "authors": [ + { + "id": 2312688602 + }, + { + "id": 123 + } + ], + "wrong" : {"id" : "it-is-wrong"} + } +] +)"; + + expected_result=R"(2312688602 +)"; + input_query = "select _1.authors[0].id from s3object[*];"; + run_json_query(input_query.c_str(), input_json_data, result); + ASSERT_EQ(result,expected_result); + + expected_result=R"(123 +)"; + input_query = "select _1.authors[1].id from s3object[*];"; + run_json_query(input_query.c_str(), input_json_data, result); + ASSERT_EQ(result,expected_result); + + }