Skip to content

Commit

Permalink
ARROW-7058: [C++] FilSystemDataSourceDiscovery should apply partition…
Browse files Browse the repository at this point in the history
… schemes relative to its base dir
  • Loading branch information
bkietz committed Nov 4, 2019
1 parent b07f5cd commit 1bae5ff
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 32 deletions.
21 changes: 9 additions & 12 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,21 @@ namespace arrow {
namespace dataset {

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
fs::FileSystem* filesystem, fs::Selector selector, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format)
: fs_(filesystem), files_(std::move(files)), format_(std::move(format)) {}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
out->reset(new FileSystemDataSourceDiscovery(filesystem, files, format));
return Status::OK();
}
: fs_(filesystem),
selector_(std::move(selector)),
files_(std::move(files)),
format_(std::move(format)) {}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
fs::Selector selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
std::vector<fs::FileStats> files;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &files));
return Make(filesystem, files, format, out);
out->reset(new FileSystemDataSourceDiscovery(filesystem, selector, files, format));
return Status::OK();
}

static inline Status InspectSchema(fs::FileSystem* fs,
Expand Down Expand Up @@ -85,7 +81,8 @@ Status FileSystemDataSourceDiscovery::Finish(std::shared_ptr<DataSource>* out) {
PathPartitions partitions;

if (partition_scheme_ != nullptr) {
RETURN_NOT_OK(ApplyPartitionScheme(*partition_scheme_, files_, &partitions));
RETURN_NOT_OK(
ApplyPartitionScheme(*partition_scheme_, selector_, files_, &partitions));
}

return FileSystemBasedDataSource::Make(fs_, files_, root_partition(),
Expand Down
7 changes: 2 additions & 5 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
/// of fs::FileStats or a fs::Selector.
class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery {
public:
static Status Make(fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);

static Status Make(fs::FileSystem* filesystem, fs::Selector selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);
Expand All @@ -102,11 +98,12 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
Status Finish(std::shared_ptr<DataSource>* out) override;

protected:
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem,
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem, fs::Selector selector,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format);

fs::FileSystem* fs_;
fs::Selector selector_;
std::vector<fs::FileStats> files_;
std::shared_ptr<FileFormat> format_;
};
Expand Down
32 changes: 21 additions & 11 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/dataset/partition.h"
#include "arrow/dataset/test_util.h"
#include "arrow/filesystem/test_util.h"

Expand All @@ -31,16 +32,11 @@ class FileSystemDataSourceDiscoveryTest : public TestFileSystemBasedDataSource {
void MakeDiscovery(const std::vector<fs::FileStats>& files) {
MakeFileSystem(files);
ASSERT_OK(
FileSystemDataSourceDiscovery::Make(fs_.get(), files, format_, &discovery_));
}

void MakeDiscovery(const std::vector<fs::FileStats>& files, fs::Selector selector) {
MakeFileSystem(files);
ASSERT_OK(
FileSystemDataSourceDiscovery::Make(fs_.get(), selector, format_, &discovery_));
FileSystemDataSourceDiscovery::Make(fs_.get(), selector_, format_, &discovery_));
}

protected:
fs::Selector selector_;
std::shared_ptr<DataSourceDiscovery> discovery_;
std::shared_ptr<FileFormat> format_ = std::make_shared<DummyFileFormat>();
};
Expand All @@ -53,15 +49,29 @@ TEST_F(FileSystemDataSourceDiscoveryTest, Basic) {
}

TEST_F(FileSystemDataSourceDiscoveryTest, Selector) {
// This test ensure that the Selector is enforced.
fs::Selector selector;
selector.base_dir = "A";
MakeDiscovery({fs::File("0"), fs::File("A/a")}, selector);
selector_.base_dir = "A";
MakeDiscovery({fs::File("0"), fs::File("A/a")});

ASSERT_OK(discovery_->Finish(&source_));
// "0" doesn't match selector, so it has been dropped:
AssertFragmentsAreFromPath(source_->GetFragments(options_), {"A/a"});
}

TEST_F(FileSystemDataSourceDiscoveryTest, Partition) {
selector_.base_dir = "a=ignored/base";
MakeDiscovery(
{fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")});

auto partition_scheme =
std::make_shared<HivePartitionScheme>(schema({field("a", int32())}));

ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme));
ASSERT_OK(discovery_->Finish(&source_));

AssertFragmentsAreFromPath(source_->GetFragments(options_),
{selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"});
}

TEST_F(FileSystemDataSourceDiscoveryTest, Inspect) {
auto s = schema({field("f64", float64())});
format_ = std::make_shared<DummyFileFormat>(s);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ Result<std::shared_ptr<Expression>> HivePartitionScheme::Parse(
return ConvertPartitionKeys(GetUnconvertedKeys(path), *schema_);
}

Status ApplyPartitionScheme(const PartitionScheme& scheme,
Status ApplyPartitionScheme(const PartitionScheme& scheme, const fs::Selector& selector,
std::vector<fs::FileStats> files, PathPartitions* out) {
for (const auto& file : files) {
const auto& path = file.path();
const auto& path = file.path().substr(selector.base_dir.size());
std::shared_ptr<Expression> partition;
RETURN_NOT_OK(scheme.Parse(path, &partition));
out->emplace(path, std::move(partition));
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace arrow {

namespace fs {
struct FileStats;
}
struct Selector;
} // namespace fs

namespace dataset {

Expand Down Expand Up @@ -168,7 +169,7 @@ class ARROW_DS_EXPORT FunctionPartitionScheme : public PartitionScheme {
/// \brief Mapping from path to partition expressions.
using PathPartitions = std::unordered_map<std::string, std::shared_ptr<Expression>>;

Status ApplyPartitionScheme(const PartitionScheme& scheme,
Status ApplyPartitionScheme(const PartitionScheme& scheme, const fs::Selector& selector,
std::vector<fs::FileStats> files, PathPartitions* out);

// TODO(bkietz) use RE2 and named groups to provide RegexpPartitionScheme
Expand Down

0 comments on commit 1bae5ff

Please sign in to comment.