Skip to content

Commit

Permalink
[CH-353] ShuffleSplitter improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
exmy committed Mar 20, 2023
1 parent 47f5b40 commit 6a307d1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
13 changes: 11 additions & 2 deletions utils/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "ShuffleSplitter.h"
#include <filesystem>
#include <format>
#include <memory>
#include <string>
#include <fcntl.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressionFactory.h>
Expand Down Expand Up @@ -178,11 +180,18 @@ ShuffleSplitter::Ptr ShuffleSplitter::create(const std::string & short_name, Spl

std::string ShuffleSplitter::getPartitionTempFile(size_t partition_id)
{
std::string dir = std::filesystem::path(options.local_tmp_dir) / "_shuffle_data" / std::to_string(options.map_id);
auto file_name = std::to_string(options.shuffle_id) + "_" + std::to_string(options.map_id) + "_" + std::to_string(partition_id);
std::hash<std::string> hasher;
auto hash = hasher(file_name);
auto dir_id = hash % options.local_dirs_list.size();
auto sub_dir_id = (hash / options.local_dirs_list.size()) % options.num_sub_dirs;

std::string dir = std::filesystem::path(options.local_dirs_list[dir_id]) / std::format("{:02x}", sub_dir_id);
if (!std::filesystem::exists(dir))
std::filesystem::create_directories(dir);
return std::filesystem::path(dir) / std::to_string(partition_id);
return std::filesystem::path(dir) / file_name;
}

std::unique_ptr<DB::WriteBuffer> ShuffleSplitter::getPartitionWriteBuffer(size_t partition_id)
{
auto file = getPartitionTempFile(partition_id);
Expand Down
4 changes: 3 additions & 1 deletion utils/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ struct SplitOptions
size_t split_size = DEFAULT_BLOCK_SIZE;
size_t io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
std::string data_file;
std::string local_tmp_dir;
std::vector<std::string> local_dirs_list;
int num_sub_dirs;
int shuffle_id;
int map_id;
size_t partition_nums;
std::string exprs;
Expand Down
12 changes: 10 additions & 2 deletions utils/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,13 @@ jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake(
jint num_partitions,
jbyteArray expr_list,
jbyteArray expr_index_list,
jint shuffle_id,
jlong map_id,
jint split_size,
jstring codec,
jstring data_file,
jstring local_dirs)
jstring local_dirs,
jint num_sub_dirs)
{
LOCAL_ENGINE_JNI_METHOD_START
std::vector<std::string> expr_vec;
Expand All @@ -626,11 +628,17 @@ jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake(
delete[] str;
}

Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env, local_dirs), ",");
std::vector<std::string> local_dirs_list;
local_dirs_list.insert(local_dirs_list.end(), local_dirs_tokenizer.begin(), local_dirs_tokenizer.end());

local_engine::SplitOptions options{
.split_size = static_cast<size_t>(split_size),
.io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
.data_file = jstring2string(env, data_file),
.local_tmp_dir = jstring2string(env, local_dirs),
.local_dirs_list = std::move(local_dirs_list),
.num_sub_dirs = num_sub_dirs,
.shuffle_id = shuffle_id,
.map_id = static_cast<int>(map_id),
.partition_nums = static_cast<size_t>(num_partitions),
.exprs = exprs,
Expand Down

0 comments on commit 6a307d1

Please sign in to comment.