Skip to content

Commit

Permalink
ARROW-8950: [C++] Avoid HEAD when possible in S3 filesystem
Browse files Browse the repository at this point in the history
Add FileSystem::OpenInput{Stream,File} overrides that accept a FileInfo parameter.
This can be used to optimize file opening when it the file size and existence is already known.  Concretely, avoids a HEAD request in S3.

Closes #7547 from pitrou/ARROW-8950-s3-open-file-info

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
  • Loading branch information
pitrou authored and wesm committed Jun 29, 2020
1 parent cca2db1 commit 4f8504f
Show file tree
Hide file tree
Showing 20 changed files with 361 additions and 114 deletions.
14 changes: 10 additions & 4 deletions cpp/cmake_modules/BuildUtils.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ function(ADD_TEST_CASE REL_TEST_NAME)
EXTRA_INCLUDES
EXTRA_DEPENDENCIES
LABELS
EXTRA_LABELS
PREFIX)
cmake_parse_arguments(ARG
"${options}"
Expand Down Expand Up @@ -652,10 +653,15 @@ function(ADD_TEST_CASE REL_TEST_NAME)
add_dependencies(${TARGET} ${TEST_NAME})
endforeach()

set(LABELS)
list(APPEND LABELS "unittest")
if(ARG_LABELS)
set(ARG_LABELS "unittest;${ARG_LABELS}")
else()
set(ARG_LABELS unittest)
list(APPEND LABELS ${ARG_LABELS})
endif()
# EXTRA_LABELS don't create their own dependencies, they are only used
# to ease running certain test categories.
if(ARG_EXTRA_LABELS)
list(APPEND LABELS ${ARG_EXTRA_LABELS})
endif()

foreach(LABEL ${ARG_LABELS})
Expand All @@ -673,7 +679,7 @@ function(ADD_TEST_CASE REL_TEST_NAME)
add_dependencies(${LABEL_TEST_NAME} ${TEST_NAME})
endforeach()

set_property(TEST ${TEST_NAME} APPEND PROPERTY LABELS ${ARG_LABELS})
set_property(TEST ${TEST_NAME} APPEND PROPERTY LABELS ${LABELS})
endfunction()

#
Expand Down
73 changes: 44 additions & 29 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,17 @@ Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish(FinishOptions optio
}

FileSystemDatasetFactory::FileSystemDatasetFactory(
std::vector<std::string> paths, std::shared_ptr<fs::FileSystem> filesystem,
std::vector<fs::FileInfo> files, std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options)
: paths_(std::move(paths)),
: files_(std::move(files)),
fs_(std::move(filesystem)),
format_(std::move(format)),
options_(std::move(options)) {}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
std::vector<std::string> filtered_paths;
std::vector<fs::FileInfo> filtered_files;
for (const auto& path : paths) {
if (options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
Expand All @@ -123,11 +123,32 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
}
}

filtered_paths.push_back(path);
filtered_files.emplace_back(path);
}

return std::shared_ptr<DatasetFactory>(
new FileSystemDatasetFactory(std::move(filtered_paths), std::move(filesystem),
new FileSystemDatasetFactory(std::move(filtered_files), std::move(filesystem),
std::move(format), std::move(options)));
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
std::vector<fs::FileInfo> filtered_files;
for (const auto& info : files) {
if (options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(info, filesystem)));
if (!supported) {
continue;
}
}

filtered_files.emplace_back(info);
}

return std::shared_ptr<DatasetFactory>(
new FileSystemDatasetFactory(std::move(filtered_files), std::move(filesystem),
std::move(format), std::move(options)));
}

Expand Down Expand Up @@ -156,27 +177,21 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(

ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));

std::vector<std::string> paths;
for (const auto& info : files) {
const auto& path = info.path();

if (!info.IsFile()) {
// TODO(fsaintjacques): push this filtering into Selector logic so we
// don't copy big vector around.
continue;
}

if (StartsWithAnyOf(path, options.selector_ignore_prefixes)) {
continue;
}

paths.push_back(path);
}
// Filter out anything that's not a file or that's explicitly ignored
auto files_end =
std::remove_if(files.begin(), files.end(), [&](const fs::FileInfo& info) {
if (!info.IsFile() ||
StartsWithAnyOf(info.path(), options.selector_ignore_prefixes)) {
return true;
}
return false;
});
files.erase(files_end, files.end());

// Sorting by path guarantees a stability sometimes needed by unit tests.
std::sort(paths.begin(), paths.end());
std::sort(files.begin(), files.end(), fs::FileInfo::ByPath());

return Make(std::move(filesystem), std::move(paths), std::move(format),
return Make(std::move(filesystem), std::move(files), std::move(format),
std::move(options));
}

Expand All @@ -186,15 +201,15 @@ Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSc

const bool has_fragments_limit = options.fragments >= 0;
int fragments = options.fragments;
for (const auto& path : paths_) {
for (const auto& info : files_) {
if (has_fragments_limit && fragments-- == 0) break;
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({path, fs_}));
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({info, fs_}));
schemas.push_back(schema);
}

ARROW_ASSIGN_OR_RAISE(auto partition_schema,
options_.partitioning.GetOrInferSchema(
StripPrefixAndFilename(paths_, options_.partition_base_dir)));
StripPrefixAndFilename(files_, options_.partition_base_dir)));
schemas.push_back(partition_schema);

