Skip to content

Commit

Permalink
apacheGH-40342: [C++] move LocalFileSystem to the registry
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Mar 19, 2024
1 parent b6cc951 commit 46758bc
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 165 deletions.
3 changes: 0 additions & 3 deletions cpp/examples/arrow/filesystem_definition_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
// under the License.

#include <arrow/filesystem/filesystem.h>
<<<<<<< HEAD
#include <arrow/filesystem/filesystem_library.h>
=======
>>>>>>> e447cfaac (GH-38309: [C++] build filesystems as separate modules)
#include <arrow/io/memory.h>
#include <arrow/result.h>
#include <arrow/util/uri.h>
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ add_arrow_test(filesystem-test
EXTRA_LABELS
filesystem
DEFINITIONS
ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$<TARGET_FILE:arrow_filesystem_example>")
ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$<TARGET_FILE:arrow_filesystem_example>"
EXTRA_DEPENDENCIES
arrow_filesystem_example)

if(ARROW_BUILD_BENCHMARKS)
add_arrow_benchmark(localfs_benchmark
Expand Down
13 changes: 5 additions & 8 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ Result<std::string> FileSystem::PathFromUri(const std::string& uri_string) const
return Status::NotImplemented("PathFromUri is not yet supported on this filesystem");
}

Result<std::string> FileSystem::MakeUri(std::string path) const {
return Status::NotImplemented("MakeUri is not yet supported for ", type_name(),
" filesystems");
}

//////////////////////////////////////////////////////////////////////////
// SubTreeFileSystem implementation

Expand Down Expand Up @@ -853,14 +858,6 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
}
}

if (scheme == "file") {
std::string path;
ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
if (out_path != nullptr) {
*out_path = path;
}
return std::make_shared<LocalFileSystem>(options, io_context);
}
if (scheme == "abfs" || scheme == "abfss") {
#ifdef ARROW_AZURE
ARROW_ASSIGN_OR_RAISE(auto options, AzureOptions::FromUri(uri, out_path));
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ class ARROW_EXPORT FileSystem
/// \return The path inside the filesystem that is indicated by the URI.
virtual Result<std::string> PathFromUri(const std::string& uri_string) const;

/// \brief Make a URI from which FileSystemFromUri produces an equivalent filesystem
/// \param path The path component to use in the resulting URI
/// \return A URI string, or an error if an equivalent URI cannot be produced
virtual Result<std::string> MakeUri(std::string path) const;

virtual bool Equals(const FileSystem& other) const = 0;

virtual bool Equals(const std::shared_ptr<FileSystem>& other) const {
Expand Down
37 changes: 35 additions & 2 deletions cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/io/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/io_util.h"
#include "arrow/util/string.h"
#include "arrow/util/uri.h"
#include "arrow/util/windows_fixup.h"

Expand Down Expand Up @@ -246,8 +247,20 @@ Result<LocalFileSystemOptions> LocalFileSystemOptions::FromUri(
std::string(internal::RemoveTrailingSlash(uri.path(), /*preserve_root=*/true));
}

// TODO handle use_mmap option
return LocalFileSystemOptions();
LocalFileSystemOptions options;
ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items());
for (const auto& [key, value] : params) {
if (key == "use_mmap") {
if (value.empty()) {
options.use_mmap = true;
continue;
} else {
ARROW_ASSIGN_OR_RAISE(options.use_mmap, ::arrow::internal::ParseBoolean(value));
}
break;
}
}
return options;
}

LocalFileSystem::LocalFileSystem(const io::IOContext& io_context)
Expand All @@ -273,6 +286,11 @@ Result<std::string> LocalFileSystem::PathFromUri(const std::string& uri_string)
authority_handling);
}

Result<std::string> LocalFileSystem::MakeUri(std::string path) const {
ARROW_ASSIGN_OR_RAISE(path, DoNormalizePath(std::move(path)));
return "file://" + path + (options_.use_mmap ? "?use_mmap" : "");
}

