Skip to content

Commit

Permalink
Move fileformat's config setting into HiveConnectorUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Sep 3, 2024
1 parent 1fd5d60 commit a6d2f6e
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 192 deletions.
21 changes: 21 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,27 @@ std::optional<uint8_t> HiveConfig::orcWriterCompressionLevel(
return std::nullopt;
}

uint32_t HiveConfig::orcWriterZLIBCompressionLevel (const config::ConfigBase* session) const {
auto compressionLevel = orcWriterCompressionLevel(session);
if (compressionLevel.has_value()) {
return compressionLevel.value();
}

// Default config values taken from Presto.
return 4;
}

uint32_t HiveConfig::orcWriterZSTDCompressionLevel (const config::ConfigBase* session) const {
auto compressionLevel = orcWriterCompressionLevel(session);
if (compressionLevel.has_value()) {
return compressionLevel.value();
}

// Default config values taken from Presto.
return 3;
}


std::string HiveConfig::writeFileCreateConfig() const {
return config_->get<std::string>(kWriteFileCreateConfig, "");
}
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ class HiveConfig {
std::optional<uint8_t> orcWriterCompressionLevel(
const config::ConfigBase* session) const;

uint32_t orcWriterZLIBCompressionLevel(
const config::ConfigBase* session) const;

uint32_t orcWriterZSTDCompressionLevel(
const config::ConfigBase* session) const;

std::string writeFileCreateConfig() const;

uint32_t sortWriterMaxOutputRows(const config::ConfigBase* session) const;
Expand Down
132 changes: 132 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/dwio/common/DirectBufferedInput.h"
#include "velox/dwio/common/Reader.h"
#include "velox/dwio/dwrf/common/Config.h"
#include "velox/dwio/dwrf/writer/Writer.h"

#ifdef VELOX_ENABLE_PARQUET
#include "velox/dwio/parquet/writer/Writer.h"
#endif

#include "velox/expression/Expr.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/type/TimestampConversion.h"
Expand Down Expand Up @@ -847,4 +854,129 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
return expr;
}

#ifdef VELOX_ENABLE_PARQUET
namespace {

std::optional<TimestampUnit> getTimestampUnit(
const config::ConfigBase& config,
const char* configKey) {
if (const auto unit = config.get<uint8_t>(configKey)) {
VELOX_CHECK(
unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ ||
unit == 9 /*nano*/,
"Invalid timestamp unit: {}",
unit.value());
return std::optional(static_cast<TimestampUnit>(unit.value()));
}
return std::nullopt;
}

std::optional<std::string> getTimestampTimeZone(
const config::ConfigBase& config,
const char* configKey) {
if (const auto timezone = config.get<std::string>(configKey)) {
return timezone.value();
}
return std::nullopt;
}

} // namespace
#endif

void updateWriterOptionsFromHiveConfig(
dwio::common::FileFormat fileFormat,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const config::ConfigBase* sessionProperties,
std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
if (fileFormat == dwio::common::FileFormat::PARQUET) {
#ifdef VELOX_ENABLE_PARQUET
auto parquetWriterOptions =
std::dynamic_pointer_cast<parquet::WriterOptions>(writerOptions);
VELOX_CHECK_NOT_NULL(
parquetWriterOptions,
"Parquet writer expected a Parquet WriterOptions object.");

if (!parquetWriterOptions->parquetWriteTimestampUnit) {
parquetWriterOptions->parquetWriteTimestampUnit =
getTimestampUnit(
*sessionProperties,
parquet::WriterOptions::kParquetSessionWriteTimestampUnit)
.has_value()
? getTimestampUnit(
*sessionProperties,
parquet::WriterOptions::kParquetSessionWriteTimestampUnit)
: getTimestampUnit(
*hiveConfig->config(),
parquet::WriterOptions::kParquetSessionWriteTimestampUnit);
}

if (!parquetWriterOptions->parquetWriteTimestampTimeZone) {
parquetWriterOptions->parquetWriteTimestampTimeZone =
getTimestampTimeZone(
*sessionProperties, core::QueryConfig::kSessionTimezone)
.has_value()
? getTimestampTimeZone(
*sessionProperties, core::QueryConfig::kSessionTimezone)
: getTimestampTimeZone(
*hiveConfig->config(), core::QueryConfig::kSessionTimezone);
}

writerOptions = std::move(parquetWriterOptions);
#endif
} else {
auto dwrfWriterOptions =
std::dynamic_pointer_cast<dwrf::WriterOptions>(writerOptions);
VELOX_CHECK_NOT_NULL(
dwrfWriterOptions, "DWRF writer expected a DWRF WriterOptions object.");
std::map<std::string, std::string> configs;

if (writerOptions->compressionKind.has_value()) {
configs.emplace(
dwrf::Config::COMPRESSION.key,
std::to_string(writerOptions->compressionKind.value()));
}

configs.emplace(
dwrf::Config::STRIPE_SIZE.key,
std::to_string(hiveConfig->orcWriterMaxStripeSize(sessionProperties)));

configs.emplace(
dwrf::Config::MAX_DICTIONARY_SIZE.key,
std::to_string(
hiveConfig->orcWriterMaxDictionaryMemory(sessionProperties)));

configs.emplace(
dwrf::Config::INTEGER_DICTIONARY_ENCODING_ENABLED.key,
std::to_string(hiveConfig->isOrcWriterIntegerDictionaryEncodingEnabled(
sessionProperties)));
configs.emplace(
dwrf::Config::STRING_DICTIONARY_ENCODING_ENABLED.key,
std::to_string(hiveConfig->isOrcWriterStringDictionaryEncodingEnabled(
sessionProperties)));

configs.emplace(
dwrf::Config::COMPRESSION_BLOCK_SIZE_MIN.key,
std::to_string(
hiveConfig->orcWriterMinCompressionSize(sessionProperties)));

configs.emplace(
dwrf::Config::LINEAR_STRIPE_SIZE_HEURISTICS.key,
std::to_string(hiveConfig->orcWriterLinearStripeSizeHeuristics(
sessionProperties)));

configs.emplace(
dwrf::Config::ZLIB_COMPRESSION_LEVEL.key,
std::to_string(
hiveConfig->orcWriterZLIBCompressionLevel(sessionProperties)));

configs.emplace(
dwrf::Config::ZSTD_COMPRESSION_LEVEL.key,
std::to_string(
hiveConfig->orcWriterZSTDCompressionLevel(sessionProperties)));

dwrfWriterOptions->config = dwrf::Config::fromMap(configs);
writerOptions = std::move(dwrfWriterOptions);
}
}

} // namespace facebook::velox::connector::hive
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,11 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
SubfieldFilters& filters,
double& sampleRate);