return schemas;
Expand Down Expand Up @@ -223,10 +238,10 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
}

std::vector<std::shared_ptr<FileFragment>> fragments;
for (const auto& path : paths_) {
auto fixed_path = StripPrefixAndFilename(path, options_.partition_base_dir);
for (const auto& info : files_) {
auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir);
ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition));
fragments.push_back(fragment);
}

Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,18 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
FileSystemDatasetFactory(std::vector<std::string> paths,
static Result<std::shared_ptr<DatasetFactory>> Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);

FileSystemDatasetFactory(std::vector<fs::FileInfo> files,
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);

Result<std::shared_ptr<Schema>> PartitionSchema();

std::vector<std::string> paths_;
std::vector<fs::FileInfo> files_;
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<FileFormat> format_;
FileSystemFactoryOptions options_;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) {
factory_options_.partitioning = std::make_shared<HivePartitioning>(
schema({field("a", int32()), field("b", int32())}));

auto paths = std::vector<std::string>{partition_path, unpartition_path};

ASSERT_OK_AND_ASSIGN(
factory_, FileSystemDatasetFactory::Make(fs_, {partition_path, unpartition_path},
format_, factory_options_));
factory_, FileSystemDatasetFactory::Make(fs_, paths, format_, factory_options_));

InspectOptions options;
AssertInspect(schema({field("a", int32()), field("b", int32())}), options);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace dataset {

Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
if (filesystem_) {
return filesystem_->OpenInputFile(path_);
return filesystem_->OpenInputFile(file_info_);
}

if (buffer_) {
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ class ARROW_DS_EXPORT FileSource {
public:
FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: path_(std::move(path)),
: file_info_(std::move(path)),
filesystem_(std::move(filesystem)),
compression_(compression) {}

FileSource(fs::FileInfo info, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: file_info_(std::move(info)),
filesystem_(std::move(filesystem)),
compression_(compression) {}

Expand Down Expand Up @@ -87,7 +93,7 @@ class ARROW_DS_EXPORT FileSource {
const std::string& path() const {
static std::string buffer_path = "<Buffer>";
static std::string custom_open_path = "<Buffer>";
return filesystem_ ? path_ : buffer_ ? buffer_path : custom_open_path;
return filesystem_ ? file_info_.path() : buffer_ ? buffer_path : custom_open_path;
}

/// \brief Return the filesystem, if any. Otherwise returns nullptr
Expand All @@ -104,7 +110,7 @@ class ARROW_DS_EXPORT FileSource {
return Status::Invalid("Called Open() on an uninitialized FileSource");
}

std::string path_;
fs::FileInfo file_info_;
std::shared_ptr<fs::FileSystem> filesystem_;
std::shared_ptr<Buffer> buffer_;
CustomOpen custom_open_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ static inline Result<std::string> FileFromRowGroup(
}
}

// TODO Is it possible to infer the file size and return a populated FileInfo?
// This could avoid some spurious HEAD requests on S3 (ARROW-8950)
path = fs::internal::JoinAbstractPath(std::vector<std::string>{base_path, path});
// Normalizing path is required for Windows.
return filesystem->NormalizePath(std::move(path));
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,23 @@ std::string StripPrefixAndFilename(const std::string& path, const std::string& p
std::vector<std::string> StripPrefixAndFilename(const std::vector<std::string>& paths,
const std::string& prefix) {
std::vector<std::string> result;
result.reserve(paths.size());
for (const auto& path : paths) {
result.emplace_back(StripPrefixAndFilename(path, prefix));
}
return result;
}

std::vector<std::string> StripPrefixAndFilename(const std::vector<fs::FileInfo>& files,
const std::string& prefix) {
std::vector<std::string> result;
result.reserve(files.size());
for (const auto& info : files) {
result.emplace_back(StripPrefixAndFilename(info.path(), prefix));
}
return result;
}

Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
const std::vector<std::string>& paths) {
if (auto part = partitioning()) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
const std::vector<std::string>& paths, const std::string& prefix);

/// \brief Vector version of StripPrefixAndFilename.
ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
const std::vector<fs::FileInfo>& files, const std::string& prefix);

/// \brief Either a Partitioning or a PartitioningFactory
class ARROW_DS_EXPORT PartitioningOrFactory {
public:
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ struct MakeFileSystemDatasetMixin {
}

ASSERT_OK_AND_ASSIGN(auto fragment,
format->MakeFragment({info.path(), fs_}, partitions[i]));
format->MakeFragment({info, fs_}, partitions[i]));
fragments.push_back(std::move(fragment));
}

Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ add_arrow_test(filesystem-test
SOURCES
filesystem_test.cc
localfs_test.cc
path_forest_test.cc)
path_forest_test.cc
EXTRA_LABELS
filesystem)

if(ARROW_S3)
add_arrow_test(s3fs_test)
add_arrow_test(s3fs_test EXTRA_LABELS filesystem)

if(ARROW_BUILD_TESTS)
add_executable(arrow-s3fs-narrative-test s3fs_narrative_test.cc)
Expand All @@ -48,5 +50,5 @@ if(ARROW_S3)
endif()

if(ARROW_HDFS)
add_arrow_test(hdfs_test)
add_arrow_test(hdfs_test EXTRA_LABELS filesystem)
endif()
Loading

0 comments on commit 4f8504f

Please sign in to comment.