Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query parallel execution #89

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0)

project(s3select)

set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb -Wnon-virtual-dtor -Wreorder -Wunused-variable -Wtype-limits -Wsign-compare -Wmaybe-uninitialized")
set(CMAKE_CXX_FLAGS "-std=gnu++17 -ggdb -Wnon-virtual-dtor -Wreorder -Wunused-variable -Wtype-limits -Wsign-compare -Wmaybe-uninitialized -lpthread")
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

Expand Down
51 changes: 51 additions & 0 deletions example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,61 @@ int run_on_single_query(const char* fname, const char* query)
return status;
}

int parallel_query_processing(int argc,char **argv)
{
//purpose: run parallel execution of single query.


char *query=0;
char *object_name=0;
uint32_t execution_scale=1;

for (int i = 0; i < argc; i++)
{

if (!strcmp(argv[i], "-key"))
{
object_name = argv[i + 1];
continue;
}

if (!strcmp(argv[i], "-q"))
{
query = argv[i + 1];
continue;
}

if (!strcmp(argv[i], "-p"))
{
execution_scale = atoi(argv[i + 1]);
continue;
}
}

if(!object_name)
{
std::cout << " object name is missing " << std::endl;
}

if(!query)
{
std::cout << " query is missing " << std::endl;
}

csv_parallel_execution csv_scaleup(object_name,query,execution_scale);

csv_scaleup.prepare();
csv_scaleup.run();

return 0;
}

int main(int argc,char **argv)
{
//purpose: run many queries (reside in file) on single file.

return parallel_query_processing(argc,argv);

char *query=0;
char *fname=0;
char *query_file=0;//file contains many queries
Expand Down
246 changes: 238 additions & 8 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <functional>

#include <fstream>
#include <thread>

#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}

Expand Down Expand Up @@ -1806,10 +1807,22 @@ class csv_object : public base_s3object

public:

void result_values_to_string(multi_values& projections_resuls, std::string& result)
{
for(auto res : projections_resuls.values)
{
result.append( res->to_string() );

//if(++i<projections_resuls.values.size())//TODO remove redundant column-delimiter at the end of row (this fix should be align with tests)
result.append(",");//TODO use csv-defintion
}
}

int getMatchRow( std::string& result) //TODO virtual ? getResult
{
int number_of_tokens = 0;
multi_values projections_resuls;



if (m_aggr_flow == true)
Expand All @@ -1820,15 +1833,17 @@ class csv_object : public base_s3object
number_of_tokens = getNextRow();
if (number_of_tokens < 0) //end of stream
{
projections_resuls.clear();
if (m_is_to_aggregate)
for (auto& i : m_projections)
{
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable
result.append( i->eval().to_string() );
result.append(",");

projections_resuls.push_value( &(i->eval()) );
}

result_values_to_string(projections_resuls,result);
return number_of_tokens;
}

Expand Down Expand Up @@ -1874,12 +1889,13 @@ class csv_object : public base_s3object
}
while (m_where_clause && !m_where_clause->eval().is_true());

projections_resuls.clear();
for (auto& i : m_projections)
{
result.append( i->eval().to_string() );
result.append(",");
projections_resuls.push_value( &(i->eval()) );
}
result.append("\n");
result_values_to_string(projections_resuls,result);
result.append("\n");//TODO use csv-defintion
}

return number_of_tokens; //TODO wrong
Expand Down Expand Up @@ -1985,7 +2001,7 @@ class csv_object : public base_s3object

}

public:
public: //change std::string& result --> multi_values
int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate)
{

Expand Down Expand Up @@ -2015,7 +2031,7 @@ class csv_object : public base_s3object
int num = 0;
try
{
num = getMatchRow(result);
num = getMatchRow(result);//TODO multi_values
}
catch (base_s3select_exception& e)
{
Expand All @@ -2038,6 +2054,220 @@ class csv_object : public base_s3object

return 0;
}

////// parallel execution should inherit from csv_object

typedef struct {

const char* object_name;
std::pair<uint64_t,uint64_t> r;
bool skip_first_row;
bool skip_last_row;
csv_object* csv_obj;
s3select* s3select_obj;

} split_desc_t;

void process_range(split_desc_t split_arg)
{
//the first range should skip the last line it
//the middle ranges(including the last) should use the previous broken line and merge it.
//each range should loop on chunks(4m)

//NOTE: this part should execute in RGW (the std::ifstream replaced with S3 API's); TODO create an interface with RGW::callback

char buff[4 * 1024 * 1024];//RGW chunk
std::ifstream is(split_arg.object_name, std::ifstream::binary);
size_t length_to_read = split_arg.r.second - split_arg.r.first;
is.seekg(split_arg.r.first); //range starting point

while (length_to_read)
{//process chunk after chunk

is.read(buff, (length_to_read > sizeof(buff)) ? sizeof(buff) : length_to_read);
length_to_read -= is.gcount();
std::string_view p(&buff[0],is.gcount());
size_t broken_line_sz = 0; //TODO no need for that
std::string broken_row;

const char *process_loc = p.data();
size_t length_of_process = is.gcount();

if (split_arg.skip_first_row)
{

auto row_delim_pos = p.find_first_of(m_csv_defintion.row_delimiter)+1;
//verfiy it found row_delimiter

split_arg.skip_first_row = false; //skip only once
broken_line_sz = row_delim_pos;
broken_row.assign(p.data(),broken_line_sz); //TODO push it for later processing

process_loc = p.data() + broken_line_sz;
length_of_process = is.gcount() - broken_line_sz;
}
else if (split_arg.skip_last_row && (size_t)is.gcount() < sizeof(buff)) //remove last row and save it;
{
auto row_delim_pos = p.find_last_of(m_csv_defintion.row_delimiter)+1;
//verfiy it found row_delimiter

broken_line_sz = is.gcount() - row_delim_pos;
broken_row.assign(&buff[row_delim_pos], broken_line_sz); //TODO push it for later processing

process_loc = &buff[0];
length_of_process = is.gcount() - broken_line_sz;
}

std::string str_result;
//the function "knows" how to merge between chunks
run_s3select_on_stream(str_result,
process_loc,
length_of_process,
split_arg.r.second - split_arg.r.first); //TODO last row in range could be broken

//TODO debug print
std::cout << "result for range " << split_arg.r.first << " " << split_arg.r.second << std::endl;
std::cout << str_result << std::endl;//TODO at this point a thread-safe queue for the result is required(each context has its own queue, it shared with single reader)

}; //while scanning the range
}

};//csv_object

