-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make CLI requests wait for last command sequence number #2280
Conversation
This PR relates to #2268 which suggests we always default to using An open question if we go with the suggestion in #2268 is what the behavior should be if a CLI request times out while waiting for the command queue to catch up -- should we simply return the timeout error, or should we retry some number of times? Regardless of whether we go with #2268 or not, my proposed implementation for how to wait for the command queue to catch up would be the one in this PR, which is why I have opened this PR now for feedback on the approach. |
@@ -245,8 +251,18 @@ public void ensureConsumedPast(final long seqNum, final Duration timeout) | |||
return result; | |||
} | |||
|
|||
private long getSequenceNumberToWaitFor(final long seqNum) { | |||
return seqNum == CommandStoreUtil.LAST_SEQUENCE_NUMBER ? lastCommandSequenceNumber : seqNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of tracking lastCommandSequenceNumber
and using that value here when seqNum == LAST_SEQUENCE_NUMBER
, ideally we would call commandConsumer.endOffsets
here directly, to truly get the current last sequence number in the command queue. However, commandConsumer
does not support multi-threading and calls to this function are made by server threads issuing requests, in contrast to all other calls to commandConsumer
which are made by the single command runner thread. Thus, tracking lastCommandSequenceNumber
and using it here was the best workaround I could think of. Open to other suggestions!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ask discussed in the meeting, this won't work as there is a race condition between the threads, i.e. its possible to receive a request that wasn't to be executed after all currently queued requests. So the code checks lastCommandSequenceNumber
upon receiving the request, but it hasn't been updated since another command was posted, so its actually out of date... not good!
As discussed, use a separate thread with its own consumer. Use a blocking queue to post CompleteableFuture<Offset>
to the thread. The thread can then block on the queue, then drain it, query the head offset and complete all the drained futures. Wrap all of this into a suitable class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I'll implement the blocking queue approach in a separate PR. For now, I've retargeted this PR to focus on having CLI requests wait until all previously-issued requests (from the particular CLI) have finished, which @apurvam pointed out has a simpler implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vcrfxia
As discussed, I think the default behaviour if no command sequence is supplied should be to use the latest offset.
Also, there is a race condition in the current solution. but we've already discussed the solution.
Let me know when this is ready for another look.
@@ -245,8 +251,18 @@ public void ensureConsumedPast(final long seqNum, final Duration timeout) | |||
return result; | |||
} | |||
|
|||
private long getSequenceNumberToWaitFor(final long seqNum) { | |||
return seqNum == CommandStoreUtil.LAST_SEQUENCE_NUMBER ? lastCommandSequenceNumber : seqNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ask discussed in the meeting, this won't work as there is a race condition between the threads, i.e. its possible to receive a request that wasn't to be executed after all currently queued requests. So the code checks lastCommandSequenceNumber
upon receiving the request, but it hasn't been updated since another command was posted, so its actually out of date... not good!
As discussed, use a separate thread with its own consumer. Use a blocking queue to post CompleteableFuture<Offset>
to the thread. The thread can then block on the queue, then drain it, query the head offset and complete all the drained futures. Wrap all of this into a suitable class.
…r-write consistency differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussions with @big-andy-coates, @hjafarpour and @apurvam , I've removed the parts of this PR for adding a special sequence number value LAST_SEQUENCE_NUMBER
to indicate that the command queue should be fully caught up before executing the new request, and also changed the new CLI behavior in this PR from using the special value to instead keep track of the last sequence number of any request issued by the particular CLI instance, and issue subsequent commands using this value as the sequence number to wait for. I've updated the PR description accordingly.
@@ -245,8 +251,18 @@ public void ensureConsumedPast(final long seqNum, final Duration timeout) | |||
return result; | |||
} | |||
|
|||
private long getSequenceNumberToWaitFor(final long seqNum) { | |||
return seqNum == CommandStoreUtil.LAST_SEQUENCE_NUMBER ? lastCommandSequenceNumber : seqNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I'll implement the blocking queue approach in a separate PR. For now, I've retargeted this PR to focus on having CLI requests wait until all previously-issued requests (from the particular CLI) have finished, which @apurvam pointed out has a simpler implementation.
9030ad7
to
8a96569
Compare
|
||
/** | ||
* Most tests in CliTest are end-to-end integration tests, so it may expect a long running time. | ||
*/ | ||
@RunWith(MockitoJUnitRunner.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little weird that this test has both integration tests and unit tests in it. (I switched the existing unit tests over to Mockito and also added more.) Would it make more sense to have separate files for the CLI integration and CLI units tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do in a follow-up. Thanks.
} | ||
|
||
private CommandStatusEntity stubCommandStatusEntityWithSeqNum(final long seqNum) { | ||
return new CommandStatusEntity( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted this to return a mock and initially tried
private CommandStatusEntity mockCommandStatusEntityWithSeqNum(final long seqNum) {
final CommandStatusEntity mockEntity = mock(CommandStatusEntity.class);
when(mockEntity.getCommandSequenceNumber()).thenReturn(seqNum);
when(mockEntity.getCommandStatus()).thenReturn(mock(CommandStatus.class));
return mockEntity;
}
but the newly added tests were failing with
java.lang.RuntimeException: Unexpected KsqlEntity class: 'io.confluent.ksql.rest.entity.CommandStatusEntity$MockitoMock$564394681'
at io.confluent.ksql.cli.console.Console.printAsTable(Console.java:350)
at io.confluent.ksql.cli.console.Console.printKsqlEntityList(Console.java:294)
at io.confluent.ksql.cli.Cli.printKsqlResponse(Cli.java:328)
at io.confluent.ksql.cli.Cli.handleStatements(Cli.java:248)
at io.confluent.ksql.cli.Cli.handleLine(Cli.java:181)
at io.confluent.ksql.cli.CliTest.shouldUpdateCommandSequenceNumber(CliTest.java:865)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
which confuses me since Console.java
certainly has a handler for the KsqlEntity CommandStatusEntity
. Perhaps the class of the mock is subtly different from the class of a regular CommandStatusEntity
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does .getClass
on the mock return?
I think you'll find it's a different type, i.e. io.confluent.ksql.rest.entity.CommandStatusEntity$MockitoMock$564394681
So... rather than using a mock, can you just create a CommandStatusEntity
in the right state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that seems to be the problem, and your proposed solution is what stubCommandStatusEntityWithSeqNum
implements. Good to know I wasn't just using Mockito incorrectly. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vcrfxia
Looking good 'cept I don't think we need a retry. See comment inline. This will also simplify the change even more ;)
final Optional<Long> lastSeqNum = entities.stream() | ||
.filter(entity -> entity instanceof CommandStatusEntity) | ||
.map(entity -> (CommandStatusEntity)entity) | ||
.map(CommandStatusEntity::getCommandSequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(CommandStatusEntity::getCommandSequenceNumber) | |
.mapToLong(CommandStatusEntity::getCommandSequenceNumber) | |
.max() | |
.ifPresent(seqNum -> lastCommandSequenceNumber = seqNum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't seem to get this proposed change to compile. mapToLong
results in a stream of OptionalLong
s, but max()
only works on Optional<T>
. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you're doing, but this works for me:
final long max = Stream.of("10")
.mapToLong(Long::valueOf)
.max().getAsLong();
:D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. Sorry for the brain fart. (mapToLong
results in a LongStream
, to which max()
can be applied just fine.) Fixed.
As discussed. Let's remove the retry and replace it with a more descriptive error message. Secondly, let's add a Finally, if the user switches servers within the CLI. (Via the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed with @big-andy-coates and @rodesai offline (and summarized by @big-andy-coates above), the latest revision of this PR removes automatic retries from the CLI and instead implements a CLI-specific command wait-for-previous-command
(suggestions for new names appreciated...) that can be used to control whether the CLI waits for previous commands to finish executing before making new requests. Its usage is wait-for-previous-command <ON/OFF>
. The setting defaults to ON and resets when the server backing the CLI changes (via the server
command).
The current setting of the flag is kept in the Cli
. In the future we might like to move this flag (along with the Cli's lastCommandSequenceNumber
) into an object tracked by the CLI as @big-andy-coates suggested above, since these are the two state values that need to be reset when the underlying server is changed.
The latest revision additionally changes the command queue catchup timeout used by StreamedQueryResource
to be consistent with the timeouts used by KsqlResource
and WSQueryEndpoint
, since the previous discrepancy doesn't make sense.
} | ||
|
||
private CommandStatusEntity stubCommandStatusEntityWithSeqNum(final long seqNum) { | ||
return new CommandStatusEntity( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that seems to be the problem, and your proposed solution is what stubCommandStatusEntityWithSeqNum
implements. Good to know I wasn't just using Mockito incorrectly. Thanks!
|
||
/** | ||
* Most tests in CliTest are end-to-end integration tests, so it may expect a long running time. | ||
*/ | ||
@RunWith(MockitoJUnitRunner.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do in a follow-up. Thanks.
final Optional<Long> lastSeqNum = entities.stream() | ||
.filter(entity -> entity instanceof CommandStatusEntity) | ||
.map(entity -> (CommandStatusEntity)entity) | ||
.map(CommandStatusEntity::getCommandSequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't seem to get this proposed change to compile. mapToLong
results in a stream of OptionalLong
s, but max()
only works on Optional<T>
. Am I missing something?
Suggested name for the settings... "request pipelining". If turned on we don't wait for a response. This is the same as HTTP pipelining, so will be a familiar concept for some. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Thanks @vcrfxia
Few comments below, but LGTM. There's a name suggestion in there too.
@@ -75,6 +78,9 @@ | |||
private final KsqlRestClient restClient; | |||
private final Console terminal; | |||
|
|||
private long lastCommandSequenceNumber; | |||
private boolean shouldWaitForPreviousCommand; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per previous comment, consider
private boolean shouldWaitForPreviousCommand; | |
private boolean requestPipelining; |
(Though note that this is the opposite of the current name, i.e. we should wait for previous command to finish if requestPipelining
is false.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -75,6 +78,9 @@ | |||
private final KsqlRestClient restClient; | |||
private final Console terminal; | |||
|
|||
private long lastCommandSequenceNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest putting both lastCommandSequenceNumber
and shouldWaitForPreviousCommand
into a new object, (just an inner class), as both represent state associated with the remote server.
private static final RemoveServerState{
private long lastCommandSequenceNumber;
private boolean requestPipelining;
private RemoveServerState() {
reset();
}
private void reset() {
lastCommandSequenceNumber = -1L
requestPipelining = false;
}
}
If/when the user switches to a different KSQL server, (using the server
command), both of these should be reset. Having in a separate ServerState
object, with a reset method that is called when switching servers and in its own the constructor, should ensure a nice pattern for ensuring bad state does not persist when switching servers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, except doing so violated the class data abstraction coupling limit for Cli.java
, which I fixed by rearranging the various calls to registerCliSpecificCommand
(specifically, moving them all into CliCommandRegisterUtil
. Unfortunately this resulted in the class data abstraction coupling limit for CliCommandRegisterUtil
being exceeded, but I think that's OK given the nature of the class?
final Optional<Long> lastSeqNum = entities.stream() | ||
.filter(entity -> entity instanceof CommandStatusEntity) | ||
.map(entity -> (CommandStatusEntity)entity) | ||
.map(CommandStatusEntity::getCommandSequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you're doing, but this works for me:
final long max = Stream.of("10")
.mapToLong(Long::valueOf)
.max().getAsLong();
:D
|
||
public RemoteServerSpecificCommand( | ||
final KsqlRestClient restClient, | ||
final PrintWriter writer) { | ||
final PrintWriter writer, | ||
final Consumer<Void> resetCliForNewServer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should just add a generic 'Event' interface to our common lib. Do you fancy doing it? :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Slipped it in as ksql-common/src/main/java/io/confluent/ksql/util/Event.java
. (Would another subfolder be more appropriate?)
import java.util.function.Consumer; | ||
import java.util.function.Supplier; | ||
|
||
public class WaitForPreviousCommand implements CliSpecificCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to RequestPipeliningCommand
obvs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
+ "If you wish to execute new commands without waiting for " | ||
+ "prior commands to finish, run the command '%s OFF'.%n", | ||
WaitForPreviousCommand.NAME); | ||
} else if (isKsqlEntityList(response)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about if response
is a plain KsqlEntity
? (Is this possible?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not currently possible, since KsqlRestClient.makeKsqlRequest()
returns RestResponse<KsqlEntityList>
. Not sure if there's a good way to set up my change to prevent errors, should this no longer be the case in the future... open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @big-andy-coates ! Implemented your suggestions.
A little weirdness that may be worth looking at again is how I've refactored where various CLI-specific commands get registered. Previously, help
, clear
, output
, history
, version
, and exit
were being registered in the Console, while server
and requestPipelining
were registered in the Cli. I've moved version
to be registered in the Cli since it feels it belongs on that side of the Console/Cli divide. But perhaps we shouldn't have the divide at all.
@@ -75,6 +78,9 @@ | |||
private final KsqlRestClient restClient; | |||
private final Console terminal; | |||
|
|||
private long lastCommandSequenceNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, except doing so violated the class data abstraction coupling limit for Cli.java
, which I fixed by rearranging the various calls to registerCliSpecificCommand
(specifically, moving them all into CliCommandRegisterUtil
. Unfortunately this resulted in the class data abstraction coupling limit for CliCommandRegisterUtil
being exceeded, but I think that's OK given the nature of the class?
@@ -75,6 +78,9 @@ | |||
private final KsqlRestClient restClient; | |||
private final Console terminal; | |||
|
|||
private long lastCommandSequenceNumber; | |||
private boolean shouldWaitForPreviousCommand; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
+ "If you wish to execute new commands without waiting for " | ||
+ "prior commands to finish, run the command '%s OFF'.%n", | ||
WaitForPreviousCommand.NAME); | ||
} else if (isKsqlEntityList(response)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not currently possible, since KsqlRestClient.makeKsqlRequest()
returns RestResponse<KsqlEntityList>
. Not sure if there's a good way to set up my change to prevent errors, should this no longer be the case in the future... open to suggestions.
final Optional<Long> lastSeqNum = entities.stream() | ||
.filter(entity -> entity instanceof CommandStatusEntity) | ||
.map(entity -> (CommandStatusEntity)entity) | ||
.map(CommandStatusEntity::getCommandSequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. Sorry for the brain fart. (mapToLong
results in a LongStream
, to which max()
can be applied just fine.) Fixed.
|
||
public RemoteServerSpecificCommand( | ||
final KsqlRestClient restClient, | ||
final PrintWriter writer) { | ||
final PrintWriter writer, | ||
final Consumer<Void> resetCliForNewServer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Slipped it in as ksql-common/src/main/java/io/confluent/ksql/util/Event.java
. (Would another subfolder be more appropriate?)
import java.util.function.Consumer; | ||
import java.util.function.Supplier; | ||
|
||
public class WaitForPreviousCommand implements CliSpecificCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Non-binding comment: i've personally always found the whole cli/console/terminal thing to be confusing and unnecessary when i've been in there too, perhaps there's a consensus on how it could be better ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great tests!
Thanks for the feedback and reviews, all! I like this final version so much better than the first. Updated the PR description to describe the latest version. Will hold off to merge until #2331 gets in since merge conflicts should be easier to resolve on this side. |
Description
This PR introduces a new CLI flag
requestPipelining
that controls whether requests issued through the CLI will wait for previous requests issued through the CLI to complete before processing new requests. This behavior (requestPipelining=false
) is the default, and is motivated by the goal to ensure read-after-write consistency for requests issued through the CLI. WhenrequestPipelining=true
, CLI requests will not wait for previous requests to complete and will instead begin execution immediately (the current behavior). The user can change between modes using a newly-added CLI-specific command:requestPipelining
. Its usage isrequestPipelining
to show the current setting of the flag, andrequestPipelining ON
orrequestPipelining OFF
to set the flag to a particular value.To implement the behavior of
requestPipelining=false
, the CLI now keeps track of the last sequence number of any request issued by the CLI, by updating this value as appropriate each time a response is received from the server. WhenrequestPipelining=false
, this sequence number is passed viacommandSequenceNumber
for the next request. Values of therequestPipelining
flag and the last command sequence number of the CLI are reset if the server backing the CLI changes (via theserver
command).Testing done
Added unit tests.
Reviewer checklist