Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Streaming ingestion (pull based) #16495

Open
yupeng9 opened this issue Oct 27, 2024 · 22 comments
Open

[RFC] Streaming ingestion (pull based) #16495

yupeng9 opened this issue Oct 27, 2024 · 22 comments
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing

Comments

@yupeng9
Copy link

yupeng9 commented Oct 27, 2024

Is your feature request related to a problem? Please describe

Today, OpenSearch exposes an HTTP-based API for indexing, in which users invoke the endpoint to push changes. It also has a “_bulk” API to group multiple operations in a single request.

There are some shortcomings with the push-based API, especially for complex applications at scale:

  1. Ingestion spikes beyond the server capacity can result in request rejections and backpressure to the clients. Thus it adds complexity to the application producer to properly handle the back pressures.
  2. In complex applications, not all index requests are equally important. However, today there’s no good way to differentiate the index requests based on the priority with the current HTTP API. This issue is particularly challenging for heavy ingestion workloads.
  3. Some cases would require replaying the indexing requests. For example, the cluster is restored from a previous snapshot, and the ingestion history needs to be replayed. Another example is the live cluster migration in which the indexing needs to be applied to two clusters.
  4. In the segment-replication mode, translog is used for the durability of staged index changes before they are committed. However, excessive Translog can result in overflow under heavy ingestion. And in the document-replication mode, it needs to wait for the completion of the document. Neither translog nor synchronous replication is necessary in the streaming ingestion mode, because the streaming buffer can provide the durability guarantee.

Describe the solution you'd like

In general, a streaming ingestion solution could bring in multiple values and address the aforementioned challenges:

  1. By introducing a streaming buffer, it’s possible to decouple the producers (i.e. applications who generate index operations) and consumers (i.e. OpenSearch server). With that, the ingestion spike can be smoothed and buffered, without blocking the producer.
  2. With streaming ingestion, it’s possible to enhance it with priority-aware ingestion by employing multiple Kafka topics to isolate the events based on priority, and applying different rate-limit thresholds to prioritize important events.
  3. A streaming platform like Kafka stores messages in a durable log, and allows consumers to re-read messages from the past via offset. This is a powerful feature that offers reprocessing and replayability.
  4. A durable messaging system like Kafka can provide the durability guarantee of the messages and still maintain a high write throughput. Thus it’s possible to simplify the ingestion component in OpenSearch to delegate the message durability problem to the messaging system.

More details of the solution can be found in this document

Related component

Indexing

Describe alternatives you've considered

As an alternative, the plugin-based approach starts a streaming-ingester process as a sidecar to the OpenSearch server in the same host, which is described in this section.

Additional context

No response

@yupeng9 yupeng9 added enhancement Enhancement or improvement to existing feature or request untriaged labels Oct 27, 2024
@github-actions github-actions bot added the Indexing Indexing, Bulk Indexing and anything related to indexing label Oct 27, 2024
@RS146BIJAY
Copy link
Contributor

