Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feast API: Sources #633

Closed
woop opened this issue Apr 19, 2020 · 6 comments
Closed

Feast API: Sources #633

woop opened this issue Apr 19, 2020 · 6 comments

Comments

@woop
Copy link
Member

woop commented Apr 19, 2020

This issue can be used to discuss the role of sources in Feast, and how we see the concept evolving in future versions.

Status quo
Feast currently supports only a single Source type on KafkaSource. This can be defined through a Feature Set or omitted. If users omit the source, Feast Core will fill in a default for the user.

Contributor comment from @ches (link)

While I get the utility and convenience especially for deploying Feast with more distributed ownership in the organization, we ignore / work around a lot of the Source and Store configuration kept in the registry RDBMS, in favor of service discovery.

I also think that asking data scientists and data engineers to be concerned with operational infra configuration like Kafka broker addresses when registering feature sets is not an elegant separation of concerns.

Maybe we can take this as an opportunity to consider design alternatives as well.

@woop
Copy link
Member Author

woop commented Apr 19, 2020

we ignore / work around a lot of the Source and Store configuration kept in the registry RDBMS, in favor of service discovery.

By "we" do you mean your current team? I'd love to get a better understanding of how you are currently doing it, especially if it means we can improve sources or stores.

I also think that asking data scientists and data engineers to be concerned with operational infra configuration like Kafka broker addresses when registering feature sets is not an elegant separation of concerns.

I agree 100%.

Part of the reason why we made sources optional was exactly for this reason. We didn't want a data scientist to have to think about configuring a source. In hindsight this created a lot of complexity for us that I think we should have delayed, but here we are.

Sources today are a glorified "connection string". If a source was to only ever stay a connection string, then I don't really see the point in exposing that to users. This can easily be configured behind the scenes by administrators and exposed through source names or a more human friendly way. Users can then select the source they want to use. If I understand correctly then this is the largest part of what you think is a bad separation of concern.

However, I do think there is value in having users configure some aspects of the sourcing of data. Perhaps source isn't the right term, but we can discuss that later.

Most of our data still lives in data lakes or data warehouses, which means federation is a natural next step. In a federated model we would probably opt to extend sources to allow new data sources to be accessed through Feast, especially without users having to export and reimport into Feast, and with Feast being lazy towards retrieval and exports (no long running jobs).

The next question is then, how does this influence the Feast API?

One extreme is the Uber approach towards data transformation. There are parts of that API that I like, and parts that I don't like. I think we are in agreement that we shouldn't be building a general purpose data transformation system, for example.

However, I see massive value in allowing users to define SQL queries. And this is the direction that I would like to take sources (or if not sources, another part of feature sets).

One of the main reasons why I see this as valuable is that all of our users are familiar with SQL. Using SQL improves the Feast user experience because they are able to validate and prove that their query works without Feast in the loop. They can then bring that query to Feast, publish it, and see the results. If there is a failure, then the problem is likely Feast. SQL is also supported by virtually all sources and stores.

name: driver_features
entities:
- name: driver_id
  type: INT64
features:
- name: avg_speed
  valueType: FLOAT
- name: avg_daily_trips
  valueType: FLOAT
sources:
  batch:
    type: BigQuerySQL
    sql: |
    SELECT driver_id, avg_speed, avg_daily_trips from `gcp_project.dataset.table`

There is a bit of leakiness here in that the project, dataset, and table are leaked through. In theory we could abstract away all of this SQL, but it would require a bit more development effort and detract from the development experience.

Would love to have your thoughts on this @ches.

@ches
Copy link
Member

ches commented Apr 19, 2020

Thanks for the thoughtful reply @woop. I didn't pay close attention to the 0.5 milestone on #632 so thanks for diverting it here, makes sense to open this discussion issue that parallels other API ones.

By "we" do you mean your current team? I'd love to get a better understanding of how you are currently doing it, especially if it means we can improve sources or stores.

Yes, at Agoda our Feast deployment is perhaps more centralized in the sense that:

  • There is one Store for warehouse (HDFS with Hive-interoperable engines)
  • There is one Store for online serving (Cassandra)
  • There is logically one Source Kafka topic for all feature ingestion (modulo internal idiosyncrasies not pertinent to the discussion)
  • Feast is offered to all clients as a managed service provided by one team, client teams do not deploy their own Stores or Serving API instances.

