Skip to content

Commit

Permalink
[native] Translate 'sink.max-buffer-size' and 'driver.max-page-partit…
Browse files Browse the repository at this point in the history
…ioning-buffer-size'.
  • Loading branch information
spershin committed Dec 16, 2023
1 parent e7a29fc commit a852d7b
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 24 deletions.
52 changes: 43 additions & 9 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void ConfigBase::initialize(const std::string& filePath) {
auto values = util::readConfig(fs::path(filePath));
filePath_ = filePath;
checkRegisteredProperties(values);
updateLoadedValues(values);

bool mutableConfig{false};
auto it = values.find(std::string(kMutableConfig));
Expand All @@ -68,6 +69,12 @@ void ConfigBase::initialize(const std::string& filePath) {
};
}

std::string ConfigBase::capacityPropertyAsBytesString(
std::string_view propertyName) const {
return folly::to<std::string>(toCapacity(
optionalProperty(propertyName).value(), velox::core::CapacityUnit::BYTE));
}

bool ConfigBase::registerProperty(
const std::string& propertyName,
const folly::Optional<std::string>& defaultValue) {
Expand Down Expand Up @@ -204,6 +211,8 @@ SystemConfig::SystemConfig() {
STR_PROP(kInternalCommunicationSharedSecret, ""),
NUM_PROP(kInternalCommunicationJwtExpirationSeconds, 300),
BOOL_PROP(kUseLegacyArrayAgg, false),
STR_PROP(kSinkMaxBufferSize, "32MB"),
STR_PROP(kDriverMaxPagePartitioningBufferSize, "32MB"),
};
}

Expand Down Expand Up @@ -708,8 +717,13 @@ BaseVeloxQueryConfig::BaseVeloxQueryConfig() {
QueryConfig::kSpillableReservationGrowthPct,
c.spillableReservationGrowthPct()),
BOOL_PROP(
QueryConfig::kSparkLegacySizeOfNull, c.sparkLegacySizeOfNull())};
update(*SystemConfig::instance());
QueryConfig::kSparkLegacySizeOfNull, c.sparkLegacySizeOfNull()),
BOOL_PROP(
QueryConfig::kPrestoArrayAggIgnoreNulls,
c.prestoArrayAggIgnoreNulls()),
NUM_PROP(
QueryConfig::kMaxArbitraryBufferSize, c.maxArbitraryBufferSize()),
};
}