[Triage - attendees 1 2
@yupeng9 It seems this issue is same as RFC. Tagging @reta to share any thoughts on this.

@reta
Copy link
Collaborator

reta commented Oct 28, 2024

Thanks @RS146BIJAY , @yupeng9 please take a look at and comment on existing RFC, I am closing this one as a duplicate, the large amount of work to have streaming ingestion has been done.

@reta reta closed this as completed Oct 28, 2024
@reta
Copy link
Collaborator

reta commented Oct 28, 2024

@msfroh I have reread the proposal and it sounds more like pull based ingestion you've been discussing on multiple occasions, @yupeng9 I have closed the issue prematurely (sorry about that), however clarifying that this is a different model

@reta reta reopened this Oct 28, 2024
@reta reta changed the title [RFC] Streaming ingestion [RFC] Pull based Streaming ingestion Oct 28, 2024
@reta reta changed the title [RFC] Pull based Streaming ingestion [RFC] Streaming ingestion (pull based) Oct 28, 2024
@yupeng9
Copy link
Author

yupeng9 commented Oct 28, 2024

thanks. yes, this is different from adding streaming in HTTP protocol. It aims to pull from streaming systems like Kafka, Kinesis, Pulsar, Redpanda etc

@reta reta removed the untriaged label Oct 28, 2024
@Bukhtawar
Copy link
Collaborator

Thanks for starting the discussion. It would be interesting to see how we decouple partitions or streams from the shards espl with the work on online shard split.
+1 on the benefits of dropping translogs.

@sachinpkale
Copy link
Member

Thanks for the RFC @yupeng9. This would help bring the pro-active back-pressure to OpenSearch ingestion.

It aims to pull from streaming systems like Kafka, Kinesis, Pulsar, Redpanda etc

Does source need to be a streaming system? Can it be a database storing records and based on the checkpoint maintained, opensearch ingestion workers will be able to ingest new records. I understand that it won't be as efficient as, say Kafka, but would be more generic.

Also, if I compare this to Data Prepper (https://opensearch.org/docs/latest/data-prepper/), the advantage in pull based ingestion would be, it is inbuilt into server, so it would know the health and capacity of the server better. Is this correct understanding?

@msfroh
Copy link
Collaborator

msfroh commented Oct 30, 2024

Does source need to be a streaming system? Can it be a database storing records and based on the checkpoint maintained, opensearch ingestion workers will be able to ingest new records. I understand that it won't be as efficient as, say Kafka, but would be more generic.

Ultimately, as long as something supports the API described in the Google Doc (or at least whatever the final version of it is), we should be able to ingest.

I'm excited to try implementing a Parquet (or maybe JSON file in a blob store) source that would do a one-time import, where each shard fetches a subset of the input as fast as they can.

Something else I would like to support is a combined source that includes both a database and an event stream. One of the Amazon systems that I worked on previously had an upstream system of record implemented in DynamoDB. That system would receive partial updates for records and apply them to DynamoDB using optimistic locking. Once the (versioned) update succeeded, it would be sent on a Kinesis stream for live updates. The search system that I worked on would periodically rebuild the whole index by backfilling from DynamoDB while simultaneously processing live updates from Kinesis (to make sure that updates applied to records already read from DynamoDB would not be missed). Once the backfill was done, it would continue processing updates from Kinesis. From our perspective on the search side, we were just pulling documents from "somewhere" and didn't care whether they came from DynamoDB or Kinesis. A composable ingest source (that pulls from multiple ingest sources) should be pretty easy.

@gbbafna
Copy link
Collaborator

gbbafna commented Oct 30, 2024

Thanks @yupeng9 for the detailed RFC.

I liked the alternative approach of using Ingestion Plugin as well . That gels easily with current model of abstracting out shard from the end user as well . This makes easy for newer features like online shard split to easily carry out its activities without any disturbance to ingestion . As far as the cons listed out the request will be forwarded to the server hosting the primary shard and thus incur additional overhead , I am not sure how big of a magnitude that will be .

@ashking94
Copy link
Member

Thanks @yupeng9 for this detailed RFC. I really like the idea of using a Kafka-like queue between the producer and OpenSearch. This should take a lot of pressure off clients, so they don't need to build their own complex logic or queues when OpenSearch can't keep up with ingestion. I'm most familiar with Kafka, so I'll use that as an example, but the same ideas apply to other streaming systems too.

A couple things I'm curious about:

How flexible can we make the scaling between Kafka partitions and OS shards? Do we really need to stick to a 1:1 mapping, or can we let them scale independently? Both systems already have their own ways to scale, so it'd be cool if we could take advantage of that.

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective? It'd be great if we could keep the learning curve low for users. Also, how can we make sure the default setup works well for most use cases without needing a ton of tweaking?

Just thinking about how this might work in real-world setups. Looking forward to seeing how this develops!

@msfroh
Copy link
Collaborator

msfroh commented Oct 30, 2024

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective?

Check out the linked Google Doc. There's an interface that essentially gives OpenSearch an "iterator" over the ingest source. Configuring the ingest source is part of the index configuration and is handled by the ingest source plugin (so it's exactly as complicated as whatever the ingest source requires, but is opaque to OpenSearch itself).

How flexible can we make the scaling between Kafka partitions and OS shards? Do we really need to stick to a 1:1 mapping, or can we let them scale independently?

The main challenge is making sure that the producer and shards agree on the strategy used to route documents to stream partitions.

The approach I've seen work (though it's probably not the only solution) is a consistent hash-range strategy. Essentially, with N stream partitions, you divide your hash space into N contiguous ranges (and can update that split as N changes). The producer hashes the doc ID (or custom routing value), sees which range that lands in, and writes to the appropriate partition. On the shard side, if you have M shards, you similarly split the hash space into M ranges (and can update as M changes, like if you do an online shard split). When M and N are not multiples of one another, the partition ranges and shard ranges won't line up perfectly, but shards can read documents from all partitions whose ranges overlap the shard's range. Any documents outside the shard's range are ignored. I know at least two highly-scalable production search systems that use this strategy.

Anyway, we should be able to ship with a 1:1 mapping constraint to start (since that already helps from a scaling perspective). We can solve the N:M mapping case later (probably by providing configuration to communicate the document hashing strategy) and remove that constraint.

Just thinking about how this might work in real-world setups. Looking forward to seeing how this develops!