bool LocalFileSystem::Equals(const FileSystem& other) const {
if (other.type_name() != type_name()) {
return false;
Expand Down Expand Up @@ -686,4 +704,19 @@ Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenAppendStream(
return OpenOutputStreamGeneric(path, truncate, append);
}

static Result<std::shared_ptr<fs::FileSystem>> LocalFileSystemFactory(
const arrow::util::Uri& uri, const io::IOContext& io_context, std::string* out_path) {
std::string path;
ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
if (out_path != nullptr) {
*out_path = std::move(path);
}
return std::make_shared<LocalFileSystem>(options, io_context);
}

FileSystemRegistrar kLocalFileSystemModule[]{
{"file", LocalFileSystemFactory},
{"local", LocalFileSystemFactory},
};

} // namespace arrow::fs
1 change: 1 addition & 0 deletions cpp/src/arrow/filesystem/localfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem {

Result<std::string> NormalizePath(std::string path) override;
Result<std::string> PathFromUri(const std::string& uri_string) const override;
Result<std::string> MakeUri(std::string path) const override;

bool Equals(const FileSystem& other) const override;

Expand Down
90 changes: 8 additions & 82 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <cerrno>
#include <chrono>
#include <memory>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -186,86 +184,6 @@ TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) {
////////////////////////////////////////////////////////////////////////////
// Misc tests

Result<std::shared_ptr<FileSystem>> SlowFileSystemFactory(const Uri& uri,
const io::IOContext& io_context,
std::string* out_path) {
auto local_uri = "file" + uri.ToString().substr(uri.scheme().size());
ARROW_ASSIGN_OR_RAISE(auto base_fs, FileSystemFromUri(local_uri, io_context, out_path));
double average_latency = 1;
int32_t seed = 0xDEADBEEF;
ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items());
for (const auto& [key, value] : params) {
if (key == "average_latency") {
average_latency = std::stod(value);
}
if (key == "seed") {
seed = std::stoi(value, nullptr, /*base=*/16);
}
}
return std::make_shared<SlowFileSystem>(base_fs, average_latency, seed);
}
FileSystemRegistrar kSlowFileSystemModule{
"slowfile",
SlowFileSystemFactory,
};

TEST(FileSystemFromUri, LinkedRegisteredFactory) {
// Since the registrar's definition is in this translation unit (which is linked to the
// unit test executable), its factory will be registered be loaded automatically before
// main() is entered.
std::string path;
ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile:///hey/yo", &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "slow");
}

TEST(FileSystemFromUri, LoadedRegisteredFactory) {
// Since the registrar's definition is in libarrow_filesystem_example.so,
// its factory will be registered only after the library is dynamically loaded.
std::string path;
EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path), Raises(StatusCode::Invalid));

EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok());

ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "local");
}

TEST(FileSystemFromUri, RuntimeRegisteredFactory) {
std::string path;
EXPECT_THAT(FileSystemFromUri("slowfile2:///hey/yo", &path),
Raises(StatusCode::Invalid));

EXPECT_THAT(RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), Ok());

ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile2:///hey/yo", &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "slow");

EXPECT_THAT(
RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory),
Raises(StatusCode::KeyError,
testing::HasSubstr("Attempted to register factory for scheme 'slowfile2' "
"but that scheme is already registered")));
}

FileSystemRegistrar kSegfaultFileSystemModule[]{
{"segfault", nullptr},
{"segfault", nullptr},
{"segfault", nullptr},
};
TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) {
// Since multiple registrars are defined in this translation unit which all
// register factories for the 'segfault' scheme, using that scheme in FileSystemFromUri
// is invalidated and raises KeyError.
std::string path;
EXPECT_THAT(FileSystemFromUri("segfault:///hey/yo", &path),
Raises(StatusCode::KeyError));
// other schemes are not affected by the collision
EXPECT_THAT(FileSystemFromUri("slowfile:///hey/yo", &path), Ok());
}

