From 05060b23a630271049fd88dbeed3a3e43c566280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A1stor=20Rodr=C3=ADguez?= Date: Sat, 7 Nov 2020 16:54:40 +0100 Subject: [PATCH] Http authentication support including None and Basic authentication --- CHANGELOG.md | 1 + README.md | 43 ++++++++++++ examples/jira-issues-search.md | 5 +- .../http/auth/BasicHttpAuthenticator.java | 62 +++++++++++++++++ .../auth/BasicHttpAuthenticatorConfig.java | 53 +++++++++++++++ .../auth/ConfigurableHttpAuthenticator.java | 55 +++++++++++++++ .../ConfigurableHttpAuthenticatorConfig.java | 61 +++++++++++++++++ .../http/auth/NoneHttpAuthenticator.java | 33 +++++++++ .../http/auth/spi/HttpAuthenticationType.java | 25 +++++++ .../http/auth/spi/HttpAuthenticator.java | 36 ++++++++++ .../http/client/okhttp/OkHttpClient.java | 17 ++++- .../client/okhttp/OkHttpClientConfig.java | 14 +++- .../BasicHttpAuthenticatorConfigTest.java | 57 ++++++++++++++++ .../http/auth/BasicHttpAuthenticatorTest.java | 68 +++++++++++++++++++ ...nfigurableHttpAuthenticatorConfigTest.java | 51 ++++++++++++++ .../ConfigurableHttpAuthenticatorTest.java | 62 +++++++++++++++++ .../http/auth/NoneHttpAuthenticatorTest.java | 33 +++++++++ .../client/okhttp/OkHttpClientConfigTest.java | 16 ++++- ...ka.connect.http.auth.spi.HttpAuthenticator | 2 + src/test/resources/logback.xml | 1 + 20 files changed, 687 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticator.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfig.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticator.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfig.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticator.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticationType.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticator.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfigTest.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorTest.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfigTest.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorTest.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticatorTest.java create mode 100644 src/test/resources/META-INF/services/com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator diff --git a/CHANGELOG.md b/CHANGELOG.md index 5db16935..380ac377 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### v0.8.0 (TBD) - Provided different log levels for `OkHttpClient`.`TRACE`: `BODY`, `DEBUG`: `BASIC`, `*`: `NONE` - Refactored throttler adding the notion of timer +- Support for authentication extension. Initial implementations for None and Basic authentication types ### v0.7.6 (05/28/2020) - Fix typo in `http.throttler.catchup.interval.millis` configuration property diff --git a/README.md b/README.md index 2f497773..3be550f7 100644 --- a/README.md +++ b/README.md @@ -265,6 +265,49 @@ Uses a [OkHttp](https://square.github.io/okhttp/) client. > Maximum number of idle connections in the connection pool > * Type: `Integer` > * Default: `1` +--- + + +### HttpAuthenticator: Authenticating a HttpRequest +When executing the request, authentication might be required. The HttpAuthenticator is responsible for resolving the authentication header +to be included in the request +`HttpAuthenticator` + +> #### `http.auth` +> ```java +> public interface HttpAuthenticator extends Configurable { +> +> Optional getAuthorizationHeader(); +> } +> ``` +> * Type: `Class` +> * Default: `com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator` +> * Available implementations: +> * `com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator` +> * `com.github.castorm.kafka.connect.http.auth.NoneHttpAuthenticator` +> * `com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator` + +#### Authenticating a HttpRequest with ConfigurableHttpAuthenticator +Allows selecting the athentication type via configuration property + +> ##### `http.auth.type` +> Type of authentication +> * Type: `String` +> * Default: `None` +> * Available options: +> * `None` +> * `Basic` + +#### Authenticating a HttpRequest with BasicHttpAuthenticator +Allows selecting the athentication type via configuration property + +> ##### `http.auth.user` +> * Type: `String` +> * Default: `` +> +> ##### `http.auth.password` +> * Type: `String` +> * Default: `` --- diff --git a/examples/jira-issues-search.md b/examples/jira-issues-search.md index 71b9fe5a..f447d524 100644 --- a/examples/jira-issues-search.md +++ b/examples/jira-issues-search.md @@ -48,8 +48,11 @@ And based on the results we would be updating the `updated` filter for subsequen "tasks.max": "1", "http.offset.initial": "timestamp=2020-05-08T07:55:44Z", "http.request.url": "https://your-host-here/rest/api/2/search", - "http.request.headers": "Authorization: Basic TBD, Accept: application/json", + "http.request.headers": "Accept: application/json", "http.request.params": "jql=updated>=\"${offset.timestamp?datetime.iso?string['yyyy/MM/dd HH:mm']}\" ORDER BY updated ASC&maxResults=100", + "http.auth.type": "Basic", + "http.auth.user": "username", + "http.auth.password": "password", "http.response.list.pointer": "/issues", "http.response.record.key.pointer": "/id", "http.response.record.offset.pointer": "timestamp=/fields/updated", diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticator.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticator.java new file mode 100644 index 00000000..89b7b5df --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticator.java @@ -0,0 +1,62 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; +import okhttp3.Credentials; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.commons.lang.StringUtils.isEmpty; + +public class BasicHttpAuthenticator implements HttpAuthenticator { + + private final Function, BasicHttpAuthenticatorConfig> configFactory; + + Optional header; + + public BasicHttpAuthenticator() { + this(BasicHttpAuthenticatorConfig::new); + } + + public BasicHttpAuthenticator(Function, BasicHttpAuthenticatorConfig> configFactory) { + this.configFactory = configFactory; + } + + @Override + public void configure(Map configs) { + + BasicHttpAuthenticatorConfig config = configFactory.apply(configs); + + if (!isEmpty(config.getUser()) || !isEmpty(config.getPassword().value())) { + header = Optional.of(Credentials.basic(config.getUser(), config.getPassword().value())); + } else { + header = Optional.empty(); + } + } + + @Override + public Optional getAuthorizationHeader() { + return header; + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfig.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfig.java new file mode 100644 index 00000000..ed93aeeb --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfig.java @@ -0,0 +1,53 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import lombok.Getter; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.types.Password; + +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +@Getter +public class BasicHttpAuthenticatorConfig extends AbstractConfig { + + private static final String USER = "http.auth.user"; + private static final String PASSWORD = "http.auth.password"; + + private final String user; + private final Password password; + + BasicHttpAuthenticatorConfig(Map originals) { + super(config(), originals); + user = getString(USER); + password = getPassword(PASSWORD); + } + + public static ConfigDef config() { + return new ConfigDef() + .define(USER, STRING, "", HIGH, "Username") + .define(PASSWORD, ConfigDef.Type.PASSWORD, "", HIGH, "Password"); + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticator.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticator.java new file mode 100644 index 00000000..ee2b5198 --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticator.java @@ -0,0 +1,55 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +public class ConfigurableHttpAuthenticator implements HttpAuthenticator { + + private final Function, ConfigurableHttpAuthenticatorConfig> configFactory; + + private HttpAuthenticator delegate; + + public ConfigurableHttpAuthenticator() { + this(ConfigurableHttpAuthenticatorConfig::new); + } + + public ConfigurableHttpAuthenticator(Function, ConfigurableHttpAuthenticatorConfig> configFactory) { + this.configFactory = configFactory; + } + + @Override + public void configure(Map configs) { + + ConfigurableHttpAuthenticatorConfig config = configFactory.apply(configs); + + delegate = config.getAuthenticator(); + } + + @Override + public Optional getAuthorizationHeader() { + return delegate.getAuthorizationHeader(); + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfig.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfig.java new file mode 100644 index 00000000..fb2d3577 --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfig.java @@ -0,0 +1,61 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticationType; +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; +import lombok.Getter; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +@Getter +public class ConfigurableHttpAuthenticatorConfig extends AbstractConfig { + + private static final String AUTH_TYPE = "http.auth.type"; + + private final HttpAuthenticator authenticator; + + ConfigurableHttpAuthenticatorConfig(Map originals) { + super(config(), originals); + authenticator = getAuthenticator(originals); + } + + private HttpAuthenticator getAuthenticator(Map originals) { + switch (HttpAuthenticationType.valueOf(getString(AUTH_TYPE).toUpperCase())) { + case BASIC: + BasicHttpAuthenticator auth = new BasicHttpAuthenticator(); + auth.configure(originals); + return auth; + default: + return new NoneHttpAuthenticator(); + } + } + + public static ConfigDef config() { + return new ConfigDef() + .define(AUTH_TYPE, STRING, HttpAuthenticationType.NONE.name(), MEDIUM, "Authentication Type"); + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticator.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticator.java new file mode 100644 index 00000000..df4f4467 --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticator.java @@ -0,0 +1,33 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; + +import java.util.Optional; + +public class NoneHttpAuthenticator implements HttpAuthenticator { + + @Override + public Optional getAuthorizationHeader() { + return Optional.empty(); + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticationType.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticationType.java new file mode 100644 index 00000000..4b9d4099 --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticationType.java @@ -0,0 +1,25 @@ +package com.github.castorm.kafka.connect.http.auth.spi; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public enum HttpAuthenticationType { + NONE, BASIC +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticator.java b/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticator.java new file mode 100644 index 00000000..96bb95d1 --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/auth/spi/HttpAuthenticator.java @@ -0,0 +1,36 @@ +package com.github.castorm.kafka.connect.http.auth.spi; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.kafka.common.Configurable; + +import java.util.Map; +import java.util.Optional; + +@FunctionalInterface +public interface HttpAuthenticator extends Configurable { + + Optional getAuthorizationHeader(); + + default void configure(Map map) { + // Do nothing + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java b/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java index cf1c5e11..7f57c727 100644 --- a/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java +++ b/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,7 @@ * #L% */ +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; import com.github.castorm.kafka.connect.http.client.spi.HttpClient; import com.github.castorm.kafka.connect.http.model.HttpRequest; import com.github.castorm.kafka.connect.http.model.HttpResponse; @@ -40,6 +41,7 @@ import static java.util.Optional.empty; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static javax.ws.rs.core.HttpHeaders.AUTHORIZATION; import static okhttp3.HttpUrl.parse; import static okhttp3.RequestBody.create; import static okhttp3.logging.HttpLoggingInterceptor.Level.BASIC; @@ -51,20 +53,31 @@ public class OkHttpClient implements HttpClient { private okhttp3.OkHttpClient client; + private HttpAuthenticator authenticator; + @Override public void configure(Map configs) { OkHttpClientConfig config = new OkHttpClientConfig(configs); + authenticator = config.getAuthenticator(); client = new okhttp3.OkHttpClient.Builder() .connectionPool(new ConnectionPool(config.getMaxIdleConnections(), config.getKeepAliveDuration(), MILLISECONDS)) .connectTimeout(config.getConnectionTimeoutMillis(), MILLISECONDS) .readTimeout(config.getReadTimeoutMillis(), MILLISECONDS) .retryOnConnectionFailure(true) .addInterceptor(createLoggingInterceptor()) + .addInterceptor(chain -> chain.proceed(authorize(chain.request()))) + .authenticator((route, response) -> authorize(response.request())) .build(); } + private Request authorize(Request request) { + return authenticator.getAuthorizationHeader() + .map(header -> request.newBuilder().header(AUTHORIZATION, header).build()) + .orElse(request); + } + private static HttpLoggingInterceptor createLoggingInterceptor() { if (log.isTraceEnabled()) { return new HttpLoggingInterceptor(log::trace).setLevel(BODY); diff --git a/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfig.java b/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfig.java index c74664d9..96ccf3eb 100644 --- a/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfig.java +++ b/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfig.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,8 @@ * #L% */ +import com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator; +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; import lombok.Getter; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -27,6 +29,8 @@ import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LONG; @@ -37,11 +41,13 @@ public class OkHttpClientConfig extends AbstractConfig { private static final String READ_TIMEOUT_MILLIS = "http.client.read.timeout.millis"; private static final String CONNECTION_KEEP_ALIVE_DURATION_MILLIS = "http.client.ttl.millis"; private static final String CONNECTION_MAX_IDLE = "http.client.max-idle"; + private static final String AUTHENTICATOR = "http.auth"; private final Long connectionTimeoutMillis; private final Long readTimeoutMillis; private final Long keepAliveDuration; private final Integer maxIdleConnections; + private final HttpAuthenticator authenticator; OkHttpClientConfig(Map originals) { super(config(), originals); @@ -49,6 +55,7 @@ public class OkHttpClientConfig extends AbstractConfig { readTimeoutMillis = getLong(READ_TIMEOUT_MILLIS); keepAliveDuration = getLong(CONNECTION_KEEP_ALIVE_DURATION_MILLIS); maxIdleConnections = getInt(CONNECTION_MAX_IDLE); + authenticator = getConfiguredInstance(AUTHENTICATOR, HttpAuthenticator.class); } public static ConfigDef config() { @@ -56,6 +63,7 @@ public static ConfigDef config() { .define(CONNECTION_TIMEOUT_MILLIS, LONG, 2000, HIGH, "Connection Timeout Millis") .define(READ_TIMEOUT_MILLIS, LONG, 2000, HIGH, "Read Timeout Millis") .define(CONNECTION_KEEP_ALIVE_DURATION_MILLIS, LONG, 300000, HIGH, "Keep Alive Duration Millis") - .define(CONNECTION_MAX_IDLE, INT, 1, HIGH, "Max Idle Connections"); + .define(CONNECTION_MAX_IDLE, INT, 1, HIGH, "Max Idle Connections") + .define(AUTHENTICATOR, CLASS, ConfigurableHttpAuthenticator.class, MEDIUM, "Custom Authenticator"); } } diff --git a/src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfigTest.java b/src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfigTest.java new file mode 100644 index 00000000..6be6224b --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorConfigTest.java @@ -0,0 +1,57 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.config.types.Password; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; + +class BasicHttpAuthenticatorConfigTest { + + @Test + void whenNoUser_thenDefault() { + assertThat(config(emptyMap()).getUser()).isEqualTo(""); + } + + @Test + void whenUser_thenInitialized() { + assertThat(config(ImmutableMap.of("http.auth.user", "user")).getUser()).isEqualTo("user"); + } + + @Test + void whenNoPassword_thenDefault() { + assertThat(config(emptyMap()).getPassword()).isEqualTo(new Password("")); + } + + @Test + void whenPassword_thenInitialized() { + assertThat(config(ImmutableMap.of("http.auth.password", "password")).getPassword()).isEqualTo(new Password("password")); + } + + private static BasicHttpAuthenticatorConfig config(Map config) { + return new BasicHttpAuthenticatorConfig(config); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorTest.java b/src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorTest.java new file mode 100644 index 00000000..26ab68a7 --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/auth/BasicHttpAuthenticatorTest.java @@ -0,0 +1,68 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.kafka.common.config.types.Password; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + +@ExtendWith(MockitoExtension.class) +class BasicHttpAuthenticatorTest { + + @Mock + BasicHttpAuthenticatorConfig config; + + BasicHttpAuthenticator authenticator; + + @BeforeEach + void setUp() { + authenticator = new BasicHttpAuthenticator(__ -> config); + } + + @Test + void whenCredentials_thenHeader() { + + given(config.getUser()).willReturn("user"); + given(config.getPassword()).willReturn(new Password("password")); + + authenticator.configure(emptyMap()); + + assertThat(authenticator.getAuthorizationHeader()).contains("Basic dXNlcjpwYXNzd29yZA=="); + } + + @Test + void whenNoCredentials_thenHeaderEmpty() { + + given(config.getUser()).willReturn(""); + given(config.getPassword()).willReturn(new Password("")); + + authenticator.configure(emptyMap()); + + assertThat(authenticator.getAuthorizationHeader()).isEmpty(); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfigTest.java b/src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfigTest.java new file mode 100644 index 00000000..f0fdf85c --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorConfigTest.java @@ -0,0 +1,51 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; + +class ConfigurableHttpAuthenticatorConfigTest { + + @Test + void whenNoType_thenDefault() { + assertThat(config(emptyMap()).getAuthenticator()).isInstanceOf(NoneHttpAuthenticator.class); + } + + @Test + void whenNoneType_thenBasic() { + assertThat(config(ImmutableMap.of("http.auth.type", "None")).getAuthenticator()).isInstanceOf(NoneHttpAuthenticator.class); + } + + @Test + void whenBasicType_thenBasic() { + assertThat(config(ImmutableMap.of("http.auth.type", "Basic")).getAuthenticator()).isInstanceOf(BasicHttpAuthenticator.class); + } + + private static ConfigurableHttpAuthenticatorConfig config(Map config) { + return new ConfigurableHttpAuthenticatorConfig(config); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorTest.java b/src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorTest.java new file mode 100644 index 00000000..79350fb6 --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/auth/ConfigurableHttpAuthenticatorTest.java @@ -0,0 +1,62 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Optional; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + +@ExtendWith(MockitoExtension.class) +class ConfigurableHttpAuthenticatorTest { + + @Mock + ConfigurableHttpAuthenticatorConfig config; + + @Mock + HttpAuthenticator delegate; + + ConfigurableHttpAuthenticator authenticator; + + @BeforeEach + void setUp() { + authenticator = new ConfigurableHttpAuthenticator(__ -> config); + } + + @Test + void whenHeader_thenDelegated() { + + given(config.getAuthenticator()).willReturn(delegate); + given(delegate.getAuthorizationHeader()).willReturn(Optional.of("header")); + + authenticator.configure(emptyMap()); + + assertThat(authenticator.getAuthorizationHeader()).contains("header"); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticatorTest.java b/src/test/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticatorTest.java new file mode 100644 index 00000000..0d6ace54 --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/auth/NoneHttpAuthenticatorTest.java @@ -0,0 +1,33 @@ +package com.github.castorm.kafka.connect.http.auth; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class NoneHttpAuthenticatorTest { + + @Test + void whenGetHeader_thenEmpty() { + assertThat(new NoneHttpAuthenticator().getAuthorizationHeader()).isEmpty(); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfigTest.java b/src/test/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfigTest.java index b75c1270..1e7d189d 100644 --- a/src/test/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfigTest.java +++ b/src/test/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfigTest.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,8 @@ * #L% */ +import com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator; +import com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -70,6 +72,16 @@ void whenMaxIdleConnections_thenInitialized() { assertThat(config(ImmutableMap.of("http.client.max-idle", "42")).getMaxIdleConnections()).isEqualTo(42L); } + @Test + void whenAuthenticator_thenDefault() { + assertThat(config(emptyMap()).getAuthenticator()).isInstanceOf(ConfigurableHttpAuthenticator.class); + } + + @Test + void whenAuthenticator_thenInitialized() { + assertThat(config(ImmutableMap.of("http.auth", "com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator")).getAuthenticator()).isInstanceOf(BasicHttpAuthenticator.class); + } + private static OkHttpClientConfig config(Map config) { return new OkHttpClientConfig(config); } diff --git a/src/test/resources/META-INF/services/com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator b/src/test/resources/META-INF/services/com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator new file mode 100644 index 00000000..01c9f82d --- /dev/null +++ b/src/test/resources/META-INF/services/com.github.castorm.kafka.connect.http.auth.spi.HttpAuthenticator @@ -0,0 +1,2 @@ +com.github.castorm.kafka.connect.http.auth.NoneHttpAuthenticator +com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 0f12fbf7..713494a8 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -28,6 +28,7 @@ +