diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7f2f7812e3cd5..8fc120c780764 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -877,7 +877,7 @@ add_dependencies(arrow_test_dependencies toolchain-tests) if(ARROW_STATIC_LINK_LIBS) add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS}) - if(ARROW_HDFS OR ARROW_ORC) + if(ARROW_FILESYSTEM OR ARROW_ORC) if(NOT MSVC_TOOLCHAIN) list(APPEND ARROW_STATIC_LINK_LIBS ${CMAKE_DL_LIBS}) list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS ${CMAKE_DL_LIBS}) diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt index 4625f130565e7..5652fff834e70 100644 --- a/cpp/examples/arrow/CMakeLists.txt +++ b/cpp/examples/arrow/CMakeLists.txt @@ -186,3 +186,18 @@ if(ARROW_GANDIVA) endif() add_arrow_example(gandiva_example EXTRA_LINK_LIBS ${GANDIVA_EXAMPLE_LINK_LIBS}) endif() + +if(ARROW_FILESYSTEM) + add_library(filesystem_definition_example SHARED filesystem_definition_example.cc) + + if(ARROW_BUILD_SHARED) + target_link_libraries(filesystem_definition_example arrow_shared) + else() + target_link_libraries(filesystem_definition_example arrow_static) + endif() + + add_arrow_example(filesystem_usage_example) + target_compile_definitions(filesystem-usage-example + PUBLIC LIBPATH="$" + ) +endif() diff --git a/cpp/examples/arrow/filesystem_definition_example.cc b/cpp/examples/arrow/filesystem_definition_example.cc new file mode 100644 index 0000000000000..4afa11d69cab2 --- /dev/null +++ b/cpp/examples/arrow/filesystem_definition_example.cc @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +// Demonstrate registering a user-defined Arrow FileSystem outside +// of the Arrow source tree. + +using arrow::Result; +using arrow::Status; +namespace io = arrow::io; +namespace fs = arrow::fs; + +class ExampleFileSystem : public fs::FileSystem { + public: + explicit ExampleFileSystem(const io::IOContext& io_context) + : fs::FileSystem{io_context} {} + + // This is a mock filesystem whose root directory contains a single file. + // All operations which would mutate will simply raise an error. + static constexpr std::string_view kPath = "example_file"; + static constexpr std::string_view kContents = "hello world"; + static fs::FileInfo info() { + fs::FileInfo info; + info.set_path(std::string{kPath}); + info.set_type(fs::FileType::File); + info.set_size(kContents.size()); + return info; + } + + static Status DoesntExist(std::string_view path) { + return Status::IOError("Path does not exist '", path, "'"); + } + + static Status NoMutation() { + return Status::IOError("operations which would mutate are not permitted"); + } + + Result PathFromUri(const std::string& uri_string) const override { + ARROW_ASSIGN_OR_RAISE(auto uri, arrow::util::Uri::FromString(uri_string)); + return uri.path(); + } + + std::string type_name() const override { return "example"; } + + bool Equals(const FileSystem& other) const override { + return type_name() == other.type_name(); + } + + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + + Result GetFileInfo(const std::string& path) override { + if (path == kPath) { + return info(); + } + return DoesntExist(path); + } + + Result> GetFileInfo(const fs::FileSelector& select) override { + if (select.base_dir == "/" || select.base_dir == "") { + return std::vector{info()}; + } + if (select.allow_not_found) { + return std::vector{}; + } + return DoesntExist(select.base_dir); + } + + Status CreateDir(const std::string& path, bool recursive) override { + return NoMutation(); + } + + Status DeleteDir(const std::string& path) override { return NoMutation(); } + + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override { + return NoMutation(); + } + + Status DeleteRootDirContents() override { return NoMutation(); } + + Status DeleteFile(const std::string& path) override { return NoMutation(); } + + Status Move(const std::string& src, const std::string& dest) override { + return NoMutation(); + } + + Status CopyFile(const std::string& src, const std::string& dest) override { + return NoMutation(); + } + + Result> OpenInputStream( + const std::string& path) override { + return OpenInputFile(path); + } + + Result> OpenInputFile( + const std::string& path) override { + if (path == kPath) { + return io::BufferReader::FromString(std::string{kContents}); + } + return DoesntExist(path); + } + + Result> OpenOutputStream( + const std::string& path, + const std::shared_ptr& metadata) override { + return NoMutation(); + } + + Result> OpenAppendStream( + const std::string& path, + const std::shared_ptr& metadata) override { + return NoMutation(); + } +}; + +fs::FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const arrow::util::Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + auto fs = std::make_shared(io_context); + if (out_path) { + ARROW_ASSIGN_OR_RAISE(*out_path, fs->PathFromUri(uri.ToString())); + } + return fs; + }, +}; diff --git a/cpp/examples/arrow/filesystem_usage_example.cc b/cpp/examples/arrow/filesystem_usage_example.cc new file mode 100644 index 0000000000000..c32057cfa0c3a --- /dev/null +++ b/cpp/examples/arrow/filesystem_usage_example.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +namespace fs = arrow::fs; + +// Demonstrate dynamically loading a user-defined Arrow FileSystem + +arrow::Status Execute() { + ARROW_RETURN_NOT_OK(arrow::fs::LoadFileSystemFactories(LIBPATH)); + + std::string uri = "example:///example_file"; + std::cout << "Uri: " << uri << std::endl; + + std::string path; + ARROW_ASSIGN_OR_RAISE(auto fs, arrow::fs::FileSystemFromUri(uri, &path)); + std::cout << "Path: " << path << std::endl; + + fs::FileSelector sel; + sel.base_dir = "/"; + ARROW_ASSIGN_OR_RAISE(auto infos, fs->GetFileInfo(sel)); + + std::cout << "Root directory contains:" << std::endl; + for (const auto& info : infos) { + std::cout << "- " << info << std::endl; + } + return arrow::Status::OK(); +} + +int main() { + auto status = Execute(); + if (!status.ok()) { + std::cerr << "Error occurred : " << status.message() << std::endl; + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index c1fafeebc035d..199ae672e8080 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -231,6 +231,7 @@ set(ARROW_SRCS util/hashing.cc util/int_util.cc util/io_util.cc + util/library.cc util/list_util.cc util/logging.cc util/key_value_metadata.cc diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index bf7145ca9368d..4d5c6e8ce7a79 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -53,7 +53,7 @@ namespace dataset { namespace { /// Apply UriUnescape, then ensure the results are valid UTF-8. Result SafeUriUnescape(std::string_view encoded) { - auto decoded = ::arrow::internal::UriUnescape(encoded); + auto decoded = ::arrow::util::UriUnescape(encoded); if (!util::ValidateUTF8(decoded)) { return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ", encoded); @@ -755,7 +755,7 @@ Result HivePartitioning::FormatValues( // field_index <-> path nesting relation segments[i] = name + "=" + hive_options_.null_fallback; } else { - segments[i] = name + "=" + arrow::internal::UriEscape(values[i]->ToString()); + segments[i] = name + "=" + arrow::util::UriEscape(values[i]->ToString()); } } diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 1b71be15d19f5..9f0bd7c0be040 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -935,7 +935,7 @@ TEST_F(TestPartitioning, WriteHiveWithSlashesInValues) { "experiment/A/f.csv", "experiment/B/f.csv", "experiment/C/k.csv", "experiment/M/i.csv"}; for (auto partition : unique_partitions) { - encoded_paths.push_back("part=" + arrow::internal::UriEscape(partition)); + encoded_paths.push_back("part=" + arrow::util::UriEscape(partition)); } ASSERT_EQ(all_dirs.size(), encoded_paths.size()); diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 73f55c27ee834..f15f1a5527b7b 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -67,7 +67,7 @@ namespace arrow { using internal::checked_cast; using internal::StartsWith; using internal::ToChars; -using internal::UriFromAbsolutePath; +using util::UriFromAbsolutePath; namespace engine { @@ -463,7 +463,7 @@ Result FromProto(const substrait::Rel& rel, const ExtensionSet& } // Extract and parse the read relation's source URI - ::arrow::internal::Uri item_uri; + ::arrow::util::Uri item_uri; switch (item.path_type_case()) { case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath: RETURN_NOT_OK(item_uri.Parse(item.uri_path())); diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index a42a8d0f8c1b6..5b5a9b992c139 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -28,6 +28,14 @@ add_arrow_test(filesystem-test EXTRA_LABELS filesystem) +if(ARROW_BUILD_TESTS) + add_library(arrow_filesystem_example SHARED examplefs.cc) + target_link_libraries(arrow_filesystem_example arrow_${ARROW_TEST_LINKAGE}) + target_compile_definitions(arrow-filesystem-test + PUBLIC ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$" + ) +endif() + if(ARROW_BUILD_BENCHMARKS) add_arrow_benchmark(localfs_benchmark PREFIX diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 2a131e40c05bf..770f698818943 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -196,15 +196,23 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; @@ -246,11 +254,11 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; }; } // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/examplefs.cc b/cpp/src/arrow/filesystem/examplefs.cc new file mode 100644 index 0000000000000..9129b852808f6 --- /dev/null +++ b/cpp/src/arrow/filesystem/examplefs.cc @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/filesystem.h" +#include "arrow/result.h" +#include "arrow/util/uri.h" + +namespace arrow::fs { + +FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + constexpr std::string_view kScheme = "example"; + auto local_uri = "file" + uri.ToString().substr(kScheme.size()); + return FileSystemFromUri(local_uri, io_context, out_path); + }, +}; + +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 810e9c179b156..99b26d2dda9d2 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -15,12 +15,17 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include +#include #include +#include "arrow/type_fwd.h" #include "arrow/util/config.h" #include "arrow/filesystem/filesystem.h" +#include "arrow/util/library.h" #ifdef ARROW_HDFS #include "arrow/filesystem/hdfs.h" #endif @@ -51,11 +56,12 @@ namespace arrow { using internal::checked_pointer_cast; using internal::TaskHints; -using internal::Uri; using io::internal::SubmitIO; +using util::Uri; namespace fs { +using arrow::internal::GetEnvVar; using internal::ConcatAbstractPath; using internal::EnsureTrailingSlash; using internal::GetAbstractPathParent; @@ -128,7 +134,7 @@ std::string FileInfo::extension() const { ////////////////////////////////////////////////////////////////////////// // FileSystem default method implementations -FileSystem::~FileSystem() {} +FileSystem::~FileSystem() = default; Result FileSystem::NormalizePath(std::string path) { return path; } @@ -203,6 +209,10 @@ Future<> FileSystem::DeleteDirContentsAsync(const std::string& path, }); } +Future<> FileSystem::DeleteDirContentsAsync(const std::string& path) { + return DeleteDirContentsAsync(path, false); +} + Result> FileSystem::OpenInputStream( const FileInfo& info) { RETURN_NOT_OK(ValidateInputFileInfo(info)); @@ -279,7 +289,7 @@ SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path, base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()), base_fs_(base_fs) {} -SubTreeFileSystem::~SubTreeFileSystem() {} +SubTreeFileSystem::~SubTreeFileSystem() = default; Result SubTreeFileSystem::NormalizeBasePath( std::string base_path, const std::shared_ptr& base_fs) { @@ -674,6 +684,53 @@ Status CopyFiles(const std::shared_ptr& source_fs, return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); } +struct Registry { + std::shared_mutex mutex; + std::unordered_map scheme_to_factory; + std::vector finalizers; + bool finalized = false; +}; + +extern "C" ARROW_EXPORT auto* GetFileSystemFactoryRegistry() { + // Check if this function is the one linked to main + static Registry registry; + return ®istry; +} + +extern "C" { +ARROW_EXPORT void MergeFileSystemRegistry(void* main_registry) { + if (GetFileSystemFactoryRegistry() == main_registry) return; + + auto& [mutex, scheme_to_factory, finalizers, _f] = *GetFileSystemFactoryRegistry(); + auto& [main_mutex, main_scheme_to_factory, main_finalizers, _mf] = + *static_cast(main_registry); + + std::unique_lock lock{mutex}, main_lock{main_mutex}; + + for (auto& [scheme, factory] : scheme_to_factory) { + auto& ref = main_scheme_to_factory[scheme]; + if (ref) continue; + ref = factory; + } + scheme_to_factory.clear(); + + for (auto* finalizer : finalizers) { + main_finalizers.push_back(finalizer); + } + finalizers.clear(); +} +} + +Status LoadFileSystemFactories(const char* libpath) { + ARROW_ASSIGN_OR_RAISE(void* lib, util::LoadDynamicLibrary(libpath)); + + if (void* merge = util::GetSymbol(lib, "MergeFileSystemRegistry").ValueOr(nullptr)) { + reinterpret_cast(merge)(GetFileSystemFactoryRegistry()); + } + + return Status::OK(); +} + namespace { Result> FileSystemFromUriReal(const Uri& uri, @@ -682,6 +739,19 @@ Result> FileSystemFromUriReal(const Uri& uri, std::string* out_path) { const auto scheme = uri.scheme(); + { + auto& [mutex, scheme_to_factory, _, finalized] = *GetFileSystemFactoryRegistry(); + std::shared_lock lock{mutex}; + if (finalized) { + return Status::Invalid("FileSystem factories were already finalized!"); + } + + auto it = scheme_to_factory.find(scheme); + if (it != scheme_to_factory.end()) { + return it->second(uri, io_context, out_path); + } + } + if (scheme == "file") { std::string path; ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path)); @@ -737,6 +807,31 @@ Result> FileSystemFromUriReal(const Uri& uri, } // namespace +void RegisterFileSystemFactory(std::string scheme, FileSystem::Factory factory, + void finalizer()) { + auto& [mutex, scheme_to_factory, finalizers, finalized] = + *GetFileSystemFactoryRegistry(); + std::unique_lock lock{mutex}; + if (finalized) return; + + auto& ref = scheme_to_factory[scheme]; + if (ref) return; + ref = factory; + + finalizers.push_back(finalizer); +} + +void EnsureFinalized() { + auto& [mutex, scheme_to_factory, finalizers, finalized] = + *GetFileSystemFactoryRegistry(); + std::unique_lock lock{mutex}; + if (finalized) return; + + for (auto finalizer : finalizers) { + finalizer(); + } +} + Result> FileSystemFromUri(const std::string& uri_string, std::string* out_path) { return FileSystemFromUri(uri_string, io::default_io_context(), out_path); diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12c1..b846dad38bfbd 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -36,8 +36,15 @@ #include "arrow/util/windows_fixup.h" namespace arrow { + +namespace util { +class Uri; +} + namespace fs { +using arrow::util::Uri; + // A system clock time point expressed as a 64-bit (or more) number of // nanoseconds since the epoch. using TimePoint = @@ -156,7 +163,11 @@ struct IterationTraits { namespace fs { /// \brief Abstract file system API -class ARROW_EXPORT FileSystem : public std::enable_shared_from_this { +class ARROW_EXPORT FileSystem + /// \cond false + : public std::enable_shared_from_this +/// \endcond +{ // NOLINT public: virtual ~FileSystem(); @@ -225,7 +236,8 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Create a directory and subdirectories. /// /// This function succeeds if the directory already exists. - virtual Status CreateDir(const std::string& path, bool recursive = true) = 0; + virtual Status CreateDir(const std::string& path, bool recursive) = 0; + Status CreateDir(const std::string& path) { return CreateDir(path, true); } /// Delete a directory and its contents, recursively. virtual Status DeleteDir(const std::string& path) = 0; @@ -234,12 +246,18 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// /// Like DeleteDir, but doesn't delete the directory itself. /// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents. - virtual Status DeleteDirContents(const std::string& path, - bool missing_dir_ok = false) = 0; + virtual Status DeleteDirContents(const std::string& path, bool missing_dir_ok) = 0; + Status DeleteDirContents(const std::string& path) { + return DeleteDirContents(path, false); + } + + /// Async version of DeleteDirContents. + virtual Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok); /// Async version of DeleteDirContents. - virtual Future<> DeleteDirContentsAsync(const std::string& path, - bool missing_dir_ok = false); + /// + /// This overload allows missing directories. + Future<> DeleteDirContentsAsync(const std::string& path); /// EXPERIMENTAL: Delete the root directory's contents, recursively. /// @@ -272,6 +290,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input stream for sequential reading. virtual Result> OpenInputStream( const std::string& path) = 0; + /// Open an input stream for sequential reading. /// /// This override assumes the given FileInfo validly represents the file's @@ -282,6 +301,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input file for random access reading. virtual Result> OpenInputFile( const std::string& path) = 0; + /// Open an input file for random access reading. /// /// This override assumes the given FileInfo validly represents the file's @@ -293,6 +313,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Async version of OpenInputStream virtual Future> OpenInputStreamAsync( const std::string& path); + /// Async version of OpenInputStream virtual Future> OpenInputStreamAsync( const FileInfo& info); @@ -300,6 +321,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Async version of OpenInputFile virtual Future> OpenInputFileAsync( const std::string& path); + /// Async version of OpenInputFile virtual Future> OpenInputFileAsync( const FileInfo& info); @@ -324,6 +346,10 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this const std::shared_ptr& metadata) = 0; Result> OpenAppendStream(const std::string& path); + using Factory = Result>(const Uri& uri, + const io::IOContext& io_context, + std::string* out_path); + protected: explicit FileSystem(io::IOContext io_context = io::default_io_context()) : io_context_(std::move(io_context)) {} @@ -361,17 +387,22 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -399,13 +430,13 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: - SubTreeFileSystem() {} + SubTreeFileSystem() = default; const std::string base_path_; std::shared_ptr base_fs_; @@ -433,14 +464,21 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -458,16 +496,25 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { const FileInfo& info) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: std::shared_ptr base_fs_; std::shared_ptr latencies_; }; +/// \brief Ensure all registered filesystem implementations are finalized. +/// +/// Individual finalizers may wait for concurrent calls to finish so as to avoid +/// race conditions. After this function has been called, all filesystem APIs +/// will fail with an error. +/// +/// The user is responsible for synchronization of calls to this function. +void EnsureFinalized(); + /// \defgroup filesystem-factories Functions for creating FileSystem instances /// /// @{ @@ -477,6 +524,8 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", /// "gs" and "gcs". /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[out] out_path (optional) Path inside the filesystem. /// \return out_fs FileSystem instance. @@ -489,6 +538,8 @@ Result> FileSystemFromUri(const std::string& uri, /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", /// "gs" and "gcs". /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[in] io_context an IOContext which will be associated with the filesystem /// \param[out] out_path (optional) Path inside the filesystem. @@ -500,6 +551,8 @@ Result> FileSystemFromUri(const std::string& uri, /// \brief Create a new FileSystem by URI /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// Same as FileSystemFromUri, but in addition also recognize non-URIs /// and treat them as local filesystem paths. Only absolute local filesystem /// paths are allowed. @@ -509,6 +562,8 @@ Result> FileSystemFromUriOrPath( /// \brief Create a new FileSystem by URI with a custom IO context /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// Same as FileSystemFromUri, but in addition also recognize non-URIs /// and treat them as local filesystem paths. Only absolute local filesystem /// paths are allowed. @@ -519,6 +574,82 @@ Result> FileSystemFromUriOrPath( /// @} +/// \defgroup filesystem-factory-registration Helpers for FileSystem registration +/// +/// @{ + +/// \brief Register a Filesystem factory +/// +/// Support for custom Uri schemes can be added by registering a factory +/// for the corresponding FileSystem. +/// +/// \param[in] scheme a Uri scheme which the factory will handle. +/// If a factory has already been registered for a scheme, +/// the new factory will be ignored. +/// \param[in] factory a function which can produce a FileSystem for Uris which match +/// scheme. +/// \param[in] finalizer a function which must be called to finalize the factory before +/// the process exits, or nullptr if no finalization is necessary. +ARROW_EXPORT void RegisterFileSystemFactory(std::string scheme, + FileSystem::Factory factory, + void finalizer() = NULLPTR); + +/// \brief Register Filesystem factories from a shared library +/// +/// In addition to dynamically loading the indicated library, registries are merged if +/// necessary (static linkage can produce isolated registries). +ARROW_EXPORT Status LoadFileSystemFactories(const char* libpath); + +struct FileSystemRegistrar { + /// \brief Register a Filesystem factory at load time + /// + /// Support for custom Uri schemes can be added by registering a factory for the + /// corresponding FileSystem. An instance of this helper can be defined at namespace + /// scope to cause the factory to be registered at load time. + /// + /// Global constructors will finish execution before main() starts if the registrar is + /// linked into the same binary as main(), or before dlopen()/LoadLibrary() returns if + /// the library in which the registrar is defined is dynamically loaded. + /// + /// \code + /// FileSystemRegistrar kSlowFileSystemModule{ + /// "slow+file", + /// [](const Uri& uri, const io::IOContext& io_context, std::string* out_path) + /// ->Result> { + /// constexpr std::string_view kPrefix = "slow+"; + /// auto local_uri = uri.ToString().substr(kPrefix.size()); + /// ARROW_ASSIGN_OR_RAISE(auto base_fs, + /// FileSystemFromUri(local_uri, io_context, out_path)); + /// double average_latency = 1; + /// int32_t seed = 0xDEADBEEF; + /// ARROW_ASSIGN_OR_RAISE(auto params, uri.query_item()); + /// for (const auto& [key, value] : params) { + /// if (key == "average_latency") { + /// average_latency = std::stod(value); + /// } + /// if (key == "seed") { + /// seed = std::stoi(value, nullptr, /*base=*/16); + /// } + /// } + /// return std::make_shared(base_fs, average_latency, seed); + /// })); + /// \endcode + /// + /// \param[in] scheme a Uri scheme which the factory will handle. + /// If a factory has already been registered for a scheme, the + /// new factory will be ignored. + /// \param[in] factory a function which can produce a FileSystem for Uris which match + /// scheme. + /// \param[in] finalizer a function which must be called to finalize the factory before + /// the process exits, or nullptr if no finalization is necessary. + FileSystemRegistrar(std::string scheme, FileSystem::Factory factory, + void finalizer() = NULLPTR) { + RegisterFileSystemFactory(std::move(scheme), std::move(factory), finalizer); + } +}; + +/// @} + /// \brief Copy files, including from one FileSystem to another /// /// If a source and destination are resident in the same FileSystem FileSystem::CopyFile diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index d41cb49022c0c..8eaa923feddb1 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -751,7 +751,7 @@ GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_obj return options; } -Result GcsOptions::FromUri(const arrow::internal::Uri& uri, +Result GcsOptions::FromUri(const arrow::util::Uri& uri, std::string* out_path) { const auto bucket = uri.host(); auto path = uri.path(); @@ -815,7 +815,7 @@ Result GcsOptions::FromUri(const arrow::internal::Uri& uri, Result GcsOptions::FromUri(const std::string& uri_string, std::string* out_path) { - arrow::internal::Uri uri; + arrow::util::Uri uri; RETURN_NOT_OK(uri.Parse(uri_string)); return FromUri(uri, out_path); } diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index e4a1edfd6bfb9..f1fbc95bf957c 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -146,8 +146,7 @@ struct ARROW_EXPORT GcsOptions { static GcsOptions FromServiceAccountCredentials(const std::string& json_object); /// Initialize from URIs such as "gs://bucket/object". - static Result FromUri(const arrow::internal::Uri& uri, - std::string* out_path); + static Result FromUri(const arrow::util::Uri& uri, std::string* out_path); static Result FromUri(const std::string& uri, std::string* out_path); }; diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc index b227aae65d9cd..d59b2a342d723 100644 --- a/cpp/src/arrow/filesystem/hdfs.cc +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -35,7 +35,7 @@ namespace arrow { using internal::ErrnoFromStatus; using internal::ParseValue; -using internal::Uri; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 798aac0ea9075..25604a39e3ace 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -25,8 +25,7 @@ #include "arrow/io/hdfs.h" #include "arrow/util/uri.h" -namespace arrow { -namespace fs { +namespace arrow::fs { /// Options for the HDFS implementation. struct ARROW_EXPORT HdfsOptions { @@ -51,7 +50,7 @@ struct ARROW_EXPORT HdfsOptions { bool Equals(const HdfsOptions& other) const; - static Result FromUri(const ::arrow::internal::Uri& uri); + static Result FromUri(const ::arrow::util::Uri& uri); static Result FromUri(const std::string& uri); }; @@ -69,16 +68,21 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; @@ -94,10 +98,10 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; /// Create a HdfsFileSystem instance from the given options. static Result> Make( @@ -110,5 +114,4 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { std::unique_ptr impl_; }; -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc index 7ad9e6cd40e65..db5cefef37488 100644 --- a/cpp/src/arrow/filesystem/hdfs_test.cc +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -34,7 +34,7 @@ namespace arrow { using internal::ErrnoFromStatus; -using internal::Uri; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 01ac946379119..fbb33fd00868b 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -24,10 +24,10 @@ #ifdef _WIN32 #include "arrow/util/windows_compatibility.h" #else -#include #include -#include #include +#include +#include #endif #include "arrow/filesystem/filesystem.h" @@ -42,8 +42,7 @@ #include "arrow/util/uri.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace fs { +namespace arrow::fs { using ::arrow::internal::IOErrorFromErrno; #ifdef _WIN32 @@ -217,9 +216,7 @@ Status StatSelector(const PlatformFilename& dir_fn, const FileSelector& select, } // namespace -LocalFileSystemOptions LocalFileSystemOptions::Defaults() { - return LocalFileSystemOptions(); -} +LocalFileSystemOptions LocalFileSystemOptions::Defaults() { return {}; } bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { return use_mmap == other.use_mmap && directory_readahead == other.directory_readahead && @@ -227,7 +224,7 @@ bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { } Result LocalFileSystemOptions::FromUri( - const ::arrow::internal::Uri& uri, std::string* out_path) { + const ::arrow::util::Uri& uri, std::string* out_path) { if (!uri.username().empty() || !uri.password().empty()) { return Status::Invalid("Unsupported username or password in local URI: '", uri.ToString(), "'"); @@ -260,7 +257,7 @@ LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options, const io::IOContext& io_context) : FileSystem(io_context), options_(options) {} -LocalFileSystem::~LocalFileSystem() {} +LocalFileSystem::~LocalFileSystem() = default; Result LocalFileSystem::NormalizePath(std::string path) { return DoNormalizePath(std::move(path)); @@ -689,5 +686,4 @@ Result> LocalFileSystem::OpenAppendStream( return OpenOutputStreamGeneric(path, truncate, append); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h index 108530c2b2778..45a3da317f663 100644 --- a/cpp/src/arrow/filesystem/localfs.h +++ b/cpp/src/arrow/filesystem/localfs.h @@ -62,7 +62,7 @@ struct ARROW_EXPORT LocalFileSystemOptions { bool Equals(const LocalFileSystemOptions& other) const; - static Result FromUri(const ::arrow::internal::Uri& uri, + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path); }; @@ -89,16 +89,21 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { LocalFileSystemOptions options() const { return options_; } /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -113,10 +118,10 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: LocalFileSystemOptions options_; diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index 7ce2a56968679..106834d76bee0 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -33,16 +33,15 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/util/io_util.h" +#include "arrow/util/library.h" #include "arrow/util/uri.h" -namespace arrow { -namespace fs { -namespace internal { +namespace arrow::fs::internal { using ::arrow::internal::FileDescriptor; using ::arrow::internal::PlatformFilename; using ::arrow::internal::TemporaryDir; -using ::arrow::internal::UriFromAbsolutePath; +using ::arrow::util::UriFromAbsolutePath; class LocalFSTestMixin : public ::testing::Test { public: @@ -86,6 +85,52 @@ Result> FSFromUriOrPath(const std::string& uri, //////////////////////////////////////////////////////////////////////////// // Misc tests +FileSystemRegistrar kSlowFileSystemModule{ + "slow+file", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + constexpr std::string_view kPrefix = "slow+"; + auto local_uri = uri.ToString().substr(kPrefix.size()); + ARROW_ASSIGN_OR_RAISE(auto base_fs, + FileSystemFromUri(local_uri, io_context, out_path)); + double average_latency = 1; + int32_t seed = 0xDEADBEEF; + ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items()); + for (const auto& [key, value] : params) { + if (key == "average_latency") { + average_latency = std::stod(value); + } + if (key == "seed") { + seed = std::stoi(value, nullptr, /*base=*/16); + } + } + return std::make_shared(base_fs, average_latency, seed); + }, +}; + +TEST(FileSystemFromUri, LinkedRegisteredFactory) { + // Since the registrar's definition is in this translation unit (which is linked to the + // unit test executable), its factory will be registered be loaded automatically before + // main() is entered. + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slow+file:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); +} + +TEST(FileSystemFromUri, LoadedRegisteredFactory) { + // Since the registrar's definition is in libarrow_filesystem_example.so, + // its factory will be registered only after the library is dynamically loaded. + std::string path; + EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path), Raises(StatusCode::Invalid)); + + EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "local"); +} + TEST(DetectAbsolutePath, Basics) { ASSERT_TRUE(DetectAbsolutePath("/")); ASSERT_TRUE(DetectAbsolutePath("/foo")); @@ -164,7 +209,7 @@ GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGenericMMap); template class TestLocalFS : public LocalFSTestMixin { public: - void SetUp() { + void SetUp() override { LocalFSTestMixin::SetUp(); path_formatter_ = PathFormatter(); local_path_ = EnsureTrailingSlash(path_formatter_(temp_dir_->path().ToString())); @@ -494,9 +539,4 @@ TYPED_TEST(TestLocalFS, StressGetFileInfoGenerator) { } } -// TODO Should we test backslash paths on Windows? -// SubTreeFileSystem isn't compatible with them. - -} // namespace internal -} // namespace fs -} // namespace arrow +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h index 32d06e5910dfe..5626560e08363 100644 --- a/cpp/src/arrow/filesystem/mockfs.h +++ b/cpp/src/arrow/filesystem/mockfs.h @@ -26,9 +26,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace fs { -namespace internal { +namespace arrow::fs::internal { struct MockDirInfo { std::string full_path; @@ -68,16 +66,21 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; - // XXX It's not very practical to have to explicitly declare inheritance - // of default overrides. + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -92,10 +95,10 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; // Contents-dumping helpers to ease testing. // Output is lexicographically-ordered by full path. @@ -128,6 +131,4 @@ class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem { FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; }; -} // namespace internal -} // namespace fs -} // namespace arrow +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 9c895ae76c7b8..4e9628d3dc241 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -344,7 +344,7 @@ bool IsLikelyUri(std::string_view v) { // with 36 characters. return false; } - return ::arrow::internal::IsValidUriScheme(v.substr(0, pos)); + return ::arrow::util::IsValidUriScheme(v.substr(0, pos)); } struct Globber::Impl { diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index a987d63a6d247..db134f581ae2e 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -132,8 +132,8 @@ namespace arrow { using internal::TaskGroup; using internal::ToChars; -using internal::Uri; using io::internal::SubmitIO; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 13a0abde32318..82d08bc5ea89a 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -51,7 +51,7 @@ struct ARROW_EXPORT S3ProxyOptions { /// Initialize from URI such as http://username:password@host:port /// or http://host:port static Result FromUri(const std::string& uri); - static Result FromUri(const ::arrow::internal::Uri& uri); + static Result FromUri(const ::arrow::util::Uri& uri); bool Equals(const S3ProxyOptions& other) const; }; @@ -232,7 +232,7 @@ struct ARROW_EXPORT S3Options { /// generate temporary credentials. static S3Options FromAssumeRoleWithWebIdentity(); - static Result FromUri(const ::arrow::internal::Uri& uri, + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path = NULLPTR); static Result FromUri(const std::string& uri, std::string* out_path = NULLPTR); @@ -258,19 +258,24 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::DeleteDirContentsAsync; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; - Future<> DeleteDirContentsAsync(const std::string& path, - bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; + Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -312,11 +317,11 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { /// implementing your own background execution strategy. Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; /// Create a S3FileSystem instance from the given options. static Result> Make( diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc index bbb3c32ee6bd2..9322beb45ead2 100644 --- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc @@ -48,8 +48,7 @@ DEFINE_string(region, "", "AWS region"); DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')"); DEFINE_string(scheme, "https", "Connection scheme"); -namespace arrow { -namespace fs { +namespace arrow::fs { #define ASSERT_RAISES_PRINT(context_msg, error_type, expr) \ do { \ @@ -247,8 +246,7 @@ void TestMain(int argc, char** argv) { ASSERT_OK(FinalizeS3()); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs int main(int argc, char** argv) { std::stringstream ss; diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 33e9712a666cd..810b10d417d2c 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -75,8 +75,8 @@ namespace fs { using ::arrow::internal::checked_pointer_cast; using ::arrow::internal::PlatformFilename; using ::arrow::internal::ToChars; -using ::arrow::internal::UriEscape; using ::arrow::internal::Zip; +using ::arrow::util::UriEscape; using ::arrow::fs::internal::ConnectRetryStrategy; using ::arrow::fs::internal::ErrorToStatus; diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index 8747f9683b90f..d69f6c896d08e 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -30,7 +30,7 @@ namespace arrow { using internal::StatusDetailFromErrno; -using internal::Uri; +using util::Uri; namespace fs { namespace internal { diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index 96cc5178a9f31..74ddf015432d8 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -28,7 +28,7 @@ #include "arrow/util/visibility.h" namespace arrow { -using internal::Uri; +using util::Uri; namespace fs { namespace internal { diff --git a/cpp/src/arrow/flight/cookie_internal.cc b/cpp/src/arrow/flight/cookie_internal.cc index 921f017313611..8f41106ebce5c 100644 --- a/cpp/src/arrow/flight/cookie_internal.cc +++ b/cpp/src/arrow/flight/cookie_internal.cc @@ -159,8 +159,8 @@ CookiePair Cookie::ParseCookieAttribute(const std::string& cookie_header_value, } // Key/Value may be URI-encoded. - out_key = arrow::internal::UriUnescape(out_key); - out_value = arrow::internal::UriUnescape(out_value); + out_key = arrow::util::UriUnescape(out_key); + out_value = arrow::util::UriUnescape(out_value); // Strip outer quotes on the value. if (out_value.size() >= 2 && out_value[0] == '"' && diff --git a/cpp/src/arrow/flight/transport.h b/cpp/src/arrow/flight/transport.h index ee7bd01720730..4029aa5223deb 100644 --- a/cpp/src/arrow/flight/transport.h +++ b/cpp/src/arrow/flight/transport.h @@ -168,7 +168,7 @@ class ARROW_FLIGHT_EXPORT ClientTransport { /// Initialize the client. virtual Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) = 0; + const arrow::util::Uri& uri) = 0; /// Close the client. Once this returns, the client is no longer usable. virtual Status Close() = 0; diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc index f7612759e8de6..f799ba761c40d 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc @@ -684,13 +684,13 @@ class GrpcClientImpl : public internal::ClientTransport { } Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) override { + const arrow::util::Uri& uri) override { const std::string& scheme = location.scheme(); std::stringstream grpc_uri; std::shared_ptr<::grpc::ChannelCredentials> creds; if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) { - grpc_uri << arrow::internal::UriEncodeHost(uri.host()) << ':' << uri.port_text(); + grpc_uri << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text(); if (scheme == kSchemeGrpcTls) { if (options.disable_server_verification) { diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc index a9780b5eeb77e..28fc736aa0088 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc @@ -575,8 +575,7 @@ class GrpcServerTransport : public internal::ServerTransport { new GrpcServerTransport(base, std::move(memory_manager))); } - Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) override { + Status Init(const FlightServerOptions& options, const arrow::util::Uri& uri) override { grpc_service_.reset( new GrpcServiceHandler(options.auth_handler, options.middleware, this)); @@ -588,7 +587,7 @@ class GrpcServerTransport : public internal::ServerTransport { int port = 0; if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) { std::stringstream address; - address << arrow::internal::UriEncodeHost(uri.host()) << ':' << uri.port_text(); + address << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text(); std::shared_ptr<::grpc::ServerCredentials> creds; if (scheme == kSchemeGrpcTls) { @@ -635,12 +634,10 @@ class GrpcServerTransport : public internal::ServerTransport { if (scheme == kSchemeGrpcTls) { ARROW_ASSIGN_OR_RAISE( - location_, - Location::ForGrpcTls(arrow::internal::UriEncodeHost(uri.host()), port)); + location_, Location::ForGrpcTls(arrow::util::UriEncodeHost(uri.host()), port)); } else if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp) { ARROW_ASSIGN_OR_RAISE( - location_, - Location::ForGrpcTcp(arrow::internal::UriEncodeHost(uri.host()), port)); + location_, Location::ForGrpcTcp(arrow::util::UriEncodeHost(uri.host()), port)); } return Status::OK(); } diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc index cd9ddaa85a6f6..32c2fd776f32b 100644 --- a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc +++ b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc @@ -77,7 +77,7 @@ class ClientConnection { ARROW_DEFAULT_MOVE_AND_ASSIGN(ClientConnection); ~ClientConnection() { DCHECK(!driver_) << "Connection was not closed!"; } - Status Init(std::shared_ptr ucp_context, const arrow::internal::Uri& uri) { + Status Init(std::shared_ptr ucp_context, const arrow::util::Uri& uri) { auto status = InitImpl(std::move(ucp_context), uri); // Clean up after-the-fact if we fail to initialize if (!status.ok()) { @@ -91,8 +91,7 @@ class ClientConnection { return status; } - Status InitImpl(std::shared_ptr ucp_context, - const arrow::internal::Uri& uri) { + Status InitImpl(std::shared_ptr ucp_context, const arrow::util::Uri& uri) { { ucs_status_t status; ucp_worker_params_t worker_params; @@ -521,7 +520,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport { } Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) override { + const arrow::util::Uri& uri) override { RETURN_NOT_OK(uri_.Parse(uri.ToString())); { ucp_config_t* ucp_config; @@ -721,7 +720,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport { private: static constexpr size_t kMaxOpenConnections = 3; - arrow::internal::Uri uri_; + arrow::util::Uri uri_; std::shared_ptr ucp_context_; std::mutex connections_mutex_; std::deque connections_; diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_server.cc b/cpp/src/arrow/flight/transport/ucx/ucx_server.cc index b20f8f286e3bc..cb9c8948ccf1e 100644 --- a/cpp/src/arrow/flight/transport/ucx/ucx_server.cc +++ b/cpp/src/arrow/flight/transport/ucx/ucx_server.cc @@ -198,8 +198,7 @@ class UcxServerImpl : public arrow::flight::internal::ServerTransport { } } - Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) override { + Status Init(const FlightServerOptions& options, const arrow::util::Uri& uri) override { const auto max_threads = std::max(8, std::thread::hardware_concurrency()); ARROW_ASSIGN_OR_RAISE(rpc_pool_, arrow::internal::ThreadPool::Make(max_threads)); diff --git a/cpp/src/arrow/flight/transport/ucx/util_internal.cc b/cpp/src/arrow/flight/transport/ucx/util_internal.cc index acaa4f5872343..2db7d4e2630ff 100644 --- a/cpp/src/arrow/flight/transport/ucx/util_internal.cc +++ b/cpp/src/arrow/flight/transport/ucx/util_internal.cc @@ -50,7 +50,7 @@ ucs_status_t FlightUcxStatusDetail::Unwrap(const Status& status) { return dynamic_cast(status.detail().get())->status_; } -arrow::Result UriToSockaddr(const arrow::internal::Uri& uri, +arrow::Result UriToSockaddr(const arrow::util::Uri& uri, struct sockaddr_storage* addr) { std::string host = uri.host(); if (host.empty()) { diff --git a/cpp/src/arrow/flight/transport/ucx/util_internal.h b/cpp/src/arrow/flight/transport/ucx/util_internal.h index 84e84ba071154..958868d59d4f5 100644 --- a/cpp/src/arrow/flight/transport/ucx/util_internal.h +++ b/cpp/src/arrow/flight/transport/ucx/util_internal.h @@ -71,7 +71,7 @@ static inline bool IsIgnorableDisconnectError(ucs_status_t ucs_status) { /// /// \return The length of the sockaddr ARROW_FLIGHT_EXPORT -arrow::Result UriToSockaddr(const arrow::internal::Uri& uri, +arrow::Result UriToSockaddr(const arrow::util::Uri& uri, struct sockaddr_storage* addr); ARROW_FLIGHT_EXPORT diff --git a/cpp/src/arrow/flight/transport_server.h b/cpp/src/arrow/flight/transport_server.h index 51105a89304f4..8e5fe3e710c13 100644 --- a/cpp/src/arrow/flight/transport_server.h +++ b/cpp/src/arrow/flight/transport_server.h @@ -69,7 +69,7 @@ class ARROW_FLIGHT_EXPORT ServerTransport { /// This method should launch the server in a background thread, i.e. it /// should not block. Once this returns, the server should be active. virtual Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) = 0; + const arrow::util::Uri& uri) = 0; /// \brief Shutdown the server. /// /// This should wait for active RPCs to finish. Once this returns, the diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index 11b2baafad220..6682e300d19c9 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -821,7 +821,7 @@ arrow::Result CloseSessionResult::Deserialize( return out; } -Location::Location() { uri_ = std::make_shared(); } +Location::Location() { uri_ = std::make_shared(); } arrow::Result Location::Parse(const std::string& uri_string) { Location location; diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index 4b17149aa2d46..7dd707d3e2fa2 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -52,11 +52,11 @@ class DictionaryMemo; } // namespace ipc -namespace internal { +namespace util { class Uri; -} // namespace internal +} // namespace util namespace flight { @@ -466,7 +466,7 @@ struct ARROW_FLIGHT_EXPORT Location { private: friend class FlightClient; friend class FlightServerBase; - std::shared_ptr uri_; + std::shared_ptr uri_; }; /// \brief A flight ticket and list of locations where the ticket can be diff --git a/cpp/src/arrow/util/library.cc b/cpp/src/arrow/util/library.cc new file mode 100644 index 0000000000000..93858109ea685 --- /dev/null +++ b/cpp/src/arrow/util/library.cc @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/library.h" + +#include "arrow/util/io_util.h" + +#ifdef _WIN32 +#include +#else +#include +#endif + +namespace arrow::util { + +// TODO(bkietz) unify with the utilities in hdfs_internal.cc +Result LoadDynamicLibrary(const char* path) { +#ifdef _WIN32 + if (void* handle = LoadLibraryA(path)) return handle; + return arrow::internal::IOErrorFromWinError(GetLastError(), "LoadLibrary failed."); +#else + constexpr int kFlags = + // All undefined symbols in the shared object are resolved before dlopen() returns. + RTLD_NOW + // Symbols defined in this shared object are not made available to + // resolve references in subsequently loaded shared objects. + | RTLD_LOCAL; + if (void* handle = dlopen(path, kFlags)) return handle; + return Status::IOError("dlopen failed: ", dlerror()); +#endif +} + +Result GetSymbol(void* handle, const char* name) { +#ifdef _WIN32 + if (void* sym = reinterpret_cast( + GetProcAddress(reinterpret_cast(handle), name))) { + return sym; + } + return arrow::internal::IOErrorFromWinError(GetLastError(), "GetProcAddress failed."); +#else + if (void* sym = dlsym(handle, name)) return sym; + return Status::IOError("dlsym failed: ", dlerror()); +#endif +} +} // namespace arrow::util diff --git a/cpp/src/arrow/util/library.h b/cpp/src/arrow/util/library.h new file mode 100644 index 0000000000000..d1faa688d95ee --- /dev/null +++ b/cpp/src/arrow/util/library.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/result.h" +#include "arrow/util/visibility.h" + +namespace arrow::util { + +/// \brief Load a dynamic library +/// +/// This wraps dlopen() except on Windows, where LoadLibrary() is called. +ARROW_EXPORT Result LoadDynamicLibrary(const char* name); + +/// \brief Retrieve a symbol by name from a library handle. +ARROW_EXPORT Result GetSymbol(void* handle, const char* name); + +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri.cc b/cpp/src/arrow/util/uri.cc index b291ee3d7f1f2..d1b54e78aa909 100644 --- a/cpp/src/arrow/util/uri.cc +++ b/cpp/src/arrow/util/uri.cc @@ -27,8 +27,7 @@ #include "arrow/util/value_parsing.h" #include "arrow/vendored/uriparser/Uri.h" -namespace arrow { -namespace internal { +namespace arrow::util { namespace { @@ -111,7 +110,7 @@ bool IsValidUriScheme(std::string_view s) { } struct Uri::Impl { - Impl() : string_rep_(""), port_(-1) { memset(&uri_, 0, sizeof(uri_)); } + Impl() { memset(&uri_, 0, sizeof(uri_)); } ~Impl() { uriFreeUriMembersA(&uri_); } @@ -133,7 +132,7 @@ struct Uri::Impl { // Keep alive strings that uriparser stores pointers to std::vector data_; std::string string_rep_; - int32_t port_; + int32_t port_ = -1; std::vector path_segments_; bool is_file_uri_; bool is_absolute_path_; @@ -141,7 +140,7 @@ struct Uri::Impl { Uri::Uri() : impl_(new Impl) {} -Uri::~Uri() {} +Uri::~Uri() = default; Uri::Uri(Uri&& u) : impl_(std::move(u.impl_)) {} @@ -169,21 +168,19 @@ int32_t Uri::port() const { return impl_->port_; } std::string Uri::username() const { auto userpass = TextRangeToView(impl_->uri_.userInfo); auto sep_pos = userpass.find_first_of(':'); - if (sep_pos == std::string_view::npos) { - return UriUnescape(userpass); - } else { - return UriUnescape(userpass.substr(0, sep_pos)); + if (sep_pos != std::string_view::npos) { + userpass = userpass.substr(0, sep_pos); } + return UriUnescape(userpass); } std::string Uri::password() const { auto userpass = TextRangeToView(impl_->uri_.userInfo); auto sep_pos = userpass.find_first_of(':'); if (sep_pos == std::string_view::npos) { - return std::string(); - } else { - return UriUnescape(userpass.substr(sep_pos + 1)); + return ""; } + return UriUnescape(userpass.substr(sep_pos + 1)); } std::string Uri::path() const { @@ -218,6 +215,9 @@ std::string Uri::path() const { std::string Uri::query_string() const { return TextRangeToString(impl_->uri_.query); } Result>> Uri::query_items() const { + // XXX would it be worthwhile to fold this parsing into Uri::parse() or maybe + // cache these lazily in an unordered_map? Then we could provide + // Uri::query_item(std::string name) const auto& query = impl_->uri_.query; UriQueryListA* query_list; int item_count; @@ -301,7 +301,8 @@ Status Uri::Parse(const std::string& uri_string) { auto port_text = TextRangeToView(impl_->uri_.portText); if (port_text.size()) { uint16_t port_num; - if (!ParseValue(port_text.data(), port_text.size(), &port_num)) { + if (!::arrow::internal::ParseValue(port_text.data(), port_text.size(), + &port_num)) { return Status::Invalid("Invalid port number '", port_text, "' in URI '", uri_string, "'"); } @@ -311,6 +312,12 @@ Status Uri::Parse(const std::string& uri_string) { return Status::OK(); } +Result Uri::FromString(const std::string& uri_string) { + Uri uri; + ARROW_RETURN_NOT_OK(uri.Parse(uri_string)); + return uri; +} + Result UriFromAbsolutePath(std::string_view path) { if (path.empty()) { return Status::Invalid( @@ -336,5 +343,4 @@ Result UriFromAbsolutePath(std::string_view path) { return out; } -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri.h b/cpp/src/arrow/util/uri.h index 855a61408da99..74dbe924ff237 100644 --- a/cpp/src/arrow/util/uri.h +++ b/cpp/src/arrow/util/uri.h @@ -27,8 +27,7 @@ #include "arrow/type_fwd.h" #include "arrow/util/visibility.h" -namespace arrow { -namespace internal { +namespace arrow::util { /// \brief A parsed URI class ARROW_EXPORT Uri { @@ -86,6 +85,9 @@ class ARROW_EXPORT Uri { /// Factory function to parse a URI from its string representation. Status Parse(const std::string& uri_string); + /// Factory function to parse a URI from its string representation. + static Result FromString(const std::string& uri_string); + private: struct Impl; std::unique_ptr impl_; @@ -114,5 +116,4 @@ bool IsValidUriScheme(std::string_view s); ARROW_EXPORT Result UriFromAbsolutePath(std::string_view path); -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri_test.cc b/cpp/src/arrow/util/uri_test.cc index 4293dc73b01cb..36e09b1b2e879 100644 --- a/cpp/src/arrow/util/uri_test.cc +++ b/cpp/src/arrow/util/uri_test.cc @@ -26,8 +26,7 @@ #include "arrow/util/logging.h" #include "arrow/util/uri.h" -namespace arrow { -namespace internal { +namespace arrow::util { TEST(UriEscape, Basics) { ASSERT_EQ(UriEscape(""), ""); @@ -371,5 +370,4 @@ TEST(UriFromAbsolutePath, Basics) { #endif } -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/docs/source/cpp/api/filesystem.rst b/docs/source/cpp/api/filesystem.rst index 8132af42e2495..4ab6664e6d681 100644 --- a/docs/source/cpp/api/filesystem.rst +++ b/docs/source/cpp/api/filesystem.rst @@ -19,6 +19,8 @@ Filesystems =========== +.. _cpp-api-filesystems: + Interface ========= @@ -33,12 +35,20 @@ Interface .. doxygenclass:: arrow::fs::FileSystem :members: -High-level factory function -=========================== +.. doxygenfunction:: arrow::fs::EnsureFinalized() + +High-level factory functions +============================ .. doxygengroup:: filesystem-factories :content-only: +Factory registration functions +============================== + +.. doxygengroup:: filesystem-factory-registration + :content-only: + Concrete implementations ======================== diff --git a/docs/source/cpp/io.rst b/docs/source/cpp/io.rst index 28ab5d783a23d..f514d7926bb31 100644 --- a/docs/source/cpp/io.rst +++ b/docs/source/cpp/io.rst @@ -73,6 +73,9 @@ The :class:`filesystem interface ` allows abstracted access over various data storage backends such as the local filesystem or a S3 bucket. It provides input and output streams as well as directory operations. +.. seealso:: + :ref:`FileSystems API reference `. + The filesystem interface exposes a simplified view of the underlying data storage. Data paths are represented as *abstract paths*, which are ``/``-separated, even on Windows, and shouldn't include special path @@ -81,7 +84,14 @@ underlying storage, are automatically dereferenced. Only basic :class:`metadata ` about file entries, such as the file size and modification time, is made available. -Concrete implementations are available for +Filesystem instances are constructed from URI strings using one of the +`High-level factory functions`_, which dispatch to implementation-specific +factories based on the URI's ``scheme``. Other properties for the new instance +are extracted from the URI's other properties such as the ``hostname``, ``username``, +etc. Arrow supports runtime registration of new filesystems, and provides built-in +support for several filesystems. + +Which built-in filesystems are supported is configured at build time and may include :class:`local filesystem access `, :class:`HDFS `, :class:`Amazon S3-compatible storage ` and @@ -91,4 +101,45 @@ Concrete implementations are available for Tasks that use filesystems will typically run on the :ref:`I/O thread pool`. For filesystems that support high levels - of concurrency you may get a benefit from increasing the size of the I/O thread pool. \ No newline at end of file + of concurrency you may get a benefit from increasing the size of the I/O thread pool. + +Defining new FileSystems +======================== + +Build complexity can be decreased by compartmentalizing a FileSystem +implementation into a separate shared library, which applications may +link or load dynamically. Arrow's built-in FileSystem implementations +also follow this pattern. Before a scheme can be used with any of the +`High-level factory functions`_ the library which contains it must be +loaded, requiring a call to ``dlopen()`` or an equivalent (see +:func:`~arrow::util::LoadDynamicLibrary`) if the library is not +linked to the application. + +In addition to loading any libraries, +:func:`~arrow:fs::RegisterFileSystemFactory` +must be called to register a factory for a new URI scheme. However +this requires the consumer to probe libraries for symbols and invoke +these correctly. It is usually preferred that registration be an +automatic addendum to the library's loading. An instance of +:class:`~arrow::fs::FileSystemRegistrar` can be defined +at namespace scope inside the library to accomplish this +automatic registration: + +.. code-block:: cpp + + arrow::fs::FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + EnsureExampleFileSystemInitialized(); + return std::make_shared(); + }, + &EnsureExampleFileSystemFinalized, + }; + +If a filesystem implementation requires initialization before any instances +may be constructed, this should be included in the corresponding factory or +otherwise automatically ensured before the factory is invoked. Likewise if +a filesystem implementation requires tear down before the process ends, this +can be wrapped in a function and registered alongside the factory. All +finalizers will be called by :func:`~arrow::fs::EnsureFinalized`. diff --git a/python/pyarrow/src/arrow/python/filesystem.h b/python/pyarrow/src/arrow/python/filesystem.h index 003fd5cb80551..194b226ac5c35 100644 --- a/python/pyarrow/src/arrow/python/filesystem.h +++ b/python/pyarrow/src/arrow/python/filesystem.h @@ -26,9 +26,7 @@ #include "arrow/python/visibility.h" #include "arrow/util/macros.h" -namespace arrow { -namespace py { -namespace fs { +namespace arrow::py::fs { class ARROW_PYTHON_EXPORT PyFileSystemVtable { public: @@ -83,16 +81,24 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { bool Equals(const FileSystem& other) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo( const std::vector& paths) override; Result> GetFileInfo( const arrow::fs::FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -107,10 +113,10 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result NormalizePath(std::string path) override; @@ -121,6 +127,4 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { PyFileSystemVtable vtable_; }; -} // namespace fs -} // namespace py -} // namespace arrow +} // namespace arrow::py::fs