-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactored Command Batch Cursors. #1198
Conversation
d3ec40e
to
0da6c39
Compare
Previously had a QueryResult / QueryBatchCursor abstraction. This abstraction is no longer required as only commands are used to create cursors. Two new classes have been added: 1. SingleBatchCursor Used when commands return a single list of results but not an actual cursor. 2. CommandBatchCursor Used for commands that return a cursor and contain all the logic to manage resources and issue get more calls. The construction and resource management has been simplified by reducing the number of constructors used when creating the cursor. This will simplify future refactorings. The asynchronous cursor abstractions have been refactored to more closely follow their synchronous counterparts. Reducing the cognative costs when working on both cursor types. JAVA-5159
* to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor | ||
* has been closed. | ||
* Returns the next batch of results. A tailable cursor will block until another batch exists. | ||
* Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cursor no longer calls callback.onResult(null, null)
. As this class is internal and consumed internally there is no public API breakage or behavour change.
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), | ||
CommandResultDocumentCodec.create(decoder, NEXT_BATCH), assertNotNull(resourceManager.connectionSource), | ||
(commandResult, t) -> { | ||
if (t != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used multiple returns to make the code more readable and mimic the sync returns, even though we are using callbacks.
import static java.util.Collections.emptyList; | ||
import static java.util.Collections.singletonList; | ||
|
||
class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This follows the sync CommandBatchCursor approach and utilizes a ResourceManager. As such its very different to the previous AsyncQueryBatchCursor
which contained more branching logic.
I hope that now working on sync or async cursors in the future will be much easier on the developer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the PR is a large but fairly straightforward refactor, and looks good. Still in the process of reviewing the CommandBatchCursor classes, but wanted to see what you thought about consolidating the code further.
|
||
class CommandBatchCursorSpecification extends Specification { | ||
|
||
def 'should generate expected command with batchSize and maxTimeMS'() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert tests to Java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private final ResourceManager resourceManager; | ||
private int batchSize; | ||
|
||
AsyncCommandBatchCursor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the implementation be arranged to be the same as the sync CommandBatchCursor? I am opening the two classes side-by-side in my IDE, and the order differs (for example, maxWireVersion is set earlier here).
It seems that many of the sections in these classes are effectively identical, and might be extracted (into a superclass, possibly using generics?). For example, the fields are almost all identical, and the State enums are the same.
(This may apply to other sync-async class and method pairs.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be made the same, however, there is no async Iterator API (which is why Publishers / Flows were invented and subsequently added to the jdk).
For async the API would be very heavy / require nested callbacks for use:
hasNext(SingleResultCallback<Boolean>)
next(SingleResultCallback<List<T>>)
Which can be replaced by just:
next(SingleResultCallback<List<T>>)
isClosed()
Mongodb cursors are self describing and have reached the end of their results.
The other main difference it the while loop used for getMores works for sync but in async it becomes a nested callback loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said there is no reason hasNext()
could be added instead of isClosed()
. I'll take a look and try to increase the code reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted to leave as is, as the only consumer is internal (BatchCursorFlux) it has a check in recurse cursor:
if (batchCursor.isClosed()) {
sink.complete();
} else {
// Fetches more results.
driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
Outdated
Show resolved
Hide resolved
Changing to draft while I investigate the build errors. |
ConnectionSource and Connection also need to be retained pre kill cursors and released in the kill cursors callback
Ready for review again @katcharov |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to review but got stuck with the difficulty of comparing CommandBatchCursor to QueryBatchCursor without the benefit of a diff.
Is this refactoring going to make CSOT significantly simpler? It's not clear to me that it will, but if so, we should proceed. If not I would prefer to defer it.
this.hasNext = !batch.isEmpty(); | ||
} | ||
|
||
public List<T> getBatch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is unused, even in tests. Let's remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return rethrowIfNotNamespaceError(e, createEmptyBatchCursor(namespace, decoder, | ||
source.getServerDescription().getAddress(), batchSize)); | ||
return rethrowIfNotNamespaceError(e, | ||
SingleBatchCursor.createEmptyBatchCursor(source.getServerDescription().getAddress(), batchSize)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally use static imports for spots like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the methods and done.
|
||
import static java.util.Collections.emptyList; | ||
|
||
class SingleBatchCursor<T> implements BatchCursor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a unit test for this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import static com.mongodb.internal.operation.OperationHelper.LOGGER; | ||
import static java.lang.String.format; | ||
|
||
class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that, as this appears as a new class in the diff, it's difficult to compare with QueryBatchCursor
. Can you share some details about how this code was created? Was it mostly an auto-refactoring? Are our existing tests good enough to ensure that no regressions were introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, gits diff algorithm doesnt always get it right.
This was done in multiple parts
- a simple rename.
- Updated tests - so both async and sync have the same tests. This lead to some fixes for cases where resources weren't released in sync.
- A refactor to make the ResourceManager abstract and reusable across sync and async as per previous pr feedback.
Will make CSOT simpler because its now clear where timeout context is needed. It also brings the async and sync command cursors much more into line with lots of code reuse. |
The construction and resource management of cursors has been simplified by reducing the number of constructors used when creating the cursor. The aim of this work was to simplify future refactorings eg CSOT work.
Unfortunately, it expanded into a bigger piece of work than initially thought as, I took the opportunity to unify the general approaches between sync and async cursors. (With the caveat that async cursors are self closing).
The core top level changes are:
CommandCursorResult
which replacesQueryResult
and is used by both async and sync.Synchronous changes:
CommandBatchCursor
- ReplacesQueryBatchCursor
and is used for any commands that return a cursor object. This class contains all the logic to manage resources and issue get more calls.SingleBatchCursor
- Used for any commands that return a single list of results but not an actual cursor. There already was anAsyncSingleBatchCursor
so brings abstractions inline.Asynchronous changes:
AsyncCommandBatchCursor
- ReplacesAsyncQueryBatchCursor
and used for any commands that return a cursor object. This class contains all the logic to manage resources and issue get more calls. This was a bigger refactor and the class now it closely follows the logic of the synchronousSingleBatchCursor
. The core difference being its self closing and uses a callback loop rather than a while loop when callinggetMore
.The tests were updated so that both async and sync cursors are tested equally (previously async had much more comprehensive testing). This process discovered some subtle sync bugs where resources wouldn't be released during the error scenarios. So the PR includes some potential memory leak fixes.
JAVA-5159