Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing IcebergSplitReader #7362

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions velox/common/testutil/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,5 @@ add_executable(velox_test_util_test TestValueTest.cpp SpillConfigTest.cpp)
gtest_add_tests(velox_test_util_test "" AUTO)

target_link_libraries(
velox_test_util_test
PRIVATE
velox_test_util
velox_exception
velox_spill_config
velox_exec
gtest
gtest_main)
velox_test_util_test PRIVATE velox_test_util velox_exception
velox_spill_config velox_exec gtest gtest_main)
28 changes: 15 additions & 13 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

add_library(velox_hive_config OBJECT HiveConfig.cpp)

target_link_libraries(velox_hive_config velox_exception)

add_subdirectory(iceberg)

add_library(
velox_hive_connector OBJECT
FileHandle.cpp
Expand All @@ -30,18 +31,19 @@ add_library(

target_link_libraries(
velox_hive_connector
velox_common_io
velox_connector
velox_dwio_catalog_fbhive
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_file
velox_hive_partition_function
velox_s3fs
velox_hdfs
velox_gcs)
PUBLIC velox_hive_iceberg_splitreader
PRIVATE velox_common_io
velox_connector
velox_dwio_catalog_fbhive
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_file
velox_hive_partition_function
velox_s3fs
velox_hdfs
velox_gcs)

add_library(velox_hive_partition_function HivePartitionFunction.cpp)

Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

#pragma once

#include <cstdint>
#include <memory>
#include <string>

#include "velox/common/caching/CachedFactory.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/file/File.h"
#include "velox/dwio/common/InputStream.h"

//#include <cstdint>
//#include <memory>
//#include <string>

namespace facebook::velox {

class Config;
Expand Down
16 changes: 1 addition & 15 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,13 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool());
options.setMaxCoalesceBytes(
HiveConfig::maxCoalescedBytes(connectorQueryCtx->config()));
options.setMaxCoalesceDistance(
HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx->config()));
options.setFileColumnNamesReadAsLowerCase(
HiveConfig::isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->config()));
options.setUseColumnNamesForColumnMapping(
HiveConfig::isOrcUseColumnNames(connectorQueryCtx->config()));

return std::make_unique<HiveDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options);
connectorQueryCtx);
}

std::unique_ptr<DataSink> HiveConnector::createDataSink(
Expand Down
135 changes: 21 additions & 114 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

#include "velox/connectors/hive/HiveDataSource.h"

#include <string>
#include <unordered_map>

#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/dwio/common/DirectBufferedInput.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/FieldReference.h"

#include <string>
#include <unordered_map>

namespace facebook::velox::connector::hive {

class HiveTableHandle;
Expand Down Expand Up @@ -358,18 +357,13 @@ HiveDataSource::HiveDataSource(
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
FileHandleFactory* fileHandleFactory,
core::ExpressionEvaluator* expressionEvaluator,
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options)
: fileHandleFactory_(fileHandleFactory),
readerOpts_(options),
pool_(&options.getMemoryPool()),
ConnectorQueryCtx* connectorQueryCtx)
: pool_(connectorQueryCtx->memoryPool()),
outputType_(outputType),
expressionEvaluator_(expressionEvaluator),
cache_(cache),
scanId_(scanId),
expressionEvaluator_(connectorQueryCtx->expressionEvaluator()),
fileHandleFactory_(fileHandleFactory),
connectorQueryCtx_(connectorQueryCtx),
executor_(executor) {
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : columnHandles) {
Expand Down Expand Up @@ -410,7 +404,8 @@ HiveDataSource::HiveDataSource(
VELOX_CHECK(
hiveTableHandle_ != nullptr,
"TableHandle must be an instance of HiveTableHandle");
if (readerOpts_.isFileColumnNamesReadAsLowerCase()) {
if (HiveConfig::isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->config())) {
checkColumnNameLowerCase(outputType_);
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters());
checkColumnNameLowerCase(hiveTableHandle_->remainingFilter());
Expand Down Expand Up @@ -474,62 +469,20 @@ HiveDataSource::HiveDataSource(
*scanSpec_, *remainingFilter, expressionEvaluator_);
}

readerOpts_.setFileSchema(hiveTableHandle_->dataColumns());
ioStats_ = std::make_shared<io::IoStatistics>();
}

inline uint8_t parseDelimiter(const std::string& delim) {
for (char const& ch : delim) {
if (!std::isdigit(ch)) {
return delim[0];
}
}
return stoi(delim);
}

void HiveDataSource::parseSerdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters) {
auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim);
if (fieldIt == serdeParameters.end()) {
fieldIt = serdeParameters.find("serialization.format");
}
auto collectionIt =
serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim);
if (collectionIt == serdeParameters.end()) {
// For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but
// Hive 3.x uses "collection.delim".
// See: https://issues.apache.org/jira/browse/HIVE-16922)
collectionIt = serdeParameters.find("colelction.delim");
}
auto mapKeyIt =
serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim);

if (fieldIt == serdeParameters.end() &&
collectionIt == serdeParameters.end() &&
mapKeyIt == serdeParameters.end()) {
return;
}

