From 2e8eb705eaa5b1ec6a6d28b17701abcd60fb0523 Mon Sep 17 00:00:00 2001 From: Gal Salomon Date: Mon, 22 Feb 2021 12:33:02 +0200 Subject: [PATCH] Awscli handler (#61) * adding object to handle awscli response,first phase. adding warning flags; Signed-off-by: gal salomon * fixing text to float conversion (ronen-fr #42) (#64) Signed-off-by: gal salomon * fix integer conversion; add compile-warning flag Signed-off-by: gal salomon * fix stdin mode; fix compile warnings Signed-off-by: gal salomon --- .github/workflows/cmake.yml | 56 ------ CMakeLists.txt | 2 +- example/s3select_example.cpp | 335 +++++++++++++++++++++++++++++++++-- include/s3select.h | 2 +- include/s3select_functions.h | 4 +- test/CMakeLists.txt | 2 +- 6 files changed, 327 insertions(+), 74 deletions(-) delete mode 100644 .github/workflows/cmake.yml diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml deleted file mode 100644 index ac5e693f..00000000 --- a/.github/workflows/cmake.yml +++ /dev/null @@ -1,56 +0,0 @@ -name: CMake - -on: [push] - -env: - # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) - BUILD_TYPE: Release - -jobs: - build: - # The CMake configure and build commands are platform agnostic and should work equally - # well on Windows or Mac. You can convert this to a matrix build if you need - # cross-platform coverage. - # See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - name: install-boost - run: sudo apt-get install -y libboost-all-dev - - - name: fetch-gtest - run: sudo apt-get install -y libgtest-dev - - - name: install-gtest - run: cd /usr/src/gtest && sudo cmake . && sudo cmake --build . --target install - - - name: Create Build Environment - # Some projects don't allow in-source building, so create a separate build directory - # We'll use this as our working directory for all subsequent commands - run: cmake -E make_directory ${{github.workspace}}/build - - - name: Configure CMake - # Use a bash shell so we can use the same syntax for environment variable - # access regardless of the host operating system - shell: bash - working-directory: ${{github.workspace}}/build - # Note the current convention is to use the -S and -B options here to specify source - # and build directories, but this is only available with CMake 3.13 and higher. - # The CMake binaries on the Github Actions machines are (as of this writing) 3.12 - run: cmake $GITHUB_WORKSPACE -DCMAKE_BUILD_TYPE=$BUILD_TYPE - - - name: Build - working-directory: ${{github.workspace}}/build - shell: bash - # Execute the build. You can specify a specific target with "--target " - run: cmake --build . --config $BUILD_TYPE - - - name: Test - working-directory: ${{github.workspace}}/build - shell: bash - # Execute tests defined by the CMake configuration. - # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail - run: ctest -C $BUILD_TYPE - diff --git a/CMakeLists.txt b/CMakeLists.txt index 47fcc94c..cd017b22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0) project(s3select) -set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb -Wnon-virtual-dtor -Wreorder -Wunused-variable -Wmaybe-uninitialized") +set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb -Wnon-virtual-dtor -Wreorder -Wunused-variable -Wtype-limits -Wsign-compare -Wmaybe-uninitialized") set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_STANDARD_REQUIRED ON) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 682b9852..ff6139fd 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -3,35 +3,334 @@ #include #include #include +#include +#include using namespace s3selectEngine; using namespace BOOST_SPIRIT_CLASSIC_NS; -int main(int argc, char** argv) +class awsCli_handler { + + +//TODO get parameter +private: + std::unique_ptr s3select_syntax; + std::string m_s3select_query; + std::string m_result; + std::unique_ptr m_s3_csv_object; + std::string m_column_delimiter;//TODO remove + std::string m_quot;//TODO remove + std::string m_row_delimiter;//TODO remove + std::string m_compression_type;//TODO remove + std::string m_escape_char;//TODO remove + std::unique_ptr m_buff_header; + std::string m_header_info; + std::string m_sql_query; + uint64_t m_total_object_processing_size; + +public: + + awsCli_handler(): + s3select_syntax(std::make_unique()), + m_s3_csv_object(std::unique_ptr()), + m_buff_header(std::make_unique(1000)), + m_total_object_processing_size(0), + crc32(std::unique_ptr()) + { + } + + enum header_name_En + { + EVENT_TYPE, + CONTENT_TYPE, + MESSAGE_TYPE + }; + static const char* header_name_str[3]; + + enum header_value_En + { + RECORDS, + OCTET_STREAM, + EVENT, + CONT + }; + static const char* header_value_str[4]; + +private: + + void encode_short(char *buff, uint16_t s, int &i) + { + short x = htons(s); + memcpy(buff, &x, sizeof(s)); + i += sizeof(s); + } + + void encode_int(char *buff, u_int32_t s, int &i) + { + u_int32_t x = htonl(s); + memcpy(buff, &x, sizeof(s)); + i += sizeof(s); + } + + int create_header_records(char* buff) + { + int i = 0; + + //1 + buff[i++] = char(strlen(header_name_str[EVENT_TYPE])); + memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE])); + i += strlen(header_name_str[EVENT_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i); + memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS])); + i += strlen(header_value_str[RECORDS]); + + //2 + buff[i++] = char(strlen(header_name_str[CONTENT_TYPE])); + memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE])); + i += strlen(header_name_str[CONTENT_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i); + memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM])); + i += strlen(header_value_str[OCTET_STREAM]); + + //3 + buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE])); + memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE])); + i += strlen(header_name_str[MESSAGE_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i); + memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT])); + i += strlen(header_value_str[EVENT]); + + return i; +} + + std::unique_ptr crc32; + + int create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len) + { + u_int32_t total_byte_len = 0; + u_int32_t preload_crc = 0; + u_int32_t message_crc = 0; + int i = 0; + char *buff = out_string.data(); + + if (crc32 == 0) + { + // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum + crc32 = std::unique_ptr(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); + } + + total_byte_len = result_len + 16; + + encode_int(&buff[i], total_byte_len, i); + encode_int(&buff[i], header_len, i); + + crc32->reset(); + *crc32 = std::for_each(buff, buff + 8, *crc32); + preload_crc = (*crc32)(); + encode_int(&buff[i], preload_crc, i); + + i += result_len; + + crc32->reset(); + *crc32 = std::for_each(buff, buff + i, *crc32); + message_crc = (*crc32)(); + + int out_encode; + encode_int(reinterpret_cast(&out_encode), message_crc, i); + out_string.append(reinterpret_cast(&out_encode),sizeof(out_encode)); + + return i; + } + +#define PAYLOAD_LINE "\n\n\n\n" +#define END_PAYLOAD_LINE "\n" + +public: + + //std::string get_error_description(){} + + std::string get_result() + { + return m_result; + } + + int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size) + { + int status = 0; + csv_object::csv_defintions csv; + + m_result = "012345678901"; //12 positions for header-crc + + int header_size = 0; + + if (m_s3_csv_object == 0) + { + s3select_syntax->parse_query(query); + + if (m_row_delimiter.size()) + { + csv.row_delimiter = *m_row_delimiter.c_str(); + } + + if (m_column_delimiter.size()) + { + csv.column_delimiter = *m_column_delimiter.c_str(); + } + + if (m_quot.size()) + { + csv.quot_char = *m_quot.c_str(); + } + + if (m_escape_char.size()) + { + csv.escape_char = *m_escape_char.c_str(); + } + + if (m_header_info.compare("IGNORE") == 0) + { + csv.ignore_header_info = true; + } + else if (m_header_info.compare("USE") == 0) + { + csv.use_header_info = true; + } + + m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); + } + + if (s3select_syntax->get_error_description().empty() == false) + { + header_size = create_header_records(m_buff_header.get()); + m_result.append(m_buff_header.get(), header_size); + m_result.append(PAYLOAD_LINE); + m_result.append(s3select_syntax->get_error_description()); + //ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl; + status = -1; + } + else + { + header_size = create_header_records(m_buff_header.get()); + m_result.append(m_buff_header.get(), header_size); + m_result.append(PAYLOAD_LINE); + //status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size); + status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size); + if (status < 0) + { + m_result.append(m_s3_csv_object->get_error_description()); + } + } + + if (m_result.size() > strlen(PAYLOAD_LINE)) + { + m_result.append(END_PAYLOAD_LINE); + create_message(m_result, m_result.size() - 12, header_size); + //s->formatter->write_bin_data(m_result.data(), buff_len); + //if (op_ret < 0) + //{ + // return op_ret; + //} + } + //rgw_flush_formatter_and_reset(s, s->formatter); + + return status; + } + //int extract_by_tag(std::string tag_name, std::string& result); + + //void convert_escape_seq(std::string& esc); + + //int handle_aws_cli_parameters(std::string& sql_query); + +}; + +const char* awsCli_handler::header_name_str[3] = {":event-type", ":content-type", ":message-type"}; +const char* awsCli_handler::header_value_str[4] = {"Records", "application/octet-stream", "event","cont"}; +int run_on_localFile(char* input_query); + +int main(int argc,char **argv) { +awsCli_handler awscli; - //purpose: demostrate the s3select functionalities - s3select s3select_syntax; + char *query; + char *fname; - char* input_query = 0; + bool using_key_flag = false; for (int i = 0; i < argc; i++) { + if (!strcmp(argv[i], "-key")) + { + fname = argv[i + 1]; + using_key_flag = true; + continue; + } + if (!strcmp(argv[i], "-q")) { - input_query = argv[i + 1]; + query = argv[i + 1]; + continue; } } + if(using_key_flag == false) + { + return run_on_localFile(query); + } - if (!input_query) + FILE* fp = fopen(fname, "r"); + + if(!fp) { - std::cout << "type -q 'select ... from ... '" << std::endl; + std::cout << " input stream is not valid, abort;" << std::endl; return -1; } + struct stat statbuf; + lstat(fname, &statbuf); + int status; + s3selectEngine::csv_object::csv_defintions csv; + csv.use_header_info = false; + +#define BUFFER_SIZE 1024 * 4 //simulate 4mb parts in s3-objects + char *buff = (char *)malloc(BUFFER_SIZE); + while (1) + { + size_t input_sz = fread(buff, 1, BUFFER_SIZE, fp); + status = awscli.run_s3select(query, buff, input_sz, statbuf.st_size); + if(status<0) + { + std::cout << "failure on execution " << std::endl; + break; + } + std::cout << awscli.get_result() << std::endl; + + if(!input_sz || feof(fp)) + { + break; + } + } + + free(buff); + fclose(fp); + return status; +} + +int run_on_localFile(char* input_query) +{ + + //purpose: demostrate the s3select functionalities + s3select s3select_syntax; + + if (!input_query) + { + std::cout << "type -q 'select ... from ... '" << std::endl; + return -1; + } int status = s3select_syntax.parse_query(input_query); if (status != 0) @@ -67,14 +366,14 @@ int main(int argc, char** argv) std::string s3select_result; s3selectEngine::csv_object::csv_defintions csv; csv.use_header_info = false; + bool do_aggregate = false; //csv.column_delimiter='|'; //csv.row_delimiter='\t'; s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); - //s3selectEngine::csv_object s3_csv_object(&s3select_syntax); -#define BUFF_SIZE 1024*1024*4 +#define BUFF_SIZE 1024*1024*4 //simulate 4mb parts in s3 object char* buff = (char*)malloc( BUFF_SIZE ); while(1) { @@ -86,11 +385,21 @@ int main(int argc, char** argv) //input_sz = strlen(buff); //size_t input_sz = in == 0 ? 0 : strlen(in); - //if (!input_sz) to_aggregate = true; + if (!input_sz || feof(fp)) + { + do_aggregate = true; + } + int status; + if(do_aggregate == true) + { + status = s3_csv_object.run_s3select_on_object(s3select_result, in, input_sz, false, false, do_aggregate); + } + else + { + status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size); + } - //int status = s3_csv_object.run_s3select_on_object(s3select_result,in,input_sz,false,false,to_aggregate); - int status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size); if(status<0) { std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl; @@ -113,5 +422,5 @@ int main(int argc, char** argv) free(buff); fclose(fp); - + return 0; } diff --git a/include/s3select.h b/include/s3select.h index 002e818a..b925c7da 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -1226,7 +1226,7 @@ void push_is_null_predicate::builder(s3select* self, const char* a, const char* std::string token(a, b); bool is_null = true; - for(int i=0;i info = bsc::parse(token.c_str()+i, (bsc::str_p("is") >> bsc::str_p("not") >> bsc::str_p("null")) , bsc::space_p); if (info.full) diff --git a/include/s3select_functions.h b/include/s3select_functions.h index 24c412be..fc49142e 100644 --- a/include/s3select_functions.h +++ b/include/s3select_functions.h @@ -797,7 +797,7 @@ struct _fn_to_timestamp : public base_function result->set_value(&new_ptime); #else - if (frac_sec >= 0 && frac_sec < 10) + if (frac_sec < 10) frac_sec = frac_sec * 100000; else if (frac_sec >= 10 && frac_sec < 100) frac_sec = frac_sec * 10000; @@ -805,7 +805,7 @@ struct _fn_to_timestamp : public base_function frac_sec = frac_sec * 1000; else if (frac_sec >= 1000 && frac_sec < 10000) frac_sec = frac_sec * 100; - else if (frac_sec >= 0 && frac_sec < 100000) + else if (frac_sec < 100000) frac_sec = frac_sec * 10; //TODO: Include timezone hours(tz_hr) and timezone minutes(tz_mn) in date time calculation. diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 56bab20c..e2898ae3 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,6 +1,6 @@ add_executable(s3select_test s3select_test.cpp) target_include_directories(s3select_test PUBLIC ../include) -target_link_libraries(s3select_test gtest gtest_main boost_date_time boost_thread boost_system -lpthread -lm) +target_link_libraries(s3select_test gtest gtest_main boost_date_time boost_thread boost_system) include(GoogleTest) gtest_discover_tests(s3select_test)