Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrency to addIndexes(CodecReader…) API [LUCENE-10216] #11253

Closed
asfimport opened this issue Nov 2, 2021 · 17 comments
Closed

Add concurrency to addIndexes(CodecReader…) API [LUCENE-10216] #11253

asfimport opened this issue Nov 2, 2021 · 17 comments

Comments

@asfimport
Copy link

asfimport commented Nov 2, 2021

I work at Amazon Product Search, and we use Lucene to power search for the e-commerce platform. I’m working on a project that involves applying metadata+ETL transforms and indexing documents on n different indexing boxes, combining them into a single index on a separate reducer box, and making it available for queries on m different search boxes (replicas). Segments are asynchronously copied from indexers to reducers to searchers as they become available for the next layer to consume.

I am using the addIndexes API to combine multiple indexes into one on the reducer boxes. Since we also have taxonomy data, we need to remap facet field ordinals, which means I need to use the addIndexes(CodecReader…) version of this API. The API leverages SegmentMerger.merge() to create segments with new ordinal values while also merging all provided segments in the process.

This is however a blocking call that runs in a single thread. Until we have written segments with new ordinal values, we cannot copy them to searcher boxes, which increases the time to make documents available for search.

I was playing around with the API by creating multiple concurrent merges, each with only a single reader, creating a concurrently running 1:1 conversion from old segments to new ones (with new ordinal values). We follow this up with non-blocking background merges. This lets us copy the segments to searchers and replicas as soon as they are available, and later replace them with merged segments as background jobs complete. On the Amazon dataset I profiled, this gave us around 2.5 to 3x improvement in addIndexes() time. Each call was given about 5 readers to add on average.

This might be useful add to Lucene. We could create another addIndexes() API with a boolean flag for concurrency, that internally submits multiple merge jobs (each with a single reader) to the ConcurrentMergeScheduler, and waits for them to complete before returning.

While this is doable from outside Lucene by using your thread pool, starting multiple addIndexes() calls and waiting for them to complete, I felt it needs some understanding of what addIndexes does, why you need to wait on the merge and why it makes sense to pass a single reader in the addIndexes API.

Out of box support in Lucene could simplify this for folks a similar use case.


Migrated from LUCENE-10216 by Vigya Sharma (@vigyasharma), resolved Aug 01 2022
Sub-tasks:

Pull requests: #633

@asfimport
Copy link
Author

Adrien Grand (@jpountz) (migrated from JIRA)

One bit from this proposal I'm not fully comfortable with is the fact that Lucene would merge each reader independently when the flag is set. While I understand why this makes sense for Amazon product search since you want to have many segments to make search as concurrent as possible, this is also a bit wasteful since you'd be paying the cost of doing a merge but without getting the benefits of reducing the segment count.

What about a slightly different proposal where merges would always be executed via the configured merge scheduler, and addIndexes(CodecReader[]) would get an additional boolean doWait parameter like IndexWriter#forceMerge?

Then you could still do what you want by using ConcurrentMergeScheduler, passing codec readers one by one to CodecReader#addIndexes with doWait=false and get resulting remapped segments as quickly as possible. Yet Lucene would retain the current behaviour where addIndexes also tries to avoid doing wasteful singleton merges?

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

Thanks for going through the proposal, @jpountz.

One bit from this proposal I'm not fully comfortable with is the fact that Lucene would merge each reader independently when the flag is set.

I understand your concern, I am also a bit iffy about losing the opportunity to merge segments..

Ideally, I would like to land on a sweet middle ground - not run single segment merges, but also not add all provided segments into a single merge. Leverage some concurrency within addIndexes, and let background merges bring the segment count down further.

How do you feel about making this configurable in the API with a param that defines the number of segments merged together (MergeFactor?). We could make it flag to keep things simple for consumers, with values like ONE, THREAD_WIDE, and ALL, where ONE = single segment merges, ALL = all segments in one merge, and THREAD_WIDE = each merge gets (readers.length/numThreads) number of segments. The default here could be ALL to retain current behavior.

 

Then you could still do what you want by using ConcurrentMergeScheduler, passing codec readers one by one to CodecReader#addIndexes with doWait=false and get resulting remapped segments as quickly as possible.

The doWait flag would make the API non-blocking. But it would add additional steps for users to track when the merges triggered by addIndexes have completed. For segment replication, addIndexes is not useable until its merges complete. Merge within addIndexes(), is what creates segments with recomputed ordinal values. Until that is done, there are no segments available to copy to searcher (and replica) hosts.

We also want to bring the segment count down to as low as possible at Amazon product search. The tradeoff we are making here, is to first go with a higher segment count to make documents quickly available for search, then run background merges and bring down the segment count. This is similar to the other variant of add indexes - the addIndexes(Directory[]) API, which copies all segments from provided directories into current index dir and then triggers a background merge.

I feel making segments available quickly would be useful for anyone who has multiple replica search hosts and uses segment replication.

 