uint8_t fieldDelim = '\1';
uint8_t collectionDelim = '\2';
uint8_t mapKeyDelim = '\3';
if (fieldIt != serdeParameters.end()) {
fieldDelim = parseDelimiter(fieldIt->second);
}
if (collectionIt != serdeParameters.end()) {
collectionDelim = parseDelimiter(collectionIt->second);
}
if (mapKeyIt != serdeParameters.end()) {
mapKeyDelim = parseDelimiter(mapKeyIt->second);
}
dwio::common::SerDeOptions serDeOptions(
fieldDelim, collectionDelim, mapKeyDelim);
readerOpts_.setSerDeOptions(serDeOptions);
}

std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
return SplitReader::create(
split_, readerOutputType_, partitionKeys_, scanSpec_, pool_);
split_,
hiveTableHandle_,
scanSpec_,
readerOutputType_,
&partitionKeys_,
fileHandleFactory_,
connectorQueryCtx_,
executor_,
ioStats_);
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
Expand All @@ -541,30 +494,11 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {

VLOG(1) << "Adding split " << split_->toString();

if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) {
VELOX_CHECK(
readerOpts_.getFileFormat() == split_->fileFormat,
"HiveDataSource received splits of different formats: {} and {}",
toString(readerOpts_.getFileFormat()),
toString(split_->fileFormat));
} else {
parseSerdeParameters(split_->serdeParameters);
readerOpts_.setFileFormat(split_->fileFormat);
}

auto fileHandle = fileHandleFactory_->generate(split_->filePath).second;
auto input = createBufferedInput(*fileHandle, readerOpts_);

if (splitReader_) {
splitReader_.reset();
}
splitReader_ = createSplitReader();
splitReader_->prepareSplit(
hiveTableHandle_,
readerOpts_,
std::move(input),
metadataFilter_,
runtimeStats_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
}

std::optional<RowVectorPtr> HiveDataSource::next(
Expand Down Expand Up @@ -788,33 +722,6 @@ std::shared_ptr<common::ScanSpec> HiveDataSource::makeScanSpec(
return spec;
}

std::unique_ptr<dwio::common::BufferedInput>
HiveDataSource::createBufferedInput(
const FileHandle& fileHandle,
const dwio::common::ReaderOptions& readerOpts) {
if (cache_) {
return std::make_unique<dwio::common::CachedBufferedInput>(
fileHandle.file,
dwio::common::MetricsLog::voidLog(),
fileHandle.uuid.id(),
cache_,
Connector::getTracker(scanId_, readerOpts.loadQuantum()),
fileHandle.groupId.id(),
ioStats_,
executor_,
readerOpts);
}
return std::make_unique<dwio::common::DirectBufferedInput>(
fileHandle.file,
dwio::common::MetricsLog::voidLog(),
fileHandle.uuid.id(),
Connector::getTracker(scanId_, readerOpts.loadQuantum()),
fileHandle.groupId.id(),
ioStats_,
executor_,
readerOpts);
}

vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
filterRows_.resize(output_->size());

Expand Down
24 changes: 6 additions & 18 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ class HiveDataSource : public DataSource {
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
FileHandleFactory* fileHandleFactory,
core::ExpressionEvaluator* expressionEvaluator,
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options);
ConnectorQueryCtx* connectorQueryCtx);

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

Expand Down Expand Up @@ -95,15 +92,9 @@ class HiveDataSource : public DataSource {
protected:
virtual std::unique_ptr<SplitReader> createSplitReader();

std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
const FileHandle&,
const dwio::common::ReaderOptions&);

std::shared_ptr<HiveConnectorSplit> split_;
FileHandleFactory* fileHandleFactory_;
dwio::common::ReaderOptions readerOpts_;
std::shared_ptr<common::ScanSpec> scanSpec_;
memory::MemoryPool* pool_;
std::shared_ptr<common::ScanSpec> scanSpec_;
VectorPtr output_;
std::unique_ptr<SplitReader> splitReader_;

Expand All @@ -128,9 +119,6 @@ class HiveDataSource : public DataSource {
// hold adaptation.
void resetSplit();

void parseSerdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters);

const RowVectorPtr& getEmptyOutput() {
if (!emptyOutput_) {
emptyOutput_ = RowVector::createEmpty(outputType_, pool_);
Expand All @@ -140,7 +128,7 @@ class HiveDataSource : public DataSource {

std::shared_ptr<HiveTableHandle> hiveTableHandle_;

// The row type for the data source output, not including filter only columns
// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;
std::shared_ptr<io::IoStatistics> ioStats_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
Expand All @@ -155,9 +143,9 @@ class HiveDataSource : public DataSource {
SelectivityVector filterRows_;
exec::FilterEvalCtx filterEvalCtx_;

cache::AsyncDataCache* const cache_{nullptr};
const std::string& scanId_;
folly::Executor* executor_;
FileHandleFactory* const fileHandleFactory_;
ConnectorQueryCtx* const connectorQueryCtx_;
folly::Executor* const executor_;
};

} // namespace facebook::velox::connector::hive
Loading
Loading