Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scale-up #103

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
add_executable(s3select_example s3select_example.cpp)
target_include_directories(s3select_example PUBLIC ../include ../rapidjson/include)
add_executable(s3select_scaleup s3select_scaleup.cpp)
target_include_directories(s3select_scaleup PUBLIC ../include ../rapidjson/include)

find_package(Arrow QUIET)

Expand All @@ -8,9 +10,11 @@ if(Arrow_FOUND)
add_executable(csv_to_parquet csv_to_parquet.cpp)
target_include_directories(csv_to_parquet PUBLIC ../include)
target_link_libraries(s3select_example boost_date_time boost_system boost_thread parquet arrow boost_filesystem)
target_link_libraries(s3select_scaleup boost_date_time boost_system boost_thread parquet arrow boost_filesystem pthread)
target_link_libraries(csv_to_parquet boost_date_time boost_system boost_thread parquet arrow)
else()
target_link_libraries(s3select_example boost_date_time boost_system boost_thread boost_filesystem)
target_link_libraries(s3select_scaleup boost_date_time boost_system boost_thread boost_filesystem)
endif()

add_executable(generate_rand_csv generate_rand_csv.c)
Expand Down
38 changes: 26 additions & 12 deletions example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class awsCli_handler {
private:
std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
std::string m_s3select_query;
std::string m_result;
s3select_result m_result;
std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
std::string m_column_delimiter;//TODO remove
std::string m_quot;//TODO remove
Expand Down Expand Up @@ -156,7 +156,7 @@ class awsCli_handler {

std::string get_result()
{
return m_result;
return m_result.str();
}

int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size)
Expand Down Expand Up @@ -229,7 +229,7 @@ class awsCli_handler {
if (m_result.size() > strlen(PAYLOAD_LINE))
{
m_result.append(END_PAYLOAD_LINE);
create_message(m_result, m_result.size() - 12, header_size);
create_message(m_result.str(), m_result.size() - 12, header_size);
//s->formatter->write_bin_data(m_result.data(), buff_len);
//if (op_ret < 0)
//{
Expand Down Expand Up @@ -304,12 +304,12 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)
rgw.set_get_size_api(fp_get_size);
rgw.set_range_req_api(fp_range_req);

std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;};
std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){result="";return 0;};
std::function<int(s3select_result&)> fp_s3select_result_format = [](s3select_result& result){std::cout << result;result.clear();return 0;};
std::function<int(s3select_result&)> fp_s3select_header_format = [](s3select_result& result){result="";return 0;};

parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);

std::string result;
s3select_result result;

do
{
Expand Down Expand Up @@ -455,9 +455,10 @@ int run_on_localFile(char* input_query)

lstat(object_name.c_str(), &statbuf);

std::string s3select_result;
s3select_result result;
s3selectEngine::csv_object::csv_defintions csv;
csv.use_header_info = false;
bool do_aggregate = false;
//csv.column_delimiter='|';
//csv.row_delimiter='\t';

Expand All @@ -477,20 +478,34 @@ int run_on_localFile(char* input_query)
{
size_t input_sz = fread(buff, 1, BUFF_SIZE, fp);
char* in=buff;
status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size);

if (!input_sz || feof(fp))
{
do_aggregate = true;
}

int status;
if(do_aggregate == true)
{
status = s3_csv_object.run_s3select_on_object(result, in, input_sz, false, false, do_aggregate);
}
else
{
status = s3_csv_object.run_s3select_on_stream(result, in, input_sz, __INT64_MAX__);
}

if(status<0)
{
std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() << std::endl;
break;
}

if(s3select_result.size()>0)
if(result.size()>0)
{
std::cout << s3select_result;
std::cout << result;
}

s3select_result = "";
result = "";
if(!input_sz || feof(fp))
{
break;
Expand Down Expand Up @@ -521,7 +536,6 @@ int run_on_single_query(const char* fname, const char* query)

if (is_parquet_file(fname))
{
std::string result;
int status = run_query_on_parquet_file(query, fname);
return status;
}
Expand Down
Loading