Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-5512: [C++] Rough API skeleton for C++ Datasets API / framework #4483

Closed
wants to merge 7 commits into from

Conversation

wesm
Copy link
Member

@wesm wesm commented Jun 6, 2019

This is almost exclusively header files, so I caution all against debating small details like function signatures, names, or what kind of smart pointer to use (if any) in various places. Instead, does the high level structure seem reasonable (or at least, not horrible) as a starting point to do more work?

Some of the naming of things is inspired by related concepts in Apache Iceberg (incubating) (https://github.com/apache/incubator-iceberg), which is a vertically integrated dataset metastore and reading and writing system specialized for that metastore.

Here is the basic idea:

  • A Dataset (for reading, aka "scanning") consists of a schema (what kind of data you expect to receive) and one or more data sources
  • A DataSource abstractly yields an iterator of DataFragment
  • A DataFragment represents a roughly individual storage unit, like a file

As many interfaces involving collections are based around Iterators so that we have the option of implementating "lazy" Datasets that continue to discover their structure after we are already scanning. It is a common problem in data warehousing that creating a detailed manifest of what needs to be scanned grows linearly in time with the complexity of the dataset (e.g. the number of fragments).

I abstracted away the file-related logic from the high level interface since I would like to support other kinds of data sources other than file-based ones:

  • Flight streams: each endpoint from a DoGet operation in Flight corresponds to a DataFragment
  • Database-like clients: e.g. the results of a SQL query form a Fragment

There's some object layering issues that aren't worked out yet, and I think the only way to work them out is to work on implementation and refactor until we get things feeling right:

  • It is the job of a FileFormat implementation to translate between
  • Filtering can occur both at the Partition/Fragment level (i.e. "skip these files altogether") as well as at the post-materialization stage. In Iceberg these "post-materialization" filters are called "Residuals". For example, if the user wants filter1 & filter2 to be applied and only filter1 can be handled by the low-level file deserialization, we will have to apply filter2 against the unfiltered in-memory RecordBatch, returning the filtered RecordBatch to the user

As another matter, this objective of this framework is to draw a distinction between the Schema of a file and the Schema of the Dataset. This isn't reflected fully in the headers yet. To give an example, suppose that we wish to obtain a Dataset with schema

a: int64 nullable
b: double nullable
c: string nullable

When reading files in the Dataset, we might encounter fields are won't want, or fields that are missing. We must conform the physical data to the Dataset's desired Schema. Much of the hard labor will be in the file format implementations, so match up what's in the file with what the Dataset wants. We also must deal with other kinds of schema normalization issues, like one Parquet file having a field as "non-nullable" when the desired schema is "nullable".

Inferring the Schema of a Dataset when you don't know it outright is a whole separate matter. If you go to Scan a dataset without knowing it, you must necessarily do some amount of inference up front or just prior to scanning. We will need to offer both "low effort" (look at some, but not files, and do not expend too much energy on it -- e.g. in the case of CSV files you may reach a conclusion without parsing an entire file) and "high effort / exhaustive" Schema inference.

As far as the actual Scan execution we are likely to immediately suffer some thread scheduling issues when trying to Scan files in parallel as internally IO and CPU work is coordinated. The file reader implementations have their own internal parallelism so that's something to contemplate as well.

In any case, I suggest we start small by creating minimalistic interfaces to CSV and Parquet files to start, implement simple dataset discovery as we have now in pyarrow/parquet.py, but a bit more general, and then we can investigate the various more advanced features as described in https://docs.google.com/document/d/1bVhzifD38qDypnSjtf8exvpP3sSB5x_Kw9m-n66FB2c/edit piece by piece.

@wesm
Copy link
Member Author

wesm commented Jun 6, 2019

@pitrou @bkietz I posit that the csv/ and json/ subprojects could be nested under src/arrow/dataset as this project proceeds. I'd like to move src/parquet/arrow there also but that might be trickier since we have more users of that project who would be expecting to find the symbols in libparquet.so


/// \brief A granular piece of a Dataset, such as an individual file,
/// which can be read/scanned separately from other fragments
class ARROW_DS_EXPORT DataFragment {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the new macro?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This produces libarrow_dataset.so, which is a different shared library, so needs a different export header. One of the reasons to have a different shared library is that it needs to link to libparquet.so

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... The dependency relationships between Arrow and Parquet are already complicated (PyArrow depends on Parquet which depends on Arrow).

What if we make file formats pluggable (with some kind of registry for file extensions for example, or something else) and so Parquet would extend the datasets registry in libparquet.so without Arrow knowing about it upfront?

@emkornfield
Copy link
Contributor

@wesm at a high level this seems reasonable to me, but I'm refraining on nit-picking individual APIs per your request :)

One question I had on the iterator concept, is as you mentioned it might take a while to iterate through all fragments that need to be loaded. Does it make sense to expose some level of parallelism in the APIs to allow clients to consume fragments more quickly, or do you imagine this will happen behind the scenes?

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is difficult for me to understand how the different concepts are supposed to be used at the end. Is the user supposed to build all the scaffolding by hand? Or is there some automation behind the hood?

Some concepts are a bit vague to me. "Scan" == "read in full" or "walk directories for metadata"? Why do we have fragments and partitions and datasources and filesources?

I think it would help to start from a simple concrete example (tree of CSV files?) and explain what the different concepts relate to. This might also indicate that some concepts duplicate essentially the same underlying notion?

@nealrichardson

class ARROW_DS_EXPORT DataSource {
public:
enum Type {
SIMPLE, // Flat collection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? A single file/fragment? Also, why is it useful for the caller to know this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a placeholder. Let's actually implement some things

class ARROW_DS_EXPORT DataFragment {
public:
/// \brief Return true if the fragment can benefit from parallel
/// scanning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what that means. Scanning == reading the whole fragment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placeholder. Let's implement some splittable DataFragments (e.g. Parquet) and then figure out what APIs we need

const DataSelector& selector) = 0;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the difference, since DataSource is already able to give out a DataFragment iterator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is not yet complete -- this is intended as a concrete subclass. Note that DataSource has pure virtual functions while this one's is marked "override"


/// \param[in] source one or more input data sources
/// \param[in] schema a known schema to conform to, may be nullptr
explicit Dataset(const std::vector<std::shared_ptr<DataSource>>& sources,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... a DataSource can already yield multiple fragments, is there a reason to have a similar thing happening here?

At worse, if we want to combine DataSources together, shouldn't we have a CompositeDataSource instead? (that would be the moral equivalent of itertools.chain perhaps?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, whatever we want. I don't think it's important to resolve this detail at this stage


std::shared_ptr<Schema> schema() const { return schema_; }

/// \brief Compute consensus schema from input data sources
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different than the schema given in the constructor? Or it's only when that schema is null?

Also, it seems all reading should happen within a Scanner, so is DataSet the right place for an inference function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where is the right place to handle the "objective schema" and where to do the workflow of:

  • Read this dataset (but I'm not sure what the schema is)
  • Infer the schema
  • Use the inferred schema to scan the dataset

Maybe the schema can be passed at part of the ScannerBuilder. I'm not sure. Let's work on implementing things

};

/// \brief Read record batches from a range of a single data fragment
class ARROW_DS_EXPORT ScanTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why this is exposed. Does the user have to do with the details of invoking Scanner and ScanTask? Also, what is the logical distinction between Scanner and ScanTask? If you invoke ScanTasks in parallel, then you get the record batches in arbitrary order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object is used for the implementation of scanning. Developers do not have to interact with this unless they want to (e.g. if they are scheduling scanning tasks themselves)

In general in a Dataset, the order of the record batches often does not matter (in database systems this is the norm). If it does, then you should track the order in which ScanTask are produced from the Dataset and preserve that yourself

class PartitionKey {
public:
const std::vector<std::string>& fields() const { return fields_; }
const std::vector<std::shared_ptr<Scalar>>& values() const { return values_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what PartitionKey implements. Just AND(field==value for field, value in zip(fields, values))?

Copy link
Member Author

@wesm wesm Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's probably just it. This part is not complete, I presume that generalization will be required as time goes on. In the simple case partitions have a single field and value (that's for Hive-style partitioning, but we already know that this is not general enough -- see Iceberg's partitioning scheme for example)

};

/// \brief Container for a dataset partition, which consists of a
/// partition identifier, subpartitions, and some data fragments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing some high-level concepts here. A partition can contain several fragments? Until now I assumed partition and fragment were basically similar things. But apparently they're hierarchically organized?

I think it would help if you sketched out a simple example (e.g. a directory hierarchy of CSV files) and explained to what part of the example concretely each concept maps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fragment corresponds semantically to a single file, or a row group in side a Parquet file. It is an indivisible unit of data that can be scanned.

A partition contains a collection of fragments.

I'll add an example to the comments in the header file

};

/// \brief Main interface for
class ARROW_DS_EXPORT Scanner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the scanner responsible only for scanning through directories? Is the actual file / fragment reading handled somewhere else?

I think I would expect some kind of FragmentReader interface at some point, or perhaps that is what DataFragment is for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's what DataFragment is for. See FileBasedDataFragment -- it has a reference to an implementation of FileFormat which has the logic for scanning that kind of file

namespace arrow {
namespace dataset {

/// \brief A granular piece of a Dataset, such as an individual file,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this class is used for. Can the user do something with it, apart from inspecting its properties?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a data structure for representing the internal structure of the dataset. Users in general will not have to interact with this data structure unless they wish to explore the physical topology of the dataset (e.g. iterating through partitions and listing files)

@wesm
Copy link
Member Author

wesm commented Jun 6, 2019

Some concepts are a bit vague to me. "Scan" == "read in full" or "walk directories for metadata"? Why do we have fragments and partitions and datasources and filesources?

So "Scan" is essentially the universally-used term to describe "reading a dataset" in database systems. If you look at any database system, you will see "scan", "scan node", "scanner", etc. all referring to the process of iterating through materialized chunks of a dataset. See for example the HdfsScanner base class that Impala uses for all of its data source interfaces

https://github.com/apache/impala/blob/master/be/src/exec/hdfs-scanner.h#L74

It is difficult for me to understand how the different concepts are supposed to be used at the end. Is the user supposed to build all the scaffolding by hand? Or is there some automation behind the hood?

The API is currently lacking in user-facing API endpoints, which are responsible to assembling the data structures and providing a ready-to-scan Dataset. I don't know yet what these APIs need to look like until we actually implement all of the parts of the complete workflow and stitch them together.

I'll respond to some of your other comments.

@wesm
Copy link
Member Author

wesm commented Jun 12, 2019

I'm going to add a few more code comments, get the CI passing, and then merge this if there is no objection. I will begin opening follow up JIRA issues and tag them with "dataset" label so that this project can move forward

@wesm wesm force-pushed the datasets-api-prototype branch from 461277b to 2f6440a Compare June 12, 2019 20:21
@wesm
Copy link
Member Author

wesm commented Jun 12, 2019

Updated this. Will merge on passing build

@wesm
Copy link
Member Author

wesm commented Jun 12, 2019

Appveyor build on my fork, will keep an eye on https://ci.appveyor.com/project/wesm/arrow/builds/25238050

@pitrou
Copy link
Member

pitrou commented Jun 13, 2019

Updated this. Will merge on passing build

Sounds fine to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants