Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-588] config the pre-allocated memory for shuffle's splitter #594

Merged
merged 5 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public ShuffleSplitterJniWrapper() throws IOException {
*/
public long make(
NativePartitioning part,
long offheapPerTask,
int bufferSize,
String codec,
String dataFile,
Expand All @@ -52,6 +53,7 @@ public long make(
part.getNumPartitions(),
part.getSchema(),
part.getExprList(),
offheapPerTask,
bufferSize,
codec,
dataFile,
Expand All @@ -66,6 +68,7 @@ public native long nativeMake(
int numPartitions,
byte[] schema,
byte[] exprList,
long offheapPerTask,
int bufferSize,
String codec,
String dataFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
conf
.getConfString("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", "false")
.toBoolean

// a folder to store the codegen files
val tmpFile: String =
conf.getConfString("spark.oap.sql.columnar.tmp_dir", null)
Expand All @@ -184,6 +184,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val columnarShuffleUseCustomizedCompressionCodec: String =
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4")

val shuffleSplitDefaultSize: Int =
conf
.getConfString("spark.oap.sql.columnar.shuffleSplitDefaultSize", "8192").toInt

val numaBindingInfo: GazelleNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ class ColumnarShuffleWriter[K, V](
private var mapStatus: MapStatus = _

private val localDirs = blockManager.diskBlockManager.localDirs.mkString(",")
private val nativeBufferSize =
conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", 4096)

private val offheapSize = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
private val executorNum = conf.getInt("spark.executor.cores",1)
private val offheapPerTask = offheapSize / executorNum;

private val nativeBufferSize = GazellePluginConfig.getConf.shuffleSplitDefaultSize

private val customizedCompressCodec =
GazellePluginConfig.getConf.columnarShuffleUseCustomizedCompressionCodec
Expand Down Expand Up @@ -94,6 +98,7 @@ class ColumnarShuffleWriter[K, V](
if (nativeSplitter == 0) {
nativeSplitter = jniWrapper.make(
dep.nativePartitioning,
offheapPerTask,
nativeBufferSize,
defaultCompressionCodec,
dataTmp.getAbsolutePath,
Expand Down
4 changes: 3 additions & 1 deletion native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluate2(
JNIEXPORT jlong JNICALL
Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake(
JNIEnv* env, jobject, jstring partitioning_name_jstr, jint num_partitions,
jbyteArray schema_arr, jbyteArray expr_arr, jint buffer_size,
jbyteArray schema_arr, jbyteArray expr_arr, jlong offheap_per_task, jint buffer_size,
jstring compression_type_jstr, jstring data_file_jstr, jint num_sub_dirs,
jstring local_dirs_jstr, jboolean prefer_spill, jlong memory_pool_id) {
JNI_METHOD_START
Expand All @@ -1056,6 +1056,8 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake(
if (buffer_size > 0) {
splitOptions.buffer_size = buffer_size;
}
splitOptions.offheap_per_task = offheap_per_task;

if (num_sub_dirs > 0) {
splitOptions.num_sub_dirs = num_sub_dirs;
}
Expand Down
15 changes: 12 additions & 3 deletions native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,38 +785,47 @@ arrow::Result<int32_t> Splitter::SpillLargestPartition(int64_t* size) {
arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) {
// for the first input record batch, scan binary arrays and large binary
// arrays to get their empirical sizes
uint32_t size_per_row = 0;
if (!empirical_size_calculated_) {
auto num_rows = rb.num_rows();
for (int i = 0; i < binary_array_idx_.size(); ++i) {
auto arr =
std::static_pointer_cast<arrow::BinaryArray>(rb.column(binary_array_idx_[i]));
auto length = arr->value_offset(num_rows) - arr->value_offset(0);
binary_array_empirical_size_[i] = length / num_rows;
size_per_row += binary_array_empirical_size_[i];
}
for (int i = 0; i < large_binary_array_idx_.size(); ++i) {
auto arr = std::static_pointer_cast<arrow::LargeBinaryArray>(
rb.column(large_binary_array_idx_[i]));
auto length = arr->value_offset(num_rows) - arr->value_offset(0);
large_binary_array_empirical_size_[i] = length / num_rows;
size_per_row += large_binary_array_empirical_size_[i];
}
empirical_size_calculated_ = true;
}

for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) {
auto col_idx = fixed_width_array_idx_[col];
size_per_row += arrow::bit_width(column_type_id_[col]->id()) / 8;
if (rb.column_data(col_idx)->GetNullCount() != 0) {
input_fixed_width_has_null_[col] = true;
}
}

int64_t prealloc_row_cnt =
options_.offheap_per_task > 0 && size_per_row > 0
? options_.offheap_per_task / 4 / size_per_row / num_partitions_
: options_.buffer_size;

// prepare partition buffers and spill if necessary
for (auto pid = 0; pid < num_partitions_; ++pid) {
if (partition_id_cnt_[pid] > 0 &&
partition_buffer_idx_base_[pid] + partition_id_cnt_[pid] >
partition_buffer_size_[pid]) {
auto new_size = partition_id_cnt_[pid] > options_.buffer_size
? partition_id_cnt_[pid]
: options_.buffer_size;
auto new_size = std::min((int32_t)prealloc_row_cnt, options_.buffer_size);
// make sure the splitted record batch can be filled
if (partition_id_cnt_[pid] > new_size) new_size = partition_id_cnt_[pid];
if (options_.prefer_spill) {
if (partition_buffer_size_[pid] == 0) { // first allocate?
RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size));
Expand Down
2 changes: 2 additions & 0 deletions native-sql-engine/cpp/src/shuffle/splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class Splitter {
int64_t total_spill_time_ = 0;
int64_t total_compress_time_ = 0;
int64_t total_compute_pid_time_ = 0;
int64_t peak_memory_allocated_ = 0;

std::vector<int64_t> partition_lengths_;
std::vector<int64_t> raw_partition_lengths_;

Expand Down
1 change: 1 addition & 0 deletions native-sql-engine/cpp/src/shuffle/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static constexpr int32_t kIpcContinuationToken = -1;
const unsigned ONES[] = {1, 1, 1, 1, 1, 1, 1, 1};

struct SplitOptions {
int64_t offheap_per_task = 0;
int32_t buffer_size = kDefaultSplitterBufferSize;
int32_t num_sub_dirs = kDefaultNumSubDirs;
arrow::Compression::type compression_type = arrow::Compression::UNCOMPRESSED;
Expand Down