Skip to content

Commit

Permalink
[NSE-647] Leverage buffered write in shuffle (oap-project#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored and zhouyuan committed Jan 8, 2022
1 parent d5a7f81 commit 2f0ee49
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 2 deletions.
1 change: 1 addition & 0 deletions native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake(
auto splitOptions = SplitOptions::Defaults();
splitOptions.write_schema = write_schema;
splitOptions.prefer_spill = prefer_spill;
splitOptions.buffered_write = true;
if (buffer_size > 0) {
splitOptions.buffer_size = buffer_size;
}
Expand Down
9 changes: 8 additions & 1 deletion native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,15 @@ arrow::Status Splitter::Split(const arrow::RecordBatch& rb) {
arrow::Status Splitter::Stop() {
EVAL_START("write", options_.thread_id)
// open data file output stream
ARROW_ASSIGN_OR_RAISE(data_file_os_,
std::shared_ptr<arrow::io::FileOutputStream> fout;
ARROW_ASSIGN_OR_RAISE(fout,
arrow::io::FileOutputStream::Open(options_.data_file, true));
if (options_.buffered_write) {
ARROW_ASSIGN_OR_RAISE(data_file_os_, arrow::io::BufferedOutputStream::Create(
16384, options_.memory_pool, fout));
} else {
data_file_os_ = fout;
}

// stop PartitionWriter and collect metrics
for (auto pid = 0; pid < num_partitions_; ++pid) {
Expand Down
2 changes: 1 addition & 1 deletion native-sql-engine/cpp/src/shuffle/splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class Splitter {
std::vector<int32_t> sub_dir_selection_;
std::vector<std::string> configured_dirs_;

std::shared_ptr<arrow::io::FileOutputStream> data_file_os_;
std::shared_ptr<arrow::io::OutputStream> data_file_os_;

// shared by all partition writers
std::shared_ptr<arrow::ipc::IpcPayload> schema_payload_;
Expand Down
1 change: 1 addition & 0 deletions native-sql-engine/cpp/src/shuffle/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct SplitOptions {
arrow::Compression::type compression_type = arrow::Compression::UNCOMPRESSED;
bool prefer_spill = true;
bool write_schema = true;
bool buffered_write = false;

std::string data_file;

Expand Down

0 comments on commit 2f0ee49

Please sign in to comment.