Skip to content

Commit

Permalink
Rebase main after connector level http parameter support
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo committed Mar 7, 2024
1 parent 1bdb3da commit 458b7e3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,7 +37,7 @@

@Log4j2
@ConnectorExecutor(AWS_SIGV4)
public class AwsConnectorExecutor implements RemoteConnectorExecutor {
public class AwsConnectorExecutor extends AbstractConnectorExecutor {

@Getter
private AwsConnector connector;
Expand All @@ -56,8 +57,12 @@ public class AwsConnectorExecutor implements RemoteConnectorExecutor {
private SdkAsyncHttpClient httpClient;

public AwsConnectorExecutor(Connector connector) {
super.initialize(connector);
this.connector = (AwsConnector) connector;
this.httpClient = MLHttpClientFactory.getAsyncHttpClient();
Duration connectionTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getConnectionTimeout());
Duration readTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getReadTimeout());
Integer maxConnection = super.getConnectorClientConfig().getMaxConnections();
this.httpClient = MLHttpClientFactory.getAsyncHttpClient(connectionTimeout, readTimeout, maxConnection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -61,7 +62,10 @@ public class HttpJsonConnectorExecutor extends AbstractConnectorExecutor {
public HttpJsonConnectorExecutor(Connector connector) {
super.initialize(connector);
this.connector = (HttpConnector) connector;
this.httpClient = MLHttpClientFactory.getAsyncHttpClient();
Duration connectionTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getConnectionTimeout());
Duration readTimeout = Duration.ofSeconds(super.getConnectorClientConfig().getReadTimeout());
Integer maxConnection = super.getConnectorClientConfig().getMaxConnections();
this.httpClient = MLHttpClientFactory.getAsyncHttpClient(connectionTimeout, readTimeout, maxConnection);
}

@Override
Expand Down Expand Up @@ -103,8 +107,4 @@ public void invokeRemoteModel(
actionListener.onFailure(new MLException("Fail to execute http connector", e));
}
}

public SdkAsyncHttpClient getHttpClient(int connectionTimeout, int readTimeout, int maxConnections) {
return MLHttpClientFactory.getAsyncHttpClient(connectionTimeout, readTimeout, maxConnections);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.Arrays;
import java.util.Locale;

Expand All @@ -21,11 +22,16 @@
@Log4j2
public class MLHttpClientFactory {

public static SdkAsyncHttpClient getAsyncHttpClient() {
public static SdkAsyncHttpClient getAsyncHttpClient(Duration connectionTimeout, Duration readTimeout, int maxConnections) {
try {
return AccessController
.doPrivileged(
(PrivilegedExceptionAction<SdkAsyncHttpClient>) () -> NettyNioAsyncHttpClient.builder().maxConcurrency(100).build()
(PrivilegedExceptionAction<SdkAsyncHttpClient>) () -> NettyNioAsyncHttpClient
.builder()
.connectionTimeout(connectionTimeout)
.readTimeout(readTimeout)
.maxConcurrency(maxConnections)
.build()
);
} catch (PrivilegedActionException e) {
return null;
Expand Down

0 comments on commit 458b7e3

Please sign in to comment.