//////////////////////////////////// object for handling parallel execution of a single query
class csv_parallel_execution {


// TODO shared queue, i.e. dedicated queue per context , the queue enables read/write access one context is pushing other context is reading
//purpose: simulate the RGW IO streaming into s3select.
public:

const char* m_object_name;
std::string m_query;
uint16_t m_scale_size;
csv_object::split_desc_t m_split_args[16];

csv_parallel_execution(const char* object_name,const char * query,uint16_t num_of_split)//TODO init scale-number, should produce several split_desc_t
{
m_query.assign(query);
m_scale_size = num_of_split;
m_object_name = object_name;
}

void split(uint16_t num_of_split,const char* object_name)
{
//purpose: simulate parallel execution (different context), the threading-model is only for implementation of the simulator
//the object is split into separate ranges, each range is processed with the same query
//result should merged later.
//ranges are set arbitrary(according to object size), meaning it could "break" lines.
//these lines should merged and processed later.

std::ifstream is(object_name, std::ifstream::binary);

is.seekg (0, is.end);
uint64_t length = is.tellg();
is.seekg (0, is.beg);

uint64_t split_sz=length/num_of_split;

std::pair<uint64_t,uint64_t> r;
r.first=0;

for(int i=1;i<num_of_split+1;i++)
{
if(i == num_of_split)
{
r.second = length;
}
else
{
r.second = r.first + split_sz;
}

m_split_args[i-1].r = r;
r.first = r.second + 1;

//TODO debug
std::cout <<" [ " << m_split_args[i-1].r.first << " : " << m_split_args[i-1].r.second << " ]" << std::endl;
}


std::vector<std::thread> thread_vec;

for (int i = 0; i < num_of_split; i++)
{
csv_object::csv_defintions csv;
m_split_args[i].object_name = object_name;
m_split_args[i].s3select_obj->parse_query(m_query.c_str());
m_split_args[i].csv_obj = new csv_object(m_split_args[i].s3select_obj,csv);//TODO release


if (i == 0)
{
//skip last row and save it for later
m_split_args[i].skip_first_row = false;
m_split_args[i].skip_last_row = true;


thread_vec.push_back(std::thread(&csv_object::process_range,m_split_args[i].csv_obj,m_split_args[i]));
//m_split_args[i].csv_obj->process_range(m_split_args[i]);

continue;
}

if (i == num_of_split - 1) //last range
{
//skip first row
m_split_args[i].skip_first_row = true;
m_split_args[i].skip_last_row = false;

thread_vec.push_back(std::thread(&csv_object::process_range,m_split_args[i].csv_obj,m_split_args[i]));
//m_split_args[i].csv_obj->process_range(m_split_args[i]);

continue;
}

//"middle ranges" , should skip first row and last row.
m_split_args[i].skip_first_row = true;
m_split_args[i].skip_last_row = false;

thread_vec.push_back(std::thread(&csv_object::process_range,m_split_args[i].csv_obj,m_split_args[i]));
//m_split_args[i].csv_obj->process_range(m_split_args[i]);



}//for


for(auto &th : thread_vec)
{//wait for all threads
th.join();
}


}//split

void run()
{
split(m_scale_size,m_object_name);

for(int i=0;i<m_scale_size;i++)
{//TODO uniq-ptr
delete m_split_args[i].csv_obj;
delete m_split_args[i].s3select_obj;
}
}

void prepare()
{
for(int i=0;i<m_scale_size;i++)
{
m_split_args[i].s3select_obj = new s3select();

m_split_args[i].s3select_obj->parse_query(m_query.c_str());
}
}

};

};//namespace
Expand Down
Loading