Skip to content

Commit

Permalink
back out changes for waiting for last offset. implement CLI read-afte…
Browse files Browse the repository at this point in the history
…r-write consistency differently.
  • Loading branch information
vcrfxia committed Dec 21, 2018
1 parent f0cb4c0 commit 8a96569
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 98 deletions.
21 changes: 19 additions & 2 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlConstants;
Expand Down Expand Up @@ -80,6 +79,8 @@ public class Cli implements Closeable {
private final KsqlRestClient restClient;
private final Console terminal;

private long lastCommandSequenceNumber;

public static Cli build(
final Long streamedQueryRowLimit,
final Long streamedQueryTimeoutMs,
Expand All @@ -104,6 +105,7 @@ public static Cli build(
this.restClient = restClient;
this.terminal = terminal;
this.queryStreamExecutorService = Executors.newSingleThreadExecutor();
this.lastCommandSequenceNumber = -1;

terminal
.registerCliSpecificCommand(new RemoteServerSpecificCommand(restClient, terminal.writer()));
Expand Down Expand Up @@ -498,13 +500,15 @@ private <R> RestResponse<R> makeKsqlRequest(
final BiFunction<String, Long, RestResponse<R>> requestIssuer,
final int remainingRetries) {
final RestResponse<R> response =
requestIssuer.apply(ksql, CommandStoreUtil.LAST_SEQUENCE_NUMBER);
requestIssuer.apply(ksql, lastCommandSequenceNumber);
if (isSequenceNumberTimeout(response)) {
if (remainingRetries > 0) {
return makeKsqlRequest(ksql, requestIssuer, remainingRetries - 1);
} else {
return requestIssuer.apply(ksql, null);
}
} else if (isKsqlEntityList(response)) {
updateLastCommandSequenceNumber((KsqlEntityList)response.getResponse());
}
return response;
}
Expand All @@ -514,4 +518,17 @@ private static boolean isSequenceNumberTimeout(final RestResponse<?> response) {
&& (response.getErrorMessage().getErrorCode()
== Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT);
}

private static boolean isKsqlEntityList(final RestResponse<?> response) {
return response.isSuccessful() && response.getResponse() instanceof KsqlEntityList;
}

private void updateLastCommandSequenceNumber(final KsqlEntityList entities) {
final Optional<Long> lastSeqNum = entities.stream()
.filter(entity -> entity instanceof CommandStatusEntity)
.map(entity -> (CommandStatusEntity)entity)
.map(CommandStatusEntity::getCommandSequenceNumber)
.reduce(Long::max);
lastSeqNum.ifPresent(seqNum -> lastCommandSequenceNumber = seqNum);
}
}
80 changes: 75 additions & 5 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -52,8 +53,8 @@
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.server.KsqlRestApplication;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.computation.CommandId;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.test.util.TestKsqlRestApp;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -64,6 +65,7 @@
import io.confluent.ksql.util.TopicProducer;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -801,7 +803,7 @@ public void shouldRetryOnCommandQueueCatchupTimeoutUntilLimitReached() throws Ex
// Given:
final String statementText = "list streams;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER))
when(mockRestClient.makeKsqlRequest(statementText, -1L))
.thenReturn(RestResponse.erroneous(
new KsqlErrorMessage(Errors.ERROR_CODE_COMMAND_QUEUE_CATCHUP_TIMEOUT, "timed out!")));
when(mockRestClient.makeKsqlRequest(statementText, null))
Expand All @@ -813,7 +815,7 @@ public void shouldRetryOnCommandQueueCatchupTimeoutUntilLimitReached() throws Ex
// Then:
final InOrder inOrder = inOrder(mockRestClient);
inOrder.verify(mockRestClient, times(Cli.COMMAND_QUEUE_CATCHUP_TIMEOUT_RETRIES + 1))
.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER);
.makeKsqlRequest(statementText, -1L);
inOrder.verify(mockRestClient).makeKsqlRequest(statementText, null);
inOrder.verifyNoMoreInteractions();
}
Expand All @@ -823,7 +825,7 @@ public void shouldNotRetryOnSuccess() throws Exception {
// Given:
final String statementText = "list streams;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER))
when(mockRestClient.makeKsqlRequest(statementText, -1L))
.thenReturn(RestResponse.successful(new KsqlEntityList()));

// When:
Expand All @@ -838,7 +840,7 @@ public void shouldNotRetryOnErrorThatIsNotCommandQueueCatchupTimeout() throws Ex
// Given:
final String statementText = "list streams;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(statementText, CommandStoreUtil.LAST_SEQUENCE_NUMBER))
when(mockRestClient.makeKsqlRequest(statementText, -1L))
.thenReturn(RestResponse.erroneous(
new KsqlErrorMessage(Errors.ERROR_CODE_SERVER_ERROR, "uh oh")));

Expand All @@ -849,6 +851,65 @@ public void shouldNotRetryOnErrorThatIsNotCommandQueueCatchupTimeout() throws Ex
verify(mockRestClient, times(1)).makeKsqlRequest(anyString(), anyLong());
}

