diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2889b9d3..c827f6b2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,10 @@
## [Unreleased]
+### Added
+
+- Added support for OIDC Bearer tokens.
+
## [0.15.0] - 2024-07-30
### Added
diff --git a/README.md b/README.md
index 3b3ce57a..3374f642 100644
--- a/README.md
+++ b/README.md
@@ -337,6 +337,9 @@ CREATE TABLE http (
)
```
+Note that when using OIDC, it adds an `Authentication` header with the bearer token; this will override
+an existing `Authorization` header specified in configuration.
+
#### Custom request/response callback
- Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
@@ -410,13 +413,30 @@ In this special case, you can configure connector to trust all certificates with
To enable this option use `gid.connector.http.security.cert.server.allowSelfSigned` property setting its value to `true`.
## Basic Authentication
-The connector supports Basic Authentication mechanism using HTTP `Authorization` header.
+The connector supports Basic Authentication using a HTTP `Authorization` header.
The header value can be set via properties, similarly as for other headers. The connector converts the passed value to Base64 and uses it for the request.
If the used value starts with the prefix `Basic `, or `gid.connector.http.source.lookup.use-raw-authorization-header`
is set to `'true'`, it will be used as header value as is, without any extra modification.
+## OIDC Bearer Authentication
+The connector supports Bearer Authentication using a HTTP `Authorization` header. The [OAuth 2.0 rcf](https://datatracker.ietf.org/doc/html/rfc6749) mentions [Obtaining Authorization](https://datatracker.ietf.org/doc/html/rfc6749#section-4)
+and an authorization grant. OIDC makes use of this [authorisation grant](https://datatracker.ietf.org/doc/html/rfc6749#section-1.3) in a [Token Request](https://openid.net/specs/openid-connect-core-1_0.html#TokenRequest) by including a [OAuth grant type](https://oauth.net/2/grant-types/) and associated properties, the response is the [token response](https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse).
+
+If you want to use this authorization then you should supply the `Token Request` body in `application/x-www-form-urlencoded` encoding
+in configuration property `gid.connector.http.security.oidc.token.request`. See [grant extension](https://datatracker.ietf.org/doc/html/rfc6749#section-4.5) for
+an example of a customised grant type token request. The supplied `token request` will be issued to the
+[token end point](https://datatracker.ietf.org/doc/html/rfc6749#section-3.2), whose url should be supplied in configuration property
+`gid.connector.http.security.oidc.token.endpoint.url`. The returned `access token` is then cached and used for subsequent requests; if the token has expired then
+ a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will
+be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`.
+
+### Restrictions at this time
+* No authentication is applied to the token request.
+* The processing does not use the refresh token if it present.
+
## Table API Connector Options
### HTTP TableLookup Source
+
| Option | Required | Description/Value |
|---------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | required | The Value should be set to _rest-lookup_ |
@@ -436,19 +456,22 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
+| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
+| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
+| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |
-
### HTTP Sink
+
| Option | Required | Description/Value |
|---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
-| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| format | required | Specify what format to use. |
+| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
diff --git a/pom.xml b/pom.xml
index c160e556..eed3f5a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -297,6 +297,8 @@ under the License.
maven-surefire-plugin
3.0.0-M5
+
+ --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
diff --git a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java
index 399eb35b..208bcf01 100644
--- a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java
+++ b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java
@@ -67,7 +67,7 @@ public class HttpSinkBuilder extends
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();
private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR =
- HttpHeaderUtils.createDefaultHeaderPreprocessor();
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor();
private final Properties properties = new Properties();
diff --git a/src/main/java/com/getindata/connectors/http/internal/BasicAuthHeaderValuePreprocessor.java b/src/main/java/com/getindata/connectors/http/internal/BasicAuthHeaderValuePreprocessor.java
index 8c18c1c1..2c7e63e7 100644
--- a/src/main/java/com/getindata/connectors/http/internal/BasicAuthHeaderValuePreprocessor.java
+++ b/src/main/java/com/getindata/connectors/http/internal/BasicAuthHeaderValuePreprocessor.java
@@ -5,7 +5,6 @@
/**
* Header processor for HTTP Basic Authentication mechanism.
- * Only "Basic" authentication is supported currently.
*/
public class BasicAuthHeaderValuePreprocessor implements HeaderValuePreprocessor {
diff --git a/src/main/java/com/getindata/connectors/http/internal/ComposeHeaderPreprocessor.java b/src/main/java/com/getindata/connectors/http/internal/ComposeHeaderPreprocessor.java
index 8dd62023..0b87655f 100644
--- a/src/main/java/com/getindata/connectors/http/internal/ComposeHeaderPreprocessor.java
+++ b/src/main/java/com/getindata/connectors/http/internal/ComposeHeaderPreprocessor.java
@@ -4,10 +4,13 @@
import java.util.HashMap;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
/**
* This implementation of {@link HeaderPreprocessor} acts as a registry for all {@link
* HeaderValuePreprocessor} that should be applied on HTTP request.
*/
+@Slf4j
public class ComposeHeaderPreprocessor implements HeaderPreprocessor {
/**
@@ -37,6 +40,8 @@ public ComposeHeaderPreprocessor(Map valuePrepr
@Override
public String preprocessValueForHeader(String headerName, String headerRawValue) {
+ log.debug("preprocessValueForHeader headerName=" + headerName
+ + ", headerRawValue" + headerRawValue);
return valuePreprocessors
.getOrDefault(headerName, DEFAULT_VALUE_PREPROCESSOR)
.preprocessHeaderValue(headerRawValue);
diff --git a/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java b/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java
new file mode 100644
index 00000000..68f861ca
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java
@@ -0,0 +1,50 @@
+package com.getindata.connectors.http.internal;
+
+import java.net.http.HttpClient;
+import java.time.Duration;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.getindata.connectors.http.internal.auth.OidcAccessTokenManager;
+
+/**
+ * Header processor for HTTP OIDC Authentication mechanism.
+ */
+@Slf4j
+public class OIDCAuthHeaderValuePreprocessor implements HeaderValuePreprocessor {
+
+
+ private final String oidcAuthURL;
+ private final String oidcTokenRequest;
+ private Duration oidcExpiryReduction = Duration.ofSeconds(1);
+ /**
+ * Add the access token to the request using OidcAuth authenticate method that
+ * gives us a valid access token.
+ * @param oidcAuthURL OIDC token endpoint
+ * @param oidcTokenRequest OIDC Token Request
+ * @param oidcExpiryReduction OIDC token expiry reduction
+ */
+
+ public OIDCAuthHeaderValuePreprocessor(Optional oidcAuthURL,
+ Optional oidcTokenRequest,
+ Optional oidcExpiryReduction) {
+ this.oidcAuthURL = oidcAuthURL.get();
+ this.oidcTokenRequest = oidcTokenRequest.get();
+ if (oidcExpiryReduction.isPresent()) {
+ this.oidcExpiryReduction = oidcExpiryReduction.get();
+ }
+ }
+
+ @Override
+ public String preprocessHeaderValue(String rawValue) {
+ OidcAccessTokenManager auth = new OidcAccessTokenManager(
+ HttpClient.newBuilder().build(),
+ oidcTokenRequest,
+ oidcAuthURL,
+ oidcExpiryReduction
+ );
+ // apply the OIDC authentication by adding the dynamically calculated header value.
+ return "BEARER " + auth.authenticate();
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java b/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java
new file mode 100644
index 00000000..9334870c
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java
@@ -0,0 +1,150 @@
+
+/*
+ * Copyright 2020 Red Hat
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.getindata.connectors.http.internal.auth;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.time.Instant;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * This class is inspired by
+ * https://github.com/Apicurio/apicurio-common-rest-client/blob/
+ * 944ac9eb527c291a6083bd10ee012388e1684d20/rest-client-common/src/main/java/io/
+ * apicurio/rest/client/auth/OidcAuth.java.
+ *
+ * The OIDC access token manager encapsulates the caching of an OIDC access token,
+ * which can be short lived, for example an hour. The authenticate method will return an
+ * un-expired access token, either from the cache or by requesting a new access token.
+ */
+@Slf4j
+public class OidcAccessTokenManager {
+
+ private static final Duration DEFAULT_TOKEN_EXPIRATION_REDUCTION = Duration.ofSeconds(1);
+ private final HttpClient httpClient;
+ private final String tokenRequest;
+
+ private final String url;
+ private final Duration tokenExpirationReduction;
+
+ private String cachedAccessToken;
+ private Instant cachedAccessTokenExp;
+
+ /**
+ * Construct an Oidc access token manager with the default token expiration reduction
+ * @param httpClient httpClient to use to call the token endpoint.
+ * @param tokenRequest token request
+ * @param url token endpoint url
+ */
+ public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url) {
+ this(httpClient, tokenRequest, url, DEFAULT_TOKEN_EXPIRATION_REDUCTION);
+ }
+ /**
+ * Construct an Oidc access token manager with the supplied token expiration reduction
+ * @param httpClient httpClient to use to call the token endpoint.
+ * @param tokenRequest token request this need to be form urlencoded
+ * @param url token endpoint url
+ * @param tokenExpirationReduction token expiry reduction, request a new token if the
+ * current time is later than the cached access token
+ * expiry time reduced by this value. This means that
+ * we will not use the cached token if it is about
+ * to expire.
+ */
+ public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url,
+ Duration tokenExpirationReduction) {
+ this.tokenRequest = tokenRequest;
+ this.httpClient = httpClient;
+ this.url = url;
+ if (null == tokenExpirationReduction) {
+ this.tokenExpirationReduction = DEFAULT_TOKEN_EXPIRATION_REDUCTION;
+ } else {
+ this.tokenExpirationReduction = tokenExpirationReduction;
+ }
+ }
+
+ /**
+ * Request an access token from the token endpoint
+ */
+ private void requestAccessToken() {
+ try {
+ HttpRequest httpRequest =
+ HttpRequest.newBuilder()
+ .uri(URI.create(url))
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .method("POST", HttpRequest.BodyPublishers.ofString(tokenRequest))
+ .build();
+
+ HttpResponse response = httpClient.send(httpRequest,
+ HttpResponse.BodyHandlers.ofByteArray());
+ //create ObjectMapper instance
+ final ObjectMapper objectMapper = new ObjectMapper();
+ if (200 == response.statusCode()) {
+ byte[] bytes = response.body();
+ JsonNode rootNode = objectMapper.readTree(bytes);
+ JsonNode tokenNode = rootNode.path("access_token");
+ JsonNode expiresInNode = rootNode.path("expires_in");
+ this.cachedAccessToken = tokenNode.textValue();
+ /*
+ expiresIn is in seconds
+ */
+ Duration expiresIn = Duration.ofSeconds(expiresInNode.asInt());
+ if (expiresIn.compareTo(this.tokenExpirationReduction) > 0) {
+ //expiresIn is greater than tokenExpirationReduction
+ expiresIn = expiresIn.minus(this.tokenExpirationReduction);
+ }
+ this.cachedAccessTokenExp = Instant.now().plus(expiresIn);
+ } else {
+ throw new IllegalStateException("Attempted to get an access token but got http" +
+ " status code " + response.statusCode());
+ }
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException("Error found while trying to request a new token");
+ } catch (IOException e) {
+ throw new IllegalStateException("IO Exception occurred", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted Exception occurred", e);
+ }
+ }
+
+ /**
+ * Get a valid unexpired access token.
+ * @return access token.
+ */
+ public String authenticate() {
+ if (isAccessTokenRequired()) {
+ requestAccessToken();
+ }
+ return cachedAccessToken;
+ }
+
+ private boolean isAccessTokenRequired() {
+ return null == cachedAccessToken || isTokenExpired();
+ }
+
+ private boolean isTokenExpired() {
+ return Instant.now().isAfter(this.cachedAccessTokenExp);
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java
index 5415a159..b501b29b 100644
--- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java
+++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java
@@ -27,6 +27,14 @@ public final class HttpConnectorConfigConstants {
public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
+ "source.lookup.header.";
+ public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ + "security.oidc.token.request";
+
+ public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP
+ + "security.oidc.token.endpoint.url";
+
+ public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP
+ + "security.oidc.token.expiry.reduction";
/**
* Whether to use the raw value of the Authorization header. If set, it prevents
* the special treatment of the header for Basic Authentication, thus preserving the passed
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java
index 9a7b4320..fc2bc65b 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java
@@ -1,9 +1,14 @@
package com.getindata.connectors.http.internal.table.lookup;
+import java.time.Duration;
+
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
+import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_ENDPOINT_URL;
+import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_EXPIRY_REDUCTION;
+import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_REQUEST;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER;
@@ -53,4 +58,25 @@ public class HttpLookupConnectorOptions {
ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER)
.stringType()
.defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER);
+
+ public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL =
+ ConfigOptions.key(OIDC_AUTH_TOKEN_ENDPOINT_URL)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("OIDC Token endpoint url.");
+
+ public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST =
+ ConfigOptions.key(OIDC_AUTH_TOKEN_REQUEST)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("OIDC token request.");
+
+ public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION =
+ ConfigOptions.key(OIDC_AUTH_TOKEN_EXPIRY_REDUCTION)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("OIDC authorization access token expiry" +
+ " reduction as a Duration." +
+ " A new access token is obtained if the token" +
+ " is older than it's expiry time minus this value.");
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
index c077f21c..2d8f8cee 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java
@@ -37,7 +37,7 @@
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory;
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
-import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_QUERY_CREATOR_IDENTIFIER;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row;
@Slf4j
@@ -79,36 +79,40 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());
DeserializationSchema responseSchemaDecoder =
- decodingFormat.createRuntimeDecoder(lookupContext, physicalRowDataType);
+ decodingFormat.createRuntimeDecoder(lookupContext, physicalRowDataType);
LookupQueryCreatorFactory lookupQueryCreatorFactory =
- FactoryUtil.discoverFactory(
- this.dynamicTableFactoryContext.getClassLoader(),
- LookupQueryCreatorFactory.class,
- lookupConfig.getReadableConfig().getOptional(LOOKUP_QUERY_CREATOR_IDENTIFIER)
- .orElse(
- (lookupConfig.getLookupMethod().equalsIgnoreCase("GET") ?
- GenericGetQueryCreatorFactory.IDENTIFIER :
- GenericJsonQueryCreatorFactory.IDENTIFIER)
- )
- );
+ FactoryUtil.discoverFactory(
+ this.dynamicTableFactoryContext.getClassLoader(),
+ LookupQueryCreatorFactory.class,
+ lookupConfig.getReadableConfig().getOptional(
+ LOOKUP_QUERY_CREATOR_IDENTIFIER)
+ .orElse(
+ (lookupConfig.getLookupMethod()
+ .equalsIgnoreCase("GET") ?
+ GenericGetQueryCreatorFactory.IDENTIFIER :
+ GenericJsonQueryCreatorFactory.IDENTIFIER)
+ )
+ );
ReadableConfig readableConfig = lookupConfig.getReadableConfig();
LookupQueryCreator lookupQueryCreator =
- lookupQueryCreatorFactory.createLookupQueryCreator(
- readableConfig,
- lookupRow,
- dynamicTableFactoryContext
- );
+ lookupQueryCreatorFactory.createLookupQueryCreator(
+ readableConfig,
+ lookupRow,
+ dynamicTableFactoryContext
+ );
PollingClientFactory pollingClientFactory =
- createPollingClientFactory(lookupQueryCreator, lookupConfig);
+ createPollingClientFactory(lookupQueryCreator, lookupConfig);
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
}
protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
- DeserializationSchema responseSchemaDecoder,
- PollingClientFactory pollingClientFactory) {
+ DeserializationSchema
+ responseSchemaDecoder,
+ PollingClientFactory
+ pollingClientFactory) {
HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
@@ -141,11 +145,11 @@ protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
@Override
public DynamicTableSource copy() {
return new HttpLookupTableSource(
- physicalRowDataType,
- lookupConfig,
- decodingFormat,
- dynamicTableFactoryContext,
- cache
+ physicalRowDataType,
+ lookupConfig,
+ decodingFormat,
+ dynamicTableFactoryContext,
+ cache
);
}
@@ -167,25 +171,22 @@ private PollingClientFactory createPollingClientFactory(
LookupQueryCreator lookupQueryCreator,
HttpLookupConfig lookupConfig) {
- boolean useRawAuthHeader =
- lookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER);
-
- HeaderPreprocessor headerPreprocessor =
- HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader);
+ HeaderPreprocessor headerPreprocessor = HttpHeaderUtils.createHeaderPreprocessor(
+ lookupConfig.getReadableConfig());
String lookupMethod = lookupConfig.getLookupMethod();
HttpRequestFactory requestFactory = (lookupMethod.equalsIgnoreCase("GET")) ?
- new GetRequestFactory(
- lookupQueryCreator,
- headerPreprocessor,
- lookupConfig) :
- new BodyBasedRequestFactory(
- lookupMethod,
- lookupQueryCreator,
- headerPreprocessor,
- lookupConfig
- );
-
+ new GetRequestFactory(
+ lookupQueryCreator,
+ headerPreprocessor,
+ lookupConfig) :
+ new BodyBasedRequestFactory(
+ lookupMethod,
+ lookupQueryCreator,
+ headerPreprocessor,
+ lookupConfig
+ );
+ log.info("requestFactory is " + requestFactory);
return new JavaNetHttpPollingClientFactory(requestFactory);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java
index c9f8f8c2..6c2edf20 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java
@@ -59,6 +59,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
HttpConnectorConfigConstants.GID_CONNECTOR_HTTP,
LOOKUP_REQUEST_FORMAT.key()
);
+ validateHttpLookupSourceOptions(readable);
DecodingFormat> decodingFormat =
helper.discoverDecodingFormat(
@@ -81,6 +82,17 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
getLookupCache(readable)
);
}
+ protected void validateHttpLookupSourceOptions(ReadableConfig tableOptions)
+ throws IllegalArgumentException {
+ // ensure that there is an OIDC token request if we have an OIDC token endpoint
+ tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL).ifPresent(url -> {
+ if (tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST).isEmpty()) {
+ throw new IllegalArgumentException("Config option " +
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key() + " is required, if " +
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key() + " is configured.");
+ }
+ });
+ }
@Override
public String factoryIdentifier() {
@@ -94,7 +106,6 @@ public Set> requiredOptions() {
@Override
public Set> optionalOptions() {
-
return Set.of(
URL_ARGS,
ASYNC_POLLING,
@@ -105,7 +116,11 @@ public Set> optionalOptions() {
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
- LookupOptions.MAX_RETRIES);
+ LookupOptions.MAX_RETRIES,
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST,
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL
+ );
}
private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
index ce3a31cc..77520caa 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
@@ -35,6 +35,8 @@ public class JavaNetHttpPollingClient implements PollingClient {
private final HttpRequestFactory requestFactory;
+ private final HttpLookupConfig options;
+
private final HttpPostRequestCallback httpPostRequestCallback;
public JavaNetHttpPollingClient(
@@ -46,6 +48,7 @@ public JavaNetHttpPollingClient(
this.httpClient = httpClient;
this.responseBodyDecoder = responseBodyDecoder;
this.requestFactory = requestFactory;
+ this.options = options;
this.httpPostRequestCallback = options.getHttpPostRequestCallback();
@@ -78,6 +81,7 @@ public Optional pull(RowData lookupRow) {
private Optional queryAndProcess(RowData lookupData) throws Exception {
HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
+
HttpResponse response = httpClient.send(
request.getHttpRequest(),
BodyHandlers.ofString()
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java
index f6c19f62..44689063 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java
@@ -4,7 +4,11 @@
import java.net.http.HttpRequest.Builder;
import java.util.Arrays;
import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;
@@ -14,10 +18,12 @@
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL;
/**
* Base class for {@link HttpRequest} factories.
*/
+@Slf4j
public abstract class RequestFactoryBase implements HttpRequestFactory {
public static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30";
@@ -35,6 +41,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory {
* HTTP headers that should be used for {@link HttpRequest} created by factory.
*/
private final String[] headersAndValues;
+ private final HttpLookupConfig options;
public RequestFactoryBase(
LookupQueryCreator lookupQueryCreator,
@@ -43,6 +50,21 @@ public RequestFactoryBase(
this.baseUrl = options.getUrl();
this.lookupQueryCreator = lookupQueryCreator;
+ this.options = options;
+
+ Properties properties = options.getProperties();
+ /*
+ * For OIDC, the preprocessor will fully specify the Authentication header value,
+ * as a bearer token. But the preprocessors only amend existing headers, so in this case
+ * if there is no existing authorization header then we add a dummy one to the properties,
+ * so the preprocessor will be driven and will provide the value.
+ */
+ Optional oidcAuthURL = options.getReadableConfig()
+ .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL);
+ if (oidcAuthURL.isPresent()) {
+ properties.put(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX
+ + HttpHeaderUtils.AUTHORIZATION, "Dummy");
+ }
var headerMap = HttpHeaderUtils
.prepareHeaderMap(
@@ -52,6 +74,11 @@ public RequestFactoryBase(
);
this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(headerMap);
+
+ log.debug("RequestFactoryBase headersAndValues: " +
+ Arrays.stream(headersAndValues)
+ .map(Object::toString)
+ .collect(Collectors.joining(",")));
this.httpRequestTimeOutSeconds = Integer.parseInt(
options.getProperties().getProperty(
HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS,
@@ -67,7 +94,6 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) {
getLogger().debug("Created Http lookup query: " + lookupQueryInfo);
Builder requestBuilder = setUpRequestMethod(lookupQueryInfo);
-
if (headersAndValues.length != 0) {
requestBuilder.headers(headersAndValues);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java
index ec7c53bd..b2c0b661 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java
@@ -20,6 +20,7 @@
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.HttpSink;
import com.getindata.connectors.http.HttpSinkBuilder;
+import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
@@ -123,13 +124,16 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
var insertMethod = tableOptions.get(INSERT_METHOD);
+ HeaderPreprocessor headerPreprocessor = HttpHeaderUtils.createHeaderPreprocessor(
+ tableOptions);
+
HttpSinkBuilder builder = HttpSink
.builder()
.setEndpointUrl(tableOptions.get(URL))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.setHttpPostRequestCallback(httpPostRequestCallback)
// In future header preprocessor could be set via custom factory
- .setHttpHeaderPreprocessor(HttpHeaderUtils.createDefaultHeaderPreprocessor())
+ .setHttpHeaderPreprocessor(headerPreprocessor)
.setElementConverter(
new SerializationSchemaElementConverter(insertMethod, serializationSchema))
.setProperties(properties);
diff --git a/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java b/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java
index 2e737a6c..a63d6254 100644
--- a/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java
+++ b/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java
@@ -1,22 +1,30 @@
package com.getindata.connectors.http.internal.utils;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Duration;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.ReadableConfig;
import com.getindata.connectors.http.internal.BasicAuthHeaderValuePreprocessor;
import com.getindata.connectors.http.internal.ComposeHeaderPreprocessor;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
+import com.getindata.connectors.http.internal.OIDCAuthHeaderValuePreprocessor;
+import com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
+
+
@NoArgsConstructor(access = AccessLevel.NONE)
+@Slf4j
public final class HttpHeaderUtils {
+ public static final String AUTHORIZATION = "Authorization";
+
public static Map prepareHeaderMap(
String headerKeyPrefix,
Properties properties,
@@ -32,11 +40,12 @@ public static Map prepareHeaderMap(
for (Entry headerAndValue : propertyHeaderMap.entrySet()) {
String propertyName = headerAndValue.getKey();
String headerValue = headerAndValue.getValue();
-
String headerName = ConfigUtils.extractPropertyLastElement(propertyName);
+ String preProcessedHeader =
+ headerPreprocessor.preprocessValueForHeader(headerName, headerValue);
headerMap.put(
headerName,
- headerPreprocessor.preprocessValueForHeader(headerName, headerValue)
+ preProcessedHeader
);
}
return headerMap;
@@ -67,14 +76,57 @@ public static String[] toHeaderAndValueArray(Map headerMap) {
.toArray(String[]::new);
}
- public static HeaderPreprocessor createDefaultHeaderPreprocessor() {
- return createDefaultHeaderPreprocessor(false);
+ public static HeaderPreprocessor createBasicAuthorizationHeaderPreprocessor() {
+ return createBasicAuthorizationHeaderPreprocessor(false);
}
- public static HeaderPreprocessor createDefaultHeaderPreprocessor(boolean useRawAuthHeader) {
+ public static HeaderPreprocessor createBasicAuthorizationHeaderPreprocessor(
+ boolean useRawAuthHeader) {
return new ComposeHeaderPreprocessor(
Collections.singletonMap(
- "Authorization", new BasicAuthHeaderValuePreprocessor(useRawAuthHeader))
+ AUTHORIZATION, new BasicAuthHeaderValuePreprocessor(useRawAuthHeader))
+ );
+ }
+
+ public static HeaderPreprocessor createOIDCAuthorizationHeaderPreprocessor(
+ Optional oidcAuthURL,
+ Optional oidcTokenRequest,
+ Optional oidcExpiryReduction
+ ) {
+ return new ComposeHeaderPreprocessor(
+ Collections.singletonMap(
+ AUTHORIZATION, new OIDCAuthHeaderValuePreprocessor(oidcAuthURL,
+ oidcTokenRequest, oidcExpiryReduction))
);
}
+
+ public static HeaderPreprocessor createHeaderPreprocessor(ReadableConfig readableConfig) {
+ HeaderPreprocessor headerPreprocessor;
+ Optional oidcAuthURL = readableConfig
+ .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL);
+
+ if(oidcAuthURL.isPresent()) {
+ Optional oidcTokenRequest = readableConfig
+ .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST);
+
+ Optional oidcExpiryReduction = readableConfig
+ .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION);
+ headerPreprocessor = HttpHeaderUtils.createOIDCAuthorizationHeaderPreprocessor(
+ oidcAuthURL, oidcTokenRequest, oidcExpiryReduction);
+ log.info("created HeaderPreprocessor " + headerPreprocessor
+ + " for OIDC oidcAuthURL=" + oidcAuthURL
+ + ", oidcTokenRequest=" + oidcTokenRequest
+ + ", oidcExpiryReduction=" + oidcExpiryReduction);
+ } else {
+ boolean useRawAuthHeader =
+ readableConfig.get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER);
+
+ headerPreprocessor =
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(
+ useRawAuthHeader);
+ log.info("created HeaderPreprocessor for basic useRawAuthHeader=" + useRawAuthHeader);
+ }
+ log.info("returning HeaderPreprocessor " + headerPreprocessor);
+ return headerPreprocessor;
+ }
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java
index 4ffaf4fb..a04e3551 100644
--- a/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java
+++ b/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java
@@ -95,7 +95,7 @@ private static SSLContext getSslContext(Properties properties) {
}
if (!StringUtils.isNullOrWhitespaceOnly(clientCert)
- && !StringUtils.isNullOrWhitespaceOnly(clientPrivateKey)) {
+ && !StringUtils.isNullOrWhitespaceOnly(clientPrivateKey)) {
securityContext.addMTlsCerts(clientCert, clientPrivateKey);
}
diff --git a/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java b/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java
index 6855a86c..28cba6be 100644
--- a/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java
+++ b/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java
@@ -36,7 +36,7 @@ public abstract class HttpsConnectionTestBase {
public void setUp() {
this.properties = new Properties();
- this.headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor();
+ this.headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor();
}
public void tearDown() {
diff --git a/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java b/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java
new file mode 100644
index 00000000..437c9120
--- /dev/null
+++ b/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java
@@ -0,0 +1,251 @@
+package com.getindata.connectors.http.internal.auth;
+
+import java.net.*;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSession;
+
+import net.minidev.json.JSONObject;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.getindata.connectors.http.internal.HeaderPreprocessor;
+import com.getindata.connectors.http.internal.table.lookup.*;
+import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
+
+
+
+public class OidcAccessTokenManagerTest {
+
+ private HeaderPreprocessor headerPreprocessor;
+
+ private HttpLookupConfig options;
+
+ private static final String BASE_URL = "http://localhost/aaa";
+
+ @BeforeEach
+ public void setUp() {
+ this.headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor();
+ this.options = HttpLookupConfig.builder().url(BASE_URL).build();
+ }
+
+ @Test
+ public void testAuthenticate() throws InterruptedException {
+
+ MockHttpClient authHttpClient = new MockHttpClient();
+
+ authHttpClient.setIsExpired(1);
+ authHttpClient.setAccessToken("Access1");
+ String url = "http://localhost";
+ OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, "abc", url);
+
+ // apply the authorization to the httpRequest
+ String token1 = oidcAuth.authenticate();
+ assertThat(token1).isNotNull();
+ String token2 = oidcAuth.authenticate();
+ assertThat(token2).isNotNull();
+ // check the token is cached
+ assertThat(token1).isEqualTo(token2);
+ Thread.sleep(2000);
+ // check the token is different after first token has expired
+ String token3 = oidcAuth.authenticate();
+ assertThat(token3).isNotNull();
+ assertThat(token3).isNotEqualTo(token2);
+ }
+
+ @Test
+ public void testAuthenticateWithBadStatusCode() throws InterruptedException {
+
+ MockHttpClient authHttpClient = new MockHttpClient();
+
+ authHttpClient.setIsExpired(1);
+ authHttpClient.setAccessToken("Access1");
+ authHttpClient.setStatus(500);
+ String url = "http://localhost";
+ OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, "abc", url);
+
+ try {
+ oidcAuth.authenticate();
+ assertTrue(false, "Bad status code should result in an exception.");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testAuthenticateWithExpiryReduction() throws InterruptedException {
+
+ MockHttpClient authHttpClient = new MockHttpClient();
+
+ authHttpClient.setIsExpired(1);
+ authHttpClient.setAccessToken("Access1");
+ String url = "http://localhost";
+ OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient,
+ "abc", url, Duration.ofSeconds(5));
+
+ // apply the authorization to the httpRequest
+ String token1 = oidcAuth.authenticate();
+ assertThat(token1).isNotNull();
+ String token2 = oidcAuth.authenticate();
+ assertThat(token2).isNotNull();
+ }
+
+ class MockHttpClient extends HttpClient {
+ private int isExpired;
+ private String accessToken;
+ private int count = 0;
+ private int status = 200;
+
+ @Override
+ public Optional cookieHandler() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional connectTimeout() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Redirect followRedirects() {
+ return null;
+ }
+
+ @Override
+ public Optional proxy() {
+ return Optional.empty();
+ }
+
+ @Override
+ public SSLContext sslContext() {
+ return null;
+ }
+
+ @Override
+ public SSLParameters sslParameters() {
+ return null;
+ }
+
+ @Override
+ public Optional authenticator() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Version version() {
+ return null;
+ }
+
+ @Override
+ public Optional executor() {
+ return Optional.empty();
+ }
+
+ @Override
+ public HttpResponse send(HttpRequest request,
+ HttpResponse.BodyHandler responseBodyHandler) {
+
+ JSONObject json = new JSONObject();
+
+ json.put("expires_in", 2);
+ json.put("access_token", "dummy_token_" + this.count++);
+ byte[] bytes = json.toJSONString().getBytes();
+
+ MockHttpResponse mockHttpResponse = new MockHttpResponse();
+ mockHttpResponse.setStatusCode(status);
+ mockHttpResponse.setBody(bytes);
+
+ return (HttpResponse) mockHttpResponse;
+ }
+
+ @Override
+ public CompletableFuture> sendAsync(
+ HttpRequest request,
+ HttpResponse.BodyHandler responseBodyHandler) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture> sendAsync(
+ HttpRequest request,
+ HttpResponse.BodyHandler responseBodyHandler,
+ HttpResponse.PushPromiseHandler pushPromiseHandler) {
+ return null;
+ }
+
+ public void setIsExpired(int isExpired) {
+ this.isExpired = isExpired;
+ }
+
+ public void setAccessToken(String accesstoken) {
+ this.accessToken = accesstoken;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ class MockHttpResponse implements HttpResponse {
+ int statusCode = 0;
+ byte[] body = new byte[0];
+
+ @Override
+ public int statusCode() {
+ return statusCode;
+ }
+
+ @Override
+ public HttpRequest request() {
+ return null;
+ }
+
+ @Override
+ public Optional> previousResponse() {
+ return Optional.empty();
+ }
+
+ @Override
+ public HttpHeaders headers() {
+ return null;
+ }
+
+ @Override
+ public byte[] body() {
+ return body;
+ }
+
+ @Override
+ public Optional sslSession() {
+ return Optional.empty();
+ }
+
+ @Override
+ public URI uri() {
+ return null;
+ }
+
+ @Override
+ public Version version() {
+ return null;
+ }
+
+ public void setStatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java
index b7f57733..c12b2699 100644
--- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java
@@ -50,7 +50,7 @@ public static void afterAll() {
@BeforeEach
public void setUp() {
postRequestCallback = new Slf4jHttpPostRequestCallback();
- headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor();
+ headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor();
httpClientStaticMock.when(HttpClient::newBuilder).thenReturn(httpClientBuilder);
when(httpClientBuilder.followRedirects(any())).thenReturn(httpClientBuilder);
when(httpClientBuilder.sslContext(any())).thenReturn(httpClientBuilder);
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java
index 31236460..82811004 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java
@@ -1,12 +1,9 @@
package com.getindata.connectors.http.internal.table.lookup;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -16,6 +13,7 @@
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.junit.jupiter.api.Assertions.assertFalse;
public class HttpLookupTableSourceFactoryTest {
@@ -37,6 +35,29 @@ public class HttpLookupTableSourceFactoryTest {
UniqueConstraint.primaryKey("id", List.of("id"))
);
+ @Test
+ void validateHttpLookupSourceOptions() {
+
+ HttpLookupTableSourceFactory httpLookupTableSourceFactory
+ = new HttpLookupTableSourceFactory();
+ TableConfig tableConfig = new TableConfig();
+ httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig);
+ tableConfig.set(HttpLookupConnectorOptions
+ .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "aaa");
+
+ try {
+ httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig);
+ assertFalse(true, "Expected an error.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ // should now work.
+ tableConfig.set(HttpLookupConnectorOptions
+ .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), "bbb");
+
+ httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig);
+ }
+
@Test
void shouldCreateForMandatoryFields() {
Map options = getMandatoryOptions();
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
index df7b069b..27f62df0 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
@@ -325,7 +325,7 @@ private GetRequestFactory setUpGetRequestFactory(Properties properties) {
return new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
- HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader),
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader),
HttpLookupConfig.builder()
.url(getBaseUrl())
.properties(properties)
@@ -348,7 +348,7 @@ private BodyBasedRequestFactory setUpBodyRequestFactory(
return new BodyBasedRequestFactory(
methodName,
new GenericJsonQueryCreator(jsonSerializer),
- HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader),
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader),
HttpLookupConfig.builder()
.url(getBaseUrl())
.properties(properties)
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
index 683f9261..01ce70c1 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
@@ -1,7 +1,9 @@
package com.getindata.connectors.http.internal.table.lookup;
import java.io.File;
+import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import com.github.tomakehurst.wiremock.WireMockServer;
@@ -32,6 +34,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.HttpsConnectionTestBase;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator;
@@ -96,11 +99,9 @@ public void testHttpsConnectionWithSelfSignedCert() {
wireMockServer.start();
setupServerStub();
- setUpPollingClientFactory(wireMockServer.baseUrl());
-
properties.setProperty(HttpConnectorConfigConstants.ALLOW_SELF_SIGNED, "true");
- testPollingClientConnection();
+ setupAndTestConnection();
}
@ParameterizedTest
@@ -120,14 +121,11 @@ public void testHttpsConnectionWithAddedCerts(String certName) {
wireMockServer.start();
setupServerStub();
- setUpPollingClientFactory(wireMockServer.baseUrl());
-
properties.setProperty(
- HttpConnectorConfigConstants.SERVER_TRUSTED_CERT,
- trustedCert.getAbsolutePath()
+ HttpConnectorConfigConstants.SERVER_TRUSTED_CERT,
+ trustedCert.getAbsolutePath()
);
-
- testPollingClientConnection();
+ setupAndTestConnection();
}
@ParameterizedTest
@@ -154,22 +152,19 @@ public void testMTlsConnection(String clientPrivateKeyName) {
wireMockServer.start();
setupServerStub();
- setUpPollingClientFactory(wireMockServer.baseUrl());
-
properties.setProperty(
- HttpConnectorConfigConstants.SERVER_TRUSTED_CERT,
- serverTrustedCert.getAbsolutePath()
+ HttpConnectorConfigConstants.SERVER_TRUSTED_CERT,
+ serverTrustedCert.getAbsolutePath()
);
properties.setProperty(
- HttpConnectorConfigConstants.CLIENT_CERT,
- clientCert.getAbsolutePath()
+ HttpConnectorConfigConstants.CLIENT_CERT,
+ clientCert.getAbsolutePath()
);
properties.setProperty(
- HttpConnectorConfigConstants.CLIENT_PRIVATE_KEY,
- clientPrivateKey.getAbsolutePath()
+ HttpConnectorConfigConstants.CLIENT_PRIVATE_KEY,
+ clientPrivateKey.getAbsolutePath()
);
-
- testPollingClientConnection();
+ setupAndTestConnection();
}
@Test
@@ -198,21 +193,38 @@ public void testMTlsConnectionUsingKeyStore() {
wireMockServer.start();
setupServerStub();
- setUpPollingClientFactory(wireMockServer.baseUrl());
-
properties.setProperty(
- HttpConnectorConfigConstants.KEY_STORE_PASSWORD,
- password
+ HttpConnectorConfigConstants.KEY_STORE_PASSWORD,
+ password
);
properties.setProperty(
- HttpConnectorConfigConstants.KEY_STORE_PATH,
- clientKeyStoreFile.getAbsolutePath()
+ HttpConnectorConfigConstants.KEY_STORE_PATH,
+ clientKeyStoreFile.getAbsolutePath()
);
properties.setProperty(
- HttpConnectorConfigConstants.SERVER_TRUSTED_CERT,
- serverTrustedCert.getAbsolutePath()
+ HttpConnectorConfigConstants.SERVER_TRUSTED_CERT,
+ serverTrustedCert.getAbsolutePath()
);
+ setupAndTestConnection();
+ }
+
+ private void setupAndTestConnection() {
+ // test with basic auth
+ setupAndTestConnectionWithAuth(
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor());
+ // test with OIDC auth
+ setupAndTestConnectionWithAuth(
+ HttpHeaderUtils.createOIDCAuthorizationHeaderPreprocessor(
+ Optional.of("http://abc"),
+ Optional.of("aaa"),
+ Optional.of(Duration.ofSeconds(5))
+ )
+ );
+ }
+ private void setupAndTestConnectionWithAuth(HeaderPreprocessor headerPreprocessor) {
+ setUpPollingClientFactory(wireMockServer.baseUrl(),
+ headerPreprocessor);
testPollingClientConnection();
}
@@ -299,7 +311,7 @@ private void setupServerStub() {
.withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
}
- private void setUpPollingClientFactory(String baseUrl) {
+ private void setUpPollingClientFactory(String baseUrl, HeaderPreprocessor headerPreprocessor) {
LookupRow lookupRow = new LookupRow()
.addLookupEntry(
@@ -317,7 +329,7 @@ private void setUpPollingClientFactory(String baseUrl) {
GetRequestFactory requestFactory = new GetRequestFactory(
new GenericGetQueryCreator(lookupRow),
- HttpHeaderUtils.createDefaultHeaderPreprocessor(),
+ headerPreprocessor,
HttpLookupConfig.builder()
.url(baseUrl + ENDPOINT)
.build()
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java
index 6d6feac1..f5c9f4bf 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java
@@ -53,7 +53,7 @@ public class JavaNetHttpPollingClientTest {
@BeforeEach
public void setUp() {
- this.headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor();
+ this.headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor();
this.options = HttpLookupConfig.builder().url(BASE_URL).build();
}
@@ -155,7 +155,7 @@ public void shouldBuildBodyBasedClientUri() {
BodyBasedRequestFactory requestFactory = new BodyBasedRequestFactory(
"POST",
new GenericJsonQueryCreator(jsonSerializer),
- HttpHeaderUtils.createDefaultHeaderPreprocessor(),
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(),
HttpLookupConfig.builder()
.url(BASE_URL)
.build()