From c232d982bbcf03be7a9f392787cbf85915f07299 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Mon, 2 Aug 2021 07:47:15 +0300 Subject: [PATCH 1/3] adstraction for projection results, a phase before output-serialization, result returns as set of values. missing memory managment for variable and value Signed-off-by: gal salomon --- include/s3select.h | 25 ++++++++++--- include/s3select_oper.h | 79 +++++++++++++++++++++++++++++++---------- 2 files changed, 81 insertions(+), 23 deletions(-) diff --git a/include/s3select.h b/include/s3select.h index 926a2e98..ac0394da 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -1806,10 +1806,22 @@ class csv_object : public base_s3object public: + void result_values_to_string(multi_values& projections_resuls, std::string& result) + { + for(auto res : projections_resuls.values) + { + result.append( res->to_string() ); + + //if(++iset_last_call(); i->set_skip_non_aggregate(false);//projection column is set to be runnable - result.append( i->eval().to_string() ); - result.append(","); + + projections_resuls.push_value( &(i->eval()) ); } + result_values_to_string(projections_resuls,result); return number_of_tokens; } @@ -1874,12 +1888,13 @@ class csv_object : public base_s3object } while (m_where_clause && !m_where_clause->eval().is_true()); + projections_resuls.clear(); for (auto& i : m_projections) { - result.append( i->eval().to_string() ); - result.append(","); + projections_resuls.push_value( &(i->eval()) ); } - result.append("\n"); + result_values_to_string(projections_resuls,result); + result.append("\n");//TODO use csv-defintion } return number_of_tokens; //TODO wrong diff --git a/include/s3select_oper.h b/include/s3select_oper.h index 56301530..7b0f447f 100644 --- a/include/s3select_oper.h +++ b/include/s3select_oper.h @@ -155,7 +155,7 @@ class s3select_allocator //s3select is the "owner" std::vector list_of_buff; u_int32_t m_idx; -#define __S3_ALLOCATION_BUFF__ (8*1024) +#define __S3_ALLOCATION_BUFF__ (24*1024) void check_capacity(size_t sz) { if (sz>__S3_ALLOCATION_BUFF__) @@ -415,6 +415,22 @@ struct binop_modulo typedef std::tuple timestamp_t; +class value; +class multi_values +{ + public: + std::vector values; + + public: + void push_value(value* v); + + void clear() + { + values.clear(); + } + +}; + class value { @@ -422,11 +438,13 @@ class value typedef union { int64_t num; - char* str;//TODO consider string_view + char* str;//TODO consider string_view(save copy) double dbl; timestamp_t* timestamp; } value_t; + multi_values multiple_values; + private: value_t __val; //std::string m_to_string; @@ -444,6 +462,7 @@ class value S3NULL, S3NAN, BOOL, + MULTIPLE_VALUES, NA } ; value_En_t type; @@ -475,6 +494,11 @@ class value __val.str = m_str_value.data(); } + ~value() + {//TODO should be a part of the cleanup routine(__function::push_for_cleanup) + multiple_values.values.clear(); + } + value():type(value_En_t::NA) { __val.num=0; @@ -977,6 +1001,22 @@ class value } }; +void multi_values::push_value(value *v) +{ + //v could be single or multiple values + if (v->type == value::value_En_t::MULTIPLE_VALUES) + { + for (auto sv : v->multiple_values.values) + { + values.push_back(sv); + } + } + else + { + values.push_back(v); + } +} + class base_statement { @@ -1168,7 +1208,8 @@ class variable : public base_statement int column_pos; value var_value; std::string m_star_op_result; - char m_star_op_result_charc[4096]; //TODO should be dynamic + char m_star_op_result_charc[4096]; //TODO cause larger allocations for other objects containing variable (dynamic is one solution) + value star_operation_values[16];//TODO cause larger allocations for other objects containing variable (dynamic is one solution) const int undefined_column_pos = -1; const int column_alias = -2; @@ -1298,15 +1339,20 @@ class variable : public base_statement return var_value.type; } - value& star_operation() //purpose return content of all columns in a input stream { - - int i; + size_t pos=0; - int num_of_columns = m_scratch->get_num_of_columns(); - for(i=0; iget_num_of_columns(); + var_value.multiple_values.clear(); //TODO var_value.clear()?? + + if(sizeof(star_operation_values)/sizeof(value) < num_of_columns) + { + throw base_s3select_exception(std::string("not enough memory for star-operation"), base_s3select_exception::s3select_exp_en_t::FATAL); + } + + for(size_t i=0; iget_column_value(i).size(); if((pos+len)>sizeof(m_star_op_result_charc)) @@ -1314,22 +1360,19 @@ class variable : public base_statement throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL); } - memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len); + memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len);//TODO using string_view will avoid copy + m_star_op_result_charc[ pos + len ] = 0; + + star_operation_values[i] = &m_star_op_result_charc[pos];//set string value + var_value.multiple_values.push_value( &star_operation_values[i] ); + pos += len; - m_star_op_result_charc[ pos ] = ',';//TODO need for another abstraction (per file type) pos ++; } - size_t len = m_scratch->get_column_value(i).size(); - if((pos+len)>sizeof(m_star_op_result_charc)) - { - throw base_s3select_exception("result line too long", base_s3select_exception::s3select_exp_en_t::FATAL); - } + var_value.type = value::value_En_t::MULTIPLE_VALUES; - memcpy(&m_star_op_result_charc[pos], m_scratch->get_column_value(i).data(), len); - m_star_op_result_charc[ pos + len ] = 0; - var_value = (char*)&m_star_op_result_charc[0]; return var_value; } From f0fface34df1738ad15b0139330908669e588a46 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Thu, 2 Sep 2021 16:30:35 +0300 Subject: [PATCH 2/3] enable parallel execution of a query. 2 main flows (1) non-aggregation flow: split the input object into N ranges, each range gets its own context. (2) aggregarion flow:(for aggregatio queries) requires 2 phases, Upon the completion of the first phase (running the query on a specific range) The AST will be traversed for aggregation nodes, the result of each node is pushed into a dedicated scratch area current commit is handling the CSV use-case/non-aggregation flow. --- example/s3select_example.cpp | 51 ++++++++ include/s3select.h | 221 ++++++++++++++++++++++++++++++++++- 2 files changed, 269 insertions(+), 3 deletions(-) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 012de26a..6e3eee67 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -473,10 +473,61 @@ int run_on_single_query(const char* fname, const char* query) return status; } +int parallel_query_processing(int argc,char **argv) +{ +//purpose: run parallel execution of single query. + + + char *query=0; + char *object_name=0; + uint32_t execution_scale=1; + + for (int i = 0; i < argc; i++) + { + + if (!strcmp(argv[i], "-key")) + { + object_name = argv[i + 1]; + continue; + } + + if (!strcmp(argv[i], "-q")) + { + query = argv[i + 1]; + continue; + } + + if (!strcmp(argv[i], "-p")) + { + execution_scale = atoi(argv[i + 1]); + continue; + } + } + + if(!object_name) + { + std::cout << " object name is missing " << std::endl; + } + + if(!query) + { + std::cout << " query is missing " << std::endl; + } + + csv_parallel_execution csv_scaleup(object_name,query,execution_scale); + + csv_scaleup.prepare(); + csv_scaleup.run(); + + return 0; +} + int main(int argc,char **argv) { //purpose: run many queries (reside in file) on single file. + return parallel_query_processing(argc,argv); + char *query=0; char *fname=0; char *query_file=0;//file contains many queries diff --git a/include/s3select.h b/include/s3select.h index ac0394da..48d41732 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -13,7 +13,8 @@ #include #include #include - +#include +#include #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} @@ -2000,7 +2001,7 @@ class csv_object : public base_s3object } -public: +public: //change std::string& result --> multi_values int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate) { @@ -2030,7 +2031,7 @@ class csv_object : public base_s3object int num = 0; try { - num = getMatchRow(result); + num = getMatchRow(result);//TODO multi_values } catch (base_s3select_exception& e) { @@ -2053,6 +2054,220 @@ class csv_object : public base_s3object return 0; } + + ////// parallel execution should inherit from csv_object + + typedef struct { + + const char* object_name; + std::pair r; + bool skip_first_row; + bool skip_last_row; + csv_object* csv_obj; + s3select* s3select_obj; + + } split_desc_t; + + void process_range(split_desc_t split_arg) + { + //the first range should skip the last line it + //the middle ranges(including the last) should use the previous broken line and merge it. + //each range should loop on chunks(4m) + + //NOTE: this part should execute in RGW (the std::ifstream replaced with S3 API's); TODO create an interface with RGW::callback + + char buff[4 * 1024 * 1024];//RGW chunk + std::ifstream is(split_arg.object_name, std::ifstream::binary); + size_t length_to_read = split_arg.r.second - split_arg.r.first; + is.seekg(split_arg.r.first); //range starting point + + while (length_to_read) + {//process chunk after chunk + + is.read(buff, (length_to_read > sizeof(buff)) ? sizeof(buff) : length_to_read); + length_to_read -= is.gcount(); + std::string_view p(&buff[0],is.gcount()); + size_t broken_line_sz = 0; //TODO no need for that + std::string broken_row; + + const char *process_loc = p.data(); + size_t length_of_process = is.gcount(); + + if (split_arg.skip_first_row) + { + + auto row_delim_pos = p.find_first_of(m_csv_defintion.row_delimiter)+1; + //verfiy it found row_delimiter + + split_arg.skip_first_row = false; //skip only once + broken_line_sz = row_delim_pos; + broken_row.assign(p.data(),broken_line_sz); //TODO push it for later processing + + process_loc = p.data() + broken_line_sz; + length_of_process = is.gcount() - broken_line_sz; + } + else if (split_arg.skip_last_row && (size_t)is.gcount() < sizeof(buff)) //remove last row and save it; + { + auto row_delim_pos = p.find_last_of(m_csv_defintion.row_delimiter)+1; + //verfiy it found row_delimiter + + broken_line_sz = is.gcount() - row_delim_pos; + broken_row.assign(&buff[row_delim_pos], broken_line_sz); //TODO push it for later processing + + process_loc = &buff[0]; + length_of_process = is.gcount() - broken_line_sz; + } + + std::string str_result; + //the function "knows" how to merge between chunks + run_s3select_on_stream(str_result, + process_loc, + length_of_process, + split_arg.r.second - split_arg.r.first); //TODO last row in range could be broken + + //TODO debug print + std::cout << "result for range " << split_arg.r.first << " " << split_arg.r.second << std::endl; + std::cout << str_result << std::endl;//TODO at this point a thread-safe queue for the result is required(each context has its own queue, it shared with single reader) + + }; //while scanning the range + } + +};//csv_object + +//////////////////////////////////// object for handling parallel execution of a single query +class csv_parallel_execution { + + + // TODO shared queue, i.e. dedicated queue per context , the queue enables read/write access one context is pushing other context is reading + //purpose: simulate the RGW IO streaming into s3select. + public: + + const char* m_object_name; + std::string m_query; + uint16_t m_scale_size; + csv_object::split_desc_t m_split_args[16]; + + csv_parallel_execution(const char* object_name,const char * query,uint16_t num_of_split)//TODO init scale-number, should produce several split_desc_t + { + m_query.assign(query); + m_scale_size = num_of_split; + m_object_name = object_name; + } + + void split(uint16_t num_of_split,const char* object_name) + { + //purpose: simulate parallel execution (different context), the threading-model is only for implementation of the simulator + //the object is split into separate ranges, each range is processed with the same query + //result should merged later. + //ranges are set arbitrary(according to object size), meaning it could "break" lines. + //these lines should merged and processed later. + + std::ifstream is(object_name, std::ifstream::binary); + + is.seekg (0, is.end); + uint64_t length = is.tellg(); + is.seekg (0, is.beg); + + uint64_t split_sz=length/num_of_split; + + std::pair r; + r.first=0; + + for(int i=1;i thread_vec; + + for (int i = 0; i < num_of_split; i++) + { + csv_object::csv_defintions csv; + m_split_args[i].object_name = object_name; + m_split_args[i].s3select_obj->parse_query(m_query.c_str()); + m_split_args[i].csv_obj = new csv_object(m_split_args[i].s3select_obj,csv);//TODO release + + + if (i == 0) + { + //skip last row and save it for later + m_split_args[i].skip_first_row = false; + m_split_args[i].skip_last_row = true; + + + thread_vec.push_back(std::thread(&csv_object::process_range,m_split_args[i].csv_obj,m_split_args[i])); + //m_split_args[i].csv_obj->process_range(m_split_args[i]); + + continue; + } + + if (i == num_of_split - 1) //last range + { + //skip first row + m_split_args[i].skip_first_row = true; + m_split_args[i].skip_last_row = false; + + thread_vec.push_back(std::thread(&csv_object::process_range,m_split_args[i].csv_obj,m_split_args[i])); + //m_split_args[i].csv_obj->process_range(m_split_args[i]); + + continue; + } + + //"middle ranges" , should skip first row and last row. + m_split_args[i].skip_first_row = true; + m_split_args[i].skip_last_row = false; + + thread_vec.push_back(std::thread(&csv_object::process_range,m_split_args[i].csv_obj,m_split_args[i])); + //m_split_args[i].csv_obj->process_range(m_split_args[i]); + + + + }//for + + + for(auto &th : thread_vec) + {//wait for all threads + th.join(); + } + + + }//split + + void run() + { + split(m_scale_size,m_object_name); + + for(int i=0;iparse_query(m_query.c_str()); + } + } + }; };//namespace From 7e9c8c343c48ae49cec71f007084bd5010bc6ed2 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Thu, 2 Sep 2021 16:59:42 +0300 Subject: [PATCH 3/3] add pthread to linkage --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ffb69910..9712ecc1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0) project(s3select) -set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb -Wnon-virtual-dtor -Wreorder -Wunused-variable -Wtype-limits -Wsign-compare -Wmaybe-uninitialized") +set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb -Wnon-virtual-dtor -Wreorder -Wunused-variable -Wtype-limits -Wsign-compare -Wmaybe-uninitialized -lpthread") set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)