@asfimport
Copy link
Author

Adrien Grand (@jpountz) (migrated from JIRA)

How do you feel about making this configurable in the API with a param that defines the number of segments merged together

I worry that it would make the API too complicated.

Thinking about it more: Ideally addIndexes would do the right thing based on the configured MergePolicy (to know how to group CodecReaders together) and MergeScheduler (to know how many threads it may use) so that we could hopefully keep the same API as today and experts users like Amazon product search could use a MergePolicy that favors many segments for search concurrency, plus possibly a ConcurrentMergeScheduler that is allowed to use many threads, and you would get the desired behavior? This might be a high-hanging fruit though.

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

I like the idea of handling everything via MergePolicy and MergeScheduler, without changing the API. One wrinkle is that the existing merge framework (policy and scheduler) is written to work with segments, while the API only has readers.

I have a high level plan for handling this based on early code reading. Let me know your thoughts and if you have any ideas to simplify/generalize this. If it makes sense, I can follow it up with a draft PR...

We could create a new API in MergePolicy that does findMerges using readers - findMerges(MergeTrigger mergeTrigger, CodecReaders[] readers, MergeContext mergeContext)}; and returns a MergeSpecification object, in which OneMerge objects are based on how the policy choses to merge these readers. Users can override this API based on the concurrency they'd like in addIndexes. Default impl. can just have a single OneMerge with all readers grouped together.

While OneMerge accepts (List<SegmentCommitInfo> segments}) in its constructor, the SegmentMerger actually works on readers. OneMerge converts segments to MergedReaders via initMergeReaders()}. Perhaps we could add another constructor to OneMerge that directly initializes its mergedReaders?

Once we have a mergeSpec from policy, we can either use the existing MergeSource in IW and just add each OneMerge to pendingMerges, or create a new mergeSource. I think we should at least override the MergeSource.merge() called from addIndexes, as addIndexes seems to require less work post the actual merging than IndexWriter (for e.g. no segments are rendered obsolete after an addIndexes merge, so we don't need to update the segments in SegmentInfos.applyMergeChanges() after merge completes).

We want to wait for all merges triggered by addIndexes to complete before we return, which we could do by calling await() on the MergeSpec returned by policy.

I was also thinking that we could add a new MergeTrigger to identify merges triggered by addIndexes, and handle them separately in the merge policy (and possible scheduler?). We may not need it if the findMerges(..., readers) API is only used for addIndexes but it may be a good idea regardless.

@asfimport
Copy link
Author

Michael McCandless (@mikemccand) (migrated from JIRA)

I like this plan (extending MergePolicy so it also has purview over how merging is done in addIndexes(CodecReader[])}).

Reference counting might get tricky, if OneMerge or IndexWriter holding completed OneMerge instances try to decRef readers.

To improve testing we could create a new LuceneTestCase method to addIndexes from Directory[] that randomly does so with both impls and fix tests to sometimes use that for adding indices.

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

I took a shot at the MergePolicy + MergeScheduler idea. Have raised a draft, work-in-progress PR - #633, to get some high level feedback on the approach. It has existing tests passing, and is pending new tests specific to this change.

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

Had some thoughts and questions about the transaction model for this API... Currently, it either succeeds and adds all provided readers, or fails and adds none of them.

With a merge policy splitting provided readers into groups of smaller OneMerge objects, this is slightly harder to implement. A OneMerge on a subset of readers may complete in the background and add itself to writer segment infos, while some others running in parallel could fail.

One approach could be to expose this failure information to user - the exception can contain the list of readers merged and pending. This could simplify the overall implementation.

My current thoughts, however, are that the present transaction logic is important. It is hard for users to parse the exception message, figure out which readers are pending and retry them. As opposed to retrying an entire API call (with all the readers), which their upstream system probably understands as a single unit. However, I wanted to check if loosening the transaction model for this API is a palatable approach.

To retain the all or none, single transaction model, I am thinking that we can join on all merges at the end of the addIndexes() API, and then write their segment info files in a common lock.

Would like to hear more thoughts or suggestions on this.

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

Updated the PR to retain transactionality, while using the MergePolicy and configured MergeScheduler to trigger merges for this API

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

While making this change, I found an existing TODO, which I think we're able to address now..

// TODO: somehow we should fix this merge so it's      
// abortable so that IW.close(false) is able to stop it 

Earlier, addIndexes(CodecReader...) triggered merges directly via SegmentMerger. As a result, the running merges could be tracked in Set<SegmentMerger> runningAddIndexesMerges}, but any pending merge operations could not be aborted preemptively.

Now, merges are defined via MergePolicy.OneMerge objects, and scheduled by the MergeScheduler}. We are able to leverage the OneMerge abstractions to abort any pending merges. They are aborted when IndexWriter is rolled back or closed.

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

I think the PR is ready for review, with existing tests passing and added tests for new changes.

