Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Reduce allocations in VersionedEventStore#retentionEvents (#7118)
Browse files Browse the repository at this point in the history
Reduce allocations in VersionedEventStore#retentionEvents
  • Loading branch information
schlosna authored Jun 24, 2024
1 parent bc2f357 commit caed2dd
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private LockWatchEventLog(ClientLockWatchSnapshot snapshot, CacheMetrics metrics

CacheUpdate processUpdate(LockWatchStateUpdate update) {
if (latestVersion.isEmpty()
|| !update.logId().equals(latestVersion.get().id())) {
|| !update.logId().equals(latestVersion.orElseThrow().id())) {
return update.accept(new NewLeaderVisitor());
} else {
return update.accept(new ProcessingVisitor());
Expand Down Expand Up @@ -134,11 +134,9 @@ private ClientLogEvents getEventsBetweenVersionsInternal(VersionBounds versionBo
+ "transactions"));
return ClientLogEvents.builder()
.clearCache(false)
.events(LockWatchEvents.builder()
.addAllEvents(eventStore.getEventsBetweenVersionsInclusive(
Optional.of(startVersion.get().version()),
versionBounds.endVersion().version()))
.build())
.events(LockWatchEvents.of(eventStore.getEventsBetweenVersionsInclusive(
Optional.of(startVersion.get().version()),
versionBounds.endVersion().version())))
.build();
}
}
Expand Down Expand Up @@ -174,8 +172,7 @@ private LockWatchEvent getCompressedSnapshot(VersionBounds versionBounds) {
long snapshotVersion = versionBounds.snapshotVersion();
Collection<LockWatchEvent> collapsibleEvents =
eventStore.getEventsBetweenVersionsInclusive(Optional.empty(), snapshotVersion);
LockWatchEvents events =
LockWatchEvents.builder().addAllEvents(collapsibleEvents).build();
LockWatchEvents events = LockWatchEvents.of(collapsibleEvents);

return LockWatchCreatedEvent.fromSnapshot(snapshot.getSnapshotWithEvents(events, versionBounds.leader()));
}
Expand Down Expand Up @@ -211,7 +208,7 @@ private LockWatchVersion createStartVersion(LockWatchVersion startVersion) {

private LockWatchVersion getLatestVersionAndVerify(LockWatchVersion endVersion) {
Preconditions.checkState(latestVersion.isPresent(), "Cannot get events when log does not know its version");
LockWatchVersion currentVersion = latestVersion.get();
LockWatchVersion currentVersion = latestVersion.orElseThrow();
Preconditions.checkArgument(
endVersion.version() <= currentVersion.version(),
"Transactions' view of the world is more up-to-date than the log");
Expand Down Expand Up @@ -243,9 +240,8 @@ private void processSuccessInternal(LockWatchStateUpdate.Success success) {
+ " should only happen very rarely.");
}

if (success.lastKnownVersion() > latestVersion.get().version()) {
LockWatchEvents events =
LockWatchEvents.builder().events(success.events()).build();
if (success.lastKnownVersion() > latestVersion.orElseThrow().version()) {
LockWatchEvents events = LockWatchEvents.of(success.events());
if (events.events().isEmpty()) {
throw new TransactionLockWatchFailedException("Success event has a later version than the current "
+ "version, but has no events to bridge the gap. The transaction should be retried, but this "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.atlasdb.keyvalue.api.watch;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.palantir.lock.watch.LockWatchEvent;
import com.palantir.lock.watch.LockWatchVersion;
Expand All @@ -27,6 +28,8 @@

@Value.Immutable
public interface LockWatchEvents {

@Value.Parameter
List<LockWatchEvent> events();

@Value.Derived
Expand Down Expand Up @@ -55,7 +58,7 @@ default void contiguousSequence() {
@Value.Check
default void rangeOnlyPresentIffEventsAre() {
if (events().isEmpty()) {
Preconditions.checkState(!versionRange().isPresent(), "Cannot have a version range with no events");
Preconditions.checkState(versionRange().isEmpty(), "Cannot have a version range with no events");
} else {
Preconditions.checkState(versionRange().isPresent(), "Non-empty events must have a version range");
}
Expand All @@ -67,7 +70,7 @@ default void assertNoEventsAreMissingAfterLatestVersion(Optional<LockWatchVersio
}

if (latestVersion.isPresent()) {
long firstVersion = versionRange().get().lowerEndpoint();
long firstVersion = versionRange().orElseThrow().lowerEndpoint();
Preconditions.checkArgument(
firstVersion <= latestVersion.get().version()
|| latestVersion.get().version() + 1 == firstVersion,
Expand All @@ -80,4 +83,12 @@ default void assertNoEventsAreMissingAfterLatestVersion(Optional<LockWatchVersio
static ImmutableLockWatchEvents.Builder builder() {
return ImmutableLockWatchEvents.builder();
}

static LockWatchEvents of(Iterable<LockWatchEvent> events) {
return ImmutableLockWatchEvents.of(events);
}

static LockWatchEvents empty() {
return of(ImmutableList.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import com.palantir.logsafe.UnsafeArg;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class VersionedEventStore {
private static final boolean INCLUSIVE = true;
Expand Down Expand Up @@ -60,41 +61,31 @@ Collection<LockWatchEvent> getEventsBetweenVersionsInclusive(Optional<Long> mayb
}

LockWatchEvents retentionEvents(Optional<Sequence> earliestSequenceToKeep) {
if (eventMap.size() < minEvents) {
return LockWatchEvents.builder().build();
int numToRetention = eventMap.size() - minEvents;
if (numToRetention <= 0) {
return LockWatchEvents.empty();
}

// Guarantees that we remove some events while still also potentially performing further retention - note
// that each call to retentionEventsInternal modifies eventMap.
Stream<LockWatchEvent> events = retentionEvents(numToRetention, earliestSequenceToKeep.orElse(MAX_VERSION));

// Guarantees that we remove some events while still also potentially performing further retention.
// Note that consuming elements from retentionEvents stream removes them from eventMap.
if (eventMap.size() > maxEvents) {
List<LockWatchEvent> overMaxSizeEvents = retentionEventsInternal(eventMap.size() - maxEvents, MAX_VERSION);
List<LockWatchEvent> restOfEvents =
retentionEventsInternal(eventMap.size() - minEvents, earliestSequenceToKeep.orElse(MAX_VERSION));
return ImmutableLockWatchEvents.builder()
.addAllEvents(overMaxSizeEvents)
.addAllEvents(restOfEvents)
.build();
} else {
return ImmutableLockWatchEvents.builder()
.addAllEvents(retentionEventsInternal(
eventMap.size() - minEvents, earliestSequenceToKeep.orElse(MAX_VERSION)))
.build();
Stream<LockWatchEvent> overMaxSizeEvents = retentionEvents(eventMap.size() - maxEvents, MAX_VERSION);
return ImmutableLockWatchEvents.of(Stream.concat(overMaxSizeEvents, events)
.collect(Collectors.toCollection(() -> new ArrayList<>(maxEvents))));
}
return ImmutableLockWatchEvents.of(events.collect(Collectors.toCollection(() -> new ArrayList<>(minEvents))));
}

private List<LockWatchEvent> retentionEventsInternal(int numToRetention, Sequence maxVersion) {
List<LockWatchEvent> events = new ArrayList<>(numToRetention);

private Stream<LockWatchEvent> retentionEvents(int numToRetention, Sequence maxVersion) {
// The correctness of this depends upon eventMap's entrySet returning entries in ascending sorted order.
eventMap.entrySet().stream()
.takeWhile(entry -> entry.getKey().value() < maxVersion.value())
return eventMap.headMap(maxVersion).entrySet().stream()
.limit(numToRetention)
.forEachOrdered(entry -> {
.map(entry -> {
eventMap.remove(entry.getKey());
events.add(entry.getValue());
return entry.getValue();
});

return events;
}

boolean containsEntryLessThanOrEqualTo(long version) {
Expand Down
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-7118.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Reduce allocations in VersionedEventStore#retentionEvents
links:
- https://github.com/palantir/atlasdb/pull/7118

0 comments on commit caed2dd

Please sign in to comment.