Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CLI requests wait for last command sequence number #2280

Merged
merged 20 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 95 additions & 13 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.cli.console.KsqlTerminal.StatusClosable;
import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.cli.console.cmd.CliCommandRegisterUtil;
import io.confluent.ksql.cli.console.cmd.RemoteServerSpecificCommand;
import io.confluent.ksql.cli.console.cmd.RequestPipeliningCommand;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.AstBuilder;
import io.confluent.ksql.parser.KsqlParser;
Expand All @@ -34,6 +36,7 @@
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlConstants;
Expand All @@ -57,12 +60,14 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Cli implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);
Expand All @@ -74,14 +79,15 @@ public class Cli implements Closeable {

private final KsqlRestClient restClient;
private final Console terminal;
private final RemoteServerState remoteServerState;

public static Cli build(
final Long streamedQueryRowLimit,
final Long streamedQueryTimeoutMs,
final OutputFormat outputFormat,
final KsqlRestClient restClient
) {
final Console console = Console.build(outputFormat, restClient);
final Console console = Console.build(outputFormat);
return new Cli(streamedQueryRowLimit, streamedQueryTimeoutMs, restClient, console);
}

Expand All @@ -99,9 +105,14 @@ public static Cli build(
this.restClient = restClient;
this.terminal = terminal;
this.queryStreamExecutorService = Executors.newSingleThreadExecutor();

terminal
.registerCliSpecificCommand(new RemoteServerSpecificCommand(restClient, terminal.writer()));
this.remoteServerState = new RemoteServerState();

CliCommandRegisterUtil.registerDefaultCliCommands(
terminal,
restClient,
remoteServerState::reset,
remoteServerState::getRequestPipelining,
remoteServerState::setRequestPipelining);
}

public void runInteractively() {
Expand Down Expand Up @@ -239,8 +250,7 @@ private void handleStatements(final String line)
}
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
}
}

Expand Down Expand Up @@ -275,7 +285,7 @@ private void runScript(
}
setProperty(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
makeKsqlRequest(statementText, restClient::makeKsqlRequest)
);
}