BaseVeloxQueryConfig* BaseVeloxQueryConfig::instance() {
Expand All @@ -718,14 +732,34 @@ BaseVeloxQueryConfig* BaseVeloxQueryConfig::instance() {
return instance.get();
}

void BaseVeloxQueryConfig::initialize(const std::string& filePath) {
ConfigBase::initialize(filePath);
update(*SystemConfig::instance());
}
void BaseVeloxQueryConfig::updateLoadedValues(
std::unordered_map<std::string, std::string>& values) const {
// Update velox config with values from presto system config.
auto systemConfig = SystemConfig::instance();

using namespace velox::core;
const std::unordered_map<std::string, std::string> updatedValues{
{QueryConfig::kPrestoArrayAggIgnoreNulls,
bool2String(systemConfig->useLegacyArrayAgg())},
{QueryConfig::kMaxArbitraryBufferSize,
systemConfig->capacityPropertyAsBytesString(
SystemConfig::kSinkMaxBufferSize)},
{QueryConfig::kMaxPartitionedOutputBufferSize,
systemConfig->capacityPropertyAsBytesString(
SystemConfig::kDriverMaxPagePartitioningBufferSize)},
};

void BaseVeloxQueryConfig::update(const SystemConfig& systemConfig) {
registeredProps_[velox::core::QueryConfig::kPrestoArrayAggIgnoreNulls] =
bool2String(systemConfig.useLegacyArrayAgg());
std::stringstream updated;
for (const auto& pair : updatedValues) {
updated << " " << pair.first << "=" << pair.second << "\n";
values[pair.first] = pair.second;
}
auto str = updated.str();
if (!str.empty()) {
PRESTO_STARTUP_LOG(INFO)
<< "Updated in '" << filePath_ << "' from SystemProperties:\n"
<< str;
}
}

} // namespace facebook::presto
25 changes: 19 additions & 6 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ class ConfigBase {
/// Reads configuration properties from the specified file. Must be called
/// before calling any of the getters below.
/// @param filePath Path to configuration file.
virtual void initialize(const std::string& filePath);
void initialize(const std::string& filePath);

/// Allows individual config to manipulate just-loaded-from-file key-value map
/// before it is used to initialize the config.
virtual void updateLoadedValues(
std::unordered_map<std::string, std::string>& values) const {}

/// Uses a config object already materialized.
void initialize(std::unique_ptr<velox::Config>&& config) {
Expand Down Expand Up @@ -125,6 +130,10 @@ class ConfigBase {
return optionalProperty(std::string{propertyName});
}

/// Returns "N<capacity_unit>" as string containing capacity in bytes.
std::string capacityPropertyAsBytesString(
std::string_view propertyName) const;

/// Returns copy of the config values map.
std::unordered_map<std::string, std::string> values() const {
return config_->valuesCopy();
Expand Down Expand Up @@ -423,9 +432,16 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kInternalCommunicationJwtExpirationSeconds{
"internal-communication.jwt.expiration-seconds"};

/// Below are the Presto properties from config.properties that get converted
/// to their velox counterparts in BaseVeloxQueryConfig and used solely from
/// BaseVeloxQueryConfig.

/// Uses legacy version of array_agg which ignores nulls.
static constexpr std::string_view kUseLegacyArrayAgg{
"deprecated.legacy-array-agg"};
static constexpr std::string_view kSinkMaxBufferSize{"sink.max-buffer-size"};
static constexpr std::string_view kDriverMaxPagePartitioningBufferSize{
"driver.max-page-partitioning-buffer-size"};

SystemConfig();

Expand Down Expand Up @@ -642,13 +658,10 @@ class BaseVeloxQueryConfig : public ConfigBase {

virtual ~BaseVeloxQueryConfig() = default;

void initialize(const std::string& filePath) override;
void updateLoadedValues(
std::unordered_map<std::string, std::string>& values) const override;

static BaseVeloxQueryConfig* instance();

private:
/// Update velox config with values from presto system config.
void update(const SystemConfig& config);
};

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class BaseVeloxQueryConfigTest : public testing::Test {
fileSystem->openFileForWrite(systemConfigFilePath);
systemConfigFile->append(
fmt::format("{}=true\n", SystemConfig::kUseLegacyArrayAgg));
systemConfigFile->append(
fmt::format("{}=17MB\n", SystemConfig::kSinkMaxBufferSize));
systemConfigFile->append(fmt::format(
"{}=6MB\n", SystemConfig::kDriverMaxPagePartitioningBufferSize));
systemConfigFile->close();
SystemConfig::instance()->initialize(systemConfigFilePath);
}
Expand Down Expand Up @@ -108,15 +112,19 @@ TEST_F(BaseVeloxQueryConfigTest, mutableConfig) {
}

TEST_F(BaseVeloxQueryConfigTest, fromSystemConfig) {
#define GET_VAL(_name_) cfg->optionalProperty(std::string(_name_))

auto cfg = BaseVeloxQueryConfig::instance();
ASSERT_FALSE(cfg->optionalProperty<bool>(
std::string(QueryConfig::kPrestoArrayAggIgnoreNulls))
.value());
ASSERT_EQ("false", GET_VAL(QueryConfig::kPrestoArrayAggIgnoreNulls));

setUpConfigFile(true, true);
cfg->initialize(configFilePath);
ASSERT_TRUE(cfg->optionalProperty<bool>(
std::string(QueryConfig::kPrestoArrayAggIgnoreNulls))
.value());

ASSERT_EQ("true", GET_VAL(QueryConfig::kPrestoArrayAggIgnoreNulls));
ASSERT_EQ("17825792", GET_VAL(QueryConfig::kMaxArbitraryBufferSize));
ASSERT_EQ("6291456", GET_VAL(QueryConfig::kMaxPartitionedOutputBufferSize));

#undef GET_VAL
}

} // namespace facebook::presto::test
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ConfigTest : public testing::Test {
sysConfigFile->append(
fmt::format("{}={}\n", SystemConfig::kPrestoVersion, prestoVersion));
sysConfigFile->append(
fmt::format("{}=11KB\n", SystemConfig::kQueryMaxMemoryPerNode));
fmt::format("{}=11kB\n", SystemConfig::kQueryMaxMemoryPerNode));
if (isMutable) {
sysConfigFile->append(
fmt::format("{}={}\n", ConfigBase::kMutableConfig, "true"));
Expand Down Expand Up @@ -86,7 +86,7 @@ TEST_F(ConfigTest, mutableSystemConfig) {
.value());
ASSERT_EQ(prestoVersion2, systemConfig->prestoVersion());
ASSERT_EQ(
"11KB",
"11kB",
systemConfig
->setValue(std::string(SystemConfig::kQueryMaxMemoryPerNode), "5GB")
.value());
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 230 files

0 comments on commit a852d7b

Please sign in to comment.