/// Updated the file format's WriteOptions based on the HiveConfig.
void updateWriterOptionsFromHiveConfig(
dwio::common::FileFormat fileFormat,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const config::ConfigBase* sessionProperties,
std::shared_ptr<dwio::common::WriterOptions>& writerOptions);

} // namespace facebook::velox::connector::hive
68 changes: 9 additions & 59 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "velox/common/base/StatsReporter.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/ITypedExpr.h"
Expand All @@ -37,10 +38,6 @@ using facebook::velox::common::testutil::TestValue;
namespace facebook::velox::connector::hive {

namespace {
// Default config values taken from Presto.
constexpr uint8_t kDefaultZlibCompressionLevel = 4;
constexpr uint8_t kDefaultZstdCompressionLevel = 3;

// Returns the type of non-partition data columns.
RowTypePtr getNonPartitionTypes(
const std::vector<column_index_t>& dataCols,
Expand Down Expand Up @@ -690,14 +687,6 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();

// Acquire file format specifc configs. The precedence order is:
//
// 1. First respect any options specified as part of the query plan (accessed
// through insertTableHandle)
// 2. Otherwise, acquire user defined session properties.
// 3. Lastly, acquire general hive connector configs.
options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties);

// Only overwrite options in case they were not already provided.
if (options->schema == nullptr) {
options->schema = getNonPartitionTypes(dataChannels_, inputType_);
Expand All @@ -720,63 +709,24 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
writerInfo_.back()->nonReclaimableSectionHolder.get();
}

if (options->defaultMemoryReclaimerFactory == nullptr ||
options->defaultMemoryReclaimerFactory() == nullptr) {
options->defaultMemoryReclaimerFactory = []() {
if (options->memoryReclaimerFactory == nullptr ||
options->memoryReclaimerFactory() == nullptr) {
options->memoryReclaimerFactory = []() {
return exec::MemoryReclaimer::create();
};
}

if (!options->maxStripeSize) {
options->maxStripeSize = std::optional(
hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties));
}

if (!options->maxDictionaryMemory) {
options->maxDictionaryMemory = std::optional(
hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties));
}

if (!options->orcWriterIntegerDictionaryEncodingEnabled) {
options->orcWriterIntegerDictionaryEncodingEnabled =
hiveConfig_->isOrcWriterIntegerDictionaryEncodingEnabled(
connectorSessionProperties);
}

if (!options->orcWriterStringDictionaryEncodingEnabled) {
options->orcWriterStringDictionaryEncodingEnabled =
hiveConfig_->isOrcWriterStringDictionaryEncodingEnabled(
connectorSessionProperties);
}

if (!options->orcMinCompressionSize) {
options->orcMinCompressionSize = std::optional(
hiveConfig_->orcWriterMinCompressionSize(connectorSessionProperties));
}

if (!options->orcLinearStripeSizeHeuristics) {
options->orcLinearStripeSizeHeuristics =
std::optional(hiveConfig_->orcWriterLinearStripeSizeHeuristics(
connectorSessionProperties));
}

if (options->serdeParameters.empty()) {
options->serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
}

auto compressionLevel =
hiveConfig_->orcWriterCompressionLevel(connectorSessionProperties);

if (!options->zlibCompressionLevel) {
options->zlibCompressionLevel =
compressionLevel.value_or(kDefaultZlibCompressionLevel);
}
if (!options->zstdCompressionLevel) {
options->zstdCompressionLevel =
compressionLevel.value_or(kDefaultZstdCompressionLevel);
}
updateWriterOptionsFromHiveConfig(
insertTableHandle_->tableStorageFormat(),
hiveConfig_,
connectorSessionProperties,
options);

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Writer;
}

namespace facebook::velox::connector::hive {

class HiveColumnHandle;

class LocationHandle;
Expand Down
Loading

0 comments on commit a6d2f6e

Please sign in to comment.