diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake index 377409a46e52b..e3b72f0380bbe 100644 --- a/cpp/cmake_modules/BuildUtils.cmake +++ b/cpp/cmake_modules/BuildUtils.cmake @@ -556,6 +556,7 @@ function(ADD_TEST_CASE REL_TEST_NAME) EXTRA_INCLUDES EXTRA_DEPENDENCIES LABELS + EXTRA_LABELS PREFIX) cmake_parse_arguments(ARG "${options}" @@ -652,10 +653,15 @@ function(ADD_TEST_CASE REL_TEST_NAME) add_dependencies(${TARGET} ${TEST_NAME}) endforeach() + set(LABELS) + list(APPEND LABELS "unittest") if(ARG_LABELS) - set(ARG_LABELS "unittest;${ARG_LABELS}") - else() - set(ARG_LABELS unittest) + list(APPEND LABELS ${ARG_LABELS}) + endif() + # EXTRA_LABELS don't create their own dependencies, they are only used + # to ease running certain test categories. + if(ARG_EXTRA_LABELS) + list(APPEND LABELS ${ARG_EXTRA_LABELS}) endif() foreach(LABEL ${ARG_LABELS}) @@ -673,7 +679,7 @@ function(ADD_TEST_CASE REL_TEST_NAME) add_dependencies(${LABEL_TEST_NAME} ${TEST_NAME}) endforeach() - set_property(TEST ${TEST_NAME} APPEND PROPERTY LABELS ${ARG_LABELS}) + set_property(TEST ${TEST_NAME} APPEND PROPERTY LABELS ${LABELS}) endfunction() # diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 84e25021ab58f..c082eaa1e0091 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -103,9 +103,9 @@ Result> UnionDatasetFactory::Finish(FinishOptions optio } FileSystemDatasetFactory::FileSystemDatasetFactory( - std::vector paths, std::shared_ptr filesystem, + std::vector files, std::shared_ptr filesystem, std::shared_ptr format, FileSystemFactoryOptions options) - : paths_(std::move(paths)), + : files_(std::move(files)), fs_(std::move(filesystem)), format_(std::move(format)), options_(std::move(options)) {} @@ -113,7 +113,7 @@ FileSystemDatasetFactory::FileSystemDatasetFactory( Result> FileSystemDatasetFactory::Make( std::shared_ptr filesystem, const std::vector& paths, std::shared_ptr format, FileSystemFactoryOptions options) { - std::vector filtered_paths; + std::vector filtered_files; for (const auto& path : paths) { if (options.exclude_invalid_files) { ARROW_ASSIGN_OR_RAISE(auto supported, @@ -123,11 +123,32 @@ Result> FileSystemDatasetFactory::Make( } } - filtered_paths.push_back(path); + filtered_files.emplace_back(path); } return std::shared_ptr( - new FileSystemDatasetFactory(std::move(filtered_paths), std::move(filesystem), + new FileSystemDatasetFactory(std::move(filtered_files), std::move(filesystem), + std::move(format), std::move(options))); +} + +Result> FileSystemDatasetFactory::Make( + std::shared_ptr filesystem, const std::vector& files, + std::shared_ptr format, FileSystemFactoryOptions options) { + std::vector filtered_files; + for (const auto& info : files) { + if (options.exclude_invalid_files) { + ARROW_ASSIGN_OR_RAISE(auto supported, + format->IsSupported(FileSource(info, filesystem))); + if (!supported) { + continue; + } + } + + filtered_files.emplace_back(info); + } + + return std::shared_ptr( + new FileSystemDatasetFactory(std::move(filtered_files), std::move(filesystem), std::move(format), std::move(options))); } @@ -156,27 +177,21 @@ Result> FileSystemDatasetFactory::Make( ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector)); - std::vector paths; - for (const auto& info : files) { - const auto& path = info.path(); - - if (!info.IsFile()) { - // TODO(fsaintjacques): push this filtering into Selector logic so we - // don't copy big vector around. - continue; - } - - if (StartsWithAnyOf(path, options.selector_ignore_prefixes)) { - continue; - } - - paths.push_back(path); - } + // Filter out anything that's not a file or that's explicitly ignored + auto files_end = + std::remove_if(files.begin(), files.end(), [&](const fs::FileInfo& info) { + if (!info.IsFile() || + StartsWithAnyOf(info.path(), options.selector_ignore_prefixes)) { + return true; + } + return false; + }); + files.erase(files_end, files.end()); // Sorting by path guarantees a stability sometimes needed by unit tests. - std::sort(paths.begin(), paths.end()); + std::sort(files.begin(), files.end(), fs::FileInfo::ByPath()); - return Make(std::move(filesystem), std::move(paths), std::move(format), + return Make(std::move(filesystem), std::move(files), std::move(format), std::move(options)); } @@ -186,15 +201,15 @@ Result>> FileSystemDatasetFactory::InspectSc const bool has_fragments_limit = options.fragments >= 0; int fragments = options.fragments; - for (const auto& path : paths_) { + for (const auto& info : files_) { if (has_fragments_limit && fragments-- == 0) break; - ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({path, fs_})); + ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({info, fs_})); schemas.push_back(schema); } ARROW_ASSIGN_OR_RAISE(auto partition_schema, options_.partitioning.GetOrInferSchema( - StripPrefixAndFilename(paths_, options_.partition_base_dir))); + StripPrefixAndFilename(files_, options_.partition_base_dir))); schemas.push_back(partition_schema); return schemas; @@ -223,10 +238,10 @@ Result> FileSystemDatasetFactory::Finish(FinishOptions } std::vector> fragments; - for (const auto& path : paths_) { - auto fixed_path = StripPrefixAndFilename(path, options_.partition_base_dir); + for (const auto& info : files_) { + auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir); ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path)); - ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition)); + ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition)); fragments.push_back(fragment); } diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 634073f8cef34..5176f4f53a7a2 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -222,14 +222,18 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { Result> Finish(FinishOptions options) override; protected: - FileSystemDatasetFactory(std::vector paths, + static Result> Make( + std::shared_ptr filesystem, const std::vector& files, + std::shared_ptr format, FileSystemFactoryOptions options); + + FileSystemDatasetFactory(std::vector files, std::shared_ptr filesystem, std::shared_ptr format, FileSystemFactoryOptions options); Result> PartitionSchema(); - std::vector paths_; + std::vector files_; std::shared_ptr fs_; std::shared_ptr format_; FileSystemFactoryOptions options_; diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index 4f5aa5a0953bc..2fc21de7ddf14 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -222,9 +222,10 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) { factory_options_.partitioning = std::make_shared( schema({field("a", int32()), field("b", int32())})); + auto paths = std::vector{partition_path, unpartition_path}; + ASSERT_OK_AND_ASSIGN( - factory_, FileSystemDatasetFactory::Make(fs_, {partition_path, unpartition_path}, - format_, factory_options_)); + factory_, FileSystemDatasetFactory::Make(fs_, paths, format_, factory_options_)); InspectOptions options; AssertInspect(schema({field("a", int32()), field("b", int32())}), options); diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 7f2d8f4e0befd..806c625d52b1a 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -36,7 +36,7 @@ namespace dataset { Result> FileSource::Open() const { if (filesystem_) { - return filesystem_->OpenInputFile(path_); + return filesystem_->OpenInputFile(file_info_); } if (buffer_) { diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index d822a8aca44e6..246e71dd6d548 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -47,7 +47,13 @@ class ARROW_DS_EXPORT FileSource { public: FileSource(std::string path, std::shared_ptr filesystem, Compression::type compression = Compression::UNCOMPRESSED) - : path_(std::move(path)), + : file_info_(std::move(path)), + filesystem_(std::move(filesystem)), + compression_(compression) {} + + FileSource(fs::FileInfo info, std::shared_ptr filesystem, + Compression::type compression = Compression::UNCOMPRESSED) + : file_info_(std::move(info)), filesystem_(std::move(filesystem)), compression_(compression) {} @@ -87,7 +93,7 @@ class ARROW_DS_EXPORT FileSource { const std::string& path() const { static std::string buffer_path = ""; static std::string custom_open_path = ""; - return filesystem_ ? path_ : buffer_ ? buffer_path : custom_open_path; + return filesystem_ ? file_info_.path() : buffer_ ? buffer_path : custom_open_path; } /// \brief Return the filesystem, if any. Otherwise returns nullptr @@ -104,7 +110,7 @@ class ARROW_DS_EXPORT FileSource { return Status::Invalid("Called Open() on an uninitialized FileSource"); } - std::string path_; + fs::FileInfo file_info_; std::shared_ptr filesystem_; std::shared_ptr buffer_; CustomOpen custom_open_; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index ffb69790fd09b..84f8ef6e37340 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -613,6 +613,8 @@ static inline Result FileFromRowGroup( } } + // TODO Is it possible to infer the file size and return a populated FileInfo? + // This could avoid some spurious HEAD requests on S3 (ARROW-8950) path = fs::internal::JoinAbstractPath(std::vector{base_path, path}); // Normalizing path is required for Windows. return filesystem->NormalizePath(std::move(path)); diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index f2d8488276ba7..f882645eb4230 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -608,12 +608,23 @@ std::string StripPrefixAndFilename(const std::string& path, const std::string& p std::vector StripPrefixAndFilename(const std::vector& paths, const std::string& prefix) { std::vector result; + result.reserve(paths.size()); for (const auto& path : paths) { result.emplace_back(StripPrefixAndFilename(path, prefix)); } return result; } +std::vector StripPrefixAndFilename(const std::vector& files, + const std::string& prefix) { + std::vector result; + result.reserve(files.size()); + for (const auto& info : files) { + result.emplace_back(StripPrefixAndFilename(info.path(), prefix)); + } + return result; +} + Result> PartitioningOrFactory::GetOrInferSchema( const std::vector& paths) { if (auto part = partitioning()) { diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 56a8db965e15f..f0d599a464a16 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -264,6 +264,10 @@ ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path, ARROW_DS_EXPORT std::vector StripPrefixAndFilename( const std::vector& paths, const std::string& prefix); +/// \brief Vector version of StripPrefixAndFilename. +ARROW_DS_EXPORT std::vector StripPrefixAndFilename( + const std::vector& files, const std::string& prefix); + /// \brief Either a Partitioning or a PartitioningFactory class ARROW_DS_EXPORT PartitioningOrFactory { public: diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 76e3c1f952619..8f22b391d1c4a 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -311,7 +311,7 @@ struct MakeFileSystemDatasetMixin { } ASSERT_OK_AND_ASSIGN(auto fragment, - format->MakeFragment({info.path(), fs_}, partitions[i])); + format->MakeFragment({info, fs_}, partitions[i])); fragments.push_back(std::move(fragment)); } diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index d8a03c2f6aeaf..b2eb480859e81 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -25,10 +25,12 @@ add_arrow_test(filesystem-test SOURCES filesystem_test.cc localfs_test.cc - path_forest_test.cc) + path_forest_test.cc + EXTRA_LABELS + filesystem) if(ARROW_S3) - add_arrow_test(s3fs_test) + add_arrow_test(s3fs_test EXTRA_LABELS filesystem) if(ARROW_BUILD_TESTS) add_executable(arrow-s3fs-narrative-test s3fs_narrative_test.cc) @@ -48,5 +50,5 @@ if(ARROW_S3) endif() if(ARROW_HDFS) - add_arrow_test(hdfs_test) + add_arrow_test(hdfs_test EXTRA_LABELS filesystem) endif() diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index df3b87ac0334e..b361440d3c3c5 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -137,6 +137,28 @@ Status FileSystem::DeleteFiles(const std::vector& paths) { return st; } +Result> FileSystem::OpenInputStream( + const FileInfo& info) { + if (info.type() == FileType::NotFound) { + return internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return internal::NotAFile(info.path()); + } + return OpenInputStream(info.path()); +} + +Result> FileSystem::OpenInputFile( + const FileInfo& info) { + if (info.type() == FileType::NotFound) { + return internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return internal::NotAFile(info.path()); + } + return OpenInputFile(info.path()); +} + ////////////////////////////////////////////////////////////////////////// // SubTreeFileSystem implementation @@ -264,6 +286,15 @@ Result> SubTreeFileSystem::OpenInputStream( return base_fs_->OpenInputStream(s); } +Result> SubTreeFileSystem::OpenInputStream( + const FileInfo& info) { + auto s = info.path(); + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + FileInfo new_info(info); + new_info.set_path(std::move(s)); + return base_fs_->OpenInputStream(new_info); +} + Result> SubTreeFileSystem::OpenInputFile( const std::string& path) { auto s = path; @@ -271,6 +302,15 @@ Result> SubTreeFileSystem::OpenInputFile( return base_fs_->OpenInputFile(s); } +Result> SubTreeFileSystem::OpenInputFile( + const FileInfo& info) { + auto s = info.path(); + RETURN_NOT_OK(PrependBaseNonEmpty(&s)); + FileInfo new_info(info); + new_info.set_path(std::move(s)); + return base_fs_->OpenInputFile(new_info); +} + Result> SubTreeFileSystem::OpenOutputStream( const std::string& path) { auto s = path; @@ -349,6 +389,13 @@ Result> SlowFileSystem::OpenInputStream( return std::make_shared(stream, latencies_); } +Result> SlowFileSystem::OpenInputStream( + const FileInfo& info) { + latencies_->Sleep(); + ARROW_ASSIGN_OR_RAISE(auto stream, base_fs_->OpenInputStream(info)); + return std::make_shared(stream, latencies_); +} + Result> SlowFileSystem::OpenInputFile( const std::string& path) { latencies_->Sleep(); @@ -356,6 +403,13 @@ Result> SlowFileSystem::OpenInputFile( return std::make_shared(file, latencies_); } +Result> SlowFileSystem::OpenInputFile( + const FileInfo& info) { + latencies_->Sleep(); + ARROW_ASSIGN_OR_RAISE(auto file, base_fs_->OpenInputFile(info)); + return std::make_shared(file, latencies_); +} + Result> SlowFileSystem::OpenOutputStream( const std::string& path) { latencies_->Sleep(); diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 178a2cb2172d0..ecaa36b688f99 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -56,6 +56,9 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable { FileInfo(const FileInfo&) = default; FileInfo& operator=(const FileInfo&) = default; + explicit FileInfo(std::string path, FileType type = FileType::Unknown) + : path_(std::move(path)), type_(type) {} + /// The file type FileType type() const { return type_; } void set_type(FileType type) { type_ = type; } @@ -107,8 +110,8 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable { }; protected: - FileType type_ = FileType::Unknown; std::string path_; + FileType type_ = FileType::Unknown; int64_t size_ = kNoSize; TimePoint mtime_ = kNoTime; }; @@ -205,10 +208,23 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input stream for sequential reading. virtual Result> OpenInputStream( const std::string& path) = 0; + /// Open an input stream for sequential reading. + /// + /// This override assumes the given FileInfo validly represents the file's + /// characteristics, and may optimize access depending on them (for example + /// avoid querying the file size or its existence). + virtual Result> OpenInputStream(const FileInfo& info); /// Open an input file for random access reading. virtual Result> OpenInputFile( const std::string& path) = 0; + /// Open an input file for random access reading. + /// + /// This override assumes the given FileInfo validly represents the file's + /// characteristics, and may optimize access depending on them (for example + /// avoid querying the file size or its existence). + virtual Result> OpenInputFile( + const FileInfo& info); /// Open an output stream for sequential writing. /// @@ -266,8 +282,11 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { Result> OpenInputStream( const std::string& path) override; + Result> OpenInputStream(const FileInfo& info) override; Result> OpenInputFile( const std::string& path) override; + Result> OpenInputFile( + const FileInfo& info) override; Result> OpenOutputStream( const std::string& path) override; Result> OpenAppendStream( @@ -318,8 +337,11 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { Result> OpenInputStream( const std::string& path) override; + Result> OpenInputStream(const FileInfo& info) override; Result> OpenInputFile( const std::string& path) override; + Result> OpenInputFile( + const FileInfo& info) override; Result> OpenOutputStream( const std::string& path) override; Result> OpenAppendStream( diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc index f5ae5e4e4c7f0..b724daae5803a 100644 --- a/cpp/src/arrow/filesystem/mockfs.cc +++ b/cpp/src/arrow/filesystem/mockfs.cc @@ -26,6 +26,7 @@ #include "arrow/buffer.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/util/logging.h" @@ -38,18 +39,6 @@ namespace internal { namespace { -Status PathNotFound(const std::string& path) { - return Status::IOError("Path does not exist '", path, "'"); -} - -Status NotADir(const std::string& path) { - return Status::IOError("Not a directory: '", path, "'"); -} - -Status NotAFile(const std::string& path) { - return Status::IOError("Not a regular file: '", path, "'"); -} - //////////////////////////////////////////////////////////////////////////// // Filesystem structure diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 70c87f46ec121..acdbd83b70459 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -64,6 +64,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/s3_internal.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/io/util_internal.h" @@ -83,16 +84,16 @@ using ::Aws::Client::AWSError; using ::Aws::S3::S3Errors; namespace S3Model = Aws::S3::Model; -using ::arrow::fs::internal::ConnectRetryStrategy; -using ::arrow::fs::internal::ErrorToStatus; -using ::arrow::fs::internal::FromAwsDatetime; -using ::arrow::fs::internal::FromAwsString; -using ::arrow::fs::internal::IsAlreadyExists; -using ::arrow::fs::internal::IsNotFound; -using ::arrow::fs::internal::OutcomeToResult; -using ::arrow::fs::internal::OutcomeToStatus; -using ::arrow::fs::internal::ToAwsString; -using ::arrow::fs::internal::ToURLEncodedAwsString; +using internal::ConnectRetryStrategy; +using internal::ErrorToStatus; +using internal::FromAwsDatetime; +using internal::FromAwsString; +using internal::IsAlreadyExists; +using internal::IsNotFound; +using internal::OutcomeToResult; +using internal::OutcomeToStatus; +using internal::ToAwsString; +using internal::ToURLEncodedAwsString; const char* kS3DefaultRegion = "us-east-1"; @@ -278,24 +279,25 @@ struct S3Path { std::string key; std::vector key_parts; - static Status FromString(const std::string& s, S3Path* out) { + static Result FromString(const std::string& s) { const auto src = internal::RemoveTrailingSlash(s); auto first_sep = src.find_first_of(kSep); if (first_sep == 0) { return Status::Invalid("Path cannot start with a separator ('", s, "')"); } if (first_sep == std::string::npos) { - *out = {std::string(src), std::string(src), "", {}}; - return Status::OK(); + return S3Path{std::string(src), std::string(src), "", {}}; } - out->full_path = std::string(src); - out->bucket = std::string(src.substr(0, first_sep)); - out->key = std::string(src.substr(first_sep + 1)); - out->key_parts = internal::SplitAbstractPath(out->key); - return Validate(out); + S3Path path; + path.full_path = std::string(src); + path.bucket = std::string(src.substr(0, first_sep)); + path.key = std::string(src.substr(first_sep + 1)); + path.key_parts = internal::SplitAbstractPath(path.key); + RETURN_NOT_OK(Validate(&path)); + return path; } - static Status Validate(S3Path* path) { + static Status Validate(const S3Path* path) { auto result = internal::ValidateAbstractPathParts(path->key_parts); if (!result.ok()) { return Status::Invalid(result.message(), " in path ", path->full_path); @@ -335,15 +337,15 @@ struct S3Path { // XXX return in OutcomeToStatus instead? Status PathNotFound(const S3Path& path) { - return Status::IOError("Path does not exist '", path.full_path, "'"); + return ::arrow::fs::internal::PathNotFound(path.full_path); } Status PathNotFound(const std::string& bucket, const std::string& key) { - return Status::IOError("Path does not exist '", bucket, kSep, key, "'"); + return ::arrow::fs::internal::PathNotFound(bucket + kSep + key); } Status NotAFile(const S3Path& path) { - return Status::IOError("Not a regular file: '", path.full_path, "'"); + return ::arrow::fs::internal::NotAFile(path.full_path); } Status ValidateFilePath(const S3Path& path) { @@ -397,9 +399,17 @@ class ObjectInputFile : public io::RandomAccessFile { ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path) : client_(client), path_(path) {} + ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path, int64_t size) + : client_(client), path_(path), content_length_(size) {} + Status Init() { // Issue a HEAD Object to get the content-length and ensure any // errors (e.g. file not found) don't wait until the first Read() call. + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + S3Model::HeadObjectRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); @@ -518,7 +528,7 @@ class ObjectInputFile : public io::RandomAccessFile { S3Path path_; bool closed_ = false; int64_t pos_ = 0; - int64_t content_length_ = -1; + int64_t content_length_ = kNoSize; }; // Minimum size for each part of a multipart upload, except for the last part. @@ -1242,8 +1252,7 @@ bool S3FileSystem::Equals(const FileSystem& other) const { S3Options S3FileSystem::options() const { return impl_->options(); } Result S3FileSystem::GetFileInfo(const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); FileInfo info; info.set_path(s); @@ -1308,8 +1317,7 @@ Result S3FileSystem::GetFileInfo(const std::string& s) { } Result> S3FileSystem::GetFileInfo(const FileSelector& select) { - S3Path base_path; - RETURN_NOT_OK(S3Path::FromString(select.base_dir, &base_path)); + ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir)); std::vector results; @@ -1335,8 +1343,7 @@ Result> S3FileSystem::GetFileInfo(const FileSelector& sele } Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.key.empty()) { // Create bucket @@ -1377,8 +1384,7 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { } Status S3FileSystem::DeleteDir(const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.empty()) { return Status::NotImplemented("Cannot delete all S3 buckets"); @@ -1400,8 +1406,7 @@ Status S3FileSystem::DeleteDir(const std::string& s) { } Status S3FileSystem::DeleteDirContents(const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); if (path.empty()) { return Status::NotImplemented("Cannot delete all S3 buckets"); @@ -1412,8 +1417,7 @@ Status S3FileSystem::DeleteDirContents(const std::string& s) { } Status S3FileSystem::DeleteFile(const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); // Check the object exists @@ -1443,10 +1447,9 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) { // one must copy all directory contents one by one (including object data), // then delete the original contents. - S3Path src_path, dest_path; - RETURN_NOT_OK(S3Path::FromString(src, &src_path)); + ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src)); RETURN_NOT_OK(ValidateFilePath(src_path)); - RETURN_NOT_OK(S3Path::FromString(dest, &dest_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest)); RETURN_NOT_OK(ValidateFilePath(dest_path)); if (src_path == dest_path) { @@ -1459,10 +1462,9 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) { } Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) { - S3Path src_path, dest_path; - RETURN_NOT_OK(S3Path::FromString(src, &src_path)); + ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src)); RETURN_NOT_OK(ValidateFilePath(src_path)); - RETURN_NOT_OK(S3Path::FromString(dest, &dest_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest)); RETURN_NOT_OK(ValidateFilePath(dest_path)); if (src_path == dest_path) { @@ -1473,8 +1475,7 @@ Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) { Result> S3FileSystem::OpenInputStream( const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); auto ptr = std::make_shared(impl_->client_.get(), path); @@ -1482,10 +1483,26 @@ Result> S3FileSystem::OpenInputStream( return ptr; } +Result> S3FileSystem::OpenInputStream( + const FileInfo& info) { + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = std::make_shared(impl_->client_.get(), path, info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; +} + Result> S3FileSystem::OpenInputFile( const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); auto ptr = std::make_shared(impl_->client_.get(), path); @@ -1493,10 +1510,26 @@ Result> S3FileSystem::OpenInputFile( return ptr; } +Result> S3FileSystem::OpenInputFile( + const FileInfo& info) { + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = std::make_shared(impl_->client_.get(), path, info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; +} + Result> S3FileSystem::OpenOutputStream( const std::string& s) { - S3Path path; - RETURN_NOT_OK(S3Path::FromString(s, &path)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); auto ptr = diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 35bfa88266a68..5aebc61368340 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -119,12 +119,23 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { /// a custom readahead strategy to avoid idle waits. Result> OpenInputStream( const std::string& path) override; + /// Create a sequential input stream for reading from a S3 object. + /// + /// This override avoids a HEAD request by assuming the FileInfo + /// contains correct information. + Result> OpenInputStream(const FileInfo& info) override; /// Create a random access file for reading from a S3 object. /// /// See OpenInputStream for performance notes. Result> OpenInputFile( const std::string& path) override; + /// Create a random access file for reading from a S3 object. + /// + /// This override avoids a HEAD request by assuming the FileInfo + /// contains correct information. + Result> OpenInputFile( + const FileInfo& info) override; /// Create a sequential output stream for writing to a S3 object. /// diff --git a/cpp/src/arrow/filesystem/test_util.cc b/cpp/src/arrow/filesystem/test_util.cc index a688695fa7e43..1d90b303c85b9 100644 --- a/cpp/src/arrow/filesystem/test_util.cc +++ b/cpp/src/arrow/filesystem/test_util.cc @@ -826,6 +826,36 @@ void GenericFileSystemTest::TestOpenInputStream(FileSystem* fs) { ASSERT_RAISES(IOError, fs->OpenInputStream("AB")); } +void GenericFileSystemTest::TestOpenInputStreamWithFileInfo(FileSystem* fs) { + ASSERT_OK(fs->CreateDir("AB")); + CreateFile(fs, "AB/abc", "some data"); + + ASSERT_OK_AND_ASSIGN(auto info, fs->GetFileInfo("AB/abc")); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(info)); + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(9)); + AssertBufferEqual(*buffer, "some data"); + + // Passing an incomplete FileInfo should also work + info.set_type(FileType::Unknown); + info.set_size(kNoSize); + info.set_mtime(kNoTime); + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(info)); + ASSERT_OK_AND_ASSIGN(buffer, stream->Read(4)); + AssertBufferEqual(*buffer, "some"); + + // File does not exist + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo("zzzzt")); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); + // (same, with incomplete FileInfo) + info.set_type(FileType::Unknown); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); + + // Cannot open directory + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo("AB")); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); +} + void GenericFileSystemTest::TestOpenInputFile(FileSystem* fs) { ASSERT_OK(fs->CreateDir("AB")); CreateFile(fs, "AB/abc", "some other data"); @@ -847,6 +877,38 @@ void GenericFileSystemTest::TestOpenInputFile(FileSystem* fs) { ASSERT_RAISES(IOError, fs->OpenInputFile("AB")); } +void GenericFileSystemTest::TestOpenInputFileWithFileInfo(FileSystem* fs) { + ASSERT_OK(fs->CreateDir("AB")); + CreateFile(fs, "AB/abc", "some data"); + + ASSERT_OK_AND_ASSIGN(auto info, fs->GetFileInfo("AB/abc")); + + ASSERT_OK_AND_ASSIGN(auto file, fs->OpenInputFile(info)); + ASSERT_OK_AND_EQ(9, file->GetSize()); + ASSERT_OK_AND_ASSIGN(auto buffer, file->Read(9)); + AssertBufferEqual(*buffer, "some data"); + + // Passing an incomplete FileInfo should also work + info.set_type(FileType::Unknown); + info.set_size(kNoSize); + info.set_mtime(kNoTime); + ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(info)); + ASSERT_OK_AND_EQ(9, file->GetSize()); + ASSERT_OK_AND_ASSIGN(buffer, file->Read(4)); + AssertBufferEqual(*buffer, "some"); + + // File does not exist + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo("zzzzt")); + ASSERT_RAISES(IOError, fs->OpenInputFile(info)); + // (same, with incomplete FileInfo) + info.set_type(FileType::Unknown); + ASSERT_RAISES(IOError, fs->OpenInputFile(info)); + + // Cannot open directory + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo("AB")); + ASSERT_RAISES(IOError, fs->OpenInputFile(info)); +} + #define GENERIC_FS_TEST_DEFINE(FUNC_NAME) \ void GenericFileSystemTest::FUNC_NAME() { FUNC_NAME(GetEmptyFileSystem().get()); } @@ -867,7 +929,11 @@ GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelectorWithRecursion) GENERIC_FS_TEST_DEFINE(TestOpenOutputStream) GENERIC_FS_TEST_DEFINE(TestOpenAppendStream) GENERIC_FS_TEST_DEFINE(TestOpenInputStream) +GENERIC_FS_TEST_DEFINE(TestOpenInputStreamWithFileInfo) GENERIC_FS_TEST_DEFINE(TestOpenInputFile) +GENERIC_FS_TEST_DEFINE(TestOpenInputFileWithFileInfo) + +#undef GENERIC_FS_TEST_DEFINE } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/test_util.h b/cpp/src/arrow/filesystem/test_util.h index 084c9e813dbee..82af044dc1a2c 100644 --- a/cpp/src/arrow/filesystem/test_util.h +++ b/cpp/src/arrow/filesystem/test_util.h @@ -31,17 +31,11 @@ namespace fs { static constexpr double kTimeSlack = 2.0; // In seconds static inline FileInfo File(std::string path) { - FileInfo info; - info.set_type(FileType::File); - info.set_path(path); - return info; + return FileInfo(std::move(path), FileType::File); } static inline FileInfo Dir(std::string path) { - FileInfo info; - info.set_type(FileType::Directory); - info.set_path(path); - return info; + return FileInfo(std::move(path), FileType::Directory); } ARROW_TESTING_EXPORT @@ -116,7 +110,9 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest { void TestOpenOutputStream(); void TestOpenAppendStream(); void TestOpenInputStream(); + void TestOpenInputStreamWithFileInfo(); void TestOpenInputFile(); + void TestOpenInputFileWithFileInfo(); protected: // This function should return the filesystem under test. @@ -154,7 +150,9 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest { void TestOpenOutputStream(FileSystem* fs); void TestOpenAppendStream(FileSystem* fs); void TestOpenInputStream(FileSystem* fs); + void TestOpenInputStreamWithFileInfo(FileSystem* fs); void TestOpenInputFile(FileSystem* fs); + void TestOpenInputFileWithFileInfo(FileSystem* fs); }; #define GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, NAME) \ @@ -178,7 +176,9 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest { GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenOutputStream) \ GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenAppendStream) \ GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputStream) \ - GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputFile) + GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputStreamWithFileInfo) \ + GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputFile) \ + GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputFileWithFileInfo) #define GENERIC_FS_TEST_FUNCTIONS(TEST_CLASS) \ GENERIC_FS_TEST_FUNCTIONS_MACROS(TEST_F, TEST_CLASS) diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index f4d90baa79908..dc6381710f50a 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -47,6 +47,18 @@ Status CopyStream(const std::shared_ptr& src, return Status::OK(); } +Status PathNotFound(const std::string& path) { + return Status::IOError("Path does not exist '", path, "'"); +} + +Status NotADir(const std::string& path) { + return Status::IOError("Not a directory: '", path, "'"); +} + +Status NotAFile(const std::string& path) { + return Status::IOError("Not a regular file: '", path, "'"); +} + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index 90437a45388c3..0f59a23cbd416 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -36,6 +36,15 @@ ARROW_EXPORT Status CopyStream(const std::shared_ptr& src, const std::shared_ptr& dest, int64_t chunk_size); +ARROW_EXPORT +Status PathNotFound(const std::string& path); + +ARROW_EXPORT +Status NotADir(const std::string& path); + +ARROW_EXPORT +Status NotAFile(const std::string& path); + } // namespace internal } // namespace fs } // namespace arrow