Skip to content

Commit

Permalink
make base_dir of partition schemes explicitly optional
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Nov 5, 2019
1 parent f2f9689 commit c0edfa5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 15 deletions.
26 changes: 21 additions & 5 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,37 @@ namespace arrow {
namespace dataset {

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystem* filesystem, fs::Selector selector, std::vector<fs::FileStats> files,
fs::FileSystem* filesystem, std::string base_dir, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format)
: fs_(filesystem),
selector_(std::move(selector)),
base_dir_(std::move(base_dir)),
files_(std::move(files)),
format_(std::move(format)) {}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::string base_dir,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
out->reset(
new FileSystemDataSourceDiscovery(filesystem, std::move(base_dir), files, format));
return Status::OK();
}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
return Make(filesystem, "", std::move(files), std::move(format), out);
}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
fs::Selector selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
std::vector<fs::FileStats> files;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &files));
out->reset(new FileSystemDataSourceDiscovery(filesystem, selector, files, format));
return Status::OK();
return Make(filesystem, std::move(selector.base_dir), std::move(files), format, out);
}

static inline Status InspectSchema(fs::FileSystem* fs,
Expand Down Expand Up @@ -82,7 +98,7 @@ Status FileSystemDataSourceDiscovery::Finish(std::shared_ptr<DataSource>* out) {

if (partition_scheme_ != nullptr) {
RETURN_NOT_OK(
ApplyPartitionScheme(*partition_scheme_, selector_, files_, &partitions));
ApplyPartitionScheme(*partition_scheme_, base_dir_, files_, &partitions));
}

return FileSystemBasedDataSource::Make(fs_, files_, root_partition(),
Expand Down
12 changes: 10 additions & 2 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
/// of fs::FileStats or a fs::Selector.
class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery {
public:
static Status Make(fs::FileSystem* filesystem, std::string base_dir,
std::vector<fs::FileStats> files, std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);

static Status Make(fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);

static Status Make(fs::FileSystem* filesystem, fs::Selector selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);
Expand All @@ -98,12 +106,12 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
Status Finish(std::shared_ptr<DataSource>* out) override;

protected:
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem, fs::Selector selector,
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem, std::string base_dir,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format);

fs::FileSystem* fs_;
fs::Selector selector_;
std::string base_dir_;
std::vector<fs::FileStats> files_;
std::shared_ptr<FileFormat> format_;
};
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,20 @@ Result<std::shared_ptr<Expression>> HivePartitionScheme::Parse(
return ConvertPartitionKeys(GetUnconvertedKeys(path), *schema_);
}

Status ApplyPartitionScheme(const PartitionScheme& scheme, const fs::Selector& selector,
Status ApplyPartitionScheme(const PartitionScheme& scheme,
std::vector<fs::FileStats> files, PathPartitions* out) {
return ApplyPartitionScheme(scheme, "", std::move(files), out);
}

Status ApplyPartitionScheme(const PartitionScheme& scheme, const std::string& base_dir,
std::vector<fs::FileStats> files, PathPartitions* out) {
for (const auto& file : files) {
// XXX is this the right way to drop the base dir?
DCHECK(std::equal(selector.base_dir.begin(), selector.base_dir.end(),
file.path().begin()));
const auto& path = file.path().substr(selector.base_dir.size());
if (file.path().substr(0, base_dir.size()) != base_dir) continue;
auto path = file.path().substr(base_dir.size());

std::shared_ptr<Expression> partition;
RETURN_NOT_OK(scheme.Parse(path, &partition));
out->emplace(path, std::move(partition));
out->emplace(std::move(path), std::move(partition));
}

return Status::OK();
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ class ARROW_DS_EXPORT FunctionPartitionScheme : public PartitionScheme {
/// \brief Mapping from path to partition expressions.
using PathPartitions = std::unordered_map<std::string, std::shared_ptr<Expression>>;

Status ApplyPartitionScheme(const PartitionScheme& scheme, const fs::Selector& selector,
Status ApplyPartitionScheme(const PartitionScheme& scheme,
std::vector<fs::FileStats> files, PathPartitions* out);

Status ApplyPartitionScheme(const PartitionScheme& scheme, const std::string& base_dir,
std::vector<fs::FileStats> files, PathPartitions* out);

// TODO(bkietz) use RE2 and named groups to provide RegexpPartitionScheme
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct ARROW_EXPORT FileStats {
void set_type(FileType type) { type_ = type; }

/// The full file path in the filesystem
std::string path() const { return path_; }
const std::string& path() const { return path_; }
void set_path(const std::string& path) { path_ = path; }

/// The file base name (component after the last directory separator)
Expand Down

0 comments on commit c0edfa5

Please sign in to comment.