@Test
public void shouldUpdateCommandSequenceNumber() throws Exception {
// Given:
final String statementText = "create stream foo;";
final KsqlRestClient mockRestClient = givenMockRestClient();
final CommandStatusEntity stubEntity = stubCommandStatusEntityWithSeqNum(12L);
when(mockRestClient.makeKsqlRequest(anyString(), anyLong()))
.thenReturn(RestResponse.successful(new KsqlEntityList(
Collections.singletonList(stubEntity))));

// When:
localCli.handleLine(statementText);

final String secondStatement = "list streams;";
localCli.handleLine(secondStatement);

// Then:
verify(mockRestClient).makeKsqlRequest(secondStatement, 12L);
}

@Test
public void shouldUpdateCommandSequenceNumberOnMultipleCommandStatusEntities() throws Exception {
// Given:
final String statementText = "create stream foo;";
final KsqlRestClient mockRestClient = givenMockRestClient();
final CommandStatusEntity firstEntity = stubCommandStatusEntityWithSeqNum(12L);
final CommandStatusEntity secondEntity = stubCommandStatusEntityWithSeqNum(14L);
when(mockRestClient.makeKsqlRequest(anyString(), anyLong()))
.thenReturn(RestResponse.successful(new KsqlEntityList(
ImmutableList.of(firstEntity, secondEntity))));

// When:
localCli.handleLine(statementText);

final String secondStatement = "list streams;";
localCli.handleLine(secondStatement);

// Then:
verify(mockRestClient).makeKsqlRequest(secondStatement, 14L);
}

@Test
public void shouldNotUpdateCommandSequenceNumberIfNoCommandStatusEntities() throws Exception {
// Given:
final String statementText = "create stream foo;";
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(anyString(), anyLong()))
.thenReturn(RestResponse.successful(new KsqlEntityList()));

// When:
localCli.handleLine(statementText);

final String secondStatement = "list streams;";
localCli.handleLine(secondStatement);

// Then:
verify(mockRestClient).makeKsqlRequest(secondStatement, -1L);
}

private void givenRunInteractivelyWillExit() {
when(lineSupplier.get()).thenReturn("eXiT");
}
Expand All @@ -866,6 +927,15 @@ private KsqlRestClient givenMockRestClient() throws Exception {
return mockRestClient;
}

private CommandStatusEntity stubCommandStatusEntityWithSeqNum(final long seqNum) {
return new CommandStatusEntity(
"stub",
new CommandId(CommandId.Type.STREAM, "stub", CommandId.Action.CREATE),
new CommandStatus(CommandStatus.Status.SUCCESS, "stub"),
seqNum
);
}

