Skip to content

Commit

Permalink
GH-38597: [C++] Implement GetFileInfo(selector) for Azure filesystem (#…
Browse files Browse the repository at this point in the history
…39009)

### Rationale for this change

Part of Azure FS implementation.

### What changes are included in this PR?

The version of `GetFileInfo` that takes a prefix and can optionally
recurse into directories.

### Are these changes tested?

By unit tests present in this PR. Separate from this PR, I'm thinking of
way to fuzz-test the FS API.
* Closes: #38597
  • Loading branch information
felipecrv authored Dec 8, 2023
1 parent 081b402 commit 476c78f
Show file tree
Hide file tree
Showing 8 changed files with 481 additions and 37 deletions.
212 changes: 210 additions & 2 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace fs {
// -----------------------------------------------------------------------
// AzureOptions Implementation

AzureOptions::AzureOptions() {}
AzureOptions::AzureOptions() = default;

bool AzureOptions::Equals(const AzureOptions& other) const {
return (account_dfs_url == other.account_dfs_url &&
Expand Down Expand Up @@ -820,6 +820,209 @@ class AzureFileSystem::Impl {
}
}

private:
template <typename OnContainer>
Status VisitContainers(const Azure::Core::Context& context,
OnContainer&& on_container) const {
Azure::Storage::Blobs::ListBlobContainersOptions options;
try {
auto container_list_response =
blob_service_client_->ListBlobContainers(options, context);
for (; container_list_response.HasPage();
container_list_response.MoveToNextPage(context)) {
for (const auto& container : container_list_response.BlobContainers) {
RETURN_NOT_OK(on_container(container));
}
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus("Failed to list account containers.", exception);
}
return Status::OK();
}

static FileInfo FileInfoFromBlob(const std::string& container,
const Azure::Storage::Blobs::Models::BlobItem& blob) {
auto path = internal::ConcatAbstractPath(container, blob.Name);
if (internal::HasTrailingSlash(blob.Name)) {
return DirectoryFileInfoFromPath(path);
}
FileInfo info{std::move(path), FileType::File};
info.set_size(blob.BlobSize);
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
return info;
}

static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
FileType::Directory};
}

static std::string_view BasenameView(std::string_view s) {
DCHECK(!internal::HasTrailingSlash(s));
auto offset = s.find_last_of(internal::kSep);
auto result = (offset == std::string_view::npos) ? s : s.substr(offset);
DCHECK(!result.empty() && result.back() != internal::kSep);
return result;
}

/// \brief List the blobs at the root of a container or some dir in a container.
///
/// \pre container_client is the client for the container named like the first
/// segment of select.base_dir.
Status GetFileInfoWithSelectorFromContainer(
const Azure::Storage::Blobs::BlobContainerClient& container_client,
const Azure::Core::Context& context, Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select, FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

bool found = false;
Azure::Storage::Blobs::ListBlobsOptions options;
if (internal::IsEmptyPath(base_location.path)) {
// If the base_dir is the root of the container, then we want to list all blobs in
// the container and the Prefix should be empty and not even include the trailing
// slash because the container itself represents the `<container>/` directory.
options.Prefix = {};
found = true; // Unless the container itself is not found later!
} else {
options.Prefix = internal::EnsureTrailingSlash(base_location.path);
}
options.PageSizeHint = page_size_hint;
options.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;

auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = internal::ConcatAbstractPath(
base_location.container, internal::RemoveTrailingSlash(blob_prefix));
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
return GetFileInfoWithSelectorFromContainer(
container_client, context, page_size_hint, sub_select, acc_results);
}
return Status::OK();
};

auto process_blob =
[&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
// blob.Name has trailing slash only when Prefix is an empty
// directory marker blob for the directory we're listing
// from, and we should skip it.
if (!internal::HasTrailingSlash(blob.Name)) {
acc_results->push_back(FileInfoFromBlob(base_location.container, blob));
}
};
auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
const auto path = internal::ConcatAbstractPath(base_location.container, prefix);
acc_results->push_back(DirectoryFileInfoFromPath(path));
return recurse(prefix);
};

