Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Remove LockWatchStateUpdate#Failed. #4820

Merged
merged 11 commits into from
Jun 25, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = LockWatchStateUpdate.Failed.class, name = LockWatchStateUpdate.Failed.TYPE),
@JsonSubTypes.Type(value = LockWatchStateUpdate.Success.class, name = LockWatchStateUpdate.Success.TYPE),
@JsonSubTypes.Type(value = LockWatchStateUpdate.Snapshot.class, name = LockWatchStateUpdate.Snapshot.TYPE)})
public interface LockWatchStateUpdate {
UUID logId();
<T> T accept(Visitor<T> visitor);

static Failed failed(UUID logId) {
return ImmutableFailed.builder().logId(logId).build();
}

static Success success(UUID logId, long version, List<LockWatchEvent> events) {
return ImmutableSuccess.builder().logId(logId).lastKnownVersion(version).events(events).build();
}
Expand All @@ -57,24 +52,6 @@ static Snapshot snapshot(UUID logId, long version, Set<LockDescriptor> locked,
.build();
}

/**
* A failed update denotes that we were unable to get the difference since last known version, and we were also
* unable to compute a snapshot update.
*/
@Value.Immutable
@Value.Style(visibility = Value.Style.ImplementationVisibility.PACKAGE)
@JsonSerialize(as = ImmutableFailed.class)
@JsonDeserialize(as = ImmutableFailed.class)
@JsonTypeName(Failed.TYPE)
interface Failed extends LockWatchStateUpdate {
String TYPE = "failed";

@Override
default <T> T accept(Visitor<T> visitor) {
return visitor.visit(this);
}
}

/**
* A successful update is an update containing information about all lock watch events occurring since the previous
* last known version.
Expand Down Expand Up @@ -119,7 +96,6 @@ default <T> T accept(Visitor<T> visitor) {
}

interface Visitor<T> {
T visit(Failed failed);
T visit(Success success);
T visit(Snapshot snapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -202,7 +203,8 @@ private ConjureStartTransactionsResponse startTransactionsResponseWith(ConjureLo
LockToken.of(lockToken.getRequestId())))
.timestamps(partitionedTimestamps)
.lease(lease)
.lockWatchUpdate(LockWatchStateUpdate.failed(UUID.randomUUID()))
.lockWatchUpdate(LockWatchStateUpdate.snapshot(UUID.randomUUID(), -1, Collections.emptySet(),
Collections.emptySet()))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.palantir.atlasdb.timelock.lock.Leased;
import com.palantir.atlasdb.timelock.lock.LockLog;
import com.palantir.atlasdb.timelock.lock.TimeLimit;
import com.palantir.atlasdb.timelock.lock.watch.ValueAndVersion;
import com.palantir.atlasdb.timelock.lock.watch.AtomicValue;
import com.palantir.atlasdb.timelock.transaction.timestamp.ClientAwareManagedTimestampService;
import com.palantir.atlasdb.timelock.transaction.timestamp.DelegatingClientAwareManagedTimestampService;
import com.palantir.lock.LockDescriptor;
Expand Down Expand Up @@ -200,23 +200,24 @@ private Leased<LockImmutableTimestampResponse> lockImmutableTimestampWithLease(U
}

@Override
public ListenableFuture<StartTransactionResponseV5> startTransactionsWithWatches(StartTransactionRequestV5 request) {
public ListenableFuture<StartTransactionResponseV5> startTransactionsWithWatches(
StartTransactionRequestV5 request) {
return Futures.immediateFuture(startTransactionsWithWatchesSync(request));
}

private StartTransactionResponseV5 startTransactionsWithWatchesSync(StartTransactionRequestV5 request) {
Leased<LockImmutableTimestampResponse> leasedLockImmutableTimestampResponse =
lockImmutableTimestampWithLease(request.requestId());

ValueAndVersion<PartitionedTimestamps> timestampsAndVersion = lockService.getLockWatchingService()
.runTaskAndAtomicallyReturnLockWatchVersion(() ->
AtomicValue<PartitionedTimestamps> timestampsAndVersion = lockService.getLockWatchingService()
.runTaskAndAtomicallyReturnLockWatchStateUpdate(request.lastKnownLockLogVersion(), () ->
timestampService.getFreshTimestampsForClient(request.requestorId(), request.numTransactions()));

return StartTransactionResponseV5.of(
leasedLockImmutableTimestampResponse.value(),
timestampsAndVersion.value(),
leasedLockImmutableTimestampResponse.lease(),
getWatchStateUpdate(request.lastKnownLockLogVersion(), timestampsAndVersion.version()));
timestampsAndVersion.lockWatchStateUpdate());
}

@Override
Expand Down Expand Up @@ -270,12 +271,8 @@ public LockWatchStateUpdate getWatchStateUpdate(Optional<IdentifiedVersion> last
}

@Override
public LockWatchStateUpdate getWatchStateUpdate(Optional<IdentifiedVersion> lastKnownVersion, long endVersion) {
return lockService.getLockWatchingService().getWatchStateUpdate(lastKnownVersion, endVersion);
}

@Override
public <T> ValueAndVersion<T> runTaskAndAtomicallyReturnLockWatchVersion(Supplier<T> task) {
public <T> AtomicValue<T> runTaskAndAtomicallyReturnLockWatchStateUpdate(
Optional<IdentifiedVersion> lastKnownVersion, Supplier<T> task) {
throw new UnsupportedOperationException("Exposing this method is too dangerous.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import javax.annotation.concurrent.ThreadSafe;

import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import com.palantir.atlasdb.timelock.lock.watch.LockEventLogImpl.LockWatchCreatedEventReplayer;
import com.palantir.lock.watch.LockWatchCreatedEvent;
import com.palantir.lock.watch.LockWatchEvent;
import com.palantir.logsafe.Preconditions;

@ThreadSafe
public class ArrayLockEventSlidingWindow {
jkozlowski marked this conversation as resolved.
Show resolved Hide resolved
private final LockWatchEvent[] buffer;
private final int maxSize;
private volatile long nextSequence = 0;
private long nextSequence = 0;

ArrayLockEventSlidingWindow(int maxSize) {
this.buffer = new LockWatchEvent[maxSize];
Expand All @@ -44,92 +38,43 @@ long lastVersion() {
return nextSequence - 1;
}

/**
* Adds an event to the sliding window. Assigns a unique sequence to the event.
*
* Note on concurrency:
* 1. Each write to buffer is followed by a write to nextSequence, which is volatile.
*/
synchronized void add(LockWatchEvent.Builder eventBuilder) {
void add(LockWatchEvent.Builder eventBuilder) {
LockWatchEvent event = eventBuilder.build(nextSequence);
buffer[LongMath.mod(nextSequence, maxSize)] = event;
nextSequence++;
}

synchronized void finalizeAndAddSnapshot(long startVersion, LockWatchCreatedEventReplayer eventReplayer) {
Optional<List<LockWatchEvent>> remaining = getFromVersion(startVersion);
if (remaining.isPresent()) {
remaining.get().forEach(eventReplayer::replay);
add(LockWatchCreatedEvent.builder(eventReplayer.getReferences(), eventReplayer.getLockedDescriptors()));
}
boolean contains(long version) {
return !validateVersion(version);
jkozlowski marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Warning: this will block all lock and unlock requests until the task is done. Improper use of this method can
* result in a deadlock.
*/
synchronized <T> ValueAndVersion<T> runTaskAndAtomicallyReturnVersion(Supplier<T> task) {
return ValueAndVersion.of(lastVersion(), task.get());
}

/**
* Returns a list of all events that occurred immediately after the requested version up to the most recent version,
* ordered by consecutively increasing sequence numbers. If the list cannot be created, either a priori or because
* new events are added to the window during execution of this method causing eviction an event before it was read,
* the method will return {@link Optional#empty()}.
*
* Note on concurrency:
* 2. Before reading from buffer, we read nextSequence.
*
* 1. and 2. ensure that calls to this method have an up to date view of buffer, containing all updates made so
* far. The buffer may be updated after the volatile read of nextSequence, and these updates may or may not be
* visible. This does not affect correctness:
* a) the newer updates are not expected to be reflected in the returned list
* b) if (some of) the newer updates are visible and overwrite a value that should have been included in the
* returned list, it may end up included in the candidate result. This will be detected by
* validateConsistencyOrReturnEmpty, and {@link Optional#empty()} will be returned. This correctly reflects
* the state where, even though all the necessary events were in the requested window at the start of executing
* this method, that is no longer the case when the method returns.
*/
public Optional<List<LockWatchEvent>> getFromVersion(long version) {
return getFromTo(version, lastVersion());
}

public Optional<List<LockWatchEvent>> getFromTo(long startVersion, long endVersion) {
if (versionInTheFuture(startVersion, endVersion) || versionTooOld(startVersion, endVersion)) {
return Optional.empty();
}

int startIndex = LongMath.mod(startVersion + 1, maxSize);
int windowSize = Ints.saturatedCast(endVersion - startVersion);
public List<LockWatchEvent> getFromVersion(long version) {
Preconditions.checkArgument(contains(version), "Version not in the log");
int startIndex = LongMath.mod(version + 1, maxSize);
int windowSize = Ints.saturatedCast(lastVersion() - version);
List<LockWatchEvent> events = new ArrayList<>(windowSize);

for (int i = startIndex; events.size() < windowSize; i = incrementAndMod(i)) {
events.add(buffer[i]);
}

return validateConsistencyOrReturnEmpty(startVersion, events);
return events;
}

private int incrementAndMod(int num) {
num++;
return num >= maxSize ? num % maxSize : num;
}

private boolean versionInTheFuture(long lastVersion, long lastWrittenSequence) {
return lastVersion > lastWrittenSequence;
private boolean validateVersion(long version) {
jkozlowski marked this conversation as resolved.
Show resolved Hide resolved
return versionInTheFuture(version) || versionTooOld(version);
}

private boolean versionTooOld(long lastVersion, long lastWrittenSequence) {
return lastWrittenSequence - lastVersion > maxSize;
private boolean versionInTheFuture(long version) {
return version > lastVersion();
}

private Optional<List<LockWatchEvent>> validateConsistencyOrReturnEmpty(long version, List<LockWatchEvent> events) {
for (int i = 0; i < events.size(); i++) {
if (events.get(i).sequence() != i + version + 1) {
return Optional.empty();
}
}
return Optional.of(events);
private boolean versionTooOld(long version) {
return lastVersion() - version > maxSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

import org.immutables.value.Value;

import com.palantir.lock.watch.LockWatchStateUpdate;

@Value.Immutable
@Value.Style(visibility = Value.Style.ImplementationVisibility.PACKAGE)
public interface ValueAndVersion<T> {
public interface AtomicValue<T> {
@Value.Parameter
long version();
LockWatchStateUpdate lockWatchStateUpdate();
@Value.Parameter
T value();

static <R> ValueAndVersion<R> of(long version, R result) {
return ImmutableValueAndVersion.of(version, result);
static <R> AtomicValue<R> of(LockWatchStateUpdate lockWatchStateUpdate, R result) {
return ImmutableAtomicValue.of(lockWatchStateUpdate, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

public interface LockEventLog {
LockWatchStateUpdate getLogDiff(Optional<IdentifiedVersion> fromVersion);
LockWatchStateUpdate getLogDiff(Optional<IdentifiedVersion> fromVersion, long toVersion);
<T> ValueAndVersion<T> runTaskAndAtomicallyReturnVersion(Supplier<T> task);
<T> AtomicValue<T> runTaskAndAtomicallyReturnLockWatchStateUpdate(
Optional<IdentifiedVersion> lastKnownVersion, Supplier<T> task);
void logLock(Set<LockDescriptor> locksTakenOut, LockToken lockToken);
void logUnlock(Set<LockDescriptor> locksUnlocked);
void logLockWatchCreated(LockWatches newWatches);
Expand Down
Loading