Using stream-based ingestion is very common in lots of real-world setups, including at least three different search systems that operate within Amazon. It's so much easier to scale than a system that pretends to be a database. You can also check out what Slack did with their Astra system for log search: https://www.youtube.com/watch?v=iZt-eL1GUKo

@andrross
Copy link
Member

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective?

I see this as an opportunity to lean into data prepper (or other ingestion tools) as the front door for end users. These tools allow for more sophisticated and flexible features as compared to what is possible to do in an indexing coordinator inside the cluster itself. The specifics of how that ingestion tool sends data to cluster then really is "under the hood" from the user's perspective.

@reta
Copy link
Collaborator

reta commented Oct 30, 2024

The main challenge is making sure that the producer and shards agree on the strategy used to route documents to stream partitions.

I think this other challenge in this category, and probably the harder one (that @yupeng9 and @Bukhtawar mentioned): how to keep the locality with the primary shard(s) in case relocation happens for whatever reasons (this would eliminate any network hops that other ingestion tools would exhibit).

@yupeng9
Copy link
Author

yupeng9 commented Oct 30, 2024

How flexible can we make the scaling between Kafka partitions and OS shards? Do we really need to stick to a 1:1 mapping, or can we let them scale independently? Both systems already have their own ways to scale, so it'd be cool if we could take advantage of that.

This is a good question. At Uber, our current Search systems in production already use pull-based ingestion, and we used 1:1 mapping between Kafka partitions and shards. Our learning is that this provides a lot of simplicity, as we can trace the data from search shard to the input Kafka partition sharing the same sharding key. So that we can build dashboards for observability, and toolings for debugaggbility. And yes, resharding is a very involved procedure for this setup. However, we realized that resharding is a very infrequent operation, and therefore we want to optimize the systems for frequent operations, and less so for infrequent ones as the latter can introduce significant complexity to the system.

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective? It'd be great if we could keep the learning curve low for users. Also, how can we make sure the default setup works well for most use cases without needing a ton of tweaking?

Yes, that's the intention. We want to make this feature pluggable, so that a separate index engine is used to ingest from streaming sources, and thus not disruptive to the existing indexing code flow. So when pull-based ingestion is enabled, we can disable the HTTP-based index API.

Just thinking about how this might work in real-world setups. Looking forward to seeing how this develops!

@yupeng9
Copy link
Author

yupeng9 commented Oct 30, 2024

Just thinking about how this might work in real-world setups. Looking forward to seeing how this develops!

Using stream-based ingestion is very common in lots of real-world setups, including at least three different search systems that operate within Amazon. It's so much easier to scale than a system that pretends to be a database. You can also check out what Slack did with their Astra system for log search: https://www.youtube.com/watch?v=iZt-eL1GUKo

I gave a talk at Community-over-code NA last year, and this deck shows the current Search architecture at Uber (slide 17) of how pull-based streaming is in use at Uber production env. It includes some other interesting features too, such as real-time indexing (compared to NRT in Lucene).

@yupeng9
Copy link
Author

yupeng9 commented Oct 30, 2024

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective?

I see this as an opportunity to lean into data prepper (or other ingestion tools) as the front door for end users. These tools allow for more sophisticated and flexible features as compared to what is possible to do in an indexing coordinator inside the cluster itself. The specifics of how that ingestion tool sends data to cluster then really is "under the hood" from the user's perspective.

That's the option described in the alternative solution. I feel a true native pull-based ingestion built into the cluster with a separate engine will achieve much better performance, as we already observed this in Uber production. We also use an ingester similar to logstash to ingest from Kafka into Elasticsearch. From our benchmark, the native pull-based ingestion not only showed much better ingestion throughput, but also reduced the significant cost by removing the compute resources of the ingesters.

@gaobinlong
Copy link
Collaborator

Just curious how do we store the mappings between partitions and shards, is that part of the cluster metadata? Another question is that for log scenario, in traditional way, users always write documents to an alias, data streams or a concrete index with date suffix, these targets rollover based on age, shard size or index size with an ISM policy, does Streaming ingestion have an impact on this usage?

@yupeng9
Copy link
Author

yupeng9 commented Oct 31, 2024

Just curious how do we store the mappings between partitions and shards, is that part of the cluster metadata? Another question is that for log scenario, in traditional way, users always write documents to an alias, data streams or a concrete index with date suffix, these targets rollover based on age, shard size or index size with an ISM policy, does Streaming ingestion have an impact on this usage?

In the initial phase, I don't plan to store these mappings, but go with the convention of 1:1 mapping between partitions and shards and have validation enforcing it. In future, we can relax this constraint

