Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[Conjure Java Runtime] Part 10: Drive a truck through the AtlasDbHttpClients API #4264

Merged
merged 15 commits into from
Sep 30, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public StreamStorePersistenceConfiguration streamStorePersistence() {
return StreamStorePersistenceConfiguration.DEFAULT_CONFIG;
}

@Value.Default
public RemotingClientConfig remotingClient() {
return ImmutableRemotingClientConfig.builder().build();
}

public static ImmutableAtlasDbRuntimeConfig defaultRuntimeConfig() {
return ImmutableAtlasDbRuntimeConfig.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.LeaderConfig;
import com.palantir.atlasdb.config.LeaderRuntimeConfig;
import com.palantir.atlasdb.http.AtlasDbHttpClients;
import com.palantir.atlasdb.http.AtlasDbRemotingConstants;
import com.palantir.atlasdb.http.NotCurrentLeaderExceptionMapper;
import com.palantir.atlasdb.http.UserAgents;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.conjure.java.config.ssl.TrustContext;
import com.palantir.leader.AsyncLeadershipObserver;
import com.palantir.leader.BatchingLeaderElectionService;
Expand Down Expand Up @@ -75,17 +77,20 @@ private Leaders() {
*/
public static LeaderElectionService create(MetricsManager metricsManager,
Consumer<Object> env, LeaderConfig config, Supplier<LeaderRuntimeConfig> runtime) {
return create(metricsManager, env, config, runtime, UserAgents.DEFAULT_USER_AGENT);
return create(metricsManager, env, config, runtime, AtlasDbRemotingConstants.DEFAULT_USER_AGENT);
}

public static LeaderElectionService create(MetricsManager metricsManager,
Consumer<Object> env, LeaderConfig config, Supplier<LeaderRuntimeConfig> runtime, String userAgent) {
Consumer<Object> env, LeaderConfig config, Supplier<LeaderRuntimeConfig> runtime, UserAgent userAgent) {
return createAndRegisterLocalServices(metricsManager, env, config, runtime, userAgent).leaderElectionService();
}

public static LocalPaxosServices createAndRegisterLocalServices(
MetricsManager metricsManager, Consumer<Object> env, LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime, String userAgent) {
MetricsManager metricsManager,
Consumer<Object> env,
LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime,
UserAgent userAgent) {
LocalPaxosServices localPaxosServices = createInstrumentedLocalServices(
metricsManager, config, runtime, userAgent);

Expand All @@ -100,7 +105,7 @@ public static LocalPaxosServices createInstrumentedLocalServices(
MetricsManager metricsManager,
LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime,
String userAgent) {
UserAgent userAgent) {
Set<String> remoteLeaderUris = Sets.newHashSet(config.leaders());
remoteLeaderUris.remove(config.localServer());

Expand All @@ -117,7 +122,7 @@ public static LocalPaxosServices createInstrumentedLocalServices(
LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime,
RemotePaxosServerSpec remotePaxosServerSpec,
String userAgent) {
UserAgent userAgent) {
UUID leaderUuid = UUID.randomUUID();

AsyncLeadershipObserver leadershipObserver = AsyncLeadershipObserver.create();
Expand Down Expand Up @@ -212,10 +217,12 @@ public static <T> List<T> createProxyAndLocalList(
Set<String> remoteUris,
Optional<TrustContext> trustContext,
Class<T> clazz,
String userAgent) {
UserAgent userAgent) {

// TODO (jkong): Enable runtime config for leader election services.
List<T> remotes = remoteUris.stream()
.map(uri -> AtlasDbHttpClients.createProxy(metrics, trustContext, uri, clazz, userAgent, false))
.map(uri -> AtlasDbHttpClients.createProxy(metrics, trustContext, uri, clazz,
AuxiliaryRemotingParameters.builder().userAgent(userAgent).shouldLimitPayload(false).build()))
.collect(Collectors.toList());

return ImmutableList.copyOf(Iterables.concat(
Expand All @@ -227,14 +234,21 @@ public static Map<PingableLeader, HostAndPort> generatePingables(
MetricsManager metricsManager,
Collection<String> remoteEndpoints,
Optional<TrustContext> trustContext,
String userAgent) {
UserAgent userAgent) {
/* The interface used as a key here may be a proxy, which may have strange .equals() behavior.
* This is circumvented by using an IdentityHashMap which will just use native == for equality.
*/
Map<PingableLeader, HostAndPort> pingables = new IdentityHashMap<>();
for (String endpoint : remoteEndpoints) {
PingableLeader remoteInterface = AtlasDbHttpClients.createProxyWithoutRetrying(metricsManager.getRegistry(),
trustContext, endpoint, PingableLeader.class, userAgent, false);
PingableLeader remoteInterface = AtlasDbHttpClients.createProxyWithoutRetrying(
metricsManager.getRegistry(),
trustContext,
endpoint,
PingableLeader.class,
AuxiliaryRemotingParameters.builder() // TODO (jkong): Configurable remoting client config.
.shouldLimitPayload(false)
.userAgent(userAgent)
.build());
HostAndPort hostAndPort = HostAndPort.fromString(endpoint);
pingables.put(remoteInterface, hostAndPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,46 +29,56 @@
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.net.HostAndPort;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.RemotingClientConfig;
import com.palantir.atlasdb.config.ServerListConfig;
import com.palantir.atlasdb.http.AtlasDbHttpClients;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.conjure.java.api.config.service.ProxyConfiguration;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.conjure.java.api.config.ssl.SslConfiguration;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.config.ssl.TrustContext;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;

public final class ServiceCreator {
private final MetricsManager metricsManager;
private final String userAgent;
private final Supplier<ServerListConfig> servers;
private final boolean limitPayload;
private final AuxiliaryRemotingParameters parameters;

private ServiceCreator(MetricsManager metricsManager, String userAgent, Supplier<ServerListConfig> servers,
boolean limitPayload) {
private ServiceCreator(MetricsManager metricsManager,
Supplier<ServerListConfig> servers,
AuxiliaryRemotingParameters parameters) {
this.metricsManager = metricsManager;
this.userAgent = userAgent;
this.servers = servers;
this.limitPayload = limitPayload;
this.parameters = parameters;
}

/**
* Creates clients without client-side restrictions on payload size.
*/
public static ServiceCreator noPayloadLimiter(MetricsManager metrics, String agent,
Supplier<ServerListConfig> serverList) {
return new ServiceCreator(metrics, agent, serverList, false);
public static ServiceCreator noPayloadLimiter(
MetricsManager metrics,
Supplier<ServerListConfig> serverList,
UserAgent userAgent,
Supplier<RemotingClientConfig> remotingClientConfigSupplier) {
return new ServiceCreator(
metrics, serverList, toAuxiliaryRemotingParameters(userAgent, remotingClientConfigSupplier, false));
}

/**
* Creates clients that intercept requests with payload greater than
* {@link com.palantir.atlasdb.http.AtlasDbInterceptors#MAX_PAYLOAD_SIZE} bytes. This ServiceCreator should be used
* for clients to servers that impose payload limits.
*/
public static ServiceCreator withPayloadLimiter(MetricsManager metrics, String agent,
Supplier<ServerListConfig> serverList) {
return new ServiceCreator(metrics, agent, serverList, true);
public static ServiceCreator withPayloadLimiter(
MetricsManager metrics,
Supplier<ServerListConfig> serverList,
UserAgent userAgent,
Supplier<RemotingClientConfig> remotingClientConfigSupplier) {
return new ServiceCreator(
metrics, serverList, toAuxiliaryRemotingParameters(userAgent, remotingClientConfigSupplier, true));
}

public <T> T createService(Class<T> serviceClass) {
Expand All @@ -78,8 +88,7 @@ public <T> T createService(Class<T> serviceClass) {
SslSocketFactories::createTrustContext,
ServiceCreator::createProxySelector,
serviceClass,
userAgent,
limitPayload);
parameters);
}

/**
Expand All @@ -96,16 +105,14 @@ private static <T> T create(
Function<SslConfiguration, TrustContext> trustContextCreator,
Function<ProxyConfiguration, ProxySelector> proxySelectorCreator,
Class<T> type,
String userAgent,
boolean limitPayload) {
AuxiliaryRemotingParameters parameters) {
return AtlasDbHttpClients.createLiveReloadingProxyWithFailover(
metricsManager.getTaggedRegistry(),
serverListConfigSupplier,
trustContextCreator,
proxySelectorCreator,
type,
userAgent,
limitPayload);
parameters);
}

public static <T> T createInstrumentedService(MetricRegistry metricRegistry, T service, Class<T> serviceClass) {
Expand Down Expand Up @@ -146,6 +153,17 @@ public List<Proxy> select(URI uri) {
@Override
public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {}
};
}


private static AuxiliaryRemotingParameters toAuxiliaryRemotingParameters(
UserAgent userAgent,
Supplier<RemotingClientConfig> remotingClientConfigSupplier,
boolean shouldLimitPayload) {
return AuxiliaryRemotingParameters.builder()
.remotingClientConfig(remotingClientConfigSupplier)
.userAgent(userAgent)
.shouldLimitPayload(shouldLimitPayload)
.build();
}
}
Loading