OneMerge distribution is now provided by a new findMerges(CodecReaders[]) API in MergePolicy}, and executed by MergeScheduler threads. I've also modified the MockRandomMergePolicy to randomly pick a highly concurrent, (one segment per reader), findMerges(...) implementation 50% of the time. And confirmed manually that tests pass in both scenarios (this new impl., as well as the default impl. being picked) (thanks Michael McCandless for the suggestion).

@asfimport
Copy link
Author

Robert Muir (@rmuir) (migrated from JIRA)

I've also modified the MockRandomMergePolicy to randomly pick a highly concurrent, (one segment per reader), findMerges(...) implementation 50% of the time.

Is this going to make 50% of our test runs non-reproducible? Please, let's consider the need to reproduce failing runs and not pick such highly concurrent stuff for the unit tests. We should be able to test it another way.

@asfimport
Copy link
Author

ASF subversion and git services (migrated from JIRA)

Commit 698f40a in lucene's branch refs/heads/main from Vigya Sharma
https://gitbox.apache.org/repos/asf?p=lucene.git;h=698f40ad51a

LUCENE-10216: Use MergeScheduler and MergePolicy to run addIndexes(CodecReader[]) merges. (#633)

  • Use merge policy and merge scheduler to run addIndexes merges

  • wrapped reader does not see deletes - debug

  • Partially fixed tests in TestAddIndexes

  • Use writer object to invoke addIndexes merge

  • Use merge object info

  • Add javadocs for new methods

  • TestAddIndexes passing

  • verify field info schemas upfront from incoming readers

  • rename flag to track pooled readers

  • Keep addIndexes API transactional

  • Maintain transactionality - register segments with iw after all merges complete

  • fix checkstyle

  • PR comments

  • Fix pendingDocs - numDocs mismatch bug

  • Tests with 1-1 merges and partial merge failures

  • variable renaming and better comments

  • add test for partial merge failures. change tests to use 1-1 findmerges

  • abort pending merges gracefully

  • test null and empty merge specs

  • test interim files are deleted

  • test with empty readers

  • test cascading merges triggered

  • remove nocommits

  • gradle check errors

  • remove unused line

  • remove printf

  • spotless apply

  • update TestIndexWriterOnDiskFull to accept mergeException from failing addIndexes calls

  • return singleton reader mergespec in NoMergePolicy

  • rethrow exceptions seen in merge threads on failure

  • spotless apply

  • update test to new exception type thrown

  • spotlessApply

  • test for maxDoc limit in IndexWriter

  • spotlessApply

  • Use DocValuesIterator instead of DocValuesFieldExistsQuery for counting soft deletes

  • spotless apply

  • change exception message for closed IW

  • remove non-essential comments

  • update api doc string

  • doc string update

  • spotless

  • Changes file entry

  • simplify findMerges API, add 1-1 merges to MockRandomMergePolicy

  • update merge policies to new api

  • remove unused imports

  • spotless apply

  • move changes entry to end of list

  • fix testAddIndicesWithSoftDeletes

  • test with 1-1 merge policy always enabled

  • please spotcheck

  • tidy

  • test - never use 1-1 merge policy

  • use 1-1 merge policy randomly

  • Remove concurrent addIndexes findMerges from MockRandomMergePolicy

  • Bug Fix: RuntimeException in addIndexes

Aborted pending merges were slipping through the merge exception check in
API, and getting caught later in the RuntimeException check.

  • tidy

  • Rebase on main. Move changes to 10.0

  • Synchronize IW.AddIndexesMergeSource on outer class IW object

  • tidy

@asfimport
Copy link
Author

Adrien Grand (@jpountz) (migrated from JIRA)

Can this issue be resolved?

@asfimport
Copy link
Author

Michael McCandless (@mikemccand) (migrated from JIRA)

I think we can backport to 9.x?  But we should not rush it for 9.3.  It has baked for quite a while in main and @vigyasharma has fixed some followon build failures.

@asfimport
Copy link
Author

Vigya Sharma (@vigyasharma) (migrated from JIRA)

Raised a PR for backporting changes to 9x - #1051

Commits included are the main PR (#633) and a follow on related bug fix (#1012)

@asfimport
Copy link
Author

asfimport commented Aug 1, 2022

ASF subversion and git services (migrated from JIRA)

Commit 5dd8e9b in lucene's branch refs/heads/branch_9x from Vigya Sharma
https://gitbox.apache.org/repos/asf?p=lucene.git;h=5dd8e9bdc5a

LUCENE-10216: Use MergeScheduler and MergePolicy to run addIndexes(CodecReader[]) merges. (#1051)

Use merge policy and merge scheduler to run addIndexes merges.

This is a back port of the following commits from main:

@asfimport
Copy link
Author

Michael McCandless (@mikemccand) (migrated from JIRA)

Awesome! I think we can close this now @vigyasharma?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants