Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CLI requests wait for last command sequence number #2280

Merged
merged 20 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 58 additions & 9 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlConstants;
Expand All @@ -57,6 +58,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
Expand All @@ -67,6 +69,8 @@ public class Cli implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

static final int COMMAND_QUEUE_CATCHUP_TIMEOUT_RETRIES = 1;
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved

private final ExecutorService queryStreamExecutorService;

private final Long streamedQueryRowLimit;
Expand All @@ -75,6 +79,8 @@ public class Cli implements Closeable {
private final KsqlRestClient restClient;
private final Console terminal;

private long lastCommandSequenceNumber;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest putting both lastCommandSequenceNumber and shouldWaitForPreviousCommand into a new object, (just an inner class), as both represent state associated with the remote server.

private static final RemoveServerState{ 
   private long lastCommandSequenceNumber;
   private boolean requestPipelining;

   private RemoveServerState() {
      reset();
   }

   private void reset() {
      lastCommandSequenceNumber = -1L
      requestPipelining = false;
   }
}

If/when the user switches to a different KSQL server, (using the server command), both of these should be reset. Having in a separate ServerState object, with a reset method that is called when switching servers and in its own the constructor, should ensure a nice pattern for ensuring bad state does not persist when switching servers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, except doing so violated the class data abstraction coupling limit for Cli.java, which I fixed by rearranging the various calls to registerCliSpecificCommand (specifically, moving them all into CliCommandRegisterUtil. Unfortunately this resulted in the class data abstraction coupling limit for CliCommandRegisterUtil being exceeded, but I think that's OK given the nature of the class?


public static Cli build(
final Long streamedQueryRowLimit,
final Long streamedQueryTimeoutMs,
Expand All @@ -99,6 +105,7 @@ public static Cli build(
this.restClient = restClient;
this.terminal = terminal;
this.queryStreamExecutorService = Executors.newSingleThreadExecutor();
this.lastCommandSequenceNumber = -1;

terminal
.registerCliSpecificCommand(new RemoteServerSpecificCommand(restClient, terminal.writer()));
Expand Down Expand Up @@ -239,8 +246,7 @@ private void handleStatements(final String line)
}
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
}
}

Expand Down Expand Up @@ -275,7 +281,7 @@ private void runScript(
}
setProperty(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
makeKsqlRequest(statementText, restClient::makeKsqlRequest)
);
}

Expand All @@ -285,7 +291,8 @@ private StringBuilder printOrDisplayQueryResults(
final String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(restClient.makeKsqlRequest(consecutiveStatements.toString()));
printKsqlResponse(
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
consecutiveStatements.setLength(0);
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
Expand All @@ -297,7 +304,8 @@ private StringBuilder printOrDisplayQueryResults(
}

private void listProperties(final String statementText) throws IOException {
final KsqlEntityList ksqlEntityList = restClient.makeKsqlRequest(statementText).getResponse();
final KsqlEntityList ksqlEntityList =
makeKsqlRequest(statementText, restClient::makeKsqlRequest).getResponse();
terminal.printKsqlEntityList(ksqlEntityList);
}

Expand Down Expand Up @@ -327,7 +335,7 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) thro
private void handleStreamedQuery(final String query) throws IOException {

final RestResponse<KsqlRestClient.QueryStream> queryResponse =
restClient.makeQueryRequest(query);
makeKsqlRequest(query, restClient::makeQueryRequest);

LOGGER.debug("Handling streamed query");

Expand Down Expand Up @@ -392,7 +400,7 @@ private boolean limitNotReached(final long rowsRead) {
private void handlePrintedTopic(final String printTopic)
throws InterruptedException, ExecutionException, IOException {
final RestResponse<InputStream> topicResponse =
restClient.makePrintTopicRequest(printTopic);
makeKsqlRequest(printTopic, restClient::makePrintTopicRequest);

if (topicResponse.isSuccessful()) {
try (Scanner topicStreamScanner = new Scanner(topicResponse.getResponse(), UTF_8.name());
Expand Down Expand Up @@ -461,8 +469,7 @@ private StringBuilder unsetProperty(
) throws IOException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
consecutiveStatements.setLength(0);
}
final SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
Expand All @@ -482,4 +489,46 @@ private void unsetProperty(final String property) {
terminal.writer()
.printf("Successfully unset local property '%s' (value was '%s').%n", property, oldValue);
}

private <R> RestResponse<R> makeKsqlRequest(
final String ksql, final BiFunction<String, Long, RestResponse<R>> requestIssuer) {
return makeKsqlRequest(ksql, requestIssuer, COMMAND_QUEUE_CATCHUP_TIMEOUT_RETRIES);
}

private <R> RestResponse<R> makeKsqlRequest(
final String ksql,
final BiFunction<String, Long, RestResponse<R>> requestIssuer,
final int remainingRetries) {
final RestResponse<R> response =
requestIssuer.apply(ksql, lastCommandSequenceNumber);
if (isSequenceNumberTimeout(response)) {
if (remainingRetries > 0) {
return makeKsqlRequest(ksql, requestIssuer, remainingRetries - 1);
} else {
return requestIssuer.apply(ksql, null);
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (isKsqlEntityList(response)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about if response is a plain KsqlEntity? (Is this possible?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not currently possible, since KsqlRestClient.makeKsqlRequest() returns RestResponse<KsqlEntityList>. Not sure if there's a good way to set up my change to prevent errors, should this no longer be the case in the future... open to suggestions.

updateLastCommandSequenceNumber((KsqlEntityList)response.getResponse());
}
return response;
}

private static boolean isSequenceNumberTimeout(final RestResponse<?> response) {
return response.isErroneous()
&& (response.getErrorMessage().getErrorCode()
== Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT);
}

private static boolean isKsqlEntityList(final RestResponse<?> response) {
return response.isSuccessful() && response.getResponse() instanceof KsqlEntityList;
}

private void updateLastCommandSequenceNumber(final KsqlEntityList entities) {
final Optional<Long> lastSeqNum = entities.stream()
.filter(entity -> entity instanceof CommandStatusEntity)
.map(entity -> (CommandStatusEntity)entity)
.map(CommandStatusEntity::getCommandSequenceNumber)
Copy link
Contributor

@big-andy-coates big-andy-coates Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map(CommandStatusEntity::getCommandSequenceNumber)
.mapToLong(CommandStatusEntity::getCommandSequenceNumber)
.max()
.ifPresent(seqNum -> lastCommandSequenceNumber = seqNum)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to get this proposed change to compile. mapToLong results in a stream of OptionalLongs, but max() only works on Optional<T>. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you're doing, but this works for me:

final long max = Stream.of("10")
        .mapToLong(Long::valueOf)
        .max().getAsLong();

:D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Sorry for the brain fart. (mapToLong results in a LongStream, to which max() can be applied just fine.) Fixed.

.reduce(Long::max);
lastSeqNum.ifPresent(seqNum -> lastCommandSequenceNumber = seqNum);
}
}
Loading