Skip to content

Commit

Permalink
ARROW-7058: [C++] FileSystemDataSourceDiscovery should apply partitio…
Browse files Browse the repository at this point in the history
…n schemes relative to its base dir

@nealrichardson

Closes #5772 from bkietz/7058-FileSystemDataSourceDisco and squashes the following commits:

c0edfa5 <Benjamin Kietzman> make base_dir of partition schemes explicitly optional
f2f9689 <Benjamin Kietzman> add DCHECK for path containing selector
1bae5ff <Benjamin Kietzman> ARROW-7058:  FilSystemDataSourceDiscovery should apply partition schemes relative to its base dir

Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: François Saint-Jacques <[email protected]>
  • Loading branch information
bkietz authored and fsaintjacques committed Nov 6, 2019
1 parent d7d4c5c commit d22f800
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 21 deletions.
23 changes: 18 additions & 5 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,37 @@ namespace arrow {
namespace dataset {

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
fs::FileSystem* filesystem, std::string base_dir, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format)
: fs_(filesystem), files_(std::move(files)), format_(std::move(format)) {}
: fs_(filesystem),
base_dir_(std::move(base_dir)),
files_(std::move(files)),
format_(std::move(format)) {}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::string base_dir,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
out->reset(new FileSystemDataSourceDiscovery(filesystem, files, format));
out->reset(
new FileSystemDataSourceDiscovery(filesystem, std::move(base_dir), files, format));
return Status::OK();
}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
return Make(filesystem, "", std::move(files), std::move(format), out);
}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
fs::Selector selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
std::vector<fs::FileStats> files;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &files));
return Make(filesystem, files, format, out);
return Make(filesystem, std::move(selector.base_dir), std::move(files), format, out);
}

static inline Status InspectSchema(fs::FileSystem* fs,
Expand Down Expand Up @@ -85,7 +97,8 @@ Status FileSystemDataSourceDiscovery::Finish(std::shared_ptr<DataSource>* out) {
PathPartitions partitions;

if (partition_scheme_ != nullptr) {
RETURN_NOT_OK(ApplyPartitionScheme(*partition_scheme_, files_, &partitions));
RETURN_NOT_OK(
ApplyPartitionScheme(*partition_scheme_, base_dir_, files_, &partitions));
}

return FileSystemBasedDataSource::Make(fs_, files_, root_partition(),
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
/// of fs::FileStats or a fs::Selector.
class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery {
public:
static Status Make(fs::FileSystem* filesystem, std::string base_dir,
std::vector<fs::FileStats> files, std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);

static Status Make(fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);
Expand All @@ -102,11 +106,12 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
Status Finish(std::shared_ptr<DataSource>* out) override;

protected:
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem,
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem, std::string base_dir,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format);

fs::FileSystem* fs_;
std::string base_dir_;
std::vector<fs::FileStats> files_;
std::shared_ptr<FileFormat> format_;
};
Expand Down
32 changes: 21 additions & 11 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/dataset/partition.h"
#include "arrow/dataset/test_util.h"
#include "arrow/filesystem/test_util.h"

Expand All @@ -31,16 +32,11 @@ class FileSystemDataSourceDiscoveryTest : public TestFileSystemBasedDataSource {
void MakeDiscovery(const std::vector<fs::FileStats>& files) {
MakeFileSystem(files);
ASSERT_OK(
FileSystemDataSourceDiscovery::Make(fs_.get(), files, format_, &discovery_));
}

void MakeDiscovery(const std::vector<fs::FileStats>& files, fs::Selector selector) {
MakeFileSystem(files);
ASSERT_OK(
FileSystemDataSourceDiscovery::Make(fs_.get(), selector, format_, &discovery_));
FileSystemDataSourceDiscovery::Make(fs_.get(), selector_, format_, &discovery_));
}

protected:
fs::Selector selector_;
std::shared_ptr<DataSourceDiscovery> discovery_;
std::shared_ptr<FileFormat> format_ = std::make_shared<DummyFileFormat>();
};
Expand All @@ -53,15 +49,29 @@ TEST_F(FileSystemDataSourceDiscoveryTest, Basic) {
}

TEST_F(FileSystemDataSourceDiscoveryTest, Selector) {
// This test ensure that the Selector is enforced.
fs::Selector selector;
selector.base_dir = "A";
MakeDiscovery({fs::File("0"), fs::File("A/a")}, selector);
selector_.base_dir = "A";
MakeDiscovery({fs::File("0"), fs::File("A/a")});

ASSERT_OK(discovery_->Finish(&source_));
// "0" doesn't match selector, so it has been dropped:
AssertFragmentsAreFromPath(source_->GetFragments(options_), {"A/a"});
}

TEST_F(FileSystemDataSourceDiscoveryTest, Partition) {
selector_.base_dir = "a=ignored/base";
MakeDiscovery(
{fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")});

auto partition_scheme =
std::make_shared<HivePartitionScheme>(schema({field("a", int32())}));

ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme));
ASSERT_OK(discovery_->Finish(&source_));

AssertFragmentsAreFromPath(source_->GetFragments(options_),
{selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"});
}

TEST_F(FileSystemDataSourceDiscoveryTest, Inspect) {
auto s = schema({field("f64", float64())});
format_ = std::make_shared<DummyFileFormat>(s);
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,18 @@ Result<std::shared_ptr<Expression>> HivePartitionScheme::Parse(

Status ApplyPartitionScheme(const PartitionScheme& scheme,
std::vector<fs::FileStats> files, PathPartitions* out) {
return ApplyPartitionScheme(scheme, "", std::move(files), out);
}

Status ApplyPartitionScheme(const PartitionScheme& scheme, const std::string& base_dir,
std::vector<fs::FileStats> files, PathPartitions* out) {
for (const auto& file : files) {
const auto& path = file.path();
if (file.path().substr(0, base_dir.size()) != base_dir) continue;
auto path = file.path().substr(base_dir.size());

std::shared_ptr<Expression> partition;
RETURN_NOT_OK(scheme.Parse(path, &partition));
out->emplace(path, std::move(partition));
out->emplace(std::move(path), std::move(partition));
}

return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace arrow {

namespace fs {
struct FileStats;
}
struct Selector;
} // namespace fs

namespace dataset {

Expand Down Expand Up @@ -171,6 +172,9 @@ using PathPartitions = std::unordered_map<std::string, std::shared_ptr<Expressio
Status ApplyPartitionScheme(const PartitionScheme& scheme,
std::vector<fs::FileStats> files, PathPartitions* out);

Status ApplyPartitionScheme(const PartitionScheme& scheme, const std::string& base_dir,
std::vector<fs::FileStats> files, PathPartitions* out);

// TODO(bkietz) use RE2 and named groups to provide RegexpPartitionScheme

} // namespace dataset
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct ARROW_EXPORT FileStats {
void set_type(FileType type) { type_ = type; }

/// The full file path in the filesystem
std::string path() const { return path_; }
const std::string& path() const { return path_; }
void set_path(const std::string& path) { path_ = path; }

/// The file base name (component after the last directory separator)
Expand Down

0 comments on commit d22f800

Please sign in to comment.