From a852d7b7c7e94f19e929ba7da9b5233acf077edc Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Fri, 15 Dec 2023 16:10:11 -0800 Subject: [PATCH] [native] Translate 'sink.max-buffer-size' and 'driver.max-page-partitioning-buffer-size'. --- .../presto_cpp/main/common/Configs.cpp | 52 +++++++++++++++---- .../presto_cpp/main/common/Configs.h | 25 ++++++--- .../common/tests/BaseVeloxQueryConfigTest.cpp | 20 ++++--- .../main/common/tests/ConfigTest.cpp | 4 +- presto-native-execution/velox | 2 +- 5 files changed, 79 insertions(+), 24 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 44a3eb3dacec..b4030868307f 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -54,6 +54,7 @@ void ConfigBase::initialize(const std::string& filePath) { auto values = util::readConfig(fs::path(filePath)); filePath_ = filePath; checkRegisteredProperties(values); + updateLoadedValues(values); bool mutableConfig{false}; auto it = values.find(std::string(kMutableConfig)); @@ -68,6 +69,12 @@ void ConfigBase::initialize(const std::string& filePath) { }; } +std::string ConfigBase::capacityPropertyAsBytesString( + std::string_view propertyName) const { + return folly::to(toCapacity( + optionalProperty(propertyName).value(), velox::core::CapacityUnit::BYTE)); +} + bool ConfigBase::registerProperty( const std::string& propertyName, const folly::Optional& defaultValue) { @@ -204,6 +211,8 @@ SystemConfig::SystemConfig() { STR_PROP(kInternalCommunicationSharedSecret, ""), NUM_PROP(kInternalCommunicationJwtExpirationSeconds, 300), BOOL_PROP(kUseLegacyArrayAgg, false), + STR_PROP(kSinkMaxBufferSize, "32MB"), + STR_PROP(kDriverMaxPagePartitioningBufferSize, "32MB"), }; } @@ -708,8 +717,13 @@ BaseVeloxQueryConfig::BaseVeloxQueryConfig() { QueryConfig::kSpillableReservationGrowthPct, c.spillableReservationGrowthPct()), BOOL_PROP( - QueryConfig::kSparkLegacySizeOfNull, c.sparkLegacySizeOfNull())}; - update(*SystemConfig::instance()); + QueryConfig::kSparkLegacySizeOfNull, c.sparkLegacySizeOfNull()), + BOOL_PROP( + QueryConfig::kPrestoArrayAggIgnoreNulls, + c.prestoArrayAggIgnoreNulls()), + NUM_PROP( + QueryConfig::kMaxArbitraryBufferSize, c.maxArbitraryBufferSize()), + }; } BaseVeloxQueryConfig* BaseVeloxQueryConfig::instance() { @@ -718,14 +732,34 @@ BaseVeloxQueryConfig* BaseVeloxQueryConfig::instance() { return instance.get(); } -void BaseVeloxQueryConfig::initialize(const std::string& filePath) { - ConfigBase::initialize(filePath); - update(*SystemConfig::instance()); -} +void BaseVeloxQueryConfig::updateLoadedValues( + std::unordered_map& values) const { + // Update velox config with values from presto system config. + auto systemConfig = SystemConfig::instance(); + + using namespace velox::core; + const std::unordered_map updatedValues{ + {QueryConfig::kPrestoArrayAggIgnoreNulls, + bool2String(systemConfig->useLegacyArrayAgg())}, + {QueryConfig::kMaxArbitraryBufferSize, + systemConfig->capacityPropertyAsBytesString( + SystemConfig::kSinkMaxBufferSize)}, + {QueryConfig::kMaxPartitionedOutputBufferSize, + systemConfig->capacityPropertyAsBytesString( + SystemConfig::kDriverMaxPagePartitioningBufferSize)}, + }; -void BaseVeloxQueryConfig::update(const SystemConfig& systemConfig) { - registeredProps_[velox::core::QueryConfig::kPrestoArrayAggIgnoreNulls] = - bool2String(systemConfig.useLegacyArrayAgg()); + std::stringstream updated; + for (const auto& pair : updatedValues) { + updated << " " << pair.first << "=" << pair.second << "\n"; + values[pair.first] = pair.second; + } + auto str = updated.str(); + if (!str.empty()) { + PRESTO_STARTUP_LOG(INFO) + << "Updated in '" << filePath_ << "' from SystemProperties:\n" + << str; + } } } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 8d1d6c660c42..989531fc144d 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -29,7 +29,12 @@ class ConfigBase { /// Reads configuration properties from the specified file. Must be called /// before calling any of the getters below. /// @param filePath Path to configuration file. - virtual void initialize(const std::string& filePath); + void initialize(const std::string& filePath); + + /// Allows individual config to manipulate just-loaded-from-file key-value map + /// before it is used to initialize the config. + virtual void updateLoadedValues( + std::unordered_map& values) const {} /// Uses a config object already materialized. void initialize(std::unique_ptr&& config) { @@ -125,6 +130,10 @@ class ConfigBase { return optionalProperty(std::string{propertyName}); } + /// Returns "N" as string containing capacity in bytes. + std::string capacityPropertyAsBytesString( + std::string_view propertyName) const; + /// Returns copy of the config values map. std::unordered_map values() const { return config_->valuesCopy(); @@ -423,9 +432,16 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kInternalCommunicationJwtExpirationSeconds{ "internal-communication.jwt.expiration-seconds"}; + /// Below are the Presto properties from config.properties that get converted + /// to their velox counterparts in BaseVeloxQueryConfig and used solely from + /// BaseVeloxQueryConfig. + /// Uses legacy version of array_agg which ignores nulls. static constexpr std::string_view kUseLegacyArrayAgg{ "deprecated.legacy-array-agg"}; + static constexpr std::string_view kSinkMaxBufferSize{"sink.max-buffer-size"}; + static constexpr std::string_view kDriverMaxPagePartitioningBufferSize{ + "driver.max-page-partitioning-buffer-size"}; SystemConfig(); @@ -642,13 +658,10 @@ class BaseVeloxQueryConfig : public ConfigBase { virtual ~BaseVeloxQueryConfig() = default; - void initialize(const std::string& filePath) override; + void updateLoadedValues( + std::unordered_map& values) const override; static BaseVeloxQueryConfig* instance(); - - private: - /// Update velox config with values from presto system config. - void update(const SystemConfig& config); }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/common/tests/BaseVeloxQueryConfigTest.cpp b/presto-native-execution/presto_cpp/main/common/tests/BaseVeloxQueryConfigTest.cpp index b51df196cad1..13317594cce1 100644 --- a/presto-native-execution/presto_cpp/main/common/tests/BaseVeloxQueryConfigTest.cpp +++ b/presto-native-execution/presto_cpp/main/common/tests/BaseVeloxQueryConfigTest.cpp @@ -45,6 +45,10 @@ class BaseVeloxQueryConfigTest : public testing::Test { fileSystem->openFileForWrite(systemConfigFilePath); systemConfigFile->append( fmt::format("{}=true\n", SystemConfig::kUseLegacyArrayAgg)); + systemConfigFile->append( + fmt::format("{}=17MB\n", SystemConfig::kSinkMaxBufferSize)); + systemConfigFile->append(fmt::format( + "{}=6MB\n", SystemConfig::kDriverMaxPagePartitioningBufferSize)); systemConfigFile->close(); SystemConfig::instance()->initialize(systemConfigFilePath); } @@ -108,15 +112,19 @@ TEST_F(BaseVeloxQueryConfigTest, mutableConfig) { } TEST_F(BaseVeloxQueryConfigTest, fromSystemConfig) { +#define GET_VAL(_name_) cfg->optionalProperty(std::string(_name_)) + auto cfg = BaseVeloxQueryConfig::instance(); - ASSERT_FALSE(cfg->optionalProperty( - std::string(QueryConfig::kPrestoArrayAggIgnoreNulls)) - .value()); + ASSERT_EQ("false", GET_VAL(QueryConfig::kPrestoArrayAggIgnoreNulls)); + setUpConfigFile(true, true); cfg->initialize(configFilePath); - ASSERT_TRUE(cfg->optionalProperty( - std::string(QueryConfig::kPrestoArrayAggIgnoreNulls)) - .value()); + + ASSERT_EQ("true", GET_VAL(QueryConfig::kPrestoArrayAggIgnoreNulls)); + ASSERT_EQ("17825792", GET_VAL(QueryConfig::kMaxArbitraryBufferSize)); + ASSERT_EQ("6291456", GET_VAL(QueryConfig::kMaxPartitionedOutputBufferSize)); + +#undef GET_VAL } } // namespace facebook::presto::test diff --git a/presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp b/presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp index 837a85a38669..eb85a529f658 100644 --- a/presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp +++ b/presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp @@ -39,7 +39,7 @@ class ConfigTest : public testing::Test { sysConfigFile->append( fmt::format("{}={}\n", SystemConfig::kPrestoVersion, prestoVersion)); sysConfigFile->append( - fmt::format("{}=11KB\n", SystemConfig::kQueryMaxMemoryPerNode)); + fmt::format("{}=11kB\n", SystemConfig::kQueryMaxMemoryPerNode)); if (isMutable) { sysConfigFile->append( fmt::format("{}={}\n", ConfigBase::kMutableConfig, "true")); @@ -86,7 +86,7 @@ TEST_F(ConfigTest, mutableSystemConfig) { .value()); ASSERT_EQ(prestoVersion2, systemConfig->prestoVersion()); ASSERT_EQ( - "11KB", + "11kB", systemConfig ->setValue(std::string(SystemConfig::kQueryMaxMemoryPerNode), "5GB") .value()); diff --git a/presto-native-execution/velox b/presto-native-execution/velox index ddde53b954c5..a0ec8e4eb33d 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit ddde53b954c5c84d0598f109dd7f34a10002baff +Subproject commit a0ec8e4eb33d1bd820856e72063cdbd6578a2e6f