As I'm sure is true for Gojek too, some core entity types in our business domain have very high cardinality (e.g. customers). Most client teams serving online will use some features of these, and it isn't practical or economical for us to deploy many storage cluster islands that can support the scale. Cassandra is massively scalable; Kafka throughput is massively scalable; we have dedicated teams expert at doing those. There's also the merit that new clients don't need to provision new infrastructure to start using the system, this is one of the key problems we're solving from the status quo before Feast. (We track cost attribution in other ways, if anyone wonders about that aspect).

The one point above that I imagine could become more flexible over time is the Kafka topics, there may be use cases for special-purpose / priority ones, and I believe it should be straightforward to support that if the need arises.

That brings up a notable distinction in regard to the current Source configuration I think: if we did support this, it will be useful to declaratively associate feature sets with source topics (as Feast already allows). However, users will never need to think about the brokers, they will differ for the same topic name across DCs and our SDK wrapper and Ingestion get them from service discovery. I think this speaks to your thought that "there is value in having users configure some aspects of the sourcing of data".

Sources today are a glorified "connection string". If a source was to only ever stay a connection string, then I don't really see the point in exposing that to users. This can easily be configured behind the scenes by administrators and exposed through source names or a more human friendly way. Users can then select the source they want to use. If I understand correctly then this is the largest part of what you think is a bad separation of concern.

Yes I believe we're on the same page then. Roughly, an abstraction over operational or environmental details of infrastructure. Operators of a Feast deployment could plug service discovery into this abstraction, potentially.

Most of our data still lives in data lakes or data warehouses, which means federation is a natural next step. In a federated model we would probably opt to extend sources to allow new data sources to be accessed through Feast, especially without users having to export and reimport into Feast, and with Feast being lazy towards retrieval and exports (no long running jobs).

I feel the federation is an elegant idea in theory, but I'm initially skeptical of how it will work out in practice. Not to say it isn't worth trying or to discourage it, just would urge breaking it off to an MVP to validate without disrupting Feast's ongoing technical improvement and data model refinement—it could be a year spent rearchitecting for such a pivot in vision, with considerable risk that it doesn't work out well or serve users markedly better.

Some of my concerns with it:

We may learn differently with more experience, but at the outset in our org I think we are content to bring data into managed feature store storage. There's a cultural expectation that it is "special" data, expected to be subjected to higher quality standards, stable maintenance, etc. that federated sources may not.

However, I see massive value in allowing users to define SQL queries. And this is the direction that I would like to take sources (or if not sources, another part of feature sets).

One of the main reasons why I see this as valuable is that all of our users are familiar with SQL. Using SQL improves the Feast user experience because they are able to validate and prove that their query works without Feast in the loop. They can then bring that query to Feast, publish it, and see the results. If there is a failure, then the problem is likely Feast. SQL is also supported by virtually all sources and stores.

I'm on board with looking for ways to use SQL as an interface to the system. It does make barriers vanish for many potential users, especially data scientists/engineers/analysts who can contribute new data sets to the feature store without more specialized development knowledge/skills. Indeed something that has happened even before Feast going live for us was another team eagerly building an integration with an in-house ETL tool we have to move data between engines with—you guessed it—SQL expressions of the input. So we've already "solved" this, in a proprietary way and with some overhead of redundant import/exports that you refer to with federation.

We (at least I 😇) have a vision/dream of a streaming platform where users are expressing Beam/Flink/Spark/Whatever SQL, with ability to include/join feature store data, and (optionally) choosing to ingest results into the feature store in the same engine DAG (no extra hop out through Kafka or the like). In theory we are not that far from the query part, the data is already in tables the engine can make available in PCollections or their analogues.

I may have lost the course a little bit there, but hopefully it gives color to ubiquity of SQL.

@woop
Copy link
Member Author

woop commented Apr 19, 2020

at Agoda our Feast deployment is perhaps more centralized

This really strikes a chord on another note for me, perhaps outside of the Source discussion. I hope you will entertain this rant.

The notion of centralization versus decentralization. The question that I have in my mind is whether we ever even need multiple historical stores per Feast Core, or just one. I can see the need for multiple serving stores, but at least in the case of Gojek we can probably get away with just one.

Why is this important?

Well what happens when we move job management to Feast Serving? We will have a smart serving layer with its own persistence, and presumably Core will become nothing more than a registry. This makes a lot of sense in terms of local execution, access management, connectivity, safe upgrades, cost attribution.

However, it falls apart in another way. If a user ingests data, where do they ingest to? Do they ingest straight to one local Serving deployment or to topics managed centrally by Core? If Serving, then it means it's a localized ingestion that other stores don't benefit from. If Core, then it means Core still does data management (or stream management).

