diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 45d5ebecfbf9..5d4858d94c00 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -274,7 +274,8 @@ private void checkAuthorization(final Session session) { try { provider.checkEndpointAccess(user, method, path); } catch (final Throwable t) { - log.warn(String.format("User:%s is denied access \"%s %s\"", user, method, path), t); + log.warn(String.format("User:%s is denied access to Websocket " + + "query endpoint", user), t); throw new KsqlException(t); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java index 549041ea31b3..08407b33f41e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java @@ -20,10 +20,10 @@ import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; +import io.confluent.ksql.rest.entity.KafkaTopicInfo; +import io.confluent.ksql.rest.entity.KafkaTopicsList; import io.confluent.ksql.rest.entity.KsqlEntity; -import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.server.TestKsqlRestApp; -import io.confluent.ksql.rest.server.resources.Errors; import io.confluent.ksql.rest.server.security.KsqlAuthorizationProvider; import io.confluent.ksql.rest.server.security.KsqlSecurityExtension; import io.confluent.ksql.services.ServiceContext; @@ -35,6 +35,7 @@ import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.process.internal.RequestScoped; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -45,7 +46,6 @@ import javax.ws.rs.core.Configurable; import java.security.Principal; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -54,7 +54,6 @@ import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.JAAS_KAFKA_PROPS_NAME; import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER1; -import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -62,6 +61,10 @@ public class AuthorizationFunctionalTest { private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + private static final Credentials USER1 = VALID_USER1; + + private static final String TOPIC_1 = "topic_1"; + private static final TestKsqlRestApp REST_APP = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) .withProperty("authentication.method", "BASIC") @@ -104,6 +107,11 @@ public void dispose(final ServiceContext serviceContext) { @Rule public final ExpectedException expectedException = ExpectedException.none(); + @BeforeClass + public static void setUpClass() { + TEST_HARNESS.ensureTopics(TOPIC_1); + } + @Before public void setUp() { allowedUsers.clear(); @@ -122,39 +130,31 @@ public void shouldDenyAccess() { // Then: expectedException.expect(AssertionError.class); expectedException.expectMessage( - String.format("Access denied to User:%s", VALID_USER1.username) + String.format("Access denied to User:%s", USER1.username) ); // When: - makeKsqlRequest(VALID_USER1, "SHOW TOPICS;"); + makeKsqlRequest(USER1, "SHOW TOPICS;"); } @Test public void shouldAllowAccess() { // Given: - givenAuthorizedUser(VALID_USER1); + givenAuthorizedUser(USER1); // When: - final List results = makeKsqlRequest(VALID_USER1, "SHOW TOPICS;"); + final List results = makeKsqlRequest(USER1, "SHOW TOPICS;"); // Then: - assertSuccessful(results); + final List topics = ((KafkaTopicsList)results.get(0)).getTopics(); + assertThat(topics.size(), is(1)); + assertThat(topics.get(0).getName(), is(TOPIC_1)); } private void givenAuthorizedUser(final Credentials user) { allowedUsers.add(user.username); } - private static void assertSuccessful(final List results) { - results.stream() - .filter(e -> e instanceof CommandStatusEntity) - .map(CommandStatusEntity.class::cast) - .forEach(r -> assertThat( - r.getStatementText() + " : " + r.getCommandStatus().getMessage(), - r.getCommandStatus().getStatus(), - is(CommandStatus.Status.SUCCESS))); - } - private List makeKsqlRequest(final Credentials credentials, final String sql) { return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql, Optional.of(credentials)); }