Skip to content

Commit

Permalink
refactor: Refactor KsqlClient to use Vert.x (#4609)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Feb 27, 2020
1 parent 2ac521b commit 849255a
Show file tree
Hide file tree
Showing 66 changed files with 1,826 additions and 2,063 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.api.tck;

import io.confluent.ksql.api.server.BaseSubscriber;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.api.tck;

import io.confluent.ksql.api.server.BaseSubscriber;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.api.tck;

import io.confluent.ksql.api.server.BufferedPublisher;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
Expand Down
243 changes: 152 additions & 91 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

package io.confluent.ksql.cli;

import static java.nio.charset.StandardCharsets.UTF_8;

import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.cli.console.KsqlTerminal.StatusClosable;
import io.confluent.ksql.cli.console.OutputFormat;
Expand All @@ -31,10 +29,11 @@
import io.confluent.ksql.parser.SqlBaseParser.SetPropertyContext;
import io.confluent.ksql.parser.SqlBaseParser.StatementContext;
import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.QueryStream;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
Expand All @@ -47,26 +46,20 @@
import io.confluent.ksql.util.ParserUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.vertx.core.Context;
import java.io.Closeable;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.commons.compress.utils.IOUtils;
import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -84,8 +77,6 @@ public class Cli implements KsqlRequestExecutor, Closeable {
.put(UnsetPropertyContext.class, Cli::unsetPropertyFromCtxt)
.build();

private final ExecutorService queryStreamExecutorService;

private final Long streamedQueryRowLimit;
private final Long streamedQueryTimeoutMs;

Expand Down Expand Up @@ -116,7 +107,6 @@ public static Cli build(
this.streamedQueryTimeoutMs = streamedQueryTimeoutMs;
this.restClient = restClient;
this.terminal = terminal;
this.queryStreamExecutorService = Executors.newSingleThreadExecutor();
this.remoteServerState = new RemoteServerState();

final Supplier<String> versionSuppler =
Expand Down Expand Up @@ -220,7 +210,6 @@ private void displayWelcomeMessage() {

@Override
public void close() {
queryStreamExecutorService.shutdownNow();
terminal.close();
}

Expand Down Expand Up @@ -310,68 +299,44 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) {
}
}

private void streamResults(final QueryStream queryStream) {
final Future<?> queryStreamFuture = queryStreamExecutorService.submit(() -> {
for (long rowsRead = 0; limitNotReached(rowsRead) && queryStream.hasNext(); ) {
final StreamedRow row = queryStream.next();

terminal.printStreamedRow(row);
if (row.isTerminal()) {
break;
}

if (row.getRow().isPresent()) {
rowsRead++;
}
}
});

terminal.handle(Terminal.Signal.INT, signal -> {
terminal.handle(Terminal.Signal.INT, Terminal.SignalHandler.SIG_IGN);
queryStream.close();
});

try {
if (streamedQueryTimeoutMs != null) {
try {
queryStreamFuture.get(streamedQueryTimeoutMs, TimeUnit.MILLISECONDS);
} catch (final TimeoutException exception) {
queryStream.close();
}
}

queryStreamFuture.get();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
}
throw new RuntimeException(e.getCause());
} finally {
terminal.writer().println("Query terminated");
terminal.flush();
}
}

private boolean limitNotReached(final long rowsRead) {
return streamedQueryRowLimit == null || rowsRead < streamedQueryRowLimit;
}

@SuppressWarnings({"try", "unused"}) // ignored param is required to compile.
private void handleQuery(
final String statement,
final SqlBaseParser.QueryStatementContext query
) {
final RestResponse<QueryStream> queryResponse =
makeKsqlRequest(statement, restClient::makeQueryRequest);
final RestResponse<StreamPublisher<StreamedRow>> queryResponse =
makeKsqlRequest(statement, restClient::makeQueryRequestStreamed);

if (!queryResponse.isSuccessful()) {
terminal.printErrorMessage(queryResponse.getErrorMessage());
terminal.flush();
} else {
try (QueryStream queryStream = queryResponse.getResponse();
StatusClosable toClose = terminal.setStatusMessage("Press CTRL-C to interrupt")) {
streamResults(queryStream);
try (StatusClosable toClose = terminal.setStatusMessage("Press CTRL-C to interrupt")) {
final StreamPublisher<StreamedRow> publisher = queryResponse.getResponse();
final CompletableFuture<Void> future = new CompletableFuture<>();
final QueryStreamSubscriber subscriber = new QueryStreamSubscriber(publisher.getContext(),
future);
publisher.subscribe(subscriber);

terminal.handle(Terminal.Signal.INT, signal -> {
terminal.handle(Terminal.Signal.INT, Terminal.SignalHandler.SIG_IGN);
subscriber.close();
future.complete(null);
});

try {
if (streamedQueryTimeoutMs != null) {
future.get(streamedQueryTimeoutMs, TimeUnit.MILLISECONDS);
} else {
future.get();
}
} catch (Exception e) {
LOGGER.error("Unexpected exception in waiting for query", e);
} finally {
terminal.writer().println("Query terminated");
terminal.flush();
publisher.close();
}
}
}
}
Expand All @@ -381,42 +346,37 @@ private void handlePrintedTopic(
final String printTopic,
final SqlBaseParser.PrintTopicContext ignored
) {
final RestResponse<InputStream> topicResponse =
final RestResponse<StreamPublisher<String>> topicResponse =
makeKsqlRequest(printTopic, restClient::makePrintTopicRequest);

if (topicResponse.isSuccessful()) {
try (Scanner topicStreamScanner = new Scanner(topicResponse.getResponse(), UTF_8.name());
StatusClosable toClose = terminal.setStatusMessage("Press CTRL-C to interrupt")
) {
final Future<?> topicPrintFuture = queryStreamExecutorService.submit(() -> {
while (!Thread.currentThread().isInterrupted() && topicStreamScanner.hasNextLine()) {
final String line = topicStreamScanner.nextLine();
if (!line.isEmpty()) {
terminal.writer().println(line);
terminal.flush();
}
}
});

try (StatusClosable toClose = terminal.setStatusMessage("Press CTRL-C to interrupt")) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final StreamPublisher<String> publisher = topicResponse.getResponse();
final PrintTopicSubscriber subscriber = new PrintTopicSubscriber(publisher.getContext(),
future);
publisher.subscribe(subscriber);

terminal.handle(Terminal.Signal.INT, signal -> {
terminal.handle(Terminal.Signal.INT, Terminal.SignalHandler.SIG_IGN);
topicPrintFuture.cancel(true);
subscriber.close();
future.complete(null);
});

try {
topicPrintFuture.get();
} catch (final CancellationException exception) {
IOUtils.closeQuietly(topicResponse.getResponse());
terminal.writer().println("Topic printing ceased");
terminal.flush();
} catch (final Exception e) {
throw new RuntimeException(e);
future.get();
} catch (Exception e) {
LOGGER.error("Unexpected exception in waiting for print topic completion", e);
} finally {
publisher.close();
}
terminal.writer().println("Topic printing ceased");
}
} else {
terminal.writer().println(topicResponse.getErrorMessage().getMessage());
terminal.flush();
}
terminal.flush();
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -511,4 +471,105 @@ private void setRequestPipelining(final boolean newSetting) {
requestPipelining = newSetting;
}
}

private class QueryStreamSubscriber extends BaseSubscriber<StreamedRow> {

private final CompletableFuture<Void> future;
private boolean closed;
private long rowsRead;

QueryStreamSubscriber(final Context context, final CompletableFuture<Void> future) {
super(context);
this.future = Objects.requireNonNull(future);
}

@Override
protected void afterSubscribe(final Subscription subscription) {
makeRequest(1);
}

@Override
protected synchronized void handleValue(final StreamedRow row) {
if (closed) {
return;
}
terminal.printStreamedRow(row);
terminal.flush();
if (row.isTerminal()) {
future.complete(null);
close();
return;
}
if (row.getRow().isPresent()) {
rowsRead++;
if (streamedQueryRowLimit != null && streamedQueryRowLimit == rowsRead) {
future.complete(null);
close();
return;
}
}
makeRequest(1);
}

@Override
protected void handleComplete() {
future.complete(null);
}

@Override
protected void handleError(final Throwable t) {
future.completeExceptionally(t);
}

synchronized void close() {
closed = true;
context.runOnContext(v -> cancel());
}
}

private class PrintTopicSubscriber extends BaseSubscriber<String> {

private final CompletableFuture<Void> future;
private boolean closed;

PrintTopicSubscriber(final Context context, final CompletableFuture<Void> future) {
super(context);
this.future = Objects.requireNonNull(future);
}

@Override
protected void afterSubscribe(final Subscription subscription) {
makeRequest(1);
}

@Override
protected synchronized void handleValue(final String line) {
if (closed) {
return;
}
if (line.isEmpty()) {
// Ignore it - the server can insert these
} else {
terminal.writer().println(line);
terminal.flush();
makeRequest(1);
}
}

@Override
protected void handleComplete() {
future.complete(null);
}

@Override
protected void handleError(final Throwable t) {
future.completeExceptionally(t);
}

synchronized void close() {
closed = true;
context.runOnContext(v -> cancel());
}
}

}
Loading

0 comments on commit 849255a

Please sign in to comment.