try {
auto list_response =
container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, context);
for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
if (list_response.Blobs.empty() && list_response.BlobPrefixes.empty()) {
continue;
}
found = true;
// Blob and BlobPrefixes are sorted by name, so we can merge-iterate
// them to ensure returned results are all sorted.
size_t blob_index = 0;
size_t blob_prefix_index = 0;
while (blob_index < list_response.Blobs.size() &&
blob_prefix_index < list_response.BlobPrefixes.size()) {
const auto& blob = list_response.Blobs[blob_index];
const auto& prefix = list_response.BlobPrefixes[blob_prefix_index];
const int cmp = blob.Name.compare(prefix);
if (cmp < 0) {
process_blob(blob);
blob_index += 1;
} else if (cmp > 0) {
RETURN_NOT_OK(process_prefix(prefix));
blob_prefix_index += 1;
} else {
DCHECK_EQ(blob.Name, prefix);
RETURN_NOT_OK(process_prefix(prefix));
blob_index += 1;
blob_prefix_index += 1;
// If the container has an empty dir marker blob and another blob starting
// with this blob name as a prefix, the blob doesn't appear in the listing
// that also contains the prefix, so AFAICT this branch in unreachable. The
// code above is kept just in case, but if this DCHECK(false) is ever reached,
// we should refactor this loop to ensure no duplicate entries are ever
// reported.
DCHECK(false)
<< "Unexpected blob/prefix name collision on the same listing request";
}
}
for (; blob_index < list_response.Blobs.size(); blob_index++) {
process_blob(list_response.Blobs[blob_index]);
}
for (; blob_prefix_index < list_response.BlobPrefixes.size();
blob_prefix_index++) {
RETURN_NOT_OK(process_prefix(list_response.BlobPrefixes[blob_prefix_index]));
}
}
} catch (const Azure::Storage::StorageException& exception) {
if (exception.ErrorCode == "ContainerNotFound") {
found = false;
} else {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + select.base_dir + ": " +
container_client.GetUrl(),
exception);
}
}

return found || select.allow_not_found
? Status::OK()
: ::arrow::fs::internal::PathNotFound(select.base_dir);
}

public:
Status GetFileInfoWithSelector(const Azure::Core::Context& context,
Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select,
FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

if (base_location.container.empty()) {
// Without a container, the base_location is equivalent to the filesystem
// root -- `/`. FileSelector::allow_not_found doesn't matter in this case
// because the root always exists.
auto on_container =
[&](const Azure::Storage::Blobs::Models::BlobContainerItem& container) {
// Deleted containers are not listed by ListContainers.
DCHECK(!container.IsDeleted);

// Every container is considered a directory.
FileInfo info{container.Name, FileType::Directory};
info.set_mtime(
std::chrono::system_clock::time_point{container.Details.LastModified});
acc_results->push_back(std::move(info));

// Recurse into containers (subdirectories) if requested.
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = container.Name;
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
ARROW_RETURN_NOT_OK(GetFileInfoWithSelector(context, page_size_hint,
sub_select, acc_results));
}
return Status::OK();
};
return VisitContainers(context, std::move(on_container));
}

auto container_client =
blob_service_client_->GetBlobContainerClient(base_location.container);
return GetFileInfoWithSelectorFromContainer(container_client, context, page_size_hint,
select, acc_results);
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const AzureLocation& location,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
Expand Down Expand Up @@ -1196,7 +1399,12 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const std::string& path) {
}

Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& select) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
Azure::Core::Context context;
Azure::Nullable<int32_t> page_size_hint; // unspecified
FileInfoVector results;
RETURN_NOT_OK(
impl_->GetFileInfoWithSelector(context, page_size_hint, select, &results));
return {std::move(results)};
}

Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {
const AzureOptions& options, const io::IOContext& = io::default_io_context());

private:
explicit AzureFileSystem(const AzureOptions& options, const io::IOContext& io_context);
AzureFileSystem(const AzureOptions& options, const io::IOContext& io_context);

class Impl;
std::unique_ptr<Impl> impl_;
Expand Down
Loading

0 comments on commit 476c78f

Please sign in to comment.