Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

arrow/parquet : limit the parquet-reader memory usage #163

Merged
merged 1 commit into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,10 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)
std::cout << "DEBUG: {" << msg << "}" << std::endl;
};

std::function<void(const char*)> fp_nop = [](const char* msg){};
std::function<int(std::string&)> fp_continue_nop = [](std::string& result){return 0;};
parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
//parquet_processor.set_external_debug_system(fp_debug);
parquet_processor.set_external_system_functions(fp_continue_nop, fp_s3select_result_format, fp_s3select_header_format,fp_nop);

std::string result;

Expand Down
72 changes: 28 additions & 44 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ class base_s3object
}

result.append(res->to_string());
//TODO the strlen should replace with the size of the string(performance perspective)
m_returned_bytes_size += strlen(res->to_string());
++j;
}
Expand All @@ -2474,6 +2475,8 @@ class base_s3object
if(m_csv_defintion.output_json_format && projections_resuls.values.size()) {
json_result_format(projections_resuls, result, output_delimiter);
result.append(output_row_delimiter);
//TODO to add asneeded
//TODO "flush" the result string
return;
}

Expand All @@ -2493,10 +2496,6 @@ class base_s3object
set_processing_time_error();
}


if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(column_result.data());

if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
Expand Down Expand Up @@ -2524,9 +2523,30 @@ class base_s3object
if(!m_aggr_flow) {
result.append(output_row_delimiter);
m_returned_bytes_size += output_delimiter.size();
}
}


//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}
}

void flush_sql_result(std::string& result)
{//purpose: flush remaining data reside in the buffer
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

Status getMatchRow( std::string& result)
{
multi_values projections_resuls;
Expand Down Expand Up @@ -2670,8 +2690,6 @@ class base_s3object

}; //base_s3object

//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
class csv_object : public base_s3object
{

Expand Down Expand Up @@ -2984,16 +3002,6 @@ class csv_object : public base_s3object
return -1;
}
}

if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

if (m_sql_processing_status == Status::END_OF_STREAM)
{
break;
Expand All @@ -3009,12 +3017,7 @@ class csv_object : public base_s3object

} while (true);

if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{ //note: it may produce empty response(more the once)
//upon empty result, it should return *only* upon last call.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
flush_sql_result(result);

return 0;
}
Expand Down Expand Up @@ -3050,7 +3053,7 @@ class parquet_object : public base_s3object
parquet_query_setting(nullptr);
}

parquet_object() : base_s3object(nullptr)
parquet_object() : base_s3object(nullptr), not_to_increase_first_time(true)
{}

void parquet_query_setting(s3select *s3_query)
Expand Down Expand Up @@ -3127,26 +3130,6 @@ class parquet_object : public base_s3object
}
}

#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
{//AWS-cli limits response size the following callbacks send response upon some threshold
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);

if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED))
{
if(m_fp_s3select_header_format)
m_fp_s3select_header_format(result);
}
}
else
{
if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED))
{
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);
}
}

//TODO is_end_of_stream() required?
if (get_sql_processing_status() == Status::END_OF_STREAM || is_end_of_stream() || get_sql_processing_status() == Status::LIMIT_REACHED)
Expand All @@ -3156,6 +3139,7 @@ class parquet_object : public base_s3object

} while (1);

flush_sql_result(result);
return 0;
}

Expand Down
36 changes: 30 additions & 6 deletions include/s3select_parquet_intrf.h
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,30 @@ class PARQUET_EXPORT RowGroupReader {
std::unique_ptr<Contents> contents_;
};

#define RGW_default_buffer_size 1024*1024*1024
class S3select_Config {
public:
static S3select_Config& getInstance() {
static S3select_Config instance;
return instance;
}

void set_s3select_reader_properties(uint64_t value) { this->m_reader_properties = value; }
uint64_t get_s3select_reader_properties() const { return m_reader_properties; }

private:
uint64_t m_reader_properties;
S3select_Config() : m_reader_properties(RGW_default_buffer_size) {} // Private constructor
};

ReaderProperties s3select_reader_properties() {

static ReaderProperties default_reader_properties;
default_reader_properties.enable_buffered_stream();
default_reader_properties.set_buffer_size(S3select_Config::getInstance().get_s3select_reader_properties());
return default_reader_properties;
}

class PARQUET_EXPORT ParquetFileReader {
public:
// Declare a virtual class 'Contents' to aid dependency injection and more
Expand All @@ -755,7 +779,7 @@ class PARQUET_EXPORT ParquetFileReader {
struct PARQUET_EXPORT Contents {
static std::unique_ptr<Contents> Open(
std::shared_ptr<::arrow::io::RandomAccessFile> source,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR);

virtual ~Contents() = default;
Expand All @@ -776,21 +800,21 @@ class PARQUET_EXPORT ParquetFileReader {
ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version")
static std::unique_ptr<ParquetFileReader> Open(
std::unique_ptr<RandomAccessSource> source,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR);

// Create a file reader instance from an Arrow file object. Thread-safety is
// the responsibility of the file implementation
static std::unique_ptr<ParquetFileReader> Open(
std::shared_ptr<::arrow::io::RandomAccessFile> source,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR);

// API Convenience to open a serialized Parquet file on disk, using Arrow IO
// interfaces.
static std::unique_ptr<ParquetFileReader> OpenFile(
const std::string& path,s3selectEngine::rgw_s3select_api* rgw, bool memory_map = true,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR
);

Expand Down Expand Up @@ -1034,7 +1058,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
class SerializedFile : public ParquetFileReader::Contents {
public:
SerializedFile(std::shared_ptr<ArrowInputFile> source,
const ReaderProperties& props = default_reader_properties())
const ReaderProperties& props = s3select_reader_properties())
: source_(std::move(source)), properties_(props) {
PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize());
}
Expand Down Expand Up @@ -1241,7 +1265,7 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(

#if ARROW_VERSION_MAJOR > 9
file_metadata_ =
FileMetaData::Make(metadata_buffer->data(), &metadata_len, default_reader_properties(), file_decryptor_);
FileMetaData::Make(metadata_buffer->data(), &metadata_len, s3select_reader_properties(), file_decryptor_);
#else
file_metadata_ =
FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_);
Expand Down
Loading