diff --git a/changelog/@unreleased/pr-5294.v2.yml b/changelog/@unreleased/pr-5294.v2.yml new file mode 100644 index 00000000000..061b7696d06 --- /dev/null +++ b/changelog/@unreleased/pr-5294.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: AtlasDb client can now batch getCommitTimestamps requests across namespaces. + links: + - https://github.com/palantir/atlasdb/pull/5294 diff --git a/lock-api/build.gradle b/lock-api/build.gradle index 566e418dd7e..7b346d0a041 100644 --- a/lock-api/build.gradle +++ b/lock-api/build.gradle @@ -24,6 +24,7 @@ dependencies { compile group: 'com.palantir.safe-logging', name: 'safe-logging' compile group: 'com.palantir.safe-logging', name: 'preconditions' compile group: 'com.palantir.refreshable', name: 'refreshable' + compile group: 'one.util', name: 'streamex' annotationProcessor project(":atlasdb-processors") compileOnly project(":atlasdb-processors") diff --git a/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java new file mode 100644 index 00000000000..4779fd24466 --- /dev/null +++ b/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java @@ -0,0 +1,197 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.lock.client; + +import static com.palantir.lock.client.ConjureLockRequests.toConjure; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Streams; +import com.palantir.atlasdb.autobatch.Autobatchers; +import com.palantir.atlasdb.autobatch.BatchElement; +import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; +import com.palantir.atlasdb.futures.AtlasFutures; +import com.palantir.atlasdb.timelock.api.GetCommitTimestampsRequest; +import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.common.streams.KeyedStream; +import com.palantir.lock.v2.LockToken; +import com.palantir.lock.watch.ImmutableTransactionUpdate; +import com.palantir.lock.watch.LockWatchEventCache; +import com.palantir.lock.watch.LockWatchStateUpdate; +import com.palantir.lock.watch.LockWatchVersion; +import com.palantir.lock.watch.TransactionUpdate; +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.immutables.value.Value; + +public final class MultiClientCommitTimestampGetter implements AutoCloseable { + private final DisruptorAutobatcher autobatcher; + + private MultiClientCommitTimestampGetter(DisruptorAutobatcher autobatcher) { + this.autobatcher = autobatcher; + } + + public static MultiClientCommitTimestampGetter create(InternalMultiClientConjureTimelockService delegate) { + DisruptorAutobatcher autobatcher = Autobatchers.independent(consumer(delegate)) + .safeLoggablePurpose("multi-client-commit-timestamp-getter") + .build(); + return new MultiClientCommitTimestampGetter(autobatcher); + } + + public long getCommitTimestamp(Namespace namespace, long startTs, LockToken commitLocksToken) { + return AtlasFutures.getUnchecked(autobatcher.apply(ImmutableNamespacedRequest.builder() + .namespace(namespace) + .startTs(startTs) + .commitLocksToken(commitLocksToken) + .build())); + } + + @VisibleForTesting + static Consumer>> consumer( + InternalMultiClientConjureTimelockService delegate) { + return batch -> { + BatchStateManager batchStateManager = BatchStateManager.createFromRequestBatch(batch); + while (batchStateManager.hasPendingRequests()) { + batchStateManager.processResponse(delegate.getCommitTimestamps(batchStateManager.getRequests())); + } + }; + } + + private static final class BatchStateManager { + private final Map requestMap; + + private BatchStateManager(Map requestMap) { + this.requestMap = requestMap; + } + + static BatchStateManager createFromRequestBatch(List> batch) { + Map requestMap = new HashMap<>(); + + for (BatchElement elem : batch) { + NamespacedRequest argument = elem.argument(); + Namespace namespace = argument.namespace(); + NamespacedBatchStateManager namespacedBatchStateManager = requestMap.computeIfAbsent( + namespace, _unused -> new NamespacedBatchStateManager(argument.cache())); + namespacedBatchStateManager.addRequest(elem); + } + + return new BatchStateManager(requestMap); + } + + private boolean hasPendingRequests() { + return requestMap.values().stream().anyMatch(NamespacedBatchStateManager::hasPendingRequests); + } + + private Map getRequests() { + return KeyedStream.stream(requestMap) + .filter(NamespacedBatchStateManager::hasPendingRequests) + .map(NamespacedBatchStateManager::getRequestForServer) + .collectToMap(); + } + + private void processResponse(Map responseMap) { + responseMap.forEach((namespace, getCommitTimestampsResponse) -> + requestMap.get(namespace).serviceRequests(getCommitTimestampsResponse)); + } + } + + private static final class NamespacedBatchStateManager { + private final Queue> pendingRequestQueue; + private final LockWatchEventCache cache; + private Optional lastKnownVersion; + + private NamespacedBatchStateManager(LockWatchEventCache cache) { + this.pendingRequestQueue = new ArrayDeque<>(); + this.cache = cache; + this.lastKnownVersion = Optional.empty(); + } + + private boolean hasPendingRequests() { + return !pendingRequestQueue.isEmpty(); + } + + private void addRequest(BatchElement elem) { + pendingRequestQueue.add(elem); + } + + private GetCommitTimestampsRequest getRequestForServer() { + return GetCommitTimestampsRequest.builder() + .numTimestamps(pendingRequestQueue.size()) + .lastKnownVersion(toConjure(updateAndGetLastKnownVersion())) + .build(); + } + + private Optional updateAndGetLastKnownVersion() { + lastKnownVersion = cache.lastKnownVersion(); + return lastKnownVersion; + } + + private void serviceRequests(GetCommitTimestampsResponse commitTimestampsResponse) { + List commitTimestamps = getCommitTimestampValues(commitTimestampsResponse); + + processLockWatchUpdate(commitTimestamps, commitTimestampsResponse.getLockWatchUpdate()); + LockWatchLogUtility.logTransactionEvents(lastKnownVersion, commitTimestampsResponse.getLockWatchUpdate()); + + for (Long commitTimestamp : commitTimestamps) { + pendingRequestQueue.poll().result().set(commitTimestamp); + } + } + + private List getCommitTimestampValues(GetCommitTimestampsResponse commitTimestampsResponse) { + return LongStream.rangeClosed( + commitTimestampsResponse.getInclusiveLower(), commitTimestampsResponse.getInclusiveUpper()) + .boxed() + .collect(Collectors.toList()); + } + + private void processLockWatchUpdate(List timestamps, LockWatchStateUpdate lockWatchUpdate) { + List transactionUpdates = Streams.zip( + timestamps.stream(), + pendingRequestQueue.stream(), + (commitTs, batchElement) -> ImmutableTransactionUpdate.builder() + .startTs(batchElement.argument().startTs()) + .commitTs(commitTs) + .writesToken(batchElement.argument().commitLocksToken()) + .build()) + .collect(Collectors.toList()); + cache.processGetCommitTimestampsUpdate(transactionUpdates, lockWatchUpdate); + } + } + + @Override + public void close() { + autobatcher.close(); + } + + @Value.Immutable + interface NamespacedRequest { + Namespace namespace(); + + long startTs(); + + LockToken commitLocksToken(); + + LockWatchEventCache cache(); + } +} diff --git a/lock-api/src/main/java/com/palantir/lock/client/MultiClientTransactionStarter.java b/lock-api/src/main/java/com/palantir/lock/client/MultiClientTransactionStarter.java index 3b64616ae2a..c86658e5fba 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/MultiClientTransactionStarter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/MultiClientTransactionStarter.java @@ -33,8 +33,8 @@ import com.palantir.common.streams.KeyedStream; import com.palantir.lock.v2.StartIdentifiedAtlasDbTransactionResponse; import com.palantir.lock.watch.StartTransactionsLockWatchEventCache; +import java.util.ArrayDeque; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -278,8 +278,8 @@ private static final class ResponseHandler implements AutoCloseable { private final LockCleanupService lockCleanupService; ResponseHandler(LockCleanupService lockCleanupService) { - this.pendingFutures = new LinkedList<>(); - this.transientResponseList = new LinkedList<>(); + this.pendingFutures = new ArrayDeque<>(); + this.transientResponseList = new ArrayDeque<>(); this.lockCleanupService = lockCleanupService; } diff --git a/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java new file mode 100644 index 00000000000..ca3278c44eb --- /dev/null +++ b/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java @@ -0,0 +1,40 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.lock.client; + +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.lock.v2.LockToken; + +public class NamespacedCommitTimestampGetter implements CommitTimestampGetter { + private final Namespace namespace; + private final MultiClientCommitTimestampGetter batcher; + + public NamespacedCommitTimestampGetter(Namespace namespace, MultiClientCommitTimestampGetter batcher) { + this.namespace = namespace; + this.batcher = batcher; + } + + @Override + public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + return batcher.getCommitTimestamp(namespace, startTs, commitLocksToken); + } + + @Override + public void close() { + batcher.close(); + } +} diff --git a/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java b/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java new file mode 100644 index 00000000000..7f992915029 --- /dev/null +++ b/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java @@ -0,0 +1,255 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.lock.client; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.palantir.atlasdb.autobatch.BatchElement; +import com.palantir.atlasdb.autobatch.DisruptorAutobatcher.DisruptorFuture; +import com.palantir.atlasdb.timelock.api.GetCommitTimestampsRequest; +import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.common.streams.KeyedStream; +import com.palantir.lock.client.MultiClientCommitTimestampGetter.NamespacedRequest; +import com.palantir.lock.v2.LockToken; +import com.palantir.lock.watch.LockWatchEventCache; +import com.palantir.lock.watch.LockWatchStateUpdate; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import one.util.streamex.StreamEx; +import org.junit.Test; + +public class MultiClientCommitTimestampGetterTest { + private static final int COMMIT_TS_LIMIT_PER_REQUEST = 5; + private static final SafeIllegalStateException EXCEPTION = new SafeIllegalStateException("Something went wrong!"); + + private final Map lowestStartTsMap = new HashMap<>(); + private final Map lockWatchEventCacheMap = new HashMap<>(); + + private final LockToken lockToken = mock(LockToken.class); + private final InternalMultiClientConjureTimelockService timelockService = + mock(InternalMultiClientConjureTimelockService.class); + private final LockWatchStateUpdate lockWatchStateUpdate = mock(LockWatchStateUpdate.class); + + private final Consumer>> consumer = + MultiClientCommitTimestampGetter.consumer(timelockService); + + @Test + public void canServiceOneClient() { + setupServiceAndAssertSanityOfResponse(getCommitTimestampRequestsForClients(1, COMMIT_TS_LIMIT_PER_REQUEST - 1)); + } + + @Test + public void canServiceOneClientInMultipleRequests() { + setupServiceAndAssertSanityOfResponse(getCommitTimestampRequestsForClients(1, COMMIT_TS_LIMIT_PER_REQUEST * 5)); + } + + @Test + public void canServiceMultipleClients() { + int clientCount = 50; + setupServiceAndAssertSanityOfResponse( + getCommitTimestampRequestsForClients(clientCount, (COMMIT_TS_LIMIT_PER_REQUEST - 1) * clientCount)); + } + + @Test + public void canServiceMultipleClientsWithMultipleServerCalls() { + int clientCount = 5; + setupServiceAndAssertSanityOfResponse( + getCommitTimestampRequestsForClients(clientCount, (COMMIT_TS_LIMIT_PER_REQUEST + 1) * clientCount)); + } + + @Test + public void updatesCacheWhileProcessingResponse() { + Namespace client = Namespace.of("Kitty"); + List> batchElements = IntStream.range(0, COMMIT_TS_LIMIT_PER_REQUEST * 2) + .mapToObj(ind -> batchElementForNamespace(client)) + .collect(toList()); + setupServiceAndAssertSanityOfResponse(batchElements); + + LockWatchEventCache cache = lockWatchEventCacheMap.get(client); + verify(cache, times(2)).processGetCommitTimestampsUpdate(any(), any()); + } + + @Test + public void doesNotUpdateCacheIfClientNotServed() { + Namespace alpha = Namespace.of("alpha" + UUID.randomUUID()); + Namespace beta = Namespace.of("beta" + UUID.randomUUID()); + + BatchElement requestForAlpha = batchElementForNamespace(alpha); + BatchElement requestForBeta = batchElementForNamespace(beta); + + List> allRequests = ImmutableList.of(requestForAlpha, requestForBeta); + List> alphaRequestList = ImmutableList.of(requestForAlpha); + Map responseMap = getCommitTimestamps(alphaRequestList); + + when(timelockService.getCommitTimestamps(any())).thenReturn(responseMap).thenThrow(EXCEPTION); + + assertThatThrownBy(() -> consumer.accept(allRequests)).isEqualTo(EXCEPTION); + + // assert requests made by client alpha are served + assertSanityOfResponse(alphaRequestList, ImmutableMap.of(alpha, ImmutableList.of(responseMap.get(alpha)))); + + LockWatchEventCache alphaCache = lockWatchEventCacheMap.get(alpha); + verify(alphaCache).processGetCommitTimestampsUpdate(any(), any()); + + assertThat(requestForBeta.result().isDone()) + .as("No requests made by client - beta were successful") + .isFalse(); + + LockWatchEventCache betaCache = lockWatchEventCacheMap.get(beta); + verify(betaCache, never()).processGetCommitTimestampsUpdate(any(), any()); + } + + private void setupServiceAndAssertSanityOfResponse(List> batch) { + Map> expectedResponseMap = new HashMap<>(); + + when(timelockService.getCommitTimestamps(any())).thenAnswer(invocation -> { + Map commitTimestamps = + getCommitTimestampResponse(invocation.getArgument(0)); + commitTimestamps.forEach((namespace, response) -> { + expectedResponseMap + .computeIfAbsent(namespace, _unused -> new ArrayList()) + .add(response); + }); + return commitTimestamps; + }); + + consumer.accept(batch); + assertSanityOfResponse(batch, expectedResponseMap); + } + + private void assertSanityOfResponse( + List> batch, + Map> expectedResponseMap) { + assertThat(batch.stream().filter(elem -> !elem.result().isDone()).collect(Collectors.toSet())) + .as("All requests must be served") + .isEmpty(); + + Map> partitionedResponseMap = batch.stream() + .collect(groupingBy( + elem -> elem.argument().namespace(), + Collectors.mapping(elem -> Futures.getUnchecked(elem.result()), toList()))); + + assertThat(partitionedResponseMap.keySet()).isEqualTo(expectedResponseMap.keySet()); + assertCorrectnessOfCompletedRequests(expectedResponseMap, partitionedResponseMap); + } + + private static void assertCorrectnessOfCompletedRequests( + Map> expectedResponseMap, + Map> partitionedResponseMap) { + KeyedStream.stream(partitionedResponseMap) + .forEach((namespace, commitTsList) -> + assertCorrectnessOfServedTimestamps(expectedResponseMap.get(namespace), commitTsList)); + } + + private static void assertCorrectnessOfServedTimestamps( + List expectedCommitTimestampsResponses, List commitTsList) { + long requestedCommitTsCount = expectedCommitTimestampsResponses.stream() + .mapToLong(resp -> resp.getInclusiveUpper() - resp.getInclusiveLower() + 1) + .sum(); + assertThat(requestedCommitTsCount) + .as("We should get as many commit timestamps as we asked for") + .isEqualTo(commitTsList.size()); + assertThat(ImmutableSet.copyOf(commitTsList)) + .as("There should be no duplicate timestamps") + .hasSameSizeAs(commitTsList); + assertThat(StreamEx.of(commitTsList) + .pairMap((first, second) -> first > second) + .anyMatch(x -> x)) + .as("Served timestamps should be in increasing order") + .isFalse(); + } + + private Map getCommitTimestamps( + List> batch) { + Map>> partitionedRequests = + batch.stream().collect(groupingBy(elem -> elem.argument().namespace(), toList())); + return getCommitTimestampResponse(KeyedStream.stream(partitionedRequests) + .map(requestList -> GetCommitTimestampsRequest.builder() + .numTimestamps(requestList.size()) + .build()) + .collectToMap()); + } + + private Map getCommitTimestampResponse( + Map requestMap) { + return KeyedStream.stream(requestMap) + .mapEntries((namespace, request) -> { + long inclusiveLower = getLowerBound(namespace); + long exclusiveUpper = + inclusiveLower + Math.min(request.getNumTimestamps(), COMMIT_TS_LIMIT_PER_REQUEST); + updateLowerBound(namespace, exclusiveUpper); + return Maps.immutableEntry( + namespace, + GetCommitTimestampsResponse.builder() + .inclusiveLower(inclusiveLower) + .inclusiveUpper(exclusiveUpper - 1) + .lockWatchUpdate(lockWatchStateUpdate) + .build()); + }) + .collectToMap(); + } + + private long getLowerBound(Namespace namespace) { + return lowestStartTsMap.getOrDefault(namespace, 1L); + } + + private void updateLowerBound(Namespace namespace, long numTimestamps) { + lowestStartTsMap.put(namespace, lowestStartTsMap.getOrDefault(namespace, 1L) + numTimestamps); + } + + private List> getCommitTimestampRequestsForClients( + int clientCount, int requestCount) { + List> test = IntStream.range(0, requestCount) + .mapToObj(ind -> batchElementForNamespace(Namespace.of("Test_" + (ind % clientCount)))) + .collect(Collectors.toList()); + return test; + } + + private BatchElement batchElementForNamespace(Namespace namespace) { + return BatchElement.of( + ImmutableNamespacedRequest.builder() + .namespace(namespace) + .startTs(1) + .cache(lockWatchEventCacheMap.computeIfAbsent( + namespace, _unused -> mock(LockWatchEventCache.class))) + .commitLocksToken(lockToken) + .build(), + new DisruptorFuture("test")); + } +}