For 2nd question, do you mean if the retention policy from streaming systems shall be carried over to opensearch? I think these two are decoupled, and a separate TTL within OS controls the lifecycle of the data, which is independent from the streaming source.

@andrross
Copy link
Member

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective?

I see this as an opportunity to lean into data prepper (or other ingestion tools) as the front door for end users. These tools allow for more sophisticated and flexible features as compared to what is possible to do in an indexing coordinator inside the cluster itself. The specifics of how that ingestion tool sends data to cluster then really is "under the hood" from the user's perspective.

That's the option described in the alternative solution. I feel a true native pull-based ingestion built into the cluster with a separate engine will achieve much better performance, as we already observed this in Uber production. We also use an ingester similar to logstash to ingest from Kafka into Elasticsearch. From our benchmark, the native pull-based ingestion not only showed much better ingestion throughput, but also reduced the significant cost by removing the compute resources of the ingesters.

@yupeng9 I'm referring to how the data gets into the event stream (Kafka) in the first place. Users will likely need support for things like filtering, enriching, transforming, etc and that would be provided by a tool like data prepper, which would then write to the event stream. How the server pulls data from that event stream (i.e. either solution you described in the doc) becomes an implementation detail for the user as the front end API is the same no matter what. Does that make sense?

@yupeng9
Copy link
Author

yupeng9 commented Oct 31, 2024

Can we keep the queue stuff "under the hood" as much as possible from the OS cluster's perspective?

I see this as an opportunity to lean into data prepper (or other ingestion tools) as the front door for end users. These tools allow for more sophisticated and flexible features as compared to what is possible to do in an indexing coordinator inside the cluster itself. The specifics of how that ingestion tool sends data to cluster then really is "under the hood" from the user's perspective.

That's the option described in the alternative solution. I feel a true native pull-based ingestion built into the cluster with a separate engine will achieve much better performance, as we already observed this in Uber production. We also use an ingester similar to logstash to ingest from Kafka into Elasticsearch. From our benchmark, the native pull-based ingestion not only showed much better ingestion throughput, but also reduced the significant cost by removing the compute resources of the ingesters.

@yupeng9 I'm referring to how the data gets into the event stream (Kafka) in the first place. Users will likely need support for things like filtering, enriching, transforming, etc and that would be provided by a tool like data prepper, which would then write to the event stream. How the server pulls data from that event stream (i.e. either solution you described in the doc) becomes an implementation detail for the user as the front end API is the same no matter what. Does that make sense?

I see. I think that's a separate problem, and a nice thing about streaming system is to decouple the producers and consumers (i.e. OpenSearch). In the industry, there are various ways to produce to the streaming system. Streaming processing systems like Flink, Samza, Storm can be used for such filtering, enriching, and other kinds of preprocessing. And yes, data prepper is also one of them that can be enhanced for more native support.

@andrross
Copy link
Member

a nice thing about streaming system is to decouple the producers and consumers (i.e. OpenSearch)

Agreed! The point I'm making is that the way you keep the queue functionality "under the hood" is offering an end-to-end solution coupled with a producer. The OpenSearch Project could offer a complete, easy-to-use solution using data prepper. Managed vendors could offer solutions that make sense in their ecosystem. Companies like Uber would have the flexibility to integrate OpenSearch into existing systems that have completely different producers.

@yupeng9
Copy link
Author

yupeng9 commented Oct 31, 2024

a nice thing about streaming system is to decouple the producers and consumers (i.e. OpenSearch)

Agreed! The point I'm making is that the way you keep the queue functionality "under the hood" is offering an end-to-end solution coupled with a producer. The OpenSearch Project could offer a complete, easy-to-use solution using data prepper. Managed vendors could offer solutions that make sense in their ecosystem. Companies like Uber would have the flexibility to integrate OpenSearch into existing systems that have completely different producers.

Yes, makes a lot of sense. I believe lots more can be built/extended upon this feature

@arjunnambiartc
Copy link

Data Prepper today supports OpenSearch API termination and an Apache Kafka based persistent buffer. Hence, while managed vendors could offer solutions that bakes other streaming offerings like Kinesis, Google Pub Sub, Red Panda etc, it makes sense for the OpenSearch Project to offer Data Prepper as a first class citizen for pull based indexing. With the OpenSearch API termination in Data Prepper, clients do not even need to updated and the same HTTP push request can be intercepted by Data Prepper. This can then be written to the Kafka topics in Data Prepper and the shards can use the pull based indexing to read the data from Data Prepper. The other advantage is that Data Prepper already has native connectors to Kinesis, dynamoDB, Mongo, S3 and also supports ingesting data from HTTP clients like FluentBit/FluentD and also Otel shippers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
None yet
Development

No branches or pull requests