Skip to content

Commit

Permalink
make CLI requests wait for last command sequence number.
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Dec 15, 2018
1 parent 0687771 commit 9030ad7
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 60 deletions.
50 changes: 41 additions & 9 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlConstants;
Expand All @@ -57,6 +59,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.jline.reader.EndOfFileException;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
Expand All @@ -67,6 +70,8 @@ public class Cli implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);

static final int COMMAND_QUEUE_CATCHUP_TIMEOUT_RETRIES = 1;

private final ExecutorService queryStreamExecutorService;

private final Long streamedQueryRowLimit;
Expand Down Expand Up @@ -239,8 +244,7 @@ private void handleStatements(final String line)
}
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
}
}

Expand Down Expand Up @@ -275,7 +279,7 @@ private void runScript(
}
setProperty(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
makeKsqlRequest(statementText, restClient::makeKsqlRequest)
);
}

Expand All @@ -285,7 +289,8 @@ private StringBuilder printOrDisplayQueryResults(
final String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(restClient.makeKsqlRequest(consecutiveStatements.toString()));
printKsqlResponse(
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
consecutiveStatements.setLength(0);
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
Expand All @@ -297,7 +302,8 @@ private StringBuilder printOrDisplayQueryResults(
}

private void listProperties(final String statementText) throws IOException {
final KsqlEntityList ksqlEntityList = restClient.makeKsqlRequest(statementText).getResponse();
final KsqlEntityList ksqlEntityList =
makeKsqlRequest(statementText, restClient::makeKsqlRequest).getResponse();
terminal.printKsqlEntityList(ksqlEntityList);
}

Expand Down Expand Up @@ -327,7 +333,7 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) thro
private void handleStreamedQuery(final String query) throws IOException {

final RestResponse<KsqlRestClient.QueryStream> queryResponse =
restClient.makeQueryRequest(query);
makeKsqlRequest(query, restClient::makeQueryRequest);

LOGGER.debug("Handling streamed query");

Expand Down Expand Up @@ -392,7 +398,7 @@ private boolean limitNotReached(final long rowsRead) {
private void handlePrintedTopic(final String printTopic)
throws InterruptedException, ExecutionException, IOException {
final RestResponse<InputStream> topicResponse =
restClient.makePrintTopicRequest(printTopic);
makeKsqlRequest(printTopic, restClient::makePrintTopicRequest);

if (topicResponse.isSuccessful()) {
try (Scanner topicStreamScanner = new Scanner(topicResponse.getResponse(), UTF_8.name());
Expand Down Expand Up @@ -461,8 +467,7 @@ private StringBuilder unsetProperty(
) throws IOException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
makeKsqlRequest(consecutiveStatements.toString(), restClient::makeKsqlRequest));
consecutiveStatements.setLength(0);
}
final SqlBaseParser.UnsetPropertyContext unsetPropertyContext =
Expand All @@ -482,4 +487,31 @@ private void unsetProperty(final String property) {
terminal.writer()
.printf("Successfully unset local property '%s' (value was '%s').%n", property, oldValue);
}

private <R> RestResponse<R> makeKsqlRequest(
final String ksql, final BiFunction<String, Long, RestResponse<R>> requestIssuer) {
return makeKsqlRequest(ksql, requestIssuer, COMMAND_QUEUE_CATCHUP_TIMEOUT_RETRIES);
}

private <R> RestResponse<R> makeKsqlRequest(
final String ksql,
final BiFunction<String, Long, RestResponse<R>> requestIssuer,
final int remainingRetries) {
final RestResponse<R> response =
requestIssuer.apply(ksql, CommandStoreUtil.LAST_SEQUENCE_NUMBER);
if (isSequenceNumberTimeout(response)) {
if (remainingRetries > 0) {
return makeKsqlRequest(ksql, requestIssuer, remainingRetries - 1);
} else {
return requestIssuer.apply(ksql, null);
}
}
return response;
}

private static boolean isSequenceNumberTimeout(final RestResponse<?> response) {
return response.isErroneous()
&& (response.getErrorMessage().getErrorCode()
== Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT);
}
}
109 changes: 89 additions & 20 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE;
import static org.easymock.EasyMock.niceMock;
import static org.hamcrest.CoreMatchers.any;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.containsString;
Expand All @@ -25,6 +24,13 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.common.utils.IntegrationTest;
Expand All @@ -47,6 +53,7 @@
import io.confluent.ksql.rest.server.KsqlRestApplication;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.test.util.TestKsqlRestApp;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -69,7 +76,6 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.easymock.EasyMock;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
Expand All @@ -86,10 +92,15 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

/**
* Most tests in CliTest are end-to-end integration tests, so it may expect a long running time.
*/
@RunWith(MockitoJUnitRunner.class)
@Category({IntegrationTest.class})
public class CliTest {

Expand Down Expand Up @@ -133,6 +144,7 @@ public class CliTest {
private Console console;
private TestTerminal terminal;
private TestRowCaptor rowCaptor;
@Mock
private Supplier<String> lineSupplier;
private Cli localCli;

Expand All @@ -158,7 +170,6 @@ public static void classSetUp() throws Exception {
@SuppressWarnings("unchecked")
@Before
public void setUp() {
lineSupplier = niceMock(Supplier.class);
terminal = new TestTerminal(lineSupplier);
rowCaptor = new TestRowCaptor();
console = new Console(CLI_OUTPUT_FORMAT, () -> "v1.2.3", terminal, rowCaptor);
Expand Down Expand Up @@ -317,7 +328,7 @@ private void testCreateStreamAsSelect(String selectQuery, final Schema resultSch
}

private static void runStatement(final String statement, final KsqlRestClient restClient) {
final RestResponse response = restClient.makeKsqlRequest(statement);
final RestResponse response = restClient.makeKsqlRequest(statement, null);
Assert.assertThat(response.isSuccessful(), is(true));
final KsqlEntityList entityList = ((KsqlEntityList) response.get());
Assert.assertThat(entityList.size(), equalTo(1));
Expand Down Expand Up @@ -353,7 +364,7 @@ private static void terminateQuery(final String queryId) {
private static void dropStream(final String name) {
final String dropStatement = String.format("drop stream %s;", name);

final RestResponse response = restClient.makeKsqlRequest(dropStatement);
final RestResponse response = restClient.makeKsqlRequest(dropStatement, null);
if (response.isSuccessful()) {
return;
}
Expand Down Expand Up @@ -641,17 +652,14 @@ public void shouldHandleRegisterTopic() throws Exception {
public void shouldPrintErrorIfCantConnectToRestServer() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = EasyMock.mock(KsqlRestClient.class);
EasyMock.expect(mockRestClient.makeRootRequest()).andThrow(new KsqlRestClientException("Boom", new ProcessingException("")));
EasyMock.expect(mockRestClient.getServerInfo()).andReturn(
RestResponse.of(new ServerInfo("1.x", "testClusterId", "testServiceId")));
EasyMock.expect(mockRestClient.getServerAddress()).andReturn(new URI("http://someserver:8008")).anyTimes();
EasyMock.replay(mockRestClient);
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeRootRequest())
.thenThrow(new KsqlRestClientException("Boom", new ProcessingException("")));

new Cli(1L, 1L, mockRestClient, console)
.runInteractively();

assertThat(terminal.getOutputString(), containsString("Remote server address may not be valid"));
assertThat(terminal.getOutputString(),containsString("Remote server address may not be valid"));
}

@Test
Expand All @@ -665,16 +673,12 @@ public void shouldRegisterRemoteCommand() {
public void shouldPrintErrorOnUnsupportedAPI() throws Exception {
givenRunInteractivelyWillExit();

final KsqlRestClient mockRestClient = EasyMock.mock(KsqlRestClient.class);
EasyMock.expect(mockRestClient.makeRootRequest()).andReturn(
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeRootRequest()).thenReturn(
RestResponse.erroneous(
new KsqlErrorMessage(
Errors.toErrorCode(NOT_ACCEPTABLE.getStatusCode()),
"Minimum supported client version: 1.0")));
EasyMock.expect(mockRestClient.getServerInfo()).andReturn(
RestResponse.of(new ServerInfo("1.x", "testClusterId", "testServiceId")));
EasyMock.expect(mockRestClient.getServerAddress()).andReturn(new URI("http://someserver:8008"));
EasyMock.replay(mockRestClient);

new Cli(1L, 1L, mockRestClient, console)
.runInteractively();
Expand Down Expand Up @@ -792,9 +796,74 @@ public void shouldPrintErrorIfCantFindFunction() throws Exception {
assertThat(terminal.getOutputString(), containsString(expectedOutput));
}

@Test
public void shouldRetryOnCommandQueueCatchupTimeoutUntilLimitReached() throws Exception {
// Given:
final String statementText = "list streams;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER))
.thenReturn(RestResponse.erroneous(
new KsqlErrorMessage(Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT, "timed out!")));
when(mockRestClient.makeKsqlRequest(statementText, null))
.thenReturn(RestResponse.successful(new KsqlEntityList()));

// When:
localCli.handleLine(statementText);

// Then:
final InOrder inOrder = inOrder(mockRestClient);
inOrder.verify(mockRestClient, times(Cli.COMMAND_QUEUE_CATCHUP_TIMEOUT_RETRIES + 1))
.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER);
inOrder.verify(mockRestClient).makeKsqlRequest(statementText, null);
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldNotRetryOnSuccess() throws Exception {
// Given:
final String statementText = "list streams;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER))
.thenReturn(RestResponse.successful(new KsqlEntityList()));

// When:
localCli.handleLine(statementText);

// Then:
verify(mockRestClient, times(1)).makeKsqlRequest(anyString(), anyLong());
}

@Test
public void shouldNotRetryOnErrorThatIsNotCommandQueueCatchupTimeout() throws Exception {
// Given:
final String statementText = "list streams;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER))
.thenReturn(RestResponse.erroneous(
new KsqlErrorMessage(Errors.ERROR_CODE_SERVER_ERROR, "uh oh")));

// When:
localCli.handleLine(statementText);

// Then:
verify(mockRestClient, times(1)).makeKsqlRequest(anyString(), anyLong());
}

private void givenRunInteractivelyWillExit() {
EasyMock.expect(lineSupplier.get()).andReturn("eXiT");
EasyMock.replay(lineSupplier);
when(lineSupplier.get()).thenReturn("eXiT");
}

private KsqlRestClient givenMockRestClient() throws Exception {
final KsqlRestClient mockRestClient = mock(KsqlRestClient.class);

when(mockRestClient.getServerInfo()).thenReturn(
RestResponse.of(new ServerInfo("1.x", "testClusterId", "testServiceId")));
when(mockRestClient.getServerAddress()).thenReturn(new URI("http://someserver:8008"));

localCli = new Cli(
STREAMED_QUERY_ROW_LIMIT, STREAMED_QUERY_TIMEOUT_MS, mockRestClient, console);

return mockRestClient;
}

private static class TestRowCaptor implements RowCaptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public RestResponse<ServerInfo> getServerInfo() {
return getRequest("/info", ServerInfo.class);
}

public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql, final Long commandSeqNum) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), commandSeqNum);
return postRequest("ksql", jsonRequest, Optional.empty(), true,
r -> r.readEntity(KsqlEntityList.class));
}
Expand All @@ -143,14 +143,15 @@ public RestResponse<CommandStatus> makeStatusRequest(final String commandId) {
return getRequest(String.format("status/%s", commandId), CommandStatus.class);
}

public RestResponse<QueryStream> makeQueryRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
public RestResponse<QueryStream> makeQueryRequest(final String ksql, final Long commandSeqNum) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), commandSeqNum);
final Optional<Integer> readTimeoutMs = Optional.of(QueryStream.READ_TIMEOUT_MS);
return postRequest("query", jsonRequest, readTimeoutMs, false, QueryStream::new);
}

public RestResponse<InputStream> makePrintTopicRequest(final String ksql) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), null);
public RestResponse<InputStream> makePrintTopicRequest(
final String ksql, final Long commandSeqNum) {
final KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties.toMap(), commandSeqNum);
return postRequest("query", jsonRequest, Optional.empty(), false,
r -> (InputStream) r.getEntity());
}
Expand Down
Loading

0 comments on commit 9030ad7

Please sign in to comment.