Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[CCB] | MultiClientCommitTimestampGetter refactor - part 2 (#5294)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudiksha27 authored Mar 3, 2021
1 parent ab2128a commit 7f17418
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 3 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-5294.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: AtlasDb client can now batch getCommitTimestamps requests across namespaces.
links:
- https://github.com/palantir/atlasdb/pull/5294
1 change: 1 addition & 0 deletions lock-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
compile group: 'com.palantir.safe-logging', name: 'safe-logging'
compile group: 'com.palantir.safe-logging', name: 'preconditions'
compile group: 'com.palantir.refreshable', name: 'refreshable'
compile group: 'one.util', name: 'streamex'

annotationProcessor project(":atlasdb-processors")
compileOnly project(":atlasdb-processors")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.lock.client;

import static com.palantir.lock.client.ConjureLockRequests.toConjure;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import com.palantir.atlasdb.autobatch.Autobatchers;
import com.palantir.atlasdb.autobatch.BatchElement;
import com.palantir.atlasdb.autobatch.DisruptorAutobatcher;
import com.palantir.atlasdb.futures.AtlasFutures;
import com.palantir.atlasdb.timelock.api.GetCommitTimestampsRequest;
import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.v2.LockToken;
import com.palantir.lock.watch.ImmutableTransactionUpdate;
import com.palantir.lock.watch.LockWatchEventCache;
import com.palantir.lock.watch.LockWatchStateUpdate;
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.lock.watch.TransactionUpdate;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.immutables.value.Value;

public final class MultiClientCommitTimestampGetter implements AutoCloseable {
private final DisruptorAutobatcher<NamespacedRequest, Long> autobatcher;

private MultiClientCommitTimestampGetter(DisruptorAutobatcher<NamespacedRequest, Long> autobatcher) {
this.autobatcher = autobatcher;
}

public static MultiClientCommitTimestampGetter create(InternalMultiClientConjureTimelockService delegate) {
DisruptorAutobatcher<NamespacedRequest, Long> autobatcher = Autobatchers.independent(consumer(delegate))
.safeLoggablePurpose("multi-client-commit-timestamp-getter")
.build();
return new MultiClientCommitTimestampGetter(autobatcher);
}

public long getCommitTimestamp(Namespace namespace, long startTs, LockToken commitLocksToken) {
return AtlasFutures.getUnchecked(autobatcher.apply(ImmutableNamespacedRequest.builder()
.namespace(namespace)
.startTs(startTs)
.commitLocksToken(commitLocksToken)
.build()));
}

@VisibleForTesting
static Consumer<List<BatchElement<NamespacedRequest, Long>>> consumer(
InternalMultiClientConjureTimelockService delegate) {
return batch -> {
BatchStateManager batchStateManager = BatchStateManager.createFromRequestBatch(batch);
while (batchStateManager.hasPendingRequests()) {
batchStateManager.processResponse(delegate.getCommitTimestamps(batchStateManager.getRequests()));
}
};
}

private static final class BatchStateManager {
private final Map<Namespace, NamespacedBatchStateManager> requestMap;

private BatchStateManager(Map<Namespace, NamespacedBatchStateManager> requestMap) {
this.requestMap = requestMap;
}

static BatchStateManager createFromRequestBatch(List<BatchElement<NamespacedRequest, Long>> batch) {
Map<Namespace, NamespacedBatchStateManager> requestMap = new HashMap<>();

for (BatchElement<NamespacedRequest, Long> elem : batch) {
NamespacedRequest argument = elem.argument();
Namespace namespace = argument.namespace();
NamespacedBatchStateManager namespacedBatchStateManager = requestMap.computeIfAbsent(
namespace, _unused -> new NamespacedBatchStateManager(argument.cache()));
namespacedBatchStateManager.addRequest(elem);
}

return new BatchStateManager(requestMap);
}

private boolean hasPendingRequests() {
return requestMap.values().stream().anyMatch(NamespacedBatchStateManager::hasPendingRequests);
}

private Map<Namespace, GetCommitTimestampsRequest> getRequests() {
return KeyedStream.stream(requestMap)
.filter(NamespacedBatchStateManager::hasPendingRequests)
.map(NamespacedBatchStateManager::getRequestForServer)
.collectToMap();
}

private void processResponse(Map<Namespace, GetCommitTimestampsResponse> responseMap) {
responseMap.forEach((namespace, getCommitTimestampsResponse) ->
requestMap.get(namespace).serviceRequests(getCommitTimestampsResponse));
}
}

private static final class NamespacedBatchStateManager {
private final Queue<BatchElement<NamespacedRequest, Long>> pendingRequestQueue;
private final LockWatchEventCache cache;
private Optional<LockWatchVersion> lastKnownVersion;

private NamespacedBatchStateManager(LockWatchEventCache cache) {
this.pendingRequestQueue = new ArrayDeque<>();
this.cache = cache;
this.lastKnownVersion = Optional.empty();
}

private boolean hasPendingRequests() {
return !pendingRequestQueue.isEmpty();
}

private void addRequest(BatchElement<NamespacedRequest, Long> elem) {
pendingRequestQueue.add(elem);
}

private GetCommitTimestampsRequest getRequestForServer() {
return GetCommitTimestampsRequest.builder()
.numTimestamps(pendingRequestQueue.size())
.lastKnownVersion(toConjure(updateAndGetLastKnownVersion()))
.build();
}

private Optional<LockWatchVersion> updateAndGetLastKnownVersion() {
lastKnownVersion = cache.lastKnownVersion();
return lastKnownVersion;
}

private void serviceRequests(GetCommitTimestampsResponse commitTimestampsResponse) {
List<Long> commitTimestamps = getCommitTimestampValues(commitTimestampsResponse);

processLockWatchUpdate(commitTimestamps, commitTimestampsResponse.getLockWatchUpdate());
LockWatchLogUtility.logTransactionEvents(lastKnownVersion, commitTimestampsResponse.getLockWatchUpdate());

for (Long commitTimestamp : commitTimestamps) {
pendingRequestQueue.poll().result().set(commitTimestamp);
}
}

private List<Long> getCommitTimestampValues(GetCommitTimestampsResponse commitTimestampsResponse) {
return LongStream.rangeClosed(
commitTimestampsResponse.getInclusiveLower(), commitTimestampsResponse.getInclusiveUpper())
.boxed()
.collect(Collectors.toList());
}

private void processLockWatchUpdate(List<Long> timestamps, LockWatchStateUpdate lockWatchUpdate) {
List<TransactionUpdate> transactionUpdates = Streams.zip(
timestamps.stream(),
pendingRequestQueue.stream(),
(commitTs, batchElement) -> ImmutableTransactionUpdate.builder()
.startTs(batchElement.argument().startTs())
.commitTs(commitTs)
.writesToken(batchElement.argument().commitLocksToken())
.build())
.collect(Collectors.toList());
cache.processGetCommitTimestampsUpdate(transactionUpdates, lockWatchUpdate);
}
}

@Override
public void close() {
autobatcher.close();
}

@Value.Immutable
interface NamespacedRequest {
Namespace namespace();

long startTs();

LockToken commitLocksToken();

LockWatchEventCache cache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.v2.StartIdentifiedAtlasDbTransactionResponse;
import com.palantir.lock.watch.StartTransactionsLockWatchEventCache;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -278,8 +278,8 @@ private static final class ResponseHandler implements AutoCloseable {
private final LockCleanupService lockCleanupService;

ResponseHandler(LockCleanupService lockCleanupService) {
this.pendingFutures = new LinkedList<>();
this.transientResponseList = new LinkedList<>();
this.pendingFutures = new ArrayDeque<>();
this.transientResponseList = new ArrayDeque<>();
this.lockCleanupService = lockCleanupService;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* (c) Copyright 2021 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.lock.client;

import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.lock.v2.LockToken;

public class NamespacedCommitTimestampGetter implements CommitTimestampGetter {
private final Namespace namespace;
private final MultiClientCommitTimestampGetter batcher;

public NamespacedCommitTimestampGetter(Namespace namespace, MultiClientCommitTimestampGetter batcher) {
this.namespace = namespace;
this.batcher = batcher;
}

@Override
public long getCommitTimestamp(long startTs, LockToken commitLocksToken) {
return batcher.getCommitTimestamp(namespace, startTs, commitLocksToken);
}

@Override
public void close() {
batcher.close();
}
}
Loading

0 comments on commit 7f17418

Please sign in to comment.