Also, where would a user interface go? If it sits on Core then which historical store's data does it show? Or does it show data from all stores? Users would find it very counter-intuitive if the feature store wasn't a single source of truth. The same holds true for statistics. We won't pull statistics from online stores, and we only run a single historical store, yet we are asking users to provide the store name because we theoretically support multiple historical stores.

Anyway, the bottom line is I think a lot of problems could be solved by only having a single historical store per Feast Core. If you keep poking holes at this you might wonder why Feast Serving (batch/historical) and Feast Core are even separate services in the first place.

and it isn't practical or economical for us to deploy many storage cluster islands that can support the scale.

I wish this was the case for us. We currently have to manage our own storage clusters unfortunately, which is becoming a little bit of a pain. We'll probably be doubling down on Redis Cluster once 0.5 goes live, and try to keep things as simple as possible.

However, users will never need to think about the brokers, they will differ for the same topic name across DCs and our SDK wrapper and Ingestion get them from service discovery. I think this speaks to your thought that "there is value in having users configure some aspects of the sourcing of data".

In some ways this ties to what I was saying above about having one historical store and what the architecture looks like when/if Core is made into a pure registry. Let's assume you have centrally registered feature sets, presumably you want to set configuration on the Serving deployment that can somehow uniquely configure the brokers, because Core isn't in the remote DC. However, that means ingestion doesn't make sense as a responsibility of Feast Core, it must then fall to Serving, unless you will have multiple Feast Core deployments. It feels a lot cleaner to me to have Feast Serving manage ingestion (and interpolate brokers based on local config) than have that centrally managed.

I feel the federation is an elegant idea in theory, but I'm initially skeptical of how it will work out in practice. Not to say it isn't worth trying or to discourage it, just would urge breaking it off to an MVP to validate without disrupting Feast's ongoing technical improvement and data model refinement—it could be a year spent rearchitecting for such a pivot in vision, with considerable risk that it doesn't work out well or serve users markedly better.

Given our track record of hitting deadlines, I am just as weary about scope creep here. Just to be clear, I would not pick up federation as the sole way of dealing with data, and I would not have it started if we can't build an MVP off the critical path.

"Jack of all trades, master of none" being difficult to give consistently optimal experience with multiple storage engines, e.g. not being able to push down filters and joins across stores. See also areas like #444 and more specifically the data locality-related discussion on #482 that I think possibly belongs under #444.

This is a valid concern. I'd want to limit the scope of this implementation as much as possible, and potentially add constraints. Let's take the BQ example from earlier. MVP: For every user that created a feature set with an external BQ source we create (1) a BigQuery view to the external table and (2) jobs that occasionally update online stores from this view.

What does that give us? Well it saves us a lot of money on Dataflow jobs and means we can manage less infrastructure. It means users don't have to ingest data (and can simply plug in their queries). On both online and batch serving nothing changes.

Harder (and less efficient) to lazily impose data quality measures on data at rest.
This is true, but there is also a benefit that externalizing data ownership means the feature store doesn't become the source of truth. It reduces the operational burden by allowing the feature store team to be able to "rebuild" their stores without much risk.

Hard to rely on sustained availability of data from sources controlled by many owners—producers controlling fate of consumers without a safety buffer.

I agree with this, but this problem doesn't go away in Gojek's case at least. Essentially we are adding one more step to "federation" by having the user extract the data and publish it. However, if the user isn't savvy then they will just propagate the problems from the source straight to Feast. Then, when something goes wrong they will ask why Feast doesn't have the right data. Tracing this back upstream then becomes harder because Feast doesn't have any context outside of rows landing on a stream.

I'm not saying one approach is definitely superior, but I do think that there are trade-offs here.

risk that it doesn't work out well or serve users markedly better

I think federation in and of itself doesn't really add much value to end-users. It adds value to the operators. However Feast implements "external data sourcing" can change over time, as long as the development API that we expose is more useful to our users, then they will be happy. I can see that being the case with us exposing SQL through sources, especially on the batch only side where our users are less savvy.

There's a cultural expectation that it is "special" data, expected to be subjected to higher quality standards, stable maintenance, etc. that federated sources may not.

I like this. I think we have touched on this idea before. I am hoping that the tooling that we develop on statistics and validation will eventually lend itself to higher trust from our users here, but many of our data scientists have asked for more tools around ingestion. They don't want to push bad data into Feast, so they want to know whether it is good or bad prior to pushing. Or they want tools to undo a bad ingestion once it fails or if the statistics look off. Something to ponder about.