Expand All @@ -285,7 +295,8 @@ private StringBuilder printOrDisplayQueryResults(
final String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(restClient.makeKsqlRequest(consecutiveStatements.toString()));
printKsqlResponse(
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
consecutiveStatements.setLength(0);
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
Expand All @@ -297,7 +308,8 @@ private StringBuilder printOrDisplayQueryResults(
}

private void listProperties(final String statementText) throws IOException {
final KsqlEntityList ksqlEntityList = restClient.makeKsqlRequest(statementText).getResponse();
final KsqlEntityList ksqlEntityList =
makeKsqlRequest(statementText, restClient::makeKsqlRequest).getResponse();
terminal.printKsqlEntityList(ksqlEntityList);
}

Expand Down Expand Up @@ -327,7 +339,7 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) thro
private void handleStreamedQuery(final String query) throws IOException {

final RestResponse<KsqlRestClient.QueryStream> queryResponse =
restClient.makeQueryRequest(query);
makeKsqlRequest(query, restClient::makeQueryRequest);

LOGGER.debug("Handling streamed query");

Expand Down Expand Up @@ -392,7 +404,7 @@ private boolean limitNotReached(final long rowsRead) {
private void handlePrintedTopic(final String printTopic)
throws InterruptedException, ExecutionException, IOException {
final RestResponse<InputStream> topicResponse =
restClient.makePrintTopicRequest(printTopic);
makeKsqlRequest(printTopic, restClient::makePrintTopicRequest);

if (topicResponse.isSuccessful()) {
try (Scanner topicStreamScanner = new Scanner(topicResponse.getResponse(), UTF_8.name());
Expand Down Expand Up @@ -461,8 +473,7 @@ private StringBuilder unsetProperty(
) throws IOException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
consecutiveStatements.setLength(0);
}
final SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
Expand All @@ -481,5 +492,76 @@ private void unsetProperty(final String property) {

terminal.writer()
.printf("Successfully unset local property '%s' (value was '%s').%n", property, oldValue);
terminal.flush();
}

private <R> RestResponse<R> makeKsqlRequest(
final String ksql,
final BiFunction<String, Long, RestResponse<R>> requestIssuer) {
final Long commandSequenceNumberToWaitFor = remoteServerState.getRequestPipelining()
? null
: remoteServerState.getLastCommandSequenceNumber();
final RestResponse<R> response = requestIssuer.apply(ksql, commandSequenceNumberToWaitFor);

if (isSequenceNumberTimeout(response)) {
terminal.writer().printf(
"Error: command not executed since the server timed out "
+ "while waiting for prior commands to finish executing.%n"
+ "If you wish to execute new commands without waiting for "
+ "prior commands to finish, run the command '%s ON'.%n",
RequestPipeliningCommand.NAME);
} else if (isKsqlEntityList(response)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about if response is a plain KsqlEntity? (Is this possible?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not currently possible, since KsqlRestClient.makeKsqlRequest() returns RestResponse<KsqlEntityList>. Not sure if there's a good way to set up my change to prevent errors, should this no longer be the case in the future... open to suggestions.

updateLastCommandSequenceNumber((KsqlEntityList)response.getResponse());
}
return response;
}

private static boolean isSequenceNumberTimeout(final RestResponse<?> response) {
return response.isErroneous()
&& (response.getErrorMessage().getErrorCode()
== Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT);
}

private static boolean isKsqlEntityList(final RestResponse<?> response) {
return response.isSuccessful() && response.getResponse() instanceof KsqlEntityList;
}

private void updateLastCommandSequenceNumber(final KsqlEntityList entities) {
entities.stream()
.filter(entity -> entity instanceof CommandStatusEntity)
.map(entity -> (CommandStatusEntity)entity)
.mapToLong(CommandStatusEntity::getCommandSequenceNumber)
.max()
.ifPresent(remoteServerState::setLastCommandSequenceNumber);
}

private static final class RemoteServerState {
private long lastCommandSequenceNumber;
private boolean requestPipelining;

private RemoteServerState() {
reset();
}

private void reset() {
lastCommandSequenceNumber = -1L;
requestPipelining = false;
}

private long getLastCommandSequenceNumber() {
return lastCommandSequenceNumber;
}

private boolean getRequestPipelining() {
return requestPipelining;
}

private void setLastCommandSequenceNumber(final long seqNum) {
lastCommandSequenceNumber = seqNum;
}

private void setRequestPipelining(final boolean newSetting) {
requestPipelining = newSetting;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.confluent.ksql.cli.console.table.builder.TableBuilder;
import io.confluent.ksql.cli.console.table.builder.TablesListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ExecutionPlan;
import io.confluent.ksql.rest.entity.FieldInfo;
Expand Down Expand Up @@ -151,7 +150,7 @@ public interface RowCaptor {
void addRows(List<List<String>> fields);
}

public static Console build(final OutputFormat outputFormat, final KsqlRestClient restClient) {
public static Console build(final OutputFormat outputFormat) {
final AtomicReference<Console> consoleRef = new AtomicReference<>();
final Predicate<String> isCliCommand = line -> {
final Console theConsole = consoleRef.get();
Expand All @@ -166,19 +165,15 @@ public static Console build(final OutputFormat outputFormat, final KsqlRestClien

final KsqlTerminal terminal = new JLineTerminal(isCliCommand, historyFilePath);

final Supplier<String> versionSuppler =
() -> restClient.getServerInfo().getResponse().getVersion();

final Console console = new Console(
outputFormat, versionSuppler, terminal, new NoOpRowCaptor());
outputFormat, terminal, new NoOpRowCaptor());

consoleRef.set(console);
return console;
}

public Console(
final OutputFormat outputFormat,
final Supplier<String> versionSuppler,
final KsqlTerminal terminal,
final RowCaptor rowCaptor
) {
Expand All @@ -188,7 +183,7 @@ public Console(
this.cliSpecificCommands = Maps.newLinkedHashMap();
this.objectMapper = new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);

CliCommandRegisterUtil.registerDefaultCommands(this, versionSuppler);
CliCommandRegisterUtil.registerDefaultConsoleCommands(this);
}

public PrintWriter writer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
package io.confluent.ksql.cli.console.cmd;

import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.util.Event;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* God class for registering Cli commands
*/
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public final class CliCommandRegisterUtil {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private CliCommandRegisterUtil() {
}

public static void registerDefaultCommands(
final Console console,
final Supplier<String> versionSuppler) {

public static void registerDefaultConsoleCommands(
final Console console
) {
console.registerCliSpecificCommand(new Help(console));

console.registerCliSpecificCommand(new Clear(console));
Expand All @@ -37,8 +41,24 @@ public static void registerDefaultCommands(

console.registerCliSpecificCommand(new History(console));

console.registerCliSpecificCommand(new Exit(console));
}

public static void registerDefaultCliCommands(
final Console console,
final KsqlRestClient restClient,
final Event resetCliForNewServer,
final Supplier<Boolean> requestPipeliningSupplier,
final Consumer<Boolean> requestPipeliningConsumer
) {
final Supplier<String> versionSuppler =
() -> restClient.getServerInfo().getResponse().getVersion();
console.registerCliSpecificCommand(new Version(console, versionSuppler));

console.registerCliSpecificCommand(new Exit(console));
console.registerCliSpecificCommand(new RemoteServerSpecificCommand(
restClient, console.writer(), resetCliForNewServer));

console.registerCliSpecificCommand(new RequestPipeliningCommand(
console.writer(), requestPipeliningSupplier, requestPipeliningConsumer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.Event;
import java.io.PrintWriter;
import java.util.Objects;
import javax.ws.rs.ProcessingException;
Expand All @@ -30,12 +31,16 @@ public class RemoteServerSpecificCommand implements CliSpecificCommand {

private final KsqlRestClient restClient;
private final PrintWriter writer;
private final Event resetCliForNewServer;

public RemoteServerSpecificCommand(
RemoteServerSpecificCommand(
final KsqlRestClient restClient,
final PrintWriter writer) {
final PrintWriter writer,
final Event resetCliForNewServer) {
this.restClient = Objects.requireNonNull(restClient, "restClient");
this.writer = Objects.requireNonNull(writer, "writer");
this.resetCliForNewServer =
Objects.requireNonNull(resetCliForNewServer, "resetCliForNewServer");
}

@Override
Expand All @@ -60,6 +65,7 @@ public void execute(final String command) {
final String serverAddress = command.trim();
restClient.setServerAddress(serverAddress);
writer.write("Server now: " + command);
resetCliForNewServer.fire();
}

validateClient(writer, restClient);
Expand Down
Loading