Skip to content

Commit

Permalink
feat: pass auth header to connect client for RBAC integration (#3492)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Oct 11, 2019
1 parent 0b8ccc1 commit cef0ea3
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.ws.rs.core.HttpHeaders;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
Expand All @@ -59,9 +63,14 @@ public class DefaultConnectClient implements ConnectClient {
private static final int MAX_ATTEMPTS = 3;

private final URI connectUri;
private final Optional<String> authHeader;

public DefaultConnectClient(final String connectUri) {
public DefaultConnectClient(
final String connectUri,
final Optional<String> authHeader
) {
Objects.requireNonNull(connectUri, "connectUri");
this.authHeader = Objects.requireNonNull(authHeader, "authHeader");

try {
this.connectUri = new URI(connectUri);
Expand All @@ -84,6 +93,7 @@ public ConnectResponse<ConnectorInfo> create(

final ConnectResponse<ConnectorInfo> connectResponse = withRetries(() -> Request
.Post(connectUri.resolve(CONNECTORS))
.setHeaders(headers())
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.bodyString(
Expand Down Expand Up @@ -114,6 +124,7 @@ public ConnectResponse<List<String>> connectors() {

final ConnectResponse<List<String>> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(CONNECTORS))
.setHeaders(headers())
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
Expand All @@ -138,6 +149,7 @@ public ConnectResponse<ConnectorStateInfo> status(final String connector) {

final ConnectResponse<ConnectorStateInfo> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(CONNECTORS + "/" + connector + STATUS))
.setHeaders(headers())
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
Expand All @@ -162,6 +174,7 @@ public ConnectResponse<ConnectorInfo> describe(final String connector) {

final ConnectResponse<ConnectorInfo> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector)))
.setHeaders(headers())
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
Expand All @@ -185,6 +198,7 @@ public ConnectResponse<String> delete(final String connector) {

final ConnectResponse<String> connectResponse = withRetries(() -> Request
.Delete(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector)))
.setHeaders(headers())
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
Expand All @@ -200,6 +214,12 @@ public ConnectResponse<String> delete(final String connector) {
}
}

private Header[] headers() {
return authHeader.isPresent()
? new Header[]{new BasicHeader(HttpHeaders.AUTHORIZATION, authHeader.get())}
: new Header[]{};
}

@SuppressWarnings("unchecked")
private static <T> ConnectResponse<T> withRetries(final Callable<ConnectResponse<T>> action) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -35,6 +36,9 @@ public static ServiceContext create(
ksqlConfig,
new DefaultKafkaClientSupplier(),
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get,
new DefaultConnectClient(
ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
ksqlClient
);
}
Expand All @@ -43,6 +47,7 @@ public static ServiceContext create(
final KsqlConfig ksqlConfig,
final KafkaClientSupplier kafkaClientSupplier,
final Supplier<SchemaRegistryClient> srClientFactory,
final ConnectClient connectClient,
final SimpleKsqlClient ksqlClient
) {
final Admin adminClient = kafkaClientSupplier.getAdmin(
Expand All @@ -54,7 +59,7 @@ public static ServiceContext create(
adminClient,
new KafkaTopicClientImpl(adminClient),
srClientFactory,
new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)),
connectClient,
ksqlClient
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.services.TestServiceContext;
import io.confluent.ksql.statement.Injectors;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand Down Expand Up @@ -57,7 +58,9 @@ public static KsqlContext create(
adminClient,
kafkaTopicClient,
() -> schemaRegistryClient,
new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY))
new DefaultConnectClient(
ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty())
);

final String metricsPrefix = "instance-" + COUNTER.getAndIncrement() + "-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.github.tomakehurst.wiremock.matching.EqualToPattern;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers;
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import java.util.List;
import java.util.Optional;
import javax.ws.rs.core.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
Expand Down Expand Up @@ -57,23 +60,27 @@ public class DefaultConnectClientTest {
),
ConnectorType.SOURCE
);
private static final String AUTH_HEADER = "Basic FOOBAR";

@Rule
public WireMockRule wireMockRule = new WireMockRule(
WireMockConfiguration.wireMockConfig().dynamicPort());
WireMockConfiguration.wireMockConfig()
.dynamicPort()
);

private ConnectClient client;

@Before
public void setup() {
client = new DefaultConnectClient("http://localhost:" + wireMockRule.port());
client = new DefaultConnectClient("http://localhost:" + wireMockRule.port(), Optional.of(AUTH_HEADER));
}

@Test
public void testCreate() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.post(WireMock.urlEqualTo("/connectors"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_CREATED)
.withBody(MAPPER.writeValueAsString(SAMPLE_INFO)))
Expand All @@ -93,6 +100,7 @@ public void testCreateWithError() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.post(WireMock.urlEqualTo("/connectors"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR)
.withBody("Oh no!"))
Expand All @@ -112,6 +120,7 @@ public void testList() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.get(WireMock.urlEqualTo("/connectors"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_OK)
.withBody(MAPPER.writeValueAsString(ImmutableList.of("one", "two"))))
Expand All @@ -130,6 +139,7 @@ public void testDescribe() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.get(WireMock.urlEqualTo("/connectors/foo"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_OK)
.withBody(MAPPER.writeValueAsString(SAMPLE_INFO)))
Expand All @@ -148,6 +158,7 @@ public void testStatus() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.get(WireMock.urlEqualTo("/connectors/foo/status"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_OK)
.withBody(MAPPER.writeValueAsString(SAMPLE_STATUS)))
Expand All @@ -174,6 +185,7 @@ public void testDelete() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.delete(WireMock.urlEqualTo("/connectors/foo"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_NO_CONTENT))
);
Expand All @@ -191,6 +203,7 @@ public void testListShouldRetryOnFailure() throws JsonProcessingException {
// Given:
WireMock.stubFor(
WireMock.get(WireMock.urlEqualTo("/connectors"))
.withHeader(HttpHeaders.AUTHORIZATION, new EqualToPattern(AUTH_HEADER))
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR)
.withBody("Encountered an error!"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.util.FakeKafkaClientSupplier;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static ServiceContext create(
new FakeKafkaClientSupplier().getAdmin(Collections.emptyMap()),
topicClient,
srClientFactory,
new DefaultConnectClient("http://localhost:8083")
new DefaultConnectClient("http://localhost:8083", Optional.empty())
);
}

Expand All @@ -80,7 +81,9 @@ public static ServiceContext create(
adminClient,
new KafkaTopicClientImpl(adminClient),
srClientFactory,
new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY))
new DefaultConnectClient(
ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ static ServiceContext getServiceContext() {
new StubKafkaClientSupplier().getAdmin(Collections.emptyMap()),
new StubKafkaTopicClient(),
() -> schemaRegistryClient,
new DefaultConnectClient("http://localhost:8083"),
new DefaultConnectClient("http://localhost:8083", Optional.empty()),
DisabledKsqlClient.instance()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.services.DefaultConnectClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.ServiceContextFactory;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -71,6 +72,7 @@ public static ServiceContext create(
ksqlConfig,
kafkaClientSupplier,
srClientFactory,
new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), authHeader),
new DefaultKsqlClient(authHeader)
);
}
Expand Down

0 comments on commit cef0ea3

Please sign in to comment.