We (at least I innocent) have a vision/dream of a streaming platform where users are expressing Beam/Flink/Spark/Whatever SQL, with ability to include/join feature store data, and (optionally) choosing to ingest results into the feature store in the same engine DAG (no extra hop out through Kafka or the like). In theory we are not that far from the query part, the data is already in tables the engine can make available in PCollections or their analogues.

This sounds somewhat similar to what we have internally right now, but the integration with Feast does require that extra hop through Kafka.

The value add I can see right now is not so much on the streaming side, however. Even though that aspect is a dream state to aim for. We have enough tools to allow users to create new streams and streaming transformations and get them into Feast. I think the part that is still a bit painful is the export/import flow, but perhaps that can be made easier through better controls at ingestion time.

@ches
Copy link
Member

ches commented May 11, 2020

The value add I can see right now is not so much on the streaming side, however. Even though that aspect is a dream state to aim for. We have enough tools to allow users to create new streams and streaming transformations and get them into Feast. I think the part that is still a bit painful is the export/import flow, but perhaps that can be made easier through better controls at ingestion time.

Oh yeah, I should have made clearer that in that dream vision I don't see Feast being the whole thing, but a component of it.

@woop
Copy link
Member Author

woop commented Jun 3, 2020

I have been thinking about some recurring issues related to sources, job management, stores, and streams. These issues are not singular but they have bled through the questions people ask in various proposals.

Please see this document for architecture diagrams that illustrate the disinctions that I am making.

Problems

  1. Disconnect between pre- and post-stream ingestion jobs (producer, consumer/populator). Users push into a void (stream), but there is no continuity between the data a user pushes and the jobs that consume/populate stores. If failure happens our users dont see feedback, and its even hard for us to provide that. They have no way to debug ingestion.
  2. Adding more online serving instances requires an increasing amount of complex population/consumption jobs to be managed by Feast Core. These jobs only fulfill ½ of the ingestion process, and all of the complexity is after (not prior) to the streams. This means we have to use Beam between the feature streams and stores that they write to, as opposed to simple consumers.
  3. The idea of a Source is confusing as its implemented. As implemented it is the source of a job that writes into a store, but the original idea was that the source would be external to Feast and the destination sink would be the Feature Stream. However as implemented, the Source is sometimes the Feature Stream and sometimes an external stream, which means we dont differentiate the Feast Kafka stream from other streams. This means that implicitly there is an explosion of complex population jobs if there is an explosion of consumers, but much of this logic only needs to be applied once on a stream, not for every consumers (Feast Serving).
  4. Moving large amounts of data through streams is wasteful if it isnt used for online serving.

Proposal

  1. Impose a limit of only a single historical store per Feast deployment. This solves multiple problems like where to generate batch statistics, where to point a user interface at (as opposed to selecting one of many). But this constraint is imposed for ingestion purposes in this context.
  2. Ingestion is the act of taking data from a source and writing it into a sink. Sources and sinks are now defined as:
    1. Source types
      1. Anything (bounded, unbounded).
    2. Sink types
      1. The historical warehouse store (BQ, Hive). There will only be 1.
      2. Feature stream
  3. An ingestion job writes to both the stream and historical store. This job is a Beam/Spark job that is triggered through the Feast API (eventually). It could also be two jobs, but that should be abstracted from the user.
  4. An ingestion job is successful when it has written to both sinks. User gets feedback immediately. This solves the problem of user feedback and duality of ingestion jobs.
  5. Ingestion jobs contain all complexity (statistics generation, stream transforms, validation).
  6. Data gets into online stores through the Feature Stream. Data is pulled into the online serving databases (Redis for example) through consumers jobs.
    1. These consumer jobs are deployed alongside each Feast Serving deployment as side cars or they run in process with Feast Serving.
    2. Does not run Beam and does not require complex ingestion jobs on Dataflow. They run local consumers.
    3. They stream in data only from the central Feature Stream. Can be deployed by any team, even end users. Any person that can stand up a docker container can hook into the stream and populate their online serving instance.
    4. Topics are determined for consumption by querying Feast Core.
    5. Feast has no awareness of serving deployments and they don't affect ingestion.
    6. Users can still connect to serving deployments through the SDK in order to see the state of ingestion, especially as it relates to specific ingestion ids.

Please see this document for more details on this proposal.

@stale
Copy link

stale bot commented Aug 2, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Aug 2, 2020
@woop woop added the keep-open label Aug 2, 2020
@stale stale bot removed the wontfix This will not be worked on label Aug 2, 2020
@woop woop removed the keep-open label Feb 8, 2021
@woop woop closed this as completed Feb 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants