diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index f82068c37..b96cfa6bd 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1051,6 +1051,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake( auto splitOptions = SplitOptions::Defaults(); splitOptions.write_schema = write_schema; splitOptions.prefer_spill = prefer_spill; + splitOptions.buffered_write = true; if (buffer_size > 0) { splitOptions.buffer_size = buffer_size; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index d58f0605d..fa99532ac 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -433,8 +433,15 @@ arrow::Status Splitter::Split(const arrow::RecordBatch& rb) { arrow::Status Splitter::Stop() { EVAL_START("write", options_.thread_id) // open data file output stream - ARROW_ASSIGN_OR_RAISE(data_file_os_, + std::shared_ptr fout; + ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(options_.data_file, true)); + if (options_.buffered_write) { + ARROW_ASSIGN_OR_RAISE(data_file_os_, arrow::io::BufferedOutputStream::Create( + 16384, options_.memory_pool, fout)); + } else { + data_file_os_ = fout; + } // stop PartitionWriter and collect metrics for (auto pid = 0; pid < num_partitions_; ++pid) { diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index dbf07aa87..501aa6f10 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -230,7 +230,7 @@ class Splitter { std::vector sub_dir_selection_; std::vector configured_dirs_; - std::shared_ptr data_file_os_; + std::shared_ptr data_file_os_; // shared by all partition writers std::shared_ptr schema_payload_; diff --git a/native-sql-engine/cpp/src/shuffle/type.h b/native-sql-engine/cpp/src/shuffle/type.h index e73974243..77d254670 100644 --- a/native-sql-engine/cpp/src/shuffle/type.h +++ b/native-sql-engine/cpp/src/shuffle/type.h @@ -44,6 +44,7 @@ struct SplitOptions { arrow::Compression::type compression_type = arrow::Compression::UNCOMPRESSED; bool prefer_spill = true; bool write_schema = true; + bool buffered_write = false; std::string data_file;