Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Remove LockWatchStateUpdate#Failed. (#4820)
Browse files Browse the repository at this point in the history
* Naive fix for this.

* Remove fancy synchronization.

* Fixup

* Cleanup some more.

* Fix test

* Add a bit of comments and rename methods.

* Cleanup test

* Reflow.

* CR
  • Loading branch information
jkozlowski authored Jun 25, 2020
1 parent 1cb674b commit 27dbba7
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,6 @@ private void processFailed() {
}

private class ProcessingVisitor implements LockWatchStateUpdate.Visitor<CacheUpdate> {
@Override
public CacheUpdate visit(LockWatchStateUpdate.Failed failed) {
processFailed();
return CacheUpdate.FAILED;
}

@Override
public CacheUpdate visit(LockWatchStateUpdate.Success success) {
Expand All @@ -180,11 +175,6 @@ public CacheUpdate visit(LockWatchStateUpdate.Snapshot snapshotUpdate) {
}

private class NewLeaderVisitor implements LockWatchStateUpdate.Visitor<CacheUpdate> {
@Override
public CacheUpdate visit(LockWatchStateUpdate.Failed failed) {
processFailed();
return CacheUpdate.FAILED;
}

@Override
public CacheUpdate visit(LockWatchStateUpdate.Success success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,6 @@ public void getEventsForTransactionsReturnsSnapshotWithOldEvents() {
LOCK_EVENT_2);
}

@Test
public void failedUpdateClearsAllCaches() {
setupInitialState();
eventCache.processStartTransactionsUpdate(TIMESTAMPS_2, LockWatchStateUpdate.failed(LEADER));
verifyStage();
}

@Test
public void leaderChangeClearsCaches() {
setupInitialState();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = LockWatchStateUpdate.Failed.class, name = LockWatchStateUpdate.Failed.TYPE),
@JsonSubTypes.Type(value = LockWatchStateUpdate.Success.class, name = LockWatchStateUpdate.Success.TYPE),
@JsonSubTypes.Type(value = LockWatchStateUpdate.Snapshot.class, name = LockWatchStateUpdate.Snapshot.TYPE)})
public interface LockWatchStateUpdate {
UUID logId();
<T> T accept(Visitor<T> visitor);

static Failed failed(UUID logId) {
return ImmutableFailed.builder().logId(logId).build();
}

static Success success(UUID logId, long version, List<LockWatchEvent> events) {
return ImmutableSuccess.builder().logId(logId).lastKnownVersion(version).events(events).build();
}
Expand All @@ -57,24 +52,6 @@ static Snapshot snapshot(UUID logId, long version, Set<LockDescriptor> locked,
.build();
}

/**
* A failed update denotes that we were unable to get the difference since last known version, and we were also
* unable to compute a snapshot update.
*/
@Value.Immutable
@Value.Style(visibility = Value.Style.ImplementationVisibility.PACKAGE)
@JsonSerialize(as = ImmutableFailed.class)
@JsonDeserialize(as = ImmutableFailed.class)
@JsonTypeName(Failed.TYPE)
interface Failed extends LockWatchStateUpdate {
String TYPE = "failed";

@Override
default <T> T accept(Visitor<T> visitor) {
return visitor.visit(this);
}
}

/**
* A successful update is an update containing information about all lock watch events occurring since the previous
* last known version.
Expand Down Expand Up @@ -119,7 +96,6 @@ default <T> T accept(Visitor<T> visitor) {
}

interface Visitor<T> {
T visit(Failed failed);
T visit(Success success);
T visit(Snapshot snapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -202,7 +203,8 @@ private ConjureStartTransactionsResponse startTransactionsResponseWith(ConjureLo
LockToken.of(lockToken.getRequestId())))
.timestamps(partitionedTimestamps)
.lease(lease)
.lockWatchUpdate(LockWatchStateUpdate.failed(UUID.randomUUID()))
.lockWatchUpdate(LockWatchStateUpdate.snapshot(UUID.randomUUID(), -1, Collections.emptySet(),
Collections.emptySet()))
.build();
}

Expand Down
1 change: 1 addition & 0 deletions timelock-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
testCompile project(":atlasdb-tests-shared")

testCompile group: 'com.palantir.remoting2', name: 'jersey-servers'
testCompile group: 'com.palantir.safe-logging', name: 'preconditions-assertj'
testCompile group: 'com.palantir.conjure.java.api', name: 'test-utils'
testCompile group: 'org.assertj', name: 'assertj-core'
testCompile group: 'org.mockito', name: 'mockito-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.palantir.atlasdb.timelock.lock.Leased;
import com.palantir.atlasdb.timelock.lock.LockLog;
import com.palantir.atlasdb.timelock.lock.TimeLimit;
import com.palantir.atlasdb.timelock.lock.watch.ValueAndVersion;
import com.palantir.atlasdb.timelock.lock.watch.ValueAndLockWatchStateUpdate;
import com.palantir.atlasdb.timelock.transaction.timestamp.ClientAwareManagedTimestampService;
import com.palantir.atlasdb.timelock.transaction.timestamp.DelegatingClientAwareManagedTimestampService;
import com.palantir.lock.LockDescriptor;
Expand Down Expand Up @@ -200,23 +200,24 @@ private Leased<LockImmutableTimestampResponse> lockImmutableTimestampWithLease(U
}

@Override
public ListenableFuture<StartTransactionResponseV5> startTransactionsWithWatches(StartTransactionRequestV5 request) {
public ListenableFuture<StartTransactionResponseV5> startTransactionsWithWatches(
StartTransactionRequestV5 request) {
return Futures.immediateFuture(startTransactionsWithWatchesSync(request));
}

private StartTransactionResponseV5 startTransactionsWithWatchesSync(StartTransactionRequestV5 request) {
Leased<LockImmutableTimestampResponse> leasedLockImmutableTimestampResponse =
lockImmutableTimestampWithLease(request.requestId());

ValueAndVersion<PartitionedTimestamps> timestampsAndVersion = lockService.getLockWatchingService()
.runTaskAndAtomicallyReturnLockWatchVersion(() ->
ValueAndLockWatchStateUpdate<PartitionedTimestamps> timestampsAndUpdate = lockService.getLockWatchingService()
.runTask(request.lastKnownLockLogVersion(), () ->
timestampService.getFreshTimestampsForClient(request.requestorId(), request.numTransactions()));

return StartTransactionResponseV5.of(
leasedLockImmutableTimestampResponse.value(),
timestampsAndVersion.value(),
timestampsAndUpdate.value(),
leasedLockImmutableTimestampResponse.lease(),
getWatchStateUpdate(request.lastKnownLockLogVersion(), timestampsAndVersion.version()));
timestampsAndUpdate.lockWatchStateUpdate());
}

@Override
Expand Down Expand Up @@ -270,12 +271,7 @@ public LockWatchStateUpdate getWatchStateUpdate(Optional<IdentifiedVersion> last
}

@Override
public LockWatchStateUpdate getWatchStateUpdate(Optional<IdentifiedVersion> lastKnownVersion, long endVersion) {
return lockService.getLockWatchingService().getWatchStateUpdate(lastKnownVersion, endVersion);
}

@Override
public <T> ValueAndVersion<T> runTaskAndAtomicallyReturnLockWatchVersion(Supplier<T> task) {
public <T> ValueAndLockWatchStateUpdate<T> runTask(Optional<IdentifiedVersion> lastKnownVersion, Supplier<T> task) {
throw new UnsupportedOperationException("Exposing this method is too dangerous.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,18 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import javax.annotation.concurrent.ThreadSafe;
import javax.annotation.concurrent.NotThreadSafe;

import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import com.palantir.atlasdb.timelock.lock.watch.LockEventLogImpl.LockWatchCreatedEventReplayer;
import com.palantir.lock.watch.LockWatchCreatedEvent;
import com.palantir.lock.watch.LockWatchEvent;

@ThreadSafe
@NotThreadSafe
public class ArrayLockEventSlidingWindow {
private final LockWatchEvent[] buffer;
private final int maxSize;
private volatile long nextSequence = 0;
private long nextSequence = 0;

ArrayLockEventSlidingWindow(int maxSize) {
this.buffer = new LockWatchEvent[maxSize];
Expand All @@ -44,92 +41,37 @@ long lastVersion() {
return nextSequence - 1;
}

/**
* Adds an event to the sliding window. Assigns a unique sequence to the event.
*
* Note on concurrency:
* 1. Each write to buffer is followed by a write to nextSequence, which is volatile.
*/
synchronized void add(LockWatchEvent.Builder eventBuilder) {
void add(LockWatchEvent.Builder eventBuilder) {
LockWatchEvent event = eventBuilder.build(nextSequence);
buffer[LongMath.mod(nextSequence, maxSize)] = event;
nextSequence++;
}

synchronized void finalizeAndAddSnapshot(long startVersion, LockWatchCreatedEventReplayer eventReplayer) {
Optional<List<LockWatchEvent>> remaining = getFromVersion(startVersion);
if (remaining.isPresent()) {
remaining.get().forEach(eventReplayer::replay);
add(LockWatchCreatedEvent.builder(eventReplayer.getReferences(), eventReplayer.getLockedDescriptors()));
}
}

/**
* Warning: this will block all lock and unlock requests until the task is done. Improper use of this method can
* result in a deadlock.
*/
synchronized <T> ValueAndVersion<T> runTaskAndAtomicallyReturnVersion(Supplier<T> task) {
return ValueAndVersion.of(lastVersion(), task.get());
}

/**
* Returns a list of all events that occurred immediately after the requested version up to the most recent version,
* ordered by consecutively increasing sequence numbers. If the list cannot be created, either a priori or because
* new events are added to the window during execution of this method causing eviction an event before it was read,
* the method will return {@link Optional#empty()}.
*
* Note on concurrency:
* 2. Before reading from buffer, we read nextSequence.
*
* 1. and 2. ensure that calls to this method have an up to date view of buffer, containing all updates made so
* far. The buffer may be updated after the volatile read of nextSequence, and these updates may or may not be
* visible. This does not affect correctness:
* a) the newer updates are not expected to be reflected in the returned list
* b) if (some of) the newer updates are visible and overwrite a value that should have been included in the
* returned list, it may end up included in the candidate result. This will be detected by
* validateConsistencyOrReturnEmpty, and {@link Optional#empty()} will be returned. This correctly reflects
* the state where, even though all the necessary events were in the requested window at the start of executing
* this method, that is no longer the case when the method returns.
*/
public Optional<List<LockWatchEvent>> getFromVersion(long version) {
return getFromTo(version, lastVersion());
}

public Optional<List<LockWatchEvent>> getFromTo(long startVersion, long endVersion) {
if (versionInTheFuture(startVersion, endVersion) || versionTooOld(startVersion, endVersion)) {
public Optional<List<LockWatchEvent>> getNextEvents(long version) {
if (versionInTheFuture(version) || versionTooOld(version)) {
return Optional.empty();
}

int startIndex = LongMath.mod(startVersion + 1, maxSize);
int windowSize = Ints.saturatedCast(endVersion - startVersion);
int startIndex = LongMath.mod(version + 1, maxSize);
int windowSize = Ints.saturatedCast(lastVersion() - version);
List<LockWatchEvent> events = new ArrayList<>(windowSize);

for (int i = startIndex; events.size() < windowSize; i = incrementAndMod(i)) {
events.add(buffer[i]);
}

return validateConsistencyOrReturnEmpty(startVersion, events);
return Optional.of(events);
}

private int incrementAndMod(int num) {
num++;
return num >= maxSize ? num % maxSize : num;
}

private boolean versionInTheFuture(long lastVersion, long lastWrittenSequence) {
return lastVersion > lastWrittenSequence;
}

private boolean versionTooOld(long lastVersion, long lastWrittenSequence) {
return lastWrittenSequence - lastVersion > maxSize;
private boolean versionInTheFuture(long version) {
return version > lastVersion();
}

private Optional<List<LockWatchEvent>> validateConsistencyOrReturnEmpty(long version, List<LockWatchEvent> events) {
for (int i = 0; i < events.size(); i++) {
if (events.get(i).sequence() != i + version + 1) {
return Optional.empty();
}
}
return Optional.of(events);
private boolean versionTooOld(long version) {
return lastVersion() - version > maxSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

public interface LockEventLog {
LockWatchStateUpdate getLogDiff(Optional<IdentifiedVersion> fromVersion);
LockWatchStateUpdate getLogDiff(Optional<IdentifiedVersion> fromVersion, long toVersion);
<T> ValueAndVersion<T> runTaskAndAtomicallyReturnVersion(Supplier<T> task);
<T> ValueAndLockWatchStateUpdate<T> runTask(Optional<IdentifiedVersion> lastKnownVersion, Supplier<T> task);
void logLock(Set<LockDescriptor> locksTakenOut, LockToken lockToken);
void logUnlock(Set<LockDescriptor> locksUnlocked);
void logLockWatchCreated(LockWatches newWatches);
Expand Down
Loading

0 comments on commit 27dbba7

Please sign in to comment.