From 8a9165324c77c8e6c4b84ea66481f6870e336a49 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 2 Oct 2019 15:53:41 -0700 Subject: [PATCH 1/3] fix: improve print topic error message when topic does not exist The message incorrectly stated that KSQL treats unquoted topic names as uppercase. This is not the case. Also, the suggested alternative was just the opposite case to the supplied, which is fairly arbitrary, e.g. if the user issues `print PageView` and the topic does not exist the error message will suggest `pAGEvIEW`, even though no such topic exists. With this change the error message will include any other topics that exist in the cluster with the same name, but different case. --- .../streaming/StreamedQueryResource.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 56607d7a65d5..3edd11ba6887 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -40,9 +40,11 @@ import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -232,24 +234,28 @@ private Response handlePrintTopic( final PreparedStatement statement ) { final PrintTopic printTopic = statement.getStatement(); - final String topicName = printTopic.getTopic().toString(); + final String topicName = printTopic.getTopic(); if (!serviceContext.getTopicClient().isTopicExists(topicName)) { - String reverseSuggestion = ""; - final String nameReversedCase = reverseCase(topicName); - if (serviceContext.getTopicClient().isTopicExists(nameReversedCase)) { - reverseSuggestion = "Did you mean '" + nameReversedCase + "'?" + System.lineSeparator(); - } + final Collection possibleAlternatives = + findPossibleTopicMatches(topicName, serviceContext); + + final String reverseSuggestion = possibleAlternatives.isEmpty() + ? "" + : possibleAlternatives.stream() + .map(name -> "\tprint " + name + ";") + .collect(Collectors.joining( + System.lineSeparator(), + System.lineSeparator() + "Did you mean:" + System.lineSeparator(), + "" + )); + throw new KsqlRestException( - Errors.badRequest(String.format( - "Could not find topic '%s', " + Errors.badRequest( + "Could not find topic '" + topicName + "', " + "or the KSQL user does not have permissions to list the topic." - + System.lineSeparator() + reverseSuggestion - + "KSQL will treat unquoted topic names as uppercase." - + System.lineSeparator() - + "To print a case-sensitive topic use quotes, for example: print \'Topic\';", - topicName))); + )); } final Map propertiesWithOverrides = @@ -267,17 +273,13 @@ private Response handlePrintTopic( return Response.ok().entity(topicStreamWriter).build(); } - private String reverseCase(final String str) { - final char[] chars = str.toCharArray(); - for (int i = 0; i < chars.length; i++) { - final char c = chars[i]; - if (Character.isUpperCase(c)) { - chars[i] = Character.toLowerCase(c); - } else { - chars[i] = Character.toUpperCase(c); - } - } - return new String(chars); + private static Collection findPossibleTopicMatches( + final String topicName, + final ServiceContext serviceContext + ) { + return serviceContext.getTopicClient().listTopicNames().stream() + .filter(name -> name.equalsIgnoreCase(topicName)) + .collect(Collectors.toSet()); } } From f854c9b2dc17d2e9c7ffe5f11b496857024de022 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 2 Oct 2019 16:24:11 -0700 Subject: [PATCH 2/3] test: add tests --- .../streaming/StreamedQueryResource.java | 3 +- .../streaming/StreamedQueryResourceTest.java | 214 +++++++++--------- 2 files changed, 103 insertions(+), 114 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 3edd11ba6887..b7801a373773 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -253,7 +253,8 @@ private Response handlePrintTopic( throw new KsqlRestException( Errors.badRequest( "Could not find topic '" + topicName + "', " - + "or the KSQL user does not have permissions to list the topic." + + "or the KSQL user does not have permissions to list the topic. " + + "Topic names are case-sensitive." + reverseSuggestion )); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index ce3b7ee60021..49dc27488ff3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -20,23 +20,21 @@ import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; @@ -47,8 +45,6 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.planner.PlanSourceExtractorVisitor; -import io.confluent.ksql.planner.plan.OutputNode; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; @@ -87,10 +83,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.Mock; -import org.easymock.MockType; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus.Code; import org.hamcrest.Matchers; import org.junit.Before; @@ -98,8 +91,10 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -@RunWith(EasyMockRunner.class) +@RunWith(MockitoJUnitRunner.class) public class StreamedQueryResourceTest { private static final Duration DISCONNECT_CHECK_INTERVAL = Duration.ofMillis(1000); @@ -115,38 +110,34 @@ public class StreamedQueryResourceTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); - - @Mock(MockType.NICE) + @Mock private KsqlEngine mockKsqlEngine; - @Mock(MockType.NICE) + @Mock private ServiceContext serviceContext; - @Mock(MockType.NICE) + @Mock private KafkaTopicClient mockKafkaTopicClient; - @Mock(MockType.NICE) + @Mock private StatementParser mockStatementParser; @Mock private CommandQueue commandQueue; - @Mock(MockType.NICE) + @Mock private ActivenessRegistrar activenessRegistrar; @Mock private Consumer queryCloseCallback; - @Mock(MockType.NICE) + @Mock private KsqlAuthorizationValidator authorizationValidator; private StreamedQueryResource testResource; - private final static String queryString = "SELECT * FROM test_stream;"; + private final static String queryString = "SELECT * FROM test_stream EMIT CHANGES;"; private final static String printString = "Print TEST_TOPIC;"; private final static String topicName = "test_stream"; private PreparedStatement statement; @Before public void setup() { - expect(serviceContext.getTopicClient()).andReturn(mockKafkaTopicClient); - expect(mockKsqlEngine.hasActiveQueries()).andReturn(false); + when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); statement = PreparedStatement.of("s", mock(Statement.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - replay(mockKsqlEngine, mockStatementParser); + when(mockStatementParser.parseSingleStatement(queryString)).thenReturn(statement); testResource = new StreamedQueryResource( mockKsqlEngine, @@ -199,11 +190,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { @Test public void shouldReturn400OnBadStatement() { // Given: - reset(mockStatementParser); - expect(mockStatementParser.parseSingleStatement(anyString())) - .andThrow(new IllegalArgumentException("some error message")); - - replay(mockStatementParser); + when(mockStatementParser.parseSingleStatement(any())) + .thenThrow(new IllegalArgumentException("some error message")); // Expect expectedException.expect(KsqlRestException.class); @@ -220,10 +208,7 @@ public void shouldReturn400OnBadStatement() { } @Test - public void shouldNotWaitIfCommandSequenceNumberSpecified() { - // Given: - replay(commandQueue); - + public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { // When: testResource.streamQuery( serviceContext, @@ -231,17 +216,11 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() { ); // Then: - verify(commandQueue); + verify(commandQueue, never()).ensureConsumedPast(anyLong(), any()); } @Test public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { - // Given: - commandQueue.ensureConsumedPast(eq(3L), anyObject()); - expectLastCall(); - - replay(commandQueue); - // When: testResource.streamQuery( serviceContext, @@ -249,17 +228,15 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { ); // Then: - verify(commandQueue); + verify(commandQueue).ensureConsumedPast(eq(3L), any()); } @Test public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumber() throws Exception { // Given: - commandQueue.ensureConsumedPast(anyLong(), anyObject()); - expectLastCall().andThrow(new TimeoutException("whoops")); - - replay(commandQueue); + doThrow(new TimeoutException("whoops")) + .when(commandQueue).ensureConsumedPast(anyLong(), any()); // Expect expectedException.expect(KsqlRestException.class); @@ -308,27 +285,12 @@ public void shouldStreamRowsCorrectly() throws Throwable { rowQueuePopulatorThread.start(); final KafkaStreams mockKafkaStreams = mock(KafkaStreams.class); - mockKafkaStreams.start(); - expectLastCall(); - mockKafkaStreams.setUncaughtExceptionHandler(anyObject(Thread.UncaughtExceptionHandler.class)); - expectLastCall(); - mockKafkaStreams.cleanUp(); - expectLastCall(); - mockKafkaStreams.close(); - expectLastCall(); - - final OutputNode mockOutputNode = niceMock(OutputNode.class); - expect(mockOutputNode.accept(anyObject(PlanSourceExtractorVisitor.class), anyObject())) - .andReturn(null); final Map requestStreamsProperties = Collections.emptyMap(); - reset(mockStatementParser); statement = PreparedStatement.of("query", mock(Query.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - - reset(mockKsqlEngine); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(statement); final TransientQueryMetadata transientQueryMetadata = new TransientQueryMetadata( @@ -345,12 +307,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { Collections.emptyMap(), Collections.emptyMap(), queryCloseCallback); - reset(mockOutputNode); - expect(mockKsqlEngine.execute(serviceContext, - ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) - .andReturn(ExecuteResult.of(transientQueryMetadata)); - replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode); + when(mockKsqlEngine.execute(serviceContext, + ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) + .thenReturn(ExecuteResult.of(transientQueryMetadata)); final Response response = testResource.streamQuery( @@ -399,7 +359,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { rowQueuePopulatorThread.join(); // Definitely want to make sure that the Kafka Streams instance has been closed and cleaned up - verify(mockKafkaStreams); + verify(mockKafkaStreams).start(); + verify(mockKafkaStreams).setUncaughtExceptionHandler(any()); + verify(mockKafkaStreams).cleanUp(); + verify(mockKafkaStreams).close(); // If one of the other threads has somehow managed to throw an exception without breaking things up until this // point, we throw that exception now in the main thread and cause the test to fail @@ -471,35 +434,26 @@ public void write(final int b) throws IOException { @Test public void shouldUpdateTheLastRequestTime() { - // Given: - activenessRegistrar.updateLastRequestTime(); - EasyMock.expectLastCall(); - - EasyMock.replay(activenessRegistrar); - - // When: + /// When: testResource.streamQuery( serviceContext, new KsqlRequest(queryString, Collections.emptyMap(), null) ); // Then: - EasyMock.verify(activenessRegistrar); + verify(activenessRegistrar).updateLastRequestTime(); } @Test public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: - reset(mockStatementParser, authorizationValidator); - statement = PreparedStatement.of("query", mock(Query.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - authorizationValidator.checkAuthorization(anyObject(), anyObject(), anyObject()); - expectLastCall().andThrow( - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(statement); - replay(mockStatementParser, authorizationValidator); + doThrow( + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))) + .when(authorizationValidator).checkAuthorization(any(), any(), any()); // When: final Response response = testResource.streamQuery( @@ -519,19 +473,13 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() @Test public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { // Given: - reset(mockStatementParser, authorizationValidator); - statement = PreparedStatement.of("query", mock(Query.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - authorizationValidator.checkAuthorization(anyObject(), anyObject(), anyObject()); - expectLastCall().andThrow( - new KsqlException( - "", - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName) - ))); - - replay(mockStatementParser, authorizationValidator); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(statement); + doThrow(new KsqlException( + "", + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)))) + .when(authorizationValidator).checkAuthorization(any(), any(), any()); // When: final Response response = testResource.streamQuery( @@ -551,18 +499,15 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc } @Test - public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() throws Exception { + public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() { // Given: - reset(mockStatementParser, authorizationValidator); - statement = PreparedStatement.of("print", mock(PrintTopic.class)); - expect(mockStatementParser.parseSingleStatement(printString)) - .andReturn(statement); - authorizationValidator.checkAuthorization(anyObject(), anyObject(), anyObject()); - expectLastCall().andThrow( - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))); + when(mockStatementParser.parseSingleStatement(printString)) + .thenReturn(statement); - replay(mockStatementParser, authorizationValidator); + doThrow( + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))) + .when(authorizationValidator).checkAuthorization(any(), any(), any()); // When: final Response response = testResource.streamQuery( @@ -576,4 +521,47 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx assertEquals(response.getStatus(), expected.getStatus()); assertEquals(response.getEntity(), expected.getEntity()); } + + @Test + public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { + // Given: + final PrintTopic cmd = mock(PrintTopic.class); + when(cmd.getTopic()).thenReturn("TEST_TOPIC"); + statement = PreparedStatement.of("print", cmd); + when(mockStatementParser.parseSingleStatement(any())) + .thenReturn(statement); + + when(mockKafkaTopicClient.isTopicExists(any())).thenReturn(false); + when(mockKafkaTopicClient.listTopicNames()).thenReturn(ImmutableSet.of( + "aTopic", + "test_topic", + "Test_Topic" + )); + + // Then: + expectedException.expect(KsqlRestException.class); + expectedException.expect(exceptionStatusCode(is(HttpStatus.Code.BAD_REQUEST))); + expectedException.expect(exceptionErrorMessage( + errorMessage(containsString( + "Could not find topic 'TEST_TOPIC', " + + "or the KSQL user does not have permissions to list the topic. " + + "Topic names are case-sensitive." + + System.lineSeparator() + + "Did you mean:" + )))); + + expectedException.expect(exceptionErrorMessage( + errorMessage(containsString("\tprint test_topic;" + )))); + + expectedException.expect(exceptionErrorMessage( + errorMessage(containsString("\tprint Test_Topic;" + )))); + + // When: + testResource.streamQuery( + serviceContext, + new KsqlRequest(printString, Collections.emptyMap(), null) + ); + } } From 3ac356f1d7f2f6fa92e842210d763f347cdf747c Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 2 Oct 2019 16:26:01 -0700 Subject: [PATCH 3/3] test: increase test timeout as build servers are slow --- .../io/confluent/ksql/materialization/ks/KsStateStoreTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java index 0ca04c513970..9498ca3e68c7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java @@ -65,7 +65,7 @@ public class KsStateStoreTest { .build(); @Rule - public final Timeout timeout = Timeout.seconds(1); + public final Timeout timeout = Timeout.seconds(10); @Rule public final ExpectedException expectedException = ExpectedException.none();