private static class TestRowCaptor implements RowCaptor {
private TestResult.Builder output = new TestResult.Builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.io.Closeable;
Expand Down Expand Up @@ -63,7 +62,6 @@ public class CommandStore implements CommandQueue, Closeable {
private final CommandIdAssigner commandIdAssigner;
private final Map<CommandId, CommandStatusFuture> commandStatusMap;
private final SequenceNumberFutureStore sequenceNumberFutureStore;
private long lastCommandSequenceNumber;

public CommandStore(
final String commandTopic,
Expand Down Expand Up @@ -94,7 +92,6 @@ public CommandStore(
this.commandStatusMap = Maps.newConcurrentMap();
this.sequenceNumberFutureStore =
Objects.requireNonNull(sequenceNumberFutureStore, "sequenceNumberFutureStore");
updateLastCommandSequenceNumber();

commandConsumer.assign(Collections.singleton(topicPartition));
}
Expand Down Expand Up @@ -168,7 +165,6 @@ public QueuedCommandStatus enqueueCommand(
*/
public List<QueuedCommand> getNewCommands() {
completeSatisfiedSequenceNumberFutures();
updateLastCommandSequenceNumber();

final List<QueuedCommand> queuedCommands = Lists.newArrayList();
commandConsumer.poll(Duration.ofMillis(Long.MAX_VALUE)).forEach(
Expand Down Expand Up @@ -220,22 +216,20 @@ public List<QueuedCommand> getRestoreCommands() {
public void ensureConsumedPast(final long seqNum, final Duration timeout)
throws InterruptedException, TimeoutException {
final CompletableFuture<Void> future =
sequenceNumberFutureStore.getFutureForSequenceNumber(getSequenceNumberToWaitFor(seqNum));
sequenceNumberFutureStore.getFutureForSequenceNumber(seqNum);
try {
future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
}
throw new RuntimeException(
"Error waiting for command sequence number of "
+ CommandStoreUtil.getSequenceNumberString(seqNum),
e.getCause());
"Error waiting for command sequence number of " + seqNum, e.getCause());
} catch (final TimeoutException e) {
throw new TimeoutException(
String.format(
"Timeout reached while waiting for command sequence number of %s. (Timeout: %d ms)",
CommandStoreUtil.getSequenceNumberString(seqNum),
"Timeout reached while waiting for command sequence number of %d. (Timeout: %d ms)",
seqNum,
timeout.toMillis()));
}
}
Expand All @@ -250,16 +244,6 @@ private Collection<TopicPartition> getTopicPartitionsForTopic(final String topic
return result;
}

private long getSequenceNumberToWaitFor(final long seqNum) {
return seqNum == CommandStoreUtil.LAST_SEQUENCE_NUMBER ? lastCommandSequenceNumber : seqNum;
}

private void updateLastCommandSequenceNumber() {
final long lastOffsetPlusOne =
commandConsumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition);
lastCommandSequenceNumber = lastOffsetPlusOne - 1;
}

private void completeSatisfiedSequenceNumberFutures() {
final long consumerPosition = commandConsumer.position(topicPartition);
sequenceNumberFutureStore.completeFuturesUpToAndIncludingSequenceNumber(consumerPosition - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.TimeoutException;

public final class CommandStoreUtil {
public static final long LAST_SEQUENCE_NUMBER = Long.MAX_VALUE;

private CommandStoreUtil() {
}
Expand Down Expand Up @@ -57,8 +56,4 @@ public static void waitForCommandSequenceNumber(
commandQueue.ensureConsumedPast(seqNum, timeout);
}
}

public static String getSequenceNumberString(final long seqNum) {
return seqNum == LAST_SEQUENCE_NUMBER ? "LAST_SEQUENCE_NUMBER" : String.valueOf(seqNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
Expand Down Expand Up @@ -133,11 +132,16 @@ public void setUp() throws Exception {
.thenReturn(ImmutableList.of(
new PartitionInfo(COMMAND_TOPIC, 0, node, new Node[]{node}, new Node[]{node})
));
setUpLastSequenceNumber(-1);

when(sequenceNumberFutureStore.getFutureForSequenceNumber(anyLong())).thenReturn(future);

setUpCommandStore();
commandStore = new CommandStore(
COMMAND_TOPIC,
commandConsumer,
commandProducer,
commandIdAssigner,
sequenceNumberFutureStore
);
}

@Test
Expand Down Expand Up @@ -341,49 +345,6 @@ public void shouldCompleteFuturesWhenGettingNewCommands() {
inOrder.verify(commandConsumer).poll(any());
}

@Test
public void shouldUseLastSequenceNumber() throws Exception {
// Given:
givenLastSequenceNumber(10L);

// When:
commandStore.ensureConsumedPast(CommandStoreUtil.LAST_SEQUENCE_NUMBER, TIMEOUT);

// Then:
verify(sequenceNumberFutureStore).getFutureForSequenceNumber(10L);
}

@Test
public void shouldUseUpdatedLastSequenceNumber() throws Exception {
// Given:
setUpLastSequenceNumber(10L);
commandStore.getNewCommands();

// When:
commandStore.ensureConsumedPast(CommandStoreUtil.LAST_SEQUENCE_NUMBER, TIMEOUT);

// Then:
verify(sequenceNumberFutureStore).getFutureForSequenceNumber(10L);
}

@Test
public void shouldThrowExceptionWithMeaningfulMessageOnTimeoutWaitingForLastSequenceNumber()
throws Exception {
// Given:
givenLastSequenceNumber(10L);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException());

// Expect:
expectedException.expect(TimeoutException.class);
expectedException.expectMessage(String.format(
"Timeout reached while waiting for command sequence number of LAST_SEQUENCE_NUMBER. "
+ "(Timeout: %d ms)",
TIMEOUT.toMillis()));

// When:
commandStore.ensureConsumedPast(CommandStoreUtil.LAST_SEQUENCE_NUMBER, TIMEOUT);
}

private static List<Pair<CommandId, Command>> getPriorCommands(final CommandStore commandStore) {
return commandStore.getRestoreCommands().stream()
.map(
Expand All @@ -403,24 +364,4 @@ private static ConsumerRecords<CommandId, Command> buildRecords(final Object ...
}
return new ConsumerRecords<>(Collections.singletonMap(COMMAND_TOPIC_PARTITION, records));
}

private void givenLastSequenceNumber(final long seqNum) {
setUpLastSequenceNumber(seqNum);
setUpCommandStore();
}

private void setUpLastSequenceNumber(final long seqNum) {
when(commandConsumer.endOffsets(Collections.singleton(COMMAND_TOPIC_PARTITION)))
.thenReturn(Collections.singletonMap(COMMAND_TOPIC_PARTITION, seqNum + 1));
}

private void setUpCommandStore() {
commandStore = new CommandStore(
COMMAND_TOPIC,
commandConsumer,
commandProducer,
commandIdAssigner,
sequenceNumberFutureStore
);
}
}

0 comments on commit 8a96569

Please sign in to comment.