Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Revert "[TTS] Readonly transactions on TTS (#6212)" (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudiksha27 authored Sep 30, 2022
1 parent 9587c74 commit 153b09b
Show file tree
Hide file tree
Showing 43 changed files with 334 additions and 937 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@
import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.transaction.impl.TransactionTables;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.transaction.service.TransactionServices;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.timelock.paxos.InMemoryTimeLockRule;
import com.palantir.timestamp.ManagedTimestampService;
Expand Down Expand Up @@ -347,11 +345,8 @@ public void atomicTablesDelegatedToSourceAreNotDropped() {

private static AtlasDbServices createMock(KeyValueService kvs, InMemoryTimeLockRule timeLock) {
ManagedTimestampService timestampService = timeLock.getManagedTimestampService();
MetricsManager metricsManager = MetricsManagers.createForTests();

TransactionTables.createTables(kvs);
TransactionKnowledgeComponents knowledge =
TransactionKnowledgeComponents.createForTests(kvs, metricsManager.getTaggedRegistry());
TransactionService transactionService = spy(TransactionServices.createRaw(kvs, timestampService, false));

AtlasDbServices mockServices = mock(AtlasDbServices.class);
Expand All @@ -360,7 +355,7 @@ private static AtlasDbServices createMock(KeyValueService kvs, InMemoryTimeLockR
when(mockServices.getKeyValueService()).thenReturn(kvs);
TargetedSweeper sweeper = TargetedSweeper.createUninitializedForTest(() -> 1);
SerializableTransactionManager txManager = SerializableTransactionManager.createForTest(
metricsManager,
MetricsManagers.createForTests(),
kvs,
timeLock.getLegacyTimelockService(),
timestampService,
Expand All @@ -373,8 +368,7 @@ private static AtlasDbServices createMock(KeyValueService kvs, InMemoryTimeLockR
new NoOpCleaner(),
16,
4,
sweeper,
knowledge);
sweeper);
sweeper.initialize(txManager);
when(mockServices.getTransactionManager()).thenReturn(txManager);
return mockServices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.transaction.impl;

import com.google.common.annotations.VisibleForTesting;
import com.palantir.atlasdb.transaction.service.TransactionStatus;
import com.palantir.atlasdb.transaction.service.TransactionStatuses;
import com.palantir.logsafe.SafeArg;
Expand All @@ -34,11 +33,7 @@ public static TransactionStatus fromTimestamp(long timestamp) {
return TransactionStatuses.committed(timestamp);
}

/**
* This helper is only meant to be used for transactions with schema < 4. For schemas >= 4,
* use {@link #getCommitTsFromStatus(long, TransactionStatus, Function)}
* */
public static long getCommitTimestampIfKnown(TransactionStatus status) {
public static long getCommitTimestampOrThrow(TransactionStatus status) {
return TransactionStatuses.caseOf(status)
.committed(Function.identity())
.aborted_(TransactionConstants.FAILED_COMMIT_TS)
Expand All @@ -47,32 +42,10 @@ public static long getCommitTimestampIfKnown(TransactionStatus status) {
});
}

/**
* This helper is only meant to be used for transactions with schema < 4. For schemas >= 4,
* use {@link #getCommitTsFromStatus(long, TransactionStatus, Function)}
* */
public static Optional<Long> maybeGetCommitTs(TransactionStatus status) {
return TransactionStatuses.caseOf(status)
.committed(Function.identity())
.aborted_(TransactionConstants.FAILED_COMMIT_TS)
.otherwiseEmpty();
}

public static Long getCommitTsFromStatus(
long startTs, TransactionStatus status, Function<Long, Boolean> abortedCheck) {
return TransactionStatuses.caseOf(status)
.unknown(() -> getCommitTsForConcludedTransaction(startTs, abortedCheck))
.otherwise(() -> TransactionStatusUtils.maybeGetCommitTs(status).orElse(null));
}

public static long getCommitTsForConcludedTransaction(long startTs, Function<Long, Boolean> isAborted) {
return isAborted.apply(startTs)
? TransactionConstants.FAILED_COMMIT_TS
: getCommitTsForNonAbortedUnknownTransaction(startTs);
}

@VisibleForTesting
static long getCommitTsForNonAbortedUnknownTransaction(long startTs) {
return startTs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import com.palantir.atlasdb.transaction.impl.consistency.ImmutableTimestampCorroborationConsistencyCheck;
import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext;
import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.transaction.service.TransactionServices;
import com.palantir.atlasdb.util.AtlasDbMetrics;
Expand Down Expand Up @@ -426,11 +425,8 @@ private TransactionManager serializableInternal(@Output List<AutoCloseable> clos
TransactionManagersInitializer initializer = TransactionManagersInitializer.createInitialTables(
keyValueService, schemas(), config().initializeAsync(), allSafeForLogging());

TransactionKnowledgeComponents knowledge = TransactionKnowledgeComponents.create(
keyValueService, metricsManager.getTaggedRegistry(), config().internalSchema());

TransactionComponents components = createTransactionComponents(
closeables, metricsManager, knowledge, lockAndTimestampServices, keyValueService, runtime);
closeables, metricsManager, lockAndTimestampServices, keyValueService, runtime);
TransactionService transactionService = components.transactionService();
ConflictDetectionManager conflictManager = ConflictDetectionManagers.create(keyValueService);
SweepStrategyManager sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService);
Expand Down Expand Up @@ -506,8 +502,7 @@ private TransactionManager serializableInternal(@Output List<AutoCloseable> clos
transactionConfigSupplier,
conflictTracer,
metricsFilterEvaluationContext(),
installConfig.sharedResourcesConfig().map(SharedResourcesConfig::sharedGetRangesPoolSize),
knowledge),
installConfig.sharedResourcesConfig().map(SharedResourcesConfig::sharedGetRangesPoolSize)),
closeables);

transactionManager.registerClosingCallback(runtimeConfigRefreshable::close);
Expand Down Expand Up @@ -677,21 +672,20 @@ private static boolean targetedSweepIsEnabled(Supplier<AtlasDbRuntimeConfig> run
private TransactionComponents createTransactionComponents(
@Output List<AutoCloseable> closeables,
MetricsManager metricsManager,
TransactionKnowledgeComponents knowledgeCache,
LockAndTimestampServices lockAndTimestampServices,
KeyValueService keyValueService,
Supplier<AtlasDbRuntimeConfig> runtimeConfigSupplier) {
CoordinationService<InternalSchemaMetadata> coordinationService =
getSchemaMetadataCoordinationService(metricsManager, lockAndTimestampServices, keyValueService);
TransactionSchemaManager transactionSchemaManager = new TransactionSchemaManager(coordinationService);

TransactionService transactionService = initializeCloseable(
() -> AtlasDbMetrics.instrumentTimed(
metricsManager.getRegistry(),
TransactionService.class,
TransactionServices.createTransactionService(
keyValueService,
transactionSchemaManager,
knowledgeCache,
metricsManager.getTaggedRegistry(),
() -> runtimeConfigSupplier
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.sweep.SweepTaskRunner;
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.lock.LockService;
import com.palantir.lock.v2.TimelockService;
Expand Down Expand Up @@ -61,8 +60,6 @@ public abstract class AtlasDbServices implements AutoCloseable {

public abstract TransactionService getTransactionService();

public abstract TransactionKnowledgeComponents getTransactionKnowledgeComponents();

@Override
public void close() {
getTransactionManager().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.palantir.atlasdb.services;

import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.config.AtlasDbConfig;
import com.palantir.atlasdb.config.SweepConfig;
import com.palantir.atlasdb.coordination.CoordinationService;
import com.palantir.atlasdb.internalschema.InternalSchemaMetadata;
Expand All @@ -37,7 +36,6 @@
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers;
import com.palantir.atlasdb.transaction.impl.TransactionTables;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.transaction.service.TransactionServices;
import com.palantir.atlasdb.util.AtlasDbMetrics;
Expand Down Expand Up @@ -87,18 +85,8 @@ public KeyValueService provideWrappedKeyValueService(
@Provides
@Singleton
public TransactionService provideTransactionService(
@Named("kvs") KeyValueService kvs,
CoordinationService<InternalSchemaMetadata> coordinationService,
TransactionKnowledgeComponents knowledge) {
return TransactionServices.createTransactionService(
kvs, new TransactionSchemaManager(coordinationService), knowledge);
}

@Provides
@Singleton
public TransactionKnowledgeComponents provideTransactionKnowledgeComponents(
@Named("kvs") KeyValueService kvs, MetricsManager metricsManager, AtlasDbConfig config) {
return TransactionKnowledgeComponents.create(kvs, metricsManager.getTaggedRegistry(), config.internalSchema());
@Named("kvs") KeyValueService kvs, CoordinationService<InternalSchemaMetadata> coordinationService) {
return TransactionServices.createTransactionService(kvs, new TransactionSchemaManager(coordinationService));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
Expand Down Expand Up @@ -111,12 +110,12 @@ public SerializableTransactionManager provideTransactionManager(
ServicesConfig config,
@Named("kvs") KeyValueService kvs,
LockAndTimestampServices lts,
LockClient lockClient,
TransactionService transactionService,
ConflictDetectionManager conflictManager,
SweepStrategyManager sweepStrategyManager,
Cleaner cleaner,
@Internal DerivedSnapshotConfig derivedSnapshotConfig,
TransactionKnowledgeComponents knowledge) {
@Internal DerivedSnapshotConfig derivedSnapshotConfig) {
// todo(gmaretic): should this be using a real sweep queue?
return new SerializableTransactionManager(
metricsManager,
Expand All @@ -142,7 +141,6 @@ public SerializableTransactionManager provideTransactionManager(
() -> config.atlasDbRuntimeConfig().transaction(),
ConflictTracer.NO_OP,
DefaultMetricsFilterEvaluationContext.createDefault(),
Optional.empty(),
knowledge);
Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.atlasdb.util.MetricsManagers;
Expand Down Expand Up @@ -112,12 +111,12 @@ public SerializableTransactionManager provideTransactionManager(
ServicesConfig config,
@Named("kvs") KeyValueService kvs,
LockAndTimestampServices lts,
LockClient lockClient,
TransactionService transactionService,
ConflictDetectionManager conflictManager,
SweepStrategyManager sweepStrategyManager,
Cleaner cleaner,
@Internal DerivedSnapshotConfig derivedSnapshotConfig,
TransactionKnowledgeComponents knowledge) {
@Internal DerivedSnapshotConfig derivedSnapshotConfig) {
return new SerializableTransactionManager(
metricsManager,
kvs,
Expand All @@ -141,7 +140,6 @@ public SerializableTransactionManager provideTransactionManager(
() -> config.atlasDbRuntimeConfig().transaction(),
ConflictTracer.NO_OP,
DefaultMetricsFilterEvaluationContext.createDefault(),
Optional.empty(),
knowledge);
Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.atlasdb.futures.AtlasFutures;
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.transaction.impl.TransactionStatusUtils;
import com.palantir.atlasdb.transaction.knowledge.KnownAbortedTransactions;
import com.palantir.atlasdb.transaction.knowledge.KnownConcludedTransactions;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionStatus;
import com.palantir.atlasdb.transaction.service.TransactionStatuses;
import com.palantir.common.streams.KeyedStream;
import java.util.Comparator;
import java.util.Map;
Expand All @@ -38,10 +39,12 @@ public class KnowledgeableTimestampExtractingAtomicTable implements AtomicTable<
private final KnownAbortedTransactions knownAbortedTransactions;

public KnowledgeableTimestampExtractingAtomicTable(
AtomicTable<Long, TransactionStatus> delegate, TransactionKnowledgeComponents knowledge) {
AtomicTable<Long, TransactionStatus> delegate,
KnownConcludedTransactions knownConcludedTransactions,
KnownAbortedTransactions knownAbortedTransactions) {
this.delegate = delegate;
this.knownConcludedTransactions = knowledge.concluded();
this.knownAbortedTransactions = knowledge.aborted();
this.knownConcludedTransactions = knownConcludedTransactions;
this.knownAbortedTransactions = knownAbortedTransactions;
}

@Override
Expand All @@ -58,12 +61,12 @@ public void updateMultiple(Map<Long, Long> keyValues) throws KeyAlreadyExistsExc

/**
* Returns commit timestamp for the start timestamp supplied as arg.
* For transaction with a known commit timestamp, returns the respective commit timestamp.
* For transaction that is aborted, returns -1.
* For transaction that is known to be committed but have unknown commitTs, returns startTs as commitTs for
* read-write transaction. For read-only transactions, only returns if the greatestSeenCommitTS < startTs,
* otherwise throws.
* For transactions that are in-progress, returns a void future.
* For transactions with a known commit timestamp, returns the respective commit timestamps.
* For transactions that are aborted, returns -1.
* For transactions that are known ot be committed but have unknown commitTs, returns startTs as commitTs for
* read-write transactions.
* For read-only transactions, only returns if the greatestSeenCommitTS < startTs, otherwise throws.
* Start timestamps for transactions that are in progress return a void future.
* */
@Override
public ListenableFuture<Long> get(Long startTimestamp) {
Expand All @@ -88,15 +91,23 @@ public ListenableFuture<Map<Long, Long>> get(Iterable<Long> keys) {
ListenableFuture<Long> getInternal(long startTimestamp) {
if (knownConcludedTransactions.isKnownConcluded(
startTimestamp, KnownConcludedTransactions.Consistency.LOCAL_READ)) {
return Futures.immediateFuture(TransactionStatusUtils.getCommitTsForConcludedTransaction(
startTimestamp, knownAbortedTransactions::isKnownAborted));
return Futures.immediateFuture(getCommitTsForConcludedTransaction(startTimestamp));
} else {
ListenableFuture<TransactionStatus> presentValueFuture = delegate.get(startTimestamp);
return Futures.transform(
presentValueFuture,
presentValue -> TransactionStatusUtils.getCommitTsFromStatus(
startTimestamp, presentValue, knownAbortedTransactions::isKnownAborted),
presentValue -> getCommitTsFromStatus(startTimestamp, presentValue),
MoreExecutors.directExecutor());
}
}

private Long getCommitTsFromStatus(long startTs, TransactionStatus status) {
return TransactionStatuses.caseOf(status)
.unknown(() -> getCommitTsForConcludedTransaction(startTs))
.otherwise(() -> TransactionStatusUtils.maybeGetCommitTs(status).orElse(null));
}

private long getCommitTsForConcludedTransaction(long startTs) {
return knownAbortedTransactions.isKnownAborted(startTs) ? TransactionConstants.FAILED_COMMIT_TS : startTs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private Map<Long, TransactionStatus> processReads(Map<Cell, byte[]> reads, Map<L
}
try {
Instant startTime = clock.instant();
long commitTs = TransactionStatusUtils.getCommitTimestampIfKnown(commitStatus);
long commitTs = TransactionStatusUtils.getCommitTimestampOrThrow(commitStatus);
resultBuilder.put(startTs, touchCache.get(ImmutableCellInfo.of(cell, startTs, commitTs, actual)));
Duration timeTaken = Duration.between(startTime, clock.instant());
if (timeTaken.compareTo(COMMIT_THRESHOLD) >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,4 @@ public abstract class InternalSchemaInstallConfig {
public int versionFourAbortedTransactionsCacheSize() {
return KnownAbortedTransactionsImpl.MAXIMUM_CACHE_WEIGHT;
}

public static InternalSchemaInstallConfig getDefault() {
return ImmutableInternalSchemaInstallConfig.builder().build();
}
}
Loading

0 comments on commit 153b09b

Please sign in to comment.