Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[Conjure Java Runtime] Part 10: Drive a truck through the AtlasDbHttp…
Browse files Browse the repository at this point in the history
…Clients API (#4264)

* checkpoint i

* Drive a truck through the APIs

* Fix compiile breaks

* Merge conflicts and breaks

* Fix more breaks

* Add generated changelog entries

* Add generated changelog entries

* Pairing session

* Pairing session 2

* Other CR comments

* Update changelog to reflect structured User Agents

* Fix some tests

* bleh

* Fix test
  • Loading branch information
jeremyk-91 authored Sep 30, 2019
1 parent 356d452 commit 305c9d7
Show file tree
Hide file tree
Showing 29 changed files with 584 additions and 400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public StreamStorePersistenceConfiguration streamStorePersistence() {
return StreamStorePersistenceConfiguration.DEFAULT_CONFIG;
}

@Value.Default
public RemotingClientConfig remotingClient() {
return ImmutableRemotingClientConfig.builder().build();
}

public static ImmutableAtlasDbRuntimeConfig defaultRuntimeConfig() {
return ImmutableAtlasDbRuntimeConfig.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.LeaderConfig;
import com.palantir.atlasdb.config.LeaderRuntimeConfig;
import com.palantir.atlasdb.http.AtlasDbHttpClients;
import com.palantir.atlasdb.http.AtlasDbRemotingConstants;
import com.palantir.atlasdb.http.NotCurrentLeaderExceptionMapper;
import com.palantir.atlasdb.http.UserAgents;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.conjure.java.config.ssl.TrustContext;
import com.palantir.leader.AsyncLeadershipObserver;
import com.palantir.leader.BatchingLeaderElectionService;
Expand Down Expand Up @@ -75,17 +77,20 @@ private Leaders() {
*/
public static LeaderElectionService create(MetricsManager metricsManager,
Consumer<Object> env, LeaderConfig config, Supplier<LeaderRuntimeConfig> runtime) {
return create(metricsManager, env, config, runtime, UserAgents.DEFAULT_USER_AGENT);
return create(metricsManager, env, config, runtime, AtlasDbRemotingConstants.DEFAULT_USER_AGENT);
}

public static LeaderElectionService create(MetricsManager metricsManager,
Consumer<Object> env, LeaderConfig config, Supplier<LeaderRuntimeConfig> runtime, String userAgent) {
Consumer<Object> env, LeaderConfig config, Supplier<LeaderRuntimeConfig> runtime, UserAgent userAgent) {
return createAndRegisterLocalServices(metricsManager, env, config, runtime, userAgent).leaderElectionService();
}

public static LocalPaxosServices createAndRegisterLocalServices(
MetricsManager metricsManager, Consumer<Object> env, LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime, String userAgent) {
MetricsManager metricsManager,
Consumer<Object> env,
LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime,
UserAgent userAgent) {
LocalPaxosServices localPaxosServices = createInstrumentedLocalServices(
metricsManager, config, runtime, userAgent);

Expand All @@ -100,7 +105,7 @@ public static LocalPaxosServices createInstrumentedLocalServices(
MetricsManager metricsManager,
LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime,
String userAgent) {
UserAgent userAgent) {
Set<String> remoteLeaderUris = Sets.newHashSet(config.leaders());
remoteLeaderUris.remove(config.localServer());

Expand All @@ -117,7 +122,7 @@ public static LocalPaxosServices createInstrumentedLocalServices(
LeaderConfig config,
Supplier<LeaderRuntimeConfig> runtime,
RemotePaxosServerSpec remotePaxosServerSpec,
String userAgent) {
UserAgent userAgent) {
UUID leaderUuid = UUID.randomUUID();

AsyncLeadershipObserver leadershipObserver = AsyncLeadershipObserver.create();
Expand Down Expand Up @@ -215,10 +220,20 @@ public static <T> List<T> createProxyAndLocalList(
Set<String> remoteUris,
Optional<TrustContext> trustContext,
Class<T> clazz,
String userAgent) {
UserAgent userAgent) {

// TODO (jkong): Enable runtime config for leader election services.
List<T> remotes = remoteUris.stream()
.map(uri -> AtlasDbHttpClients.createProxy(metrics, trustContext, uri, clazz, userAgent, false))
.map(uri -> AtlasDbHttpClients.createProxy(
metrics,
trustContext,
uri,
clazz,
AuxiliaryRemotingParameters.builder()
.userAgent(userAgent)
.shouldLimitPayload(false)
.shouldRetry(true)
.build()))
.collect(Collectors.toList());

return ImmutableList.copyOf(Iterables.concat(
Expand All @@ -230,14 +245,23 @@ public static Map<PingableLeader, HostAndPort> generatePingables(
MetricsManager metricsManager,
Collection<String> remoteEndpoints,
Optional<TrustContext> trustContext,
String userAgent) {
UserAgent userAgent) {
/* The interface used as a key here may be a proxy, which may have strange .equals() behavior.
* This is circumvented by using an IdentityHashMap which will just use native == for equality.
*/
Map<PingableLeader, HostAndPort> pingables = new IdentityHashMap<>();
for (String endpoint : remoteEndpoints) {
PingableLeader remoteInterface = AtlasDbHttpClients.createProxyWithoutRetrying(metricsManager.getRegistry(),
trustContext, endpoint, PingableLeader.class, userAgent, false);
PingableLeader remoteInterface = AtlasDbHttpClients.createProxy(
metricsManager.getRegistry(),
trustContext,
endpoint,
PingableLeader.class,
AuxiliaryRemotingParameters.builder() // TODO (jkong): Configurable remoting client config.
.shouldLimitPayload(false)
.userAgent(userAgent)
.shouldRetry(false)
.shouldLimitPayload(true)
.build());
HostAndPort hostAndPort = HostAndPort.fromString(endpoint);
pingables.put(remoteInterface, hostAndPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,62 @@
*/
package com.palantir.atlasdb.factory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.net.HostAndPort;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.RemotingClientConfig;
import com.palantir.atlasdb.config.ServerListConfig;
import com.palantir.atlasdb.http.AtlasDbHttpClients;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.conjure.java.api.config.service.ProxyConfiguration;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.conjure.java.api.config.ssl.SslConfiguration;
import com.palantir.conjure.java.config.ssl.SslSocketFactories;
import com.palantir.conjure.java.config.ssl.TrustContext;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;

public final class ServiceCreator {
private final MetricsManager metricsManager;
private final String userAgent;
private final Supplier<ServerListConfig> servers;
private final boolean limitPayload;
private final AuxiliaryRemotingParameters parameters;

private ServiceCreator(MetricsManager metricsManager, String userAgent, Supplier<ServerListConfig> servers,
boolean limitPayload) {
private ServiceCreator(MetricsManager metricsManager,
Supplier<ServerListConfig> servers,
AuxiliaryRemotingParameters parameters) {
this.metricsManager = metricsManager;
this.userAgent = userAgent;
this.servers = servers;
this.limitPayload = limitPayload;
this.parameters = parameters;
}

/**
* Creates clients without client-side restrictions on payload size.
*/
public static ServiceCreator noPayloadLimiter(MetricsManager metrics, String agent,
Supplier<ServerListConfig> serverList) {
return new ServiceCreator(metrics, agent, serverList, false);
public static ServiceCreator noPayloadLimiter(
MetricsManager metrics,
Supplier<ServerListConfig> serverList,
UserAgent userAgent,
Supplier<RemotingClientConfig> remotingClientConfigSupplier) {
return new ServiceCreator(
metrics, serverList, toAuxiliaryRemotingParameters(userAgent, remotingClientConfigSupplier, false));
}

/**
* Creates clients that intercept requests with payload greater than
* {@link com.palantir.atlasdb.http.AtlasDbInterceptors#MAX_PAYLOAD_SIZE} bytes. This ServiceCreator should be used
* for clients to servers that impose payload limits.
*/
public static ServiceCreator withPayloadLimiter(MetricsManager metrics, String agent,
Supplier<ServerListConfig> serverList) {
return new ServiceCreator(metrics, agent, serverList, true);
public static ServiceCreator withPayloadLimiter(
MetricsManager metrics,
Supplier<ServerListConfig> serverList,
UserAgent userAgent,
Supplier<RemotingClientConfig> remotingClientConfigSupplier) {
return new ServiceCreator(
metrics, serverList, toAuxiliaryRemotingParameters(userAgent, remotingClientConfigSupplier, true));
}

public <T> T createService(Class<T> serviceClass) {
return create(
metricsManager,
servers,
SslSocketFactories::createTrustContext,
ServiceCreator::createProxySelector,
serviceClass,
userAgent,
limitPayload);
return create(metricsManager, servers, serviceClass, parameters);
}

/**
Expand All @@ -93,19 +84,13 @@ public static Optional<TrustContext> createTrustContext(Optional<SslConfiguratio
private static <T> T create(
MetricsManager metricsManager,
Supplier<ServerListConfig> serverListConfigSupplier,
Function<SslConfiguration, TrustContext> trustContextCreator,
Function<ProxyConfiguration, ProxySelector> proxySelectorCreator,
Class<T> type,
String userAgent,
boolean limitPayload) {
AuxiliaryRemotingParameters parameters) {
return AtlasDbHttpClients.createLiveReloadingProxyWithFailover(
metricsManager.getTaggedRegistry(),
serverListConfigSupplier,
trustContextCreator,
proxySelectorCreator,
type,
userAgent,
limitPayload);
parameters);
}

public static <T> T createInstrumentedService(MetricRegistry metricRegistry, T service, Class<T> serviceClass) {
Expand All @@ -116,36 +101,15 @@ public static <T> T createInstrumentedService(MetricRegistry metricRegistry, T s
MetricRegistry.name(serviceClass));
}

/**
* The code below is copied from http-remoting and should be removed when we switch the clients to use remoting.
*/
public static ProxySelector createProxySelector(ProxyConfiguration proxyConfig) {
switch (proxyConfig.type()) {
case DIRECT:
return fixedProxySelectorFor(Proxy.NO_PROXY);
case HTTP:
HostAndPort hostAndPort = HostAndPort.fromString(proxyConfig.hostAndPort()
.orElseThrow(() -> new SafeIllegalArgumentException(
"Expected to find proxy hostAndPort configuration for HTTP proxy")));
InetSocketAddress addr = new InetSocketAddress(hostAndPort.getHost(), hostAndPort.getPort());
return fixedProxySelectorFor(new Proxy(Proxy.Type.HTTP, addr));
default:
// fall through
}

throw new IllegalStateException("Failed to create ProxySelector for proxy configuration: " + proxyConfig);
}

private static ProxySelector fixedProxySelectorFor(Proxy proxy) {
return new ProxySelector() {
@Override
public List<Proxy> select(URI uri) {
return ImmutableList.of(proxy);
}

@Override
public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {}
};

private static AuxiliaryRemotingParameters toAuxiliaryRemotingParameters(
UserAgent userAgent,
Supplier<RemotingClientConfig> remotingClientConfigSupplier,
boolean shouldLimitPayload) {
return AuxiliaryRemotingParameters.builder()
.remotingClientConfig(remotingClientConfigSupplier)
.userAgent(userAgent)
.shouldLimitPayload(shouldLimitPayload)
.shouldRetry(true)
.build();
}
}
Loading

0 comments on commit 305c9d7

Please sign in to comment.