TEST(DetectAbsolutePath, Basics) {
ASSERT_TRUE(DetectAbsolutePath("/"));
ASSERT_TRUE(DetectAbsolutePath("/foo"));
Expand Down Expand Up @@ -389,6 +307,7 @@ class TestLocalFS : public LocalFSTestMixin {
std::string path;
ASSERT_OK_AND_ASSIGN(fs_, fs_from_uri(uri, &path));
ASSERT_EQ(fs_->type_name(), "local");
local_fs_ = ::arrow::internal::checked_pointer_cast<LocalFileSystem>(fs_);
ASSERT_EQ(path, expected_path);
ASSERT_OK_AND_ASSIGN(path, fs_->PathFromUri(uri));
ASSERT_EQ(path, expected_path);
Expand Down Expand Up @@ -500,8 +419,15 @@ TYPED_TEST(TestLocalFS, FileSystemFromUriFile) {

// Variations
this->TestLocalUri("file:/foo/bar", "/foo/bar");
ASSERT_FALSE(this->local_fs_->options().use_mmap);
this->TestLocalUri("file:///foo/bar", "/foo/bar");
this->TestLocalUri("file:///some%20path/%25percent", "/some path/%percent");

this->TestLocalUri("file:///_?use_mmap", "/_");
ASSERT_TRUE(this->local_fs_->options().use_mmap);
ASSERT_OK_AND_ASSIGN(auto uri, this->fs_->MakeUri("/_"));
EXPECT_EQ(uri, "file:///_?use_mmap");

#ifdef _WIN32
this->TestLocalUri("file:/C:/foo/bar", "C:/foo/bar");
this->TestLocalUri("file:///C:/foo/bar", "C:/foo/bar");
Expand Down
9 changes: 0 additions & 9 deletions cpp/src/arrow/testing/examplefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,20 @@
// under the License.

#include "arrow/filesystem/filesystem.h"
<<<<<<< HEAD
#include "arrow/filesystem/filesystem_library.h"
#include "arrow/result.h"
#include "arrow/util/uri.h"

#include <gtest/gtest.h>

=======
#include "arrow/result.h"
#include "arrow/util/uri.h"

>>>>>>> e447cfaac (GH-38309: [C++] build filesystems as separate modules)
namespace arrow::fs {

FileSystemRegistrar kExampleFileSystemModule{
"example",
[](const Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
constexpr std::string_view kScheme = "example";
<<<<<<< HEAD
EXPECT_EQ(uri.scheme(), kScheme);
=======
>>>>>>> e447cfaac (GH-38309: [C++] build filesystems as separate modules)
auto local_uri = "file" + uri.ToString().substr(kScheme.size());
return FileSystemFromUri(local_uri, io_context, out_path);
},
Expand Down
3 changes: 0 additions & 3 deletions python/pyarrow/_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ cdef class FileSystem(_Weakrefable):


cdef class LocalFileSystem(FileSystem):
cdef:
CLocalFileSystem* localfs

cdef init(self, const shared_ptr[CFileSystem]& wrapped)


Expand Down
33 changes: 12 additions & 21 deletions python/pyarrow/_fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
# cython: language_level = 3

from cpython.datetime cimport datetime, PyDateTime_DateTime
from cython cimport binding

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow_python cimport PyDateTime_to_TimePoint
Expand Down Expand Up @@ -421,6 +420,11 @@ cdef class FileSystem(_Weakrefable):
"the subclasses instead: LocalFileSystem or "
"SubTreeFileSystem")

@staticmethod
def _from_uri(uri):
fs, _path = FileSystem.from_uri(uri)
return fs

@staticmethod
def from_uri(uri):
"""
Expand Down Expand Up @@ -1097,30 +1101,17 @@ cdef class LocalFileSystem(FileSystem):

def __init__(self, *, use_mmap=False):
cdef:
CLocalFileSystemOptions opts
shared_ptr[CLocalFileSystem] fs

opts = CLocalFileSystemOptions.Defaults()
opts.use_mmap = use_mmap
shared_ptr[CFileSystem] fs
c_string c_uri

fs = make_shared[CLocalFileSystem](opts)
c_uri = tobytes(f"file:///_?use_mmap={int(use_mmap)}")
with nogil:
fs = GetResultValue(CFileSystemFromUri(c_uri))
self.init(<shared_ptr[CFileSystem]> fs)

cdef init(self, const shared_ptr[CFileSystem]& c_fs):
FileSystem.init(self, c_fs)
self.localfs = <CLocalFileSystem*> c_fs.get()

@staticmethod
@binding(True) # Required for cython < 3
def _reconstruct(kwargs):
# __reduce__ doesn't allow passing named arguments directly to the
# reconstructor, hence this wrapper.
return LocalFileSystem(**kwargs)

def __reduce__(self):
cdef CLocalFileSystemOptions opts = self.localfs.options()
return LocalFileSystem._reconstruct, (dict(
use_mmap=opts.use_mmap),)
uri = frombytes(GetResultValue(self.fs.MakeUri(b"/_")))
return FileSystem._from_uri, (uri,)


cdef class SubTreeFileSystem(FileSystem):
Expand Down
16 changes: 3 additions & 13 deletions python/pyarrow/includes/libarrow_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
shared_ptr[CFileSystem] shared_from_this()
c_string type_name() const
CResult[c_string] NormalizePath(c_string path)
CResult[c_string] MakeUri(c_string path)
CResult[CFileInfo] GetFileInfo(const c_string& path)
CResult[vector[CFileInfo]] GetFileInfo(
const vector[c_string]& paths)
Expand All @@ -84,6 +85,8 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
c_bool Equals(const CFileSystem& other)
c_bool Equals(shared_ptr[CFileSystem] other)

CResult[shared_ptr[CFileSystem]] CFileSystemFromUri \
"arrow::fs::FileSystemFromUri"(const c_string& uri)
CResult[shared_ptr[CFileSystem]] CFileSystemFromUri \
"arrow::fs::FileSystemFromUri"(const c_string& uri, c_string* out_path)
CResult[shared_ptr[CFileSystem]] CFileSystemFromUriOrPath \
Expand All @@ -98,19 +101,6 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
CStatus CFileSystemsInitialize "arrow::fs::Initialize" \
(const CFileSystemGlobalOptions& options)

cdef cppclass CLocalFileSystemOptions "arrow::fs::LocalFileSystemOptions":
c_bool use_mmap

@staticmethod
CLocalFileSystemOptions Defaults()

c_bool Equals(const CLocalFileSystemOptions& other)

cdef cppclass CLocalFileSystem "arrow::fs::LocalFileSystem"(CFileSystem):
CLocalFileSystem()
CLocalFileSystem(CLocalFileSystemOptions)
CLocalFileSystemOptions options()

cdef cppclass CSubTreeFileSystem \
"arrow::fs::SubTreeFileSystem"(CFileSystem):
CSubTreeFileSystem(const c_string& base_path,
Expand Down
4 changes: 0 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 46758bc

Please sign in to comment.