From f016aa8b240dcd7c8f3f1f34e7e863612fceb77d Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 4 Dec 2023 11:37:13 -0500 Subject: [PATCH] GH-38309: [C++] build filesystems as separate modules --- cpp/examples/arrow/CMakeLists.txt | 15 ++ .../arrow/filesystem_definition_example.cc | 150 +++++++++++++++++ .../arrow/filesystem_usage_example.cc | 55 ++++++ cpp/src/arrow/dataset/partition.cc | 4 +- cpp/src/arrow/dataset/partition_test.cc | 2 +- .../engine/substrait/relation_internal.cc | 4 +- cpp/src/arrow/filesystem/CMakeLists.txt | 8 + cpp/src/arrow/filesystem/azurefs.h | 16 +- cpp/src/arrow/filesystem/examplefs.cc | 34 ++++ cpp/src/arrow/filesystem/filesystem.cc | 153 ++++++++++++++++- cpp/src/arrow/filesystem/filesystem.h | 158 ++++++++++++++++-- cpp/src/arrow/filesystem/gcsfs.cc | 4 +- cpp/src/arrow/filesystem/gcsfs.h | 3 +- cpp/src/arrow/filesystem/hdfs.cc | 2 +- cpp/src/arrow/filesystem/hdfs.h | 21 ++- cpp/src/arrow/filesystem/hdfs_test.cc | 2 +- cpp/src/arrow/filesystem/localfs.cc | 18 +- cpp/src/arrow/filesystem/localfs.h | 15 +- cpp/src/arrow/filesystem/localfs_test.cc | 97 +++++++++-- cpp/src/arrow/filesystem/mockfs.h | 25 +-- cpp/src/arrow/filesystem/path_util.cc | 2 +- cpp/src/arrow/filesystem/s3fs.cc | 2 +- cpp/src/arrow/filesystem/s3fs.h | 21 ++- .../arrow/filesystem/s3fs_narrative_test.cc | 6 +- cpp/src/arrow/filesystem/s3fs_test.cc | 2 +- cpp/src/arrow/filesystem/util_internal.cc | 2 +- cpp/src/arrow/filesystem/util_internal.h | 2 +- cpp/src/arrow/flight/cookie_internal.cc | 4 +- cpp/src/arrow/flight/transport.h | 2 +- .../flight/transport/grpc/grpc_client.cc | 4 +- .../flight/transport/grpc/grpc_server.cc | 11 +- .../arrow/flight/transport/ucx/ucx_client.cc | 9 +- .../arrow/flight/transport/ucx/ucx_server.cc | 3 +- .../flight/transport/ucx/util_internal.cc | 2 +- .../flight/transport/ucx/util_internal.h | 2 +- cpp/src/arrow/flight/transport_server.h | 2 +- cpp/src/arrow/flight/types.cc | 2 +- cpp/src/arrow/flight/types.h | 6 +- cpp/src/arrow/io/hdfs_internal.cc | 124 ++++---------- cpp/src/arrow/io/hdfs_internal.h | 14 +- cpp/src/arrow/util/io_util.cc | 68 +++++++- cpp/src/arrow/util/io_util.h | 41 ++++- cpp/src/arrow/util/type_fwd.h | 1 + cpp/src/arrow/util/uri.cc | 33 ++-- cpp/src/arrow/util/uri.h | 9 +- cpp/src/arrow/util/uri_test.cc | 6 +- docs/source/cpp/api/filesystem.rst | 14 +- docs/source/cpp/io.rst | 52 +++++- python/pyarrow/src/arrow/python/filesystem.h | 24 +-- 49 files changed, 985 insertions(+), 271 deletions(-) create mode 100644 cpp/examples/arrow/filesystem_definition_example.cc create mode 100644 cpp/examples/arrow/filesystem_usage_example.cc create mode 100644 cpp/src/arrow/filesystem/examplefs.cc diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt index a092a31733f72..d45f851db860c 100644 --- a/cpp/examples/arrow/CMakeLists.txt +++ b/cpp/examples/arrow/CMakeLists.txt @@ -195,3 +195,18 @@ if(ARROW_GANDIVA) endif() add_arrow_example(gandiva_example EXTRA_LINK_LIBS ${GANDIVA_EXAMPLE_LINK_LIBS}) endif() + +if(ARROW_FILESYSTEM) + add_library(filesystem_definition_example MODULE filesystem_definition_example.cc) + + if(ARROW_BUILD_SHARED) + target_link_libraries(filesystem_definition_example arrow_shared) + else() + target_link_libraries(filesystem_definition_example arrow_static) + endif() + + add_arrow_example(filesystem_usage_example) + target_compile_definitions(filesystem-usage-example + PUBLIC LIBPATH="$" + ) +endif() diff --git a/cpp/examples/arrow/filesystem_definition_example.cc b/cpp/examples/arrow/filesystem_definition_example.cc new file mode 100644 index 0000000000000..a39be28434c43 --- /dev/null +++ b/cpp/examples/arrow/filesystem_definition_example.cc @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +// Demonstrate registering a user-defined Arrow FileSystem outside +// of the Arrow source tree. + +using arrow::Result; +using arrow::Status; +namespace io = arrow::io; +namespace fs = arrow::fs; + +class ExampleFileSystem : public fs::FileSystem { + public: + explicit ExampleFileSystem(const io::IOContext& io_context) + : fs::FileSystem{io_context} {} + + // This is a mock filesystem whose root directory contains a single file. + // All operations which would mutate will simply raise an error. + static constexpr std::string_view kPath = "example_file"; + static constexpr std::string_view kContents = "hello world"; + static fs::FileInfo info() { + fs::FileInfo info; + info.set_path(std::string{kPath}); + info.set_type(fs::FileType::File); + info.set_size(kContents.size()); + return info; + } + + static Status NotFound(std::string_view path) { + return Status::IOError("Path does not exist '", path, "'"); + } + + static Status NoMutation() { + return Status::IOError("operations which would mutate are not permitted"); + } + + Result PathFromUri(const std::string& uri_string) const override { + ARROW_ASSIGN_OR_RAISE(auto uri, arrow::util::Uri::FromString(uri_string)); + return uri.path(); + } + + std::string type_name() const override { return "example"; } + + bool Equals(const FileSystem& other) const override { + return type_name() == other.type_name(); + } + + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + + Result GetFileInfo(const std::string& path) override { + if (path == kPath) { + return info(); + } + return NotFound(path); + } + + Result> GetFileInfo(const fs::FileSelector& select) override { + if (select.base_dir == "/" || select.base_dir == "") { + return std::vector{info()}; + } + if (select.allow_not_found) { + return std::vector{}; + } + return NotFound(select.base_dir); + } + + Status CreateDir(const std::string& path, bool recursive) override { + return NoMutation(); + } + + Status DeleteDir(const std::string& path) override { return NoMutation(); } + + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override { + return NoMutation(); + } + + Status DeleteRootDirContents() override { return NoMutation(); } + + Status DeleteFile(const std::string& path) override { return NoMutation(); } + + Status Move(const std::string& src, const std::string& dest) override { + return NoMutation(); + } + + Status CopyFile(const std::string& src, const std::string& dest) override { + return NoMutation(); + } + + Result> OpenInputStream( + const std::string& path) override { + return OpenInputFile(path); + } + + Result> OpenInputFile( + const std::string& path) override { + if (path == kPath) { + return io::BufferReader::FromString(std::string{kContents}); + } + return NotFound(path); + } + + Result> OpenOutputStream( + const std::string& path, + const std::shared_ptr& metadata) override { + return NoMutation(); + } + + Result> OpenAppendStream( + const std::string& path, + const std::shared_ptr& metadata) override { + return NoMutation(); + } +}; + +fs::FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const arrow::util::Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + auto fs = std::make_shared(io_context); + if (out_path) { + ARROW_ASSIGN_OR_RAISE(*out_path, fs->PathFromUri(uri.ToString())); + } + return fs; + }, +}; diff --git a/cpp/examples/arrow/filesystem_usage_example.cc b/cpp/examples/arrow/filesystem_usage_example.cc new file mode 100644 index 0000000000000..66b8e63c18191 --- /dev/null +++ b/cpp/examples/arrow/filesystem_usage_example.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +namespace fs = arrow::fs; + +// Demonstrate dynamically loading a user-defined Arrow FileSystem + +arrow::Status Execute() { + ARROW_RETURN_NOT_OK(arrow::fs::LoadFileSystemFactories(LIBPATH)); + + std::string uri = "example:///example_file"; + std::cout << "Uri: " << uri << std::endl; + + std::string path; + ARROW_ASSIGN_OR_RAISE(auto fs, arrow::fs::FileSystemFromUri(uri, &path)); + std::cout << "Path: " << path << std::endl; + + fs::FileSelector sel; + sel.base_dir = "/"; + ARROW_ASSIGN_OR_RAISE(auto infos, fs->GetFileInfo(sel)); + + std::cout << "Root directory contains:" << std::endl; + for (const auto& info : infos) { + std::cout << "- " << info << std::endl; + } + return arrow::Status::OK(); +} + +int main() { + auto status = Execute(); + if (!status.ok()) { + std::cerr << "Error occurred : " << status.message() << std::endl; + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index bf7145ca9368d..4d5c6e8ce7a79 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -53,7 +53,7 @@ namespace dataset { namespace { /// Apply UriUnescape, then ensure the results are valid UTF-8. Result SafeUriUnescape(std::string_view encoded) { - auto decoded = ::arrow::internal::UriUnescape(encoded); + auto decoded = ::arrow::util::UriUnescape(encoded); if (!util::ValidateUTF8(decoded)) { return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ", encoded); @@ -755,7 +755,7 @@ Result HivePartitioning::FormatValues( // field_index <-> path nesting relation segments[i] = name + "=" + hive_options_.null_fallback; } else { - segments[i] = name + "=" + arrow::internal::UriEscape(values[i]->ToString()); + segments[i] = name + "=" + arrow::util::UriEscape(values[i]->ToString()); } } diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 1b71be15d19f5..9f0bd7c0be040 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -935,7 +935,7 @@ TEST_F(TestPartitioning, WriteHiveWithSlashesInValues) { "experiment/A/f.csv", "experiment/B/f.csv", "experiment/C/k.csv", "experiment/M/i.csv"}; for (auto partition : unique_partitions) { - encoded_paths.push_back("part=" + arrow::internal::UriEscape(partition)); + encoded_paths.push_back("part=" + arrow::util::UriEscape(partition)); } ASSERT_EQ(all_dirs.size(), encoded_paths.size()); diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 73f55c27ee834..f15f1a5527b7b 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -67,7 +67,7 @@ namespace arrow { using internal::checked_cast; using internal::StartsWith; using internal::ToChars; -using internal::UriFromAbsolutePath; +using util::UriFromAbsolutePath; namespace engine { @@ -463,7 +463,7 @@ Result FromProto(const substrait::Rel& rel, const ExtensionSet& } // Extract and parse the read relation's source URI - ::arrow::internal::Uri item_uri; + ::arrow::util::Uri item_uri; switch (item.path_type_case()) { case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath: RETURN_NOT_OK(item_uri.Parse(item.uri_path())); diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index b9ed11e7608f3..8997ad13c8cc1 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -28,6 +28,14 @@ add_arrow_test(filesystem-test EXTRA_LABELS filesystem) +if(ARROW_BUILD_TESTS) + add_library(arrow_filesystem_example MODULE examplefs.cc) + target_link_libraries(arrow_filesystem_example arrow_${ARROW_TEST_LINKAGE}) + target_compile_definitions(arrow-filesystem-test + PUBLIC ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$" + ) +endif() + if(ARROW_BUILD_BENCHMARKS) add_arrow_benchmark(localfs_benchmark PREFIX diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 2a131e40c05bf..770f698818943 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -196,15 +196,23 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; @@ -246,11 +254,11 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; }; } // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/examplefs.cc b/cpp/src/arrow/filesystem/examplefs.cc new file mode 100644 index 0000000000000..9129b852808f6 --- /dev/null +++ b/cpp/src/arrow/filesystem/examplefs.cc @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/filesystem.h" +#include "arrow/result.h" +#include "arrow/util/uri.h" + +namespace arrow::fs { + +FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + constexpr std::string_view kScheme = "example"; + auto local_uri = "file" + uri.ToString().substr(kScheme.size()); + return FileSystemFromUri(local_uri, io_context, out_path); + }, +}; + +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 810e9c179b156..274f0d146b447 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -15,12 +15,17 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include +#include #include +#include "arrow/type_fwd.h" #include "arrow/util/config.h" #include "arrow/filesystem/filesystem.h" +#include "arrow/util/string.h" #ifdef ARROW_HDFS #include "arrow/filesystem/hdfs.h" #endif @@ -51,11 +56,12 @@ namespace arrow { using internal::checked_pointer_cast; using internal::TaskHints; -using internal::Uri; using io::internal::SubmitIO; +using util::Uri; namespace fs { +using arrow::internal::GetEnvVar; using internal::ConcatAbstractPath; using internal::EnsureTrailingSlash; using internal::GetAbstractPathParent; @@ -128,7 +134,7 @@ std::string FileInfo::extension() const { ////////////////////////////////////////////////////////////////////////// // FileSystem default method implementations -FileSystem::~FileSystem() {} +FileSystem::~FileSystem() = default; Result FileSystem::NormalizePath(std::string path) { return path; } @@ -203,6 +209,10 @@ Future<> FileSystem::DeleteDirContentsAsync(const std::string& path, }); } +Future<> FileSystem::DeleteDirContentsAsync(const std::string& path) { + return DeleteDirContentsAsync(path, false); +} + Result> FileSystem::OpenInputStream( const FileInfo& info) { RETURN_NOT_OK(ValidateInputFileInfo(info)); @@ -279,7 +289,7 @@ SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path, base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()), base_fs_(base_fs) {} -SubTreeFileSystem::~SubTreeFileSystem() {} +SubTreeFileSystem::~SubTreeFileSystem() = default; Result SubTreeFileSystem::NormalizeBasePath( std::string base_path, const std::shared_ptr& base_fs) { @@ -674,6 +684,134 @@ Status CopyFiles(const std::shared_ptr& source_fs, return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); } +class FileSystemFactoryRegistry { + public: + static FileSystemFactoryRegistry* GetInstance() { + static FileSystemFactoryRegistry registry; + return ®istry; + } + + Result FactoryForScheme(const std::string& scheme) { + std::shared_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto it = scheme_to_factory_.find(scheme); + if (it == scheme_to_factory_.end()) return nullptr; + + return it->second.Map([](const auto& r) { return r.factory; }); + } + + Status MergeInto(FileSystemFactoryRegistry* main_registry) { + std::shared_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto& [main_mutex, main_scheme_to_factory, _] = *main_registry; + std::unique_lock main_lock{main_mutex}; + + std::vector duplicated_schemes; + for (auto& [scheme, registered] : scheme_to_factory_) { + if (!registered.ok()) { + duplicated_schemes.emplace_back(scheme); + continue; + } + + auto [it, success] = main_scheme_to_factory.emplace(std::move(scheme), registered); + if (success) continue; + + duplicated_schemes.emplace_back(it->first); + } + scheme_to_factory_.clear(); + + if (duplicated_schemes.empty()) return Status::OK(); + return Status::KeyError("Attempted to register ", duplicated_schemes.size(), + " factories for schemes ['", + arrow::internal::JoinStrings(duplicated_schemes, "', '"), + "'] but those schemes were already registered."); + } + + void EnsureFinalized() { + std::unique_lock lock{mutex_}; + if (finalized_) return; + + for (const auto& [_, registered_or_error] : scheme_to_factory_) { + if (!registered_or_error.ok()) continue; + registered_or_error->finalizer(); + } + finalized_ = true; + } + + Status RegisterFactory(std::string scheme, FileSystemFactory factory, void finalizer(), + bool defer_error) { + std::unique_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto [it, success] = + scheme_to_factory_.emplace(std::move(scheme), Registered{factory, finalizer}); + if (success) { + return Status::OK(); + } + + auto st = Status::KeyError("Attempted to register factory for scheme '", it->first, + "' but that scheme is already registered."); + if (!defer_error) return st; + + it->second = std::move(st); + return Status::OK(); + } + + private: + struct Registered { + FileSystemFactory* factory; + void (*finalizer)(); + }; + + static Status AlreadyFinalized() { + return Status::Invalid("FileSystem factories were already finalized!"); + } + + std::shared_mutex mutex_; + std::unordered_map> scheme_to_factory_; + bool finalized_ = false; +}; + +Status RegisterFileSystemFactory(std::string scheme, FileSystemFactory factory, + void finalizer()) { + return FileSystemFactoryRegistry::GetInstance()->RegisterFactory( + std::move(scheme), factory, finalizer, /*defer_error=*/false); +} + +void EnsureFinalized() { FileSystemFactoryRegistry::GetInstance()->EnsureFinalized(); } + +FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory factory, + void finalizer()) { + DCHECK_OK(FileSystemFactoryRegistry::GetInstance()->RegisterFactory( + std::move(scheme), std::move(factory), finalizer, /*defer_error=*/true)); +} + +extern "C" { +ARROW_EXPORT void* Arrow_FileSystem_GetRegistry() { + return FileSystemFactoryRegistry::GetInstance(); +} +} +constexpr auto kGetRegistryName = "Arrow_FileSystem_GetRegistry"; + +Status LoadFileSystemFactories(const char* libpath) { + using ::arrow::internal::GetSymbolAs; + using ::arrow::internal::LoadDynamicLibrary; + + ARROW_ASSIGN_OR_RAISE(void* lib, LoadDynamicLibrary(libpath)); + + if (auto* get_instance = GetSymbolAs(lib, kGetRegistryName).ValueOr(nullptr)) { + auto* lib_registry = static_cast(get_instance()); + auto* main_registry = FileSystemFactoryRegistry::GetInstance(); + if (lib_registry != main_registry) { + RETURN_NOT_OK(lib_registry->MergeInto(main_registry)); + } + } + + return Status::OK(); +} + namespace { Result> FileSystemFromUriReal(const Uri& uri, @@ -682,6 +820,15 @@ Result> FileSystemFromUriReal(const Uri& uri, std::string* out_path) { const auto scheme = uri.scheme(); + { + ARROW_ASSIGN_OR_RAISE( + auto* factory, + FileSystemFactoryRegistry::GetInstance()->FactoryForScheme(scheme)); + if (factory != nullptr) { + return factory(uri, io_context, out_path); + } + } + if (scheme == "file") { std::string path; ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path)); diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12c1..745b6516b3302 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -38,6 +38,8 @@ namespace arrow { namespace fs { +using arrow::util::Uri; + // A system clock time point expressed as a 64-bit (or more) number of // nanoseconds since the epoch. using TimePoint = @@ -156,7 +158,11 @@ struct IterationTraits { namespace fs { /// \brief Abstract file system API -class ARROW_EXPORT FileSystem : public std::enable_shared_from_this { +class ARROW_EXPORT FileSystem + /// \cond false + : public std::enable_shared_from_this +/// \endcond +{ // NOLINT public: virtual ~FileSystem(); @@ -225,7 +231,8 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Create a directory and subdirectories. /// /// This function succeeds if the directory already exists. - virtual Status CreateDir(const std::string& path, bool recursive = true) = 0; + virtual Status CreateDir(const std::string& path, bool recursive) = 0; + Status CreateDir(const std::string& path) { return CreateDir(path, true); } /// Delete a directory and its contents, recursively. virtual Status DeleteDir(const std::string& path) = 0; @@ -234,12 +241,18 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// /// Like DeleteDir, but doesn't delete the directory itself. /// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents. - virtual Status DeleteDirContents(const std::string& path, - bool missing_dir_ok = false) = 0; + virtual Status DeleteDirContents(const std::string& path, bool missing_dir_ok) = 0; + Status DeleteDirContents(const std::string& path) { + return DeleteDirContents(path, false); + } /// Async version of DeleteDirContents. - virtual Future<> DeleteDirContentsAsync(const std::string& path, - bool missing_dir_ok = false); + virtual Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok); + + /// Async version of DeleteDirContents. + /// + /// This overload allows missing directories. + Future<> DeleteDirContentsAsync(const std::string& path); /// EXPERIMENTAL: Delete the root directory's contents, recursively. /// @@ -272,6 +285,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input stream for sequential reading. virtual Result> OpenInputStream( const std::string& path) = 0; + /// Open an input stream for sequential reading. /// /// This override assumes the given FileInfo validly represents the file's @@ -282,6 +296,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input file for random access reading. virtual Result> OpenInputFile( const std::string& path) = 0; + /// Open an input file for random access reading. /// /// This override assumes the given FileInfo validly represents the file's @@ -293,6 +308,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Async version of OpenInputStream virtual Future> OpenInputStreamAsync( const std::string& path); + /// Async version of OpenInputStream virtual Future> OpenInputStreamAsync( const FileInfo& info); @@ -300,6 +316,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Async version of OpenInputFile virtual Future> OpenInputFileAsync( const std::string& path); + /// Async version of OpenInputFile virtual Future> OpenInputFileAsync( const FileInfo& info); @@ -335,6 +352,9 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this bool default_async_is_sync_ = true; }; +using FileSystemFactory = Result>( + const Uri& uri, const io::IOContext& io_context, std::string* out_path); + /// \brief A FileSystem implementation that delegates to another /// implementation after prepending a fixed base path. /// @@ -361,17 +381,22 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -399,13 +424,13 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: - SubTreeFileSystem() {} + SubTreeFileSystem() = default; const std::string base_path_; std::shared_ptr base_fs_; @@ -433,14 +458,21 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -458,16 +490,25 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { const FileInfo& info) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: std::shared_ptr base_fs_; std::shared_ptr latencies_; }; +/// \brief Ensure all registered filesystem implementations are finalized. +/// +/// Individual finalizers may wait for concurrent calls to finish so as to avoid +/// race conditions. After this function has been called, all filesystem APIs +/// will fail with an error. +/// +/// The user is responsible for synchronization of calls to this function. +void EnsureFinalized(); + /// \defgroup filesystem-factories Functions for creating FileSystem instances /// /// @{ @@ -477,6 +518,8 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", /// "gs" and "gcs". /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[out] out_path (optional) Path inside the filesystem. /// \return out_fs FileSystem instance. @@ -489,6 +532,8 @@ Result> FileSystemFromUri(const std::string& uri, /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", /// "gs" and "gcs". /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[in] io_context an IOContext which will be associated with the filesystem /// \param[out] out_path (optional) Path inside the filesystem. @@ -500,6 +545,8 @@ Result> FileSystemFromUri(const std::string& uri, /// \brief Create a new FileSystem by URI /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// Same as FileSystemFromUri, but in addition also recognize non-URIs /// and treat them as local filesystem paths. Only absolute local filesystem /// paths are allowed. @@ -509,6 +556,8 @@ Result> FileSystemFromUriOrPath( /// \brief Create a new FileSystem by URI with a custom IO context /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// Same as FileSystemFromUri, but in addition also recognize non-URIs /// and treat them as local filesystem paths. Only absolute local filesystem /// paths are allowed. @@ -519,6 +568,85 @@ Result> FileSystemFromUriOrPath( /// @} +/// \defgroup filesystem-factory-registration Helpers for FileSystem registration +/// +/// @{ + +/// \brief Register a Filesystem factory +/// +/// Support for custom Uri schemes can be added by registering a factory +/// for the corresponding FileSystem. +/// +/// \param[in] scheme a Uri scheme which the factory will handle. +/// If a factory has already been registered for a scheme, +/// the new factory will be ignored. +/// \param[in] factory a function which can produce a FileSystem for Uris which match +/// scheme. +/// \param[in] finalizer a function which must be called to finalize the factory before +/// the process exits, or nullptr if no finalization is necessary. +/// \return raises KeyError if a name collision occurs. +ARROW_EXPORT Status RegisterFileSystemFactory(std::string scheme, + FileSystemFactory factory, + void finalizer() = NULLPTR); + +/// \brief Register Filesystem factories from a shared library +/// +/// The library should register factories as part of its initialization. +/// FileSystemRegistrar is provided to make this registration straightforward, +/// each instance at namespace scope in the library will register a factory for a scheme. +/// +/// In addition to dynamically loading the indicated library, registries are merged if +/// necessary (static linkage to arrow can produce isolated registries). +ARROW_EXPORT Status LoadFileSystemFactories(const char* libpath); + +struct FileSystemRegistrar { + /// \brief Register a Filesystem factory at load time + /// + /// Support for custom Uri schemes can be added by registering a factory for the + /// corresponding FileSystem. An instance of this helper can be defined at namespace + /// scope to cause the factory to be registered at load time. + /// + /// Global constructors will finish execution before main() starts if the registrar is + /// linked into the same binary as main(), or before dlopen()/LoadLibrary() returns if + /// the library in which the registrar is defined is dynamically loaded. + /// + /// \code + /// FileSystemRegistrar kSlowFileSystemModule{ + /// "slow+file", + /// [](const Uri& uri, const io::IOContext& io_context, std::string* out_path) + /// ->Result> { + /// constexpr std::string_view kPrefix = "slow+"; + /// auto local_uri = uri.ToString().substr(kPrefix.size()); + /// ARROW_ASSIGN_OR_RAISE(auto base_fs, + /// FileSystemFromUri(local_uri, io_context, out_path)); + /// double average_latency = 1; + /// int32_t seed = 0xDEADBEEF; + /// ARROW_ASSIGN_OR_RAISE(auto params, uri.query_item()); + /// for (const auto& [key, value] : params) { + /// if (key == "average_latency") { + /// average_latency = std::stod(value); + /// } + /// if (key == "seed") { + /// seed = std::stoi(value, nullptr, /*base=*/16); + /// } + /// } + /// return std::make_shared(base_fs, average_latency, seed); + /// })); + /// \endcode + /// + /// \param[in] scheme a Uri scheme which the factory will handle. + /// If a factory has already been registered for a scheme, the + /// new factory will be ignored. + /// \param[in] factory a function which can produce a FileSystem for Uris which match + /// scheme. + /// \param[in] finalizer a function which must be called to finalize the factory before + /// the process exits, or nullptr if no finalization is necessary. + FileSystemRegistrar(std::string scheme, FileSystemFactory factory, + void finalizer() = NULLPTR); +}; + +/// @} + /// \brief Copy files, including from one FileSystem to another /// /// If a source and destination are resident in the same FileSystem FileSystem::CopyFile diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index d41cb49022c0c..8eaa923feddb1 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -751,7 +751,7 @@ GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_obj return options; } -Result GcsOptions::FromUri(const arrow::internal::Uri& uri, +Result GcsOptions::FromUri(const arrow::util::Uri& uri, std::string* out_path) { const auto bucket = uri.host(); auto path = uri.path(); @@ -815,7 +815,7 @@ Result GcsOptions::FromUri(const arrow::internal::Uri& uri, Result GcsOptions::FromUri(const std::string& uri_string, std::string* out_path) { - arrow::internal::Uri uri; + arrow::util::Uri uri; RETURN_NOT_OK(uri.Parse(uri_string)); return FromUri(uri, out_path); } diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index e4a1edfd6bfb9..f1fbc95bf957c 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -146,8 +146,7 @@ struct ARROW_EXPORT GcsOptions { static GcsOptions FromServiceAccountCredentials(const std::string& json_object); /// Initialize from URIs such as "gs://bucket/object". - static Result FromUri(const arrow::internal::Uri& uri, - std::string* out_path); + static Result FromUri(const arrow::util::Uri& uri, std::string* out_path); static Result FromUri(const std::string& uri, std::string* out_path); }; diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc index b227aae65d9cd..d59b2a342d723 100644 --- a/cpp/src/arrow/filesystem/hdfs.cc +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -35,7 +35,7 @@ namespace arrow { using internal::ErrnoFromStatus; using internal::ParseValue; -using internal::Uri; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 798aac0ea9075..25604a39e3ace 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -25,8 +25,7 @@ #include "arrow/io/hdfs.h" #include "arrow/util/uri.h" -namespace arrow { -namespace fs { +namespace arrow::fs { /// Options for the HDFS implementation. struct ARROW_EXPORT HdfsOptions { @@ -51,7 +50,7 @@ struct ARROW_EXPORT HdfsOptions { bool Equals(const HdfsOptions& other) const; - static Result FromUri(const ::arrow::internal::Uri& uri); + static Result FromUri(const ::arrow::util::Uri& uri); static Result FromUri(const std::string& uri); }; @@ -69,16 +68,21 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; @@ -94,10 +98,10 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; /// Create a HdfsFileSystem instance from the given options. static Result> Make( @@ -110,5 +114,4 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { std::unique_ptr impl_; }; -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc index 7ad9e6cd40e65..db5cefef37488 100644 --- a/cpp/src/arrow/filesystem/hdfs_test.cc +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -34,7 +34,7 @@ namespace arrow { using internal::ErrnoFromStatus; -using internal::Uri; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 01ac946379119..fbb33fd00868b 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -24,10 +24,10 @@ #ifdef _WIN32 #include "arrow/util/windows_compatibility.h" #else -#include #include -#include #include +#include +#include #endif #include "arrow/filesystem/filesystem.h" @@ -42,8 +42,7 @@ #include "arrow/util/uri.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace fs { +namespace arrow::fs { using ::arrow::internal::IOErrorFromErrno; #ifdef _WIN32 @@ -217,9 +216,7 @@ Status StatSelector(const PlatformFilename& dir_fn, const FileSelector& select, } // namespace -LocalFileSystemOptions LocalFileSystemOptions::Defaults() { - return LocalFileSystemOptions(); -} +LocalFileSystemOptions LocalFileSystemOptions::Defaults() { return {}; } bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { return use_mmap == other.use_mmap && directory_readahead == other.directory_readahead && @@ -227,7 +224,7 @@ bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { } Result LocalFileSystemOptions::FromUri( - const ::arrow::internal::Uri& uri, std::string* out_path) { + const ::arrow::util::Uri& uri, std::string* out_path) { if (!uri.username().empty() || !uri.password().empty()) { return Status::Invalid("Unsupported username or password in local URI: '", uri.ToString(), "'"); @@ -260,7 +257,7 @@ LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options, const io::IOContext& io_context) : FileSystem(io_context), options_(options) {} -LocalFileSystem::~LocalFileSystem() {} +LocalFileSystem::~LocalFileSystem() = default; Result LocalFileSystem::NormalizePath(std::string path) { return DoNormalizePath(std::move(path)); @@ -689,5 +686,4 @@ Result> LocalFileSystem::OpenAppendStream( return OpenOutputStreamGeneric(path, truncate, append); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h index 108530c2b2778..45a3da317f663 100644 --- a/cpp/src/arrow/filesystem/localfs.h +++ b/cpp/src/arrow/filesystem/localfs.h @@ -62,7 +62,7 @@ struct ARROW_EXPORT LocalFileSystemOptions { bool Equals(const LocalFileSystemOptions& other) const; - static Result FromUri(const ::arrow::internal::Uri& uri, + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path); }; @@ -89,16 +89,21 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { LocalFileSystemOptions options() const { return options_; } /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -113,10 +118,10 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: LocalFileSystemOptions options_; diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index 7ce2a56968679..a49e6442522ec 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -35,14 +35,12 @@ #include "arrow/util/io_util.h" #include "arrow/util/uri.h" -namespace arrow { -namespace fs { -namespace internal { +namespace arrow::fs::internal { using ::arrow::internal::FileDescriptor; using ::arrow::internal::PlatformFilename; using ::arrow::internal::TemporaryDir; -using ::arrow::internal::UriFromAbsolutePath; +using ::arrow::util::UriFromAbsolutePath; class LocalFSTestMixin : public ::testing::Test { public: @@ -86,6 +84,88 @@ Result> FSFromUriOrPath(const std::string& uri, //////////////////////////////////////////////////////////////////////////// // Misc tests +Result> SlowFileSystemFactory(const Uri& uri, + const io::IOContext& io_context, + std::string* out_path) { + auto local_uri = "file" + uri.ToString().substr(uri.scheme().size()); + ARROW_ASSIGN_OR_RAISE(auto base_fs, FileSystemFromUri(local_uri, io_context, out_path)); + double average_latency = 1; + int32_t seed = 0xDEADBEEF; + ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items()); + for (const auto& [key, value] : params) { + if (key == "average_latency") { + average_latency = std::stod(value); + } + if (key == "seed") { + seed = std::stoi(value, nullptr, /*base=*/16); + } + } + return std::make_shared(base_fs, average_latency, seed); +} +FileSystemRegistrar kSlowFileSystemModule{ + "slowfile", + SlowFileSystemFactory, +}; + +TEST(FileSystemFromUri, LinkedRegisteredFactory) { + // Since the registrar's definition is in this translation unit (which is linked to the + // unit test executable), its factory will be registered be loaded automatically before + // main() is entered. + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); +} + +TEST(FileSystemFromUri, LoadedRegisteredFactory) { + // Since the registrar's definition is in libarrow_filesystem_example.so, + // its factory will be registered only after the library is dynamically loaded. + std::string path; + EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path), Raises(StatusCode::Invalid)); + + EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "local"); +} + +TEST(FileSystemFromUri, RuntimeRegisteredFactory) { + std::string path; + EXPECT_THAT(FileSystemFromUri("slowfile2:///hey/yo", &path), + Raises(StatusCode::Invalid)); + + EXPECT_THAT(RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile2:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); +} + +TEST(FileSystemFromUri, RuntimeRegisteredFactoryNameCollision) { + EXPECT_THAT( + RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), + Raises(StatusCode::KeyError, + testing::HasSubstr("Attempted to register factory for scheme 'slowfile2' " + "but that scheme is already registered"))); +} + +FileSystemRegistrar kSegfaultFileSystemModule[]{ + {"segfault", nullptr}, + {"segfault", nullptr}, + {"segfault", nullptr}, +}; +TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) { + // Since multiple registrars are defined in this translation unit which all + // register factories for the 'segfault' scheme, using that scheme in FileSystemFromUri + // is invalidated and raises KeyError. + std::string path; + EXPECT_THAT(FileSystemFromUri("segfault:///hey/yo", &path), + Raises(StatusCode::KeyError)); + // other schemes are not affected by the collision + EXPECT_THAT(FileSystemFromUri("slowfile:///hey/yo", &path), Ok()); +} + TEST(DetectAbsolutePath, Basics) { ASSERT_TRUE(DetectAbsolutePath("/")); ASSERT_TRUE(DetectAbsolutePath("/foo")); @@ -164,7 +244,7 @@ GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGenericMMap); template class TestLocalFS : public LocalFSTestMixin { public: - void SetUp() { + void SetUp() override { LocalFSTestMixin::SetUp(); path_formatter_ = PathFormatter(); local_path_ = EnsureTrailingSlash(path_formatter_(temp_dir_->path().ToString())); @@ -494,9 +574,4 @@ TYPED_TEST(TestLocalFS, StressGetFileInfoGenerator) { } } -// TODO Should we test backslash paths on Windows? -// SubTreeFileSystem isn't compatible with them. - -} // namespace internal -} // namespace fs -} // namespace arrow +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h index 32d06e5910dfe..5626560e08363 100644 --- a/cpp/src/arrow/filesystem/mockfs.h +++ b/cpp/src/arrow/filesystem/mockfs.h @@ -26,9 +26,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace fs { -namespace internal { +namespace arrow::fs::internal { struct MockDirInfo { std::string full_path; @@ -68,16 +66,21 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; - // XXX It's not very practical to have to explicitly declare inheritance - // of default overrides. + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -92,10 +95,10 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; // Contents-dumping helpers to ease testing. // Output is lexicographically-ordered by full path. @@ -128,6 +131,4 @@ class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem { FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; }; -} // namespace internal -} // namespace fs -} // namespace arrow +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 9c895ae76c7b8..4e9628d3dc241 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -344,7 +344,7 @@ bool IsLikelyUri(std::string_view v) { // with 36 characters. return false; } - return ::arrow::internal::IsValidUriScheme(v.substr(0, pos)); + return ::arrow::util::IsValidUriScheme(v.substr(0, pos)); } struct Globber::Impl { diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 5fefe6b7cb016..f1ffc5ec816ae 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -132,8 +132,8 @@ namespace arrow { using internal::TaskGroup; using internal::ToChars; -using internal::Uri; using io::internal::SubmitIO; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 13a0abde32318..82d08bc5ea89a 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -51,7 +51,7 @@ struct ARROW_EXPORT S3ProxyOptions { /// Initialize from URI such as http://username:password@host:port /// or http://host:port static Result FromUri(const std::string& uri); - static Result FromUri(const ::arrow::internal::Uri& uri); + static Result FromUri(const ::arrow::util::Uri& uri); bool Equals(const S3ProxyOptions& other) const; }; @@ -232,7 +232,7 @@ struct ARROW_EXPORT S3Options { /// generate temporary credentials. static S3Options FromAssumeRoleWithWebIdentity(); - static Result FromUri(const ::arrow::internal::Uri& uri, + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path = NULLPTR); static Result FromUri(const std::string& uri, std::string* out_path = NULLPTR); @@ -258,19 +258,24 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::DeleteDirContentsAsync; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; - Future<> DeleteDirContentsAsync(const std::string& path, - bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; + Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -312,11 +317,11 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { /// implementing your own background execution strategy. Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; /// Create a S3FileSystem instance from the given options. static Result> Make( diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc index bbb3c32ee6bd2..9322beb45ead2 100644 --- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc @@ -48,8 +48,7 @@ DEFINE_string(region, "", "AWS region"); DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')"); DEFINE_string(scheme, "https", "Connection scheme"); -namespace arrow { -namespace fs { +namespace arrow::fs { #define ASSERT_RAISES_PRINT(context_msg, error_type, expr) \ do { \ @@ -247,8 +246,7 @@ void TestMain(int argc, char** argv) { ASSERT_OK(FinalizeS3()); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs int main(int argc, char** argv) { std::stringstream ss; diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index ad7aaa1bd43cf..b581be735026f 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -75,8 +75,8 @@ namespace fs { using ::arrow::internal::checked_pointer_cast; using ::arrow::internal::PlatformFilename; using ::arrow::internal::ToChars; -using ::arrow::internal::UriEscape; using ::arrow::internal::Zip; +using ::arrow::util::UriEscape; using ::arrow::fs::internal::ConnectRetryStrategy; using ::arrow::fs::internal::ErrorToStatus; diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index 8747f9683b90f..d69f6c896d08e 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -30,7 +30,7 @@ namespace arrow { using internal::StatusDetailFromErrno; -using internal::Uri; +using util::Uri; namespace fs { namespace internal { diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index 96cc5178a9f31..74ddf015432d8 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -28,7 +28,7 @@ #include "arrow/util/visibility.h" namespace arrow { -using internal::Uri; +using util::Uri; namespace fs { namespace internal { diff --git a/cpp/src/arrow/flight/cookie_internal.cc b/cpp/src/arrow/flight/cookie_internal.cc index 921f017313611..8f41106ebce5c 100644 --- a/cpp/src/arrow/flight/cookie_internal.cc +++ b/cpp/src/arrow/flight/cookie_internal.cc @@ -159,8 +159,8 @@ CookiePair Cookie::ParseCookieAttribute(const std::string& cookie_header_value, } // Key/Value may be URI-encoded. - out_key = arrow::internal::UriUnescape(out_key); - out_value = arrow::internal::UriUnescape(out_value); + out_key = arrow::util::UriUnescape(out_key); + out_value = arrow::util::UriUnescape(out_value); // Strip outer quotes on the value. if (out_value.size() >= 2 && out_value[0] == '"' && diff --git a/cpp/src/arrow/flight/transport.h b/cpp/src/arrow/flight/transport.h index ee7bd01720730..4029aa5223deb 100644 --- a/cpp/src/arrow/flight/transport.h +++ b/cpp/src/arrow/flight/transport.h @@ -168,7 +168,7 @@ class ARROW_FLIGHT_EXPORT ClientTransport { /// Initialize the client. virtual Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) = 0; + const arrow::util::Uri& uri) = 0; /// Close the client. Once this returns, the client is no longer usable. virtual Status Close() = 0; diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc index f7612759e8de6..f799ba761c40d 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc @@ -684,13 +684,13 @@ class GrpcClientImpl : public internal::ClientTransport { } Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) override { + const arrow::util::Uri& uri) override { const std::string& scheme = location.scheme(); std::stringstream grpc_uri; std::shared_ptr<::grpc::ChannelCredentials> creds; if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) { - grpc_uri << arrow::internal::UriEncodeHost(uri.host()) << ':' << uri.port_text(); + grpc_uri << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text(); if (scheme == kSchemeGrpcTls) { if (options.disable_server_verification) { diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc index a9780b5eeb77e..28fc736aa0088 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc @@ -575,8 +575,7 @@ class GrpcServerTransport : public internal::ServerTransport { new GrpcServerTransport(base, std::move(memory_manager))); } - Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) override { + Status Init(const FlightServerOptions& options, const arrow::util::Uri& uri) override { grpc_service_.reset( new GrpcServiceHandler(options.auth_handler, options.middleware, this)); @@ -588,7 +587,7 @@ class GrpcServerTransport : public internal::ServerTransport { int port = 0; if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) { std::stringstream address; - address << arrow::internal::UriEncodeHost(uri.host()) << ':' << uri.port_text(); + address << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text(); std::shared_ptr<::grpc::ServerCredentials> creds; if (scheme == kSchemeGrpcTls) { @@ -635,12 +634,10 @@ class GrpcServerTransport : public internal::ServerTransport { if (scheme == kSchemeGrpcTls) { ARROW_ASSIGN_OR_RAISE( - location_, - Location::ForGrpcTls(arrow::internal::UriEncodeHost(uri.host()), port)); + location_, Location::ForGrpcTls(arrow::util::UriEncodeHost(uri.host()), port)); } else if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp) { ARROW_ASSIGN_OR_RAISE( - location_, - Location::ForGrpcTcp(arrow::internal::UriEncodeHost(uri.host()), port)); + location_, Location::ForGrpcTcp(arrow::util::UriEncodeHost(uri.host()), port)); } return Status::OK(); } diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc index cd9ddaa85a6f6..32c2fd776f32b 100644 --- a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc +++ b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc @@ -77,7 +77,7 @@ class ClientConnection { ARROW_DEFAULT_MOVE_AND_ASSIGN(ClientConnection); ~ClientConnection() { DCHECK(!driver_) << "Connection was not closed!"; } - Status Init(std::shared_ptr ucp_context, const arrow::internal::Uri& uri) { + Status Init(std::shared_ptr ucp_context, const arrow::util::Uri& uri) { auto status = InitImpl(std::move(ucp_context), uri); // Clean up after-the-fact if we fail to initialize if (!status.ok()) { @@ -91,8 +91,7 @@ class ClientConnection { return status; } - Status InitImpl(std::shared_ptr ucp_context, - const arrow::internal::Uri& uri) { + Status InitImpl(std::shared_ptr ucp_context, const arrow::util::Uri& uri) { { ucs_status_t status; ucp_worker_params_t worker_params; @@ -521,7 +520,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport { } Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) override { + const arrow::util::Uri& uri) override { RETURN_NOT_OK(uri_.Parse(uri.ToString())); { ucp_config_t* ucp_config; @@ -721,7 +720,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport { private: static constexpr size_t kMaxOpenConnections = 3; - arrow::internal::Uri uri_; + arrow::util::Uri uri_; std::shared_ptr ucp_context_; std::mutex connections_mutex_; std::deque connections_; diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_server.cc b/cpp/src/arrow/flight/transport/ucx/ucx_server.cc index b20f8f286e3bc..cb9c8948ccf1e 100644 --- a/cpp/src/arrow/flight/transport/ucx/ucx_server.cc +++ b/cpp/src/arrow/flight/transport/ucx/ucx_server.cc @@ -198,8 +198,7 @@ class UcxServerImpl : public arrow::flight::internal::ServerTransport { } } - Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) override { + Status Init(const FlightServerOptions& options, const arrow::util::Uri& uri) override { const auto max_threads = std::max(8, std::thread::hardware_concurrency()); ARROW_ASSIGN_OR_RAISE(rpc_pool_, arrow::internal::ThreadPool::Make(max_threads)); diff --git a/cpp/src/arrow/flight/transport/ucx/util_internal.cc b/cpp/src/arrow/flight/transport/ucx/util_internal.cc index acaa4f5872343..2db7d4e2630ff 100644 --- a/cpp/src/arrow/flight/transport/ucx/util_internal.cc +++ b/cpp/src/arrow/flight/transport/ucx/util_internal.cc @@ -50,7 +50,7 @@ ucs_status_t FlightUcxStatusDetail::Unwrap(const Status& status) { return dynamic_cast(status.detail().get())->status_; } -arrow::Result UriToSockaddr(const arrow::internal::Uri& uri, +arrow::Result UriToSockaddr(const arrow::util::Uri& uri, struct sockaddr_storage* addr) { std::string host = uri.host(); if (host.empty()) { diff --git a/cpp/src/arrow/flight/transport/ucx/util_internal.h b/cpp/src/arrow/flight/transport/ucx/util_internal.h index 84e84ba071154..958868d59d4f5 100644 --- a/cpp/src/arrow/flight/transport/ucx/util_internal.h +++ b/cpp/src/arrow/flight/transport/ucx/util_internal.h @@ -71,7 +71,7 @@ static inline bool IsIgnorableDisconnectError(ucs_status_t ucs_status) { /// /// \return The length of the sockaddr ARROW_FLIGHT_EXPORT -arrow::Result UriToSockaddr(const arrow::internal::Uri& uri, +arrow::Result UriToSockaddr(const arrow::util::Uri& uri, struct sockaddr_storage* addr); ARROW_FLIGHT_EXPORT diff --git a/cpp/src/arrow/flight/transport_server.h b/cpp/src/arrow/flight/transport_server.h index 51105a89304f4..8e5fe3e710c13 100644 --- a/cpp/src/arrow/flight/transport_server.h +++ b/cpp/src/arrow/flight/transport_server.h @@ -69,7 +69,7 @@ class ARROW_FLIGHT_EXPORT ServerTransport { /// This method should launch the server in a background thread, i.e. it /// should not block. Once this returns, the server should be active. virtual Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) = 0; + const arrow::util::Uri& uri) = 0; /// \brief Shutdown the server. /// /// This should wait for active RPCs to finish. Once this returns, the diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index 11b2baafad220..6682e300d19c9 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -821,7 +821,7 @@ arrow::Result CloseSessionResult::Deserialize( return out; } -Location::Location() { uri_ = std::make_shared(); } +Location::Location() { uri_ = std::make_shared(); } arrow::Result Location::Parse(const std::string& uri_string) { Location location; diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index 4b17149aa2d46..7dd707d3e2fa2 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -52,11 +52,11 @@ class DictionaryMemo; } // namespace ipc -namespace internal { +namespace util { class Uri; -} // namespace internal +} // namespace util namespace flight { @@ -466,7 +466,7 @@ struct ARROW_FLIGHT_EXPORT Location { private: friend class FlightClient; friend class FlightServerBase; - std::shared_ptr uri_; + std::shared_ptr uri_; }; /// \brief A flight ticket and list of locations where the ticket can be diff --git a/cpp/src/arrow/io/hdfs_internal.cc b/cpp/src/arrow/io/hdfs_internal.cc index 4592392b806b5..5619dd2435acc 100644 --- a/cpp/src/arrow/io/hdfs_internal.cc +++ b/cpp/src/arrow/io/hdfs_internal.cc @@ -37,6 +37,7 @@ #include #include #include +#include "arrow/util/basic_decimal.h" #ifndef _WIN32 #include @@ -51,53 +52,27 @@ namespace arrow { using internal::GetEnvVarNative; using internal::PlatformFilename; -#ifdef _WIN32 -using internal::WinErrorMessage; -#endif -namespace io { -namespace internal { +namespace io::internal { namespace { -void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) { - if (handle == NULL) return NULL; -#ifndef _WIN32 - return dlsym(handle, symbol); -#else +template +Status SetSymbol(void* handle, char const* name, T** symbol) { + if (*symbol != nullptr) return Status::OK(); - void* ret = reinterpret_cast(GetProcAddress(handle, symbol)); - if (ret == NULL) { - // logstream(LOG_INFO) << "GetProcAddress error: " - // << get_last_err_str(GetLastError()) << std::endl; + auto maybe_symbol = ::arrow::internal::GetSymbolAs(handle, name); + if (Required || maybe_symbol.ok()) { + ARROW_ASSIGN_OR_RAISE(*symbol, maybe_symbol); } - return ret; -#endif + return Status::OK(); } -#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ - do { \ - if (!SHIM->SYMBOL_NAME) { \ - *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ - GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ - } \ - if (!SHIM->SYMBOL_NAME) \ - return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \ - } while (0) - -#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ - if (!SHIM->SYMBOL_NAME) { \ - *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ - GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ - } +#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ + RETURN_NOT_OK(SetSymbol(SHIM->handle, #SYMBOL_NAME, &SHIM->SYMBOL_NAME)); -LibraryHandle libjvm_handle = nullptr; - -// Helper functions for dlopens -Result> get_potential_libjvm_paths(); -Result> get_potential_libhdfs_paths(); -Result try_dlopen(const std::vector& potential_paths, - const char* name); +#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ + DCHECK_OK(SetSymbol(SHIM->handle, #SYMBOL_NAME, &SHIM->SYMBOL_NAME)); Result> MakeFilenameVector( const std::vector& names) { @@ -244,46 +219,18 @@ Result> get_potential_libjvm_paths() { return potential_paths; } -#ifndef _WIN32 -Result try_dlopen(const std::vector& potential_paths, - const char* name) { - std::string error_message = "unknown error"; - LibraryHandle handle; - - for (const auto& p : potential_paths) { - handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL); - - if (handle != NULL) { - return handle; - } else { - const char* err_msg = dlerror(); - if (err_msg != NULL) { - error_message = err_msg; - } - } - } - - return Status::IOError("Unable to load ", name, ": ", error_message); -} - -#else -Result try_dlopen(const std::vector& potential_paths, - const char* name) { - std::string error_message; - LibraryHandle handle; +Result try_dlopen(const std::vector& potential_paths, + std::string name) { + std::string error_message = "Unable to load " + name; for (const auto& p : potential_paths) { - handle = LoadLibraryW(p.ToNative().c_str()); - if (handle != NULL) { - return handle; - } else { - error_message = WinErrorMessage(GetLastError()); - } + auto maybe_handle = arrow::internal::LoadDynamicLibrary(p); + if (maybe_handle.ok()) return *maybe_handle; + error_message += "\n" + maybe_handle.status().message(); } - return Status::IOError("Unable to load ", name, ": ", error_message); + return Status(StatusCode::IOError, std::move(error_message)); } -#endif // _WIN32 LibHdfsShim libhdfs_shim; @@ -335,7 +282,7 @@ Status ConnectLibHdfs(LibHdfsShim** driver) { shim->Initialize(); ARROW_ASSIGN_OR_RAISE(auto libjvm_potential_paths, get_potential_libjvm_paths()); - ARROW_ASSIGN_OR_RAISE(libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm")); + RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm")); ARROW_ASSIGN_OR_RAISE(auto libhdfs_potential_paths, get_potential_libhdfs_paths()); ARROW_ASSIGN_OR_RAISE(shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs")); @@ -350,7 +297,7 @@ Status ConnectLibHdfs(LibHdfsShim** driver) { /////////////////////////////////////////////////////////////////////////// // HDFS thin wrapper methods -hdfsBuilder* LibHdfsShim::NewBuilder(void) { return this->hdfsNewBuilder(); } +hdfsBuilder* LibHdfsShim::NewBuilder() { return this->hdfsNewBuilder(); } void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { this->hdfsBuilderSetNameNode(bld, nn); @@ -426,26 +373,29 @@ int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { return this->hdfsFlush(fs, fi int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { GET_SYMBOL(this, hdfsAvailable); - if (this->hdfsAvailable) + if (this->hdfsAvailable) { return this->hdfsAvailable(fs, file); - else + } else { return 0; + } } int LibHdfsShim::Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { GET_SYMBOL(this, hdfsCopy); - if (this->hdfsCopy) + if (this->hdfsCopy) { return this->hdfsCopy(srcFS, src, dstFS, dst); - else + } else { return 0; + } } int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { GET_SYMBOL(this, hdfsMove); - if (this->hdfsMove) + if (this->hdfsMove) { return this->hdfsMove(srcFS, src, dstFS, dst); - else + } else { return 0; + } } int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { @@ -454,10 +404,11 @@ int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { GET_SYMBOL(this, hdfsRename); - if (this->hdfsRename) + if (this->hdfsRename) { return this->hdfsRename(fs, oldPath, newPath); - else + } else { return 0; + } } char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { @@ -465,7 +416,7 @@ char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSiz if (this->hdfsGetWorkingDirectory) { return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); } else { - return NULL; + return nullptr; } } @@ -509,7 +460,7 @@ char*** LibHdfsShim::GetHosts(hdfsFS fs, const char* path, tOffset start, if (this->hdfsGetHosts) { return this->hdfsGetHosts(fs, path, start, length); } else { - return NULL; + return nullptr; } } @@ -551,6 +502,5 @@ int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { } } -} // namespace internal -} // namespace io +} // namespace io::internal } // namespace arrow diff --git a/cpp/src/arrow/io/hdfs_internal.h b/cpp/src/arrow/io/hdfs_internal.h index 590e3a4835932..4b6b4884c00c9 100644 --- a/cpp/src/arrow/io/hdfs_internal.h +++ b/cpp/src/arrow/io/hdfs_internal.h @@ -33,18 +33,11 @@ namespace arrow { class Status; -namespace io { -namespace internal { - -#ifndef _WIN32 -typedef void* LibraryHandle; -#else -typedef HINSTANCE LibraryHandle; -#endif +namespace io::internal { // NOTE(wesm): cpplint does not like use of short and other imprecise C types struct LibHdfsShim { - LibraryHandle handle; + void* handle; hdfsBuilder* (*hdfsNewBuilder)(void); void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); @@ -217,6 +210,5 @@ struct LibHdfsShim { // TODO(wesm): Remove these exports when we are linking statically ARROW_EXPORT Status ConnectLibHdfs(LibHdfsShim** driver); -} // namespace internal -} // namespace io +} // namespace io::internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index b693336e09921..096aab624c843 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -116,11 +116,13 @@ #include #endif -namespace arrow { - -using internal::checked_cast; +#ifdef _WIN32 +#include +#else +#include +#endif -namespace internal { +namespace arrow::internal { namespace { @@ -2215,5 +2217,59 @@ int64_t GetTotalMemoryBytes() { #endif } -} // namespace internal -} // namespace arrow +Result LoadDynamicLibrary(const char* path) { +#ifdef _WIN32 + ARROW_ASSIGN_OR_RAISE(auto platform_path, PlatformFilename::FromString(path)); + return LoadDynamicLibrary(platform_path); +#else + constexpr int kFlags = + // All undefined symbols in the shared object are resolved before dlopen() returns. + RTLD_NOW + // Symbols defined in this shared object are not made available to + // resolve references in subsequently loaded shared objects. + | RTLD_LOCAL; + if (void* handle = dlopen(path, kFlags)) return handle; + // dlopen(3) man page: "If dlopen() fails for any reason, it returns NULL." + // There is no null-returning non-error condition. + auto* error = dlerror(); + return Status::IOError("dlopen(", path, ") failed: ", error ? error : "unknown error"); +#endif +} + +Result LoadDynamicLibrary(const PlatformFilename& path) { +#ifdef _WIN32 + if (void* handle = LoadLibraryW(path.ToNative().c_str())) { + return handle; + } + // win32 api doc: "If the function fails, the return value is NULL." + // There is no null-returning non-error condition. + return IOErrorFromWinError(GetLastError(), "LoadLibrary(", path.ToString(), + ") failed."); +#else + return LoadDynamicLibrary(path.ToNative().c_str()); +#endif +} + +Result GetSymbol(void* handle, const char* name) { + if (handle == nullptr) { + return Status::Invalid("Attempting to retrieve symbol '", name, + "' from null library handle"); + } +#ifdef _WIN32 + if (void* sym = reinterpret_cast( + GetProcAddress(reinterpret_cast(handle), name))) { + return sym; + } + // win32 api doc: "If the function fails, the return value is NULL." + // There is no null-returning non-error condition. + return IOErrorFromWinError(GetLastError(), "GetProcAddress(", name, ") failed."); +#else + if (void* sym = dlsym(handle, name)) return sym; + // dlsym(3) man page: "On failure, they return NULL" + // There is no null-returning non-error condition. + auto* error = dlerror(); + return Status::IOError("dlsym(", name, ") failed: ", error ? error : "unknown error"); +#endif +} + +} // namespace arrow::internal diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index bba71c0d80ad4..5f5bbd169e2eb 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -29,16 +29,16 @@ #include #if ARROW_HAVE_SIGACTION -#include // Needed for struct sigaction +#include // Needed for struct sigaction #endif +#include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace internal { +namespace arrow::internal { // NOTE: 8-bit path strings on Windows are encoded using UTF-8. // Using MBCS would fail encoding some paths. @@ -338,7 +338,7 @@ class ARROW_EXPORT TemporaryDir { class ARROW_EXPORT SignalHandler { public: - typedef void (*Callback)(int); + using Callback = void (*)(int); SignalHandler(); explicit SignalHandler(Callback cb); @@ -419,5 +419,34 @@ int64_t GetCurrentRSS(); ARROW_EXPORT int64_t GetTotalMemoryBytes(); -} // namespace internal -} // namespace arrow +/// \brief Load a dynamic library +/// +/// This wraps dlopen() except on Windows, where LoadLibrary() is called. +/// These two platforms handle absolute paths consistently; relative paths +/// or the library's bare name may be handled but inconsistently. +/// +/// \return An opaque handle for the dynamic library, which can be used for +/// subsequent symbol lookup. Nullptr will never be returned; instead +/// an error will be raised. +ARROW_EXPORT Result LoadDynamicLibrary(const PlatformFilename& path); + +/// \brief Load a dynamic library +/// +/// An overload taking null terminated string. +ARROW_EXPORT Result LoadDynamicLibrary(const char* path); + +/// \brief Retrieve a symbol by name from a library handle. +/// +/// This wraps dlsym() except on Windows, where GetProcAddress() is called. +/// +/// \return The address associated with the named symbol. Nullptr will never be +/// returned; instead an error will be raised. +ARROW_EXPORT Result GetSymbol(void* handle, const char* name); + +template +Result GetSymbolAs(void* handle, const char* name) { + ARROW_ASSIGN_OR_RAISE(void* sym, GetSymbol(handle, name)); + return reinterpret_cast(sym); +} + +} // namespace arrow::internal diff --git a/cpp/src/arrow/util/type_fwd.h b/cpp/src/arrow/util/type_fwd.h index 6d904f19b11b5..3174881f4d018 100644 --- a/cpp/src/arrow/util/type_fwd.h +++ b/cpp/src/arrow/util/type_fwd.h @@ -64,6 +64,7 @@ class AsyncTaskScheduler; class Compressor; class Decompressor; class Codec; +class Uri; } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/uri.cc b/cpp/src/arrow/util/uri.cc index b291ee3d7f1f2..9c0f7f9a59630 100644 --- a/cpp/src/arrow/util/uri.cc +++ b/cpp/src/arrow/util/uri.cc @@ -27,8 +27,7 @@ #include "arrow/util/value_parsing.h" #include "arrow/vendored/uriparser/Uri.h" -namespace arrow { -namespace internal { +namespace arrow::util { namespace { @@ -111,7 +110,7 @@ bool IsValidUriScheme(std::string_view s) { } struct Uri::Impl { - Impl() : string_rep_(""), port_(-1) { memset(&uri_, 0, sizeof(uri_)); } + Impl() { memset(&uri_, 0, sizeof(uri_)); } ~Impl() { uriFreeUriMembersA(&uri_); } @@ -133,7 +132,7 @@ struct Uri::Impl { // Keep alive strings that uriparser stores pointers to std::vector data_; std::string string_rep_; - int32_t port_; + int32_t port_ = -1; std::vector path_segments_; bool is_file_uri_; bool is_absolute_path_; @@ -141,7 +140,7 @@ struct Uri::Impl { Uri::Uri() : impl_(new Impl) {} -Uri::~Uri() {} +Uri::~Uri() = default; Uri::Uri(Uri&& u) : impl_(std::move(u.impl_)) {} @@ -169,21 +168,19 @@ int32_t Uri::port() const { return impl_->port_; } std::string Uri::username() const { auto userpass = TextRangeToView(impl_->uri_.userInfo); auto sep_pos = userpass.find_first_of(':'); - if (sep_pos == std::string_view::npos) { - return UriUnescape(userpass); - } else { - return UriUnescape(userpass.substr(0, sep_pos)); + if (sep_pos != std::string_view::npos) { + userpass = userpass.substr(0, sep_pos); } + return UriUnescape(userpass); } std::string Uri::password() const { auto userpass = TextRangeToView(impl_->uri_.userInfo); auto sep_pos = userpass.find_first_of(':'); if (sep_pos == std::string_view::npos) { - return std::string(); - } else { - return UriUnescape(userpass.substr(sep_pos + 1)); + return ""; } + return UriUnescape(userpass.substr(sep_pos + 1)); } std::string Uri::path() const { @@ -301,7 +298,8 @@ Status Uri::Parse(const std::string& uri_string) { auto port_text = TextRangeToView(impl_->uri_.portText); if (port_text.size()) { uint16_t port_num; - if (!ParseValue(port_text.data(), port_text.size(), &port_num)) { + if (!::arrow::internal::ParseValue(port_text.data(), port_text.size(), + &port_num)) { return Status::Invalid("Invalid port number '", port_text, "' in URI '", uri_string, "'"); } @@ -311,6 +309,12 @@ Status Uri::Parse(const std::string& uri_string) { return Status::OK(); } +Result Uri::FromString(const std::string& uri_string) { + Uri uri; + ARROW_RETURN_NOT_OK(uri.Parse(uri_string)); + return uri; +} + Result UriFromAbsolutePath(std::string_view path) { if (path.empty()) { return Status::Invalid( @@ -336,5 +340,4 @@ Result UriFromAbsolutePath(std::string_view path) { return out; } -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri.h b/cpp/src/arrow/util/uri.h index 855a61408da99..74dbe924ff237 100644 --- a/cpp/src/arrow/util/uri.h +++ b/cpp/src/arrow/util/uri.h @@ -27,8 +27,7 @@ #include "arrow/type_fwd.h" #include "arrow/util/visibility.h" -namespace arrow { -namespace internal { +namespace arrow::util { /// \brief A parsed URI class ARROW_EXPORT Uri { @@ -86,6 +85,9 @@ class ARROW_EXPORT Uri { /// Factory function to parse a URI from its string representation. Status Parse(const std::string& uri_string); + /// Factory function to parse a URI from its string representation. + static Result FromString(const std::string& uri_string); + private: struct Impl; std::unique_ptr impl_; @@ -114,5 +116,4 @@ bool IsValidUriScheme(std::string_view s); ARROW_EXPORT Result UriFromAbsolutePath(std::string_view path); -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri_test.cc b/cpp/src/arrow/util/uri_test.cc index 4293dc73b01cb..36e09b1b2e879 100644 --- a/cpp/src/arrow/util/uri_test.cc +++ b/cpp/src/arrow/util/uri_test.cc @@ -26,8 +26,7 @@ #include "arrow/util/logging.h" #include "arrow/util/uri.h" -namespace arrow { -namespace internal { +namespace arrow::util { TEST(UriEscape, Basics) { ASSERT_EQ(UriEscape(""), ""); @@ -371,5 +370,4 @@ TEST(UriFromAbsolutePath, Basics) { #endif } -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/docs/source/cpp/api/filesystem.rst b/docs/source/cpp/api/filesystem.rst index 8132af42e2495..4ab6664e6d681 100644 --- a/docs/source/cpp/api/filesystem.rst +++ b/docs/source/cpp/api/filesystem.rst @@ -19,6 +19,8 @@ Filesystems =========== +.. _cpp-api-filesystems: + Interface ========= @@ -33,12 +35,20 @@ Interface .. doxygenclass:: arrow::fs::FileSystem :members: -High-level factory function -=========================== +.. doxygenfunction:: arrow::fs::EnsureFinalized() + +High-level factory functions +============================ .. doxygengroup:: filesystem-factories :content-only: +Factory registration functions +============================== + +.. doxygengroup:: filesystem-factory-registration + :content-only: + Concrete implementations ======================== diff --git a/docs/source/cpp/io.rst b/docs/source/cpp/io.rst index 28ab5d783a23d..7febeb53498ae 100644 --- a/docs/source/cpp/io.rst +++ b/docs/source/cpp/io.rst @@ -73,6 +73,9 @@ The :class:`filesystem interface ` allows abstracted access over various data storage backends such as the local filesystem or a S3 bucket. It provides input and output streams as well as directory operations. +.. seealso:: + :ref:`FileSystems API reference `. + The filesystem interface exposes a simplified view of the underlying data storage. Data paths are represented as *abstract paths*, which are ``/``-separated, even on Windows, and shouldn't include special path @@ -81,7 +84,14 @@ underlying storage, are automatically dereferenced. Only basic :class:`metadata ` about file entries, such as the file size and modification time, is made available. -Concrete implementations are available for +Filesystem instances are constructed from URI strings using one of the +`High-level factory functions`_, which dispatch to implementation-specific +factories based on the URI's ``scheme``. Other properties for the new instance +are extracted from the URI's other properties such as the ``hostname``, ``username``, +etc. Arrow supports runtime registration of new filesystems, and provides built-in +support for several filesystems. + +Which built-in filesystems are supported is configured at build time and may include :class:`local filesystem access `, :class:`HDFS `, :class:`Amazon S3-compatible storage ` and @@ -91,4 +101,42 @@ Concrete implementations are available for Tasks that use filesystems will typically run on the :ref:`I/O thread pool`. For filesystems that support high levels - of concurrency you may get a benefit from increasing the size of the I/O thread pool. \ No newline at end of file + of concurrency you may get a benefit from increasing the size of the I/O thread pool. + +Defining new FileSystems +======================== + +Build complexity can be decreased by compartmentalizing a FileSystem +implementation into a separate shared library, which applications may +link or load dynamically. Arrow's built-in FileSystem implementations +also follow this pattern. Before a scheme can be used with any of the +`High-level factory functions`_ the library which contains it must be +loaded, requiring a call to :func:`~arrow::fs::LoadFileSystemFactories` +if the library is not linked to the application. + +Factories for new URI schemes are registered with +:func:`~arrow:fs::RegisterFileSystemFactory`. However an explicit +function call requires the consumer to probe libraries for symbols and +invoke these correctly. It is usually preferred that registration be +an automatic addendum to the library's loading. An instance of +:class:`~arrow::fs::FileSystemRegistrar` can be defined at namespace +scope inside the library to accomplish this automatic registration: + +.. code-block:: cpp + + arrow::fs::FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + EnsureExampleFileSystemInitialized(); + return std::make_shared(); + }, + &EnsureExampleFileSystemFinalized, + }; + +If a filesystem implementation requires initialization before any instances +may be constructed, this should be included in the corresponding factory or +otherwise automatically ensured before the factory is invoked. Likewise if +a filesystem implementation requires tear down before the process ends, this +can be wrapped in a function and registered alongside the factory. All +finalizers will be called by :func:`~arrow::fs::EnsureFinalized`. diff --git a/python/pyarrow/src/arrow/python/filesystem.h b/python/pyarrow/src/arrow/python/filesystem.h index 003fd5cb80551..194b226ac5c35 100644 --- a/python/pyarrow/src/arrow/python/filesystem.h +++ b/python/pyarrow/src/arrow/python/filesystem.h @@ -26,9 +26,7 @@ #include "arrow/python/visibility.h" #include "arrow/util/macros.h" -namespace arrow { -namespace py { -namespace fs { +namespace arrow::py::fs { class ARROW_PYTHON_EXPORT PyFileSystemVtable { public: @@ -83,16 +81,24 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { bool Equals(const FileSystem& other) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo( const std::vector& paths) override; Result> GetFileInfo( const arrow::fs::FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -107,10 +113,10 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result NormalizePath(std::string path) override; @@ -121,6 +127,4 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { PyFileSystemVtable vtable_; }; -} // namespace fs -} // namespace py -} // namespace arrow +} // namespace arrow::py::fs