Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[Dialogue] Part 1: BlockingSensitive -> TimeoutSensitive Nomenclature Changes #4761

Merged
merged 7 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public static <T> List<T> createProxyAndLocalList(
.userAgent(userAgent)
.shouldLimitPayload(false)
.shouldRetry(true)
.shouldSupportBlockingOperations(false)
.shouldUseExtendedTimeout(false)
.remotingClientConfig(remotingClientConfig)
.build()))
.collect(Collectors.toList());
Expand All @@ -296,7 +296,7 @@ public static List<LeaderPingerContext<PingableLeader>> generatePingables(
.userAgent(userAgent)
.shouldLimitPayload(false) // Guaranteed to be small, no need to limit.
.shouldRetry(false)
.shouldSupportBlockingOperations(false)
.shouldUseExtendedTimeout(false)
.remotingClientConfig(remotingClientConfig)
.build()))
.map(Leaders::convertAddressToHostAndPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public <T> T createService(Class<T> serviceClass) {
return create(metricsManager, servers, serviceClass, parameters);
}

public <T> T createServiceWithoutBlockingOperations(Class<T> serviceClass) {
public <T> T createServiceWithShortTimeout(Class<T> serviceClass) {
AuxiliaryRemotingParameters blockingUnsupportedParameters
= ImmutableAuxiliaryRemotingParameters.copyOf(parameters).withShouldSupportBlockingOperations(false);
= ImmutableAuxiliaryRemotingParameters.copyOf(parameters).withShouldUseExtendedTimeout(false);
return create(metricsManager, servers, serviceClass, blockingUnsupportedParameters);
}

Expand Down Expand Up @@ -116,7 +116,7 @@ private static AuxiliaryRemotingParameters toAuxiliaryRemotingParameters(
.remotingClientConfig(remotingClientConfigSupplier)
.userAgent(userAgent)
.shouldLimitPayload(shouldLimitPayload)
.shouldSupportBlockingOperations(true) // TODO (jkong): Figure out when to migrate safely
.shouldUseExtendedTimeout(true) // TODO (jkong): Figure out when to migrate safely
.shouldRetry(true)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
import com.palantir.atlasdb.factory.Leaders.LocalPaxosServices;
import com.palantir.atlasdb.factory.startup.ConsistencyCheckRunner;
import com.palantir.atlasdb.factory.startup.TimeLockMigrator;
import com.palantir.atlasdb.factory.timelock.BlockingAndNonBlockingServices;
import com.palantir.atlasdb.factory.timelock.BlockingSensitiveConjureTimelockService;
import com.palantir.atlasdb.factory.timelock.BlockingSensitiveLockRpcClient;
import com.palantir.atlasdb.factory.timelock.ShortAndLongTimeoutServices;
import com.palantir.atlasdb.factory.timelock.TimeoutSensitiveConjureTimelockService;
import com.palantir.atlasdb.factory.timelock.TimeoutSensitiveLockRpcClient;
import com.palantir.atlasdb.factory.timelock.TimestampCorroboratingTimelockService;
import com.palantir.atlasdb.factory.timestamp.FreshTimestampSupplierAdapter;
import com.palantir.atlasdb.http.AtlasDbHttpClients;
Expand Down Expand Up @@ -1008,16 +1008,16 @@ private static LockAndTimestampServices getLockAndTimestampServices(
ServiceCreator creator = ServiceCreator.withPayloadLimiter(
metricsManager, timelockServerListConfig, userAgent, remotingConfigSupplier);

LockRpcClient lockRpcClient = new BlockingSensitiveLockRpcClient(
BlockingAndNonBlockingServices.create(creator, LockRpcClient.class));
LockRpcClient lockRpcClient = new TimeoutSensitiveLockRpcClient(
ShortAndLongTimeoutServices.create(creator, LockRpcClient.class));

LockService lockService = AtlasDbMetrics.instrumentTimed(
metricsManager.getRegistry(),
LockService.class,
RemoteLockServiceAdapter.create(lockRpcClient, timelockNamespace));

ConjureTimelockService conjureTimelockService = new BlockingSensitiveConjureTimelockService(
BlockingAndNonBlockingServices.create(creator, ConjureTimelockService.class));
ConjureTimelockService conjureTimelockService = new TimeoutSensitiveConjureTimelockService(
ShortAndLongTimeoutServices.create(creator, ConjureTimelockService.class));

TimelockRpcClient timelockClient = creator.createService(TimelockRpcClient.class);

Expand All @@ -1040,7 +1040,7 @@ private static LockAndTimestampServices getLockAndTimestampServices(
RemoteTimelockServiceAdapter remoteTimelockServiceAdapter = RemoteTimelockServiceAdapter
.create(namespacedTimelockRpcClient, namespacedConjureTimelockService, lockWatchEventCache);
TimestampManagementService timestampManagementService = new RemoteTimestampManagementAdapter(
creator.createServiceWithoutBlockingOperations(TimestampManagementRpcClient.class), timelockNamespace);
creator.createServiceWithShortTimeout(TimestampManagementRpcClient.class), timelockNamespace);

return ImmutableLockAndTimestampServices.builder()
.lock(lockService)
Expand Down Expand Up @@ -1118,7 +1118,7 @@ private static LockAndTimestampServices createRawLeaderServices(
.userAgent(userAgent)
.shouldRetry(true)
.shouldLimitPayload(true)
.shouldSupportBlockingOperations(false)
.shouldUseExtendedTimeout(false)
.build());

// Determine asynchronously whether the remote services are talking to our local services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import com.palantir.atlasdb.factory.ServiceCreator;

@Value.Immutable
public interface BlockingAndNonBlockingServices<T> {
T blocking();
T nonBlocking();
public interface ShortAndLongTimeoutServices<T> {
T longTimeout();
T shortTimeout();

static <T> BlockingAndNonBlockingServices<T> create(ServiceCreator serviceCreator, Class<T> clazz) {
return ImmutableBlockingAndNonBlockingServices.<T>builder()
.blocking(serviceCreator.createService(clazz))
.nonBlocking(serviceCreator.createServiceWithoutBlockingOperations(clazz))
static <T> ShortAndLongTimeoutServices<T> create(ServiceCreator serviceCreator, Class<T> clazz) {
return ImmutableShortAndLongTimeoutServices.<T>builder()
.longTimeout(serviceCreator.createService(clazz))
.shortTimeout(serviceCreator.createServiceWithShortTimeout(clazz))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,58 +37,58 @@
* Given two proxies to the same set of underlying TimeLock servers, one configured to expect longer-running operations
* on the server and one configured not to, routes calls appropriately.
*/
public final class BlockingSensitiveConjureTimelockService implements ConjureTimelockService {
private final ConjureTimelockService blocking;
private final ConjureTimelockService nonBlocking;
public final class TimeoutSensitiveConjureTimelockService implements ConjureTimelockService {
private final ConjureTimelockService longTimeoutProxy;
private final ConjureTimelockService shortTimeoutProxy;

public BlockingSensitiveConjureTimelockService(
BlockingAndNonBlockingServices<ConjureTimelockService> conjureTimelockServices) {
this.blocking = conjureTimelockServices.blocking();
this.nonBlocking = conjureTimelockServices.nonBlocking();
public TimeoutSensitiveConjureTimelockService(
ShortAndLongTimeoutServices<ConjureTimelockService> conjureTimelockServices) {
this.longTimeoutProxy = conjureTimelockServices.longTimeout();
this.shortTimeoutProxy = conjureTimelockServices.shortTimeout();
}

@Override
public ConjureStartTransactionsResponse startTransactions(AuthHeader authHeader, String namespace,
ConjureStartTransactionsRequest request) {
return nonBlocking.startTransactions(authHeader, namespace, request);
return shortTimeoutProxy.startTransactions(authHeader, namespace, request);
}

@Override
public ConjureGetFreshTimestampsResponse getFreshTimestamps(AuthHeader authHeader, String namespace,
ConjureGetFreshTimestampsRequest request) {
return nonBlocking.getFreshTimestamps(authHeader, namespace, request);
return shortTimeoutProxy.getFreshTimestamps(authHeader, namespace, request);
}

@Override
public LeaderTime leaderTime(AuthHeader authHeader, String namespace) {
return nonBlocking.leaderTime(authHeader, namespace);
return shortTimeoutProxy.leaderTime(authHeader, namespace);
}

@Override
public ConjureLockResponse lock(AuthHeader authHeader, String namespace, ConjureLockRequest request) {
return blocking.lock(authHeader, namespace, request);
return longTimeoutProxy.lock(authHeader, namespace, request);
}

@Override
public ConjureWaitForLocksResponse waitForLocks(AuthHeader authHeader, String namespace,
ConjureLockRequest request) {
return blocking.waitForLocks(authHeader, namespace, request);
return longTimeoutProxy.waitForLocks(authHeader, namespace, request);
}

@Override
public ConjureRefreshLocksResponse refreshLocks(AuthHeader authHeader, String namespace,
ConjureRefreshLocksRequest request) {
return nonBlocking.refreshLocks(authHeader, namespace, request);
return shortTimeoutProxy.refreshLocks(authHeader, namespace, request);
}

@Override
public ConjureUnlockResponse unlock(AuthHeader authHeader, String namespace, ConjureUnlockRequest request) {
return nonBlocking.unlock(authHeader, namespace, request);
return shortTimeoutProxy.unlock(authHeader, namespace, request);
}

@Override
public GetCommitTimestampsResponse getCommitTimestamps(AuthHeader authHeader, String namespace,
GetCommitTimestampsRequest request) {
return nonBlocking.getCommitTimestamps(authHeader, namespace, request);
return shortTimeoutProxy.getCommitTimestamps(authHeader, namespace, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,126 +34,126 @@
* Given two proxies to the same set of underlying remote lock servers, one configured to expect longer-running
* operations on the server and one not to, routes calls appropriately.
*/
public class BlockingSensitiveLockRpcClient implements LockRpcClient {
private final LockRpcClient blocking;
private final LockRpcClient nonBlocking;
public class TimeoutSensitiveLockRpcClient implements LockRpcClient {
private final LockRpcClient longTimeoutProxy;
private final LockRpcClient shortTimeoutProxy;

public BlockingSensitiveLockRpcClient(BlockingAndNonBlockingServices<LockRpcClient> services) {
this.blocking = services.blocking();
this.nonBlocking = services.nonBlocking();
public TimeoutSensitiveLockRpcClient(ShortAndLongTimeoutServices<LockRpcClient> services) {
this.longTimeoutProxy = services.longTimeout();
this.shortTimeoutProxy = services.shortTimeout();
}

@Override
public Optional<LockResponse> lockWithFullLockResponse(String namespace, LockClient client, LockRequest request)
throws InterruptedException {
return blocking.lockWithFullLockResponse(namespace, client, request);
return longTimeoutProxy.lockWithFullLockResponse(namespace, client, request);
}

@Override
public boolean unlock(String namespace, HeldLocksToken token) {
return nonBlocking.unlock(namespace, token);
return shortTimeoutProxy.unlock(namespace, token);
}

@Override
public boolean unlock(String namespace, LockRefreshToken token) {
return nonBlocking.unlock(namespace, token);
return shortTimeoutProxy.unlock(namespace, token);
}

@Override
public boolean unlockSimple(String namespace, SimpleHeldLocksToken token) {
return nonBlocking.unlockSimple(namespace, token);
return shortTimeoutProxy.unlockSimple(namespace, token);
}

@Override
public boolean unlockAndFreeze(String namespace, HeldLocksToken token) {
// TODO (jkong): It feels like this could be non-blocking but not 100% sure so going for the safe option.
return blocking.unlockAndFreeze(namespace, token);
return longTimeoutProxy.unlockAndFreeze(namespace, token);
}

@Override
public Set<HeldLocksToken> getTokens(String namespace, LockClient client) {
return nonBlocking.getTokens(namespace, client);
return shortTimeoutProxy.getTokens(namespace, client);
}

@Override
public Set<HeldLocksToken> refreshTokens(String namespace, Iterable<HeldLocksToken> tokens) {
return nonBlocking.refreshTokens(namespace, tokens);
return shortTimeoutProxy.refreshTokens(namespace, tokens);
}

@Override
public Optional<HeldLocksGrant> refreshGrant(String namespace, HeldLocksGrant grant) {
return nonBlocking.refreshGrant(namespace, grant);
return shortTimeoutProxy.refreshGrant(namespace, grant);
}

@Override
public Optional<HeldLocksGrant> refreshGrant(String namespace, BigInteger grantId) {
return nonBlocking.refreshGrant(namespace, grantId);
return shortTimeoutProxy.refreshGrant(namespace, grantId);
}

@Override
public HeldLocksGrant convertToGrant(String namespace, HeldLocksToken token) {
// TODO (jkong): It feels like this could be non-blocking but not 100% sure so going for the safe option.
return blocking.convertToGrant(namespace, token);
return longTimeoutProxy.convertToGrant(namespace, token);
}

@Override
public HeldLocksToken useGrant(String namespace, LockClient client, HeldLocksGrant grant) {
// TODO (jkong): It feels like this could be non-blocking but not 100% sure so going for the safe option.
return blocking.useGrant(namespace, client, grant);
return longTimeoutProxy.useGrant(namespace, client, grant);
}

@Override
public HeldLocksToken useGrant(String namespace, LockClient client, BigInteger grantId) {
// TODO (jkong): It feels like this could be non-blocking but not 100% sure so going for the safe option.
return blocking.useGrant(namespace, client, grantId);
return longTimeoutProxy.useGrant(namespace, client, grantId);
}

@Override
public Optional<Long> getMinLockedInVersionId(String namespace) {
return nonBlocking.getMinLockedInVersionId(namespace);
return shortTimeoutProxy.getMinLockedInVersionId(namespace);
}

@Override
public Optional<Long> getMinLockedInVersionId(String namespace, LockClient client) {
return nonBlocking.getMinLockedInVersionId(namespace, client);
return shortTimeoutProxy.getMinLockedInVersionId(namespace, client);
}

@Override
public Optional<Long> getMinLockedInVersionId(String namespace, String client) {
return nonBlocking.getMinLockedInVersionId(namespace, client);
return shortTimeoutProxy.getMinLockedInVersionId(namespace, client);
}

@Override
public LockServerOptions getLockServerOptions(String namespace) {
return nonBlocking.getLockServerOptions(namespace);
return shortTimeoutProxy.getLockServerOptions(namespace);
}

@Override
public Optional<LockRefreshToken> lock(String namespace, String client, LockRequest request)
throws InterruptedException {
return blocking.lock(namespace, client, request);
return longTimeoutProxy.lock(namespace, client, request);
}

@Override
public Optional<HeldLocksToken> lockAndGetHeldLocks(String namespace, String client, LockRequest request)
throws InterruptedException {
return blocking.lockAndGetHeldLocks(namespace, client, request);
return longTimeoutProxy.lockAndGetHeldLocks(namespace, client, request);
}

@Override
public Set<LockRefreshToken> refreshLockRefreshTokens(String namespace, Iterable<LockRefreshToken> tokens) {
return nonBlocking.refreshLockRefreshTokens(namespace, tokens);
return shortTimeoutProxy.refreshLockRefreshTokens(namespace, tokens);
}

@Override
public long currentTimeMillis(String namespace) {
return nonBlocking.currentTimeMillis(namespace);
return shortTimeoutProxy.currentTimeMillis(namespace);
}

@Override
public void logCurrentState(String namespace) {
// Even if this does take more than the non-blocking timeout, the request will fail while the server will
// dump its logs out.
nonBlocking.logCurrentState(namespace);
shortTimeoutProxy.logCurrentState(namespace);
}
}
Loading