diff --git a/atlasdb-cli/src/test/java/com/palantir/atlasdb/cli/command/KeyValueServiceMigratorsTest.java b/atlasdb-cli/src/test/java/com/palantir/atlasdb/cli/command/KeyValueServiceMigratorsTest.java index 93a137e6768..d4d6ecf3f2f 100644 --- a/atlasdb-cli/src/test/java/com/palantir/atlasdb/cli/command/KeyValueServiceMigratorsTest.java +++ b/atlasdb-cli/src/test/java/com/palantir/atlasdb/cli/command/KeyValueServiceMigratorsTest.java @@ -50,10 +50,8 @@ import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers; import com.palantir.atlasdb.transaction.impl.TransactionConstants; import com.palantir.atlasdb.transaction.impl.TransactionTables; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; -import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; import com.palantir.timelock.paxos.InMemoryTimeLockRule; import com.palantir.timestamp.ManagedTimestampService; @@ -347,11 +345,8 @@ public void atomicTablesDelegatedToSourceAreNotDropped() { private static AtlasDbServices createMock(KeyValueService kvs, InMemoryTimeLockRule timeLock) { ManagedTimestampService timestampService = timeLock.getManagedTimestampService(); - MetricsManager metricsManager = MetricsManagers.createForTests(); TransactionTables.createTables(kvs); - TransactionKnowledgeComponents knowledge = - TransactionKnowledgeComponents.createForTests(kvs, metricsManager.getTaggedRegistry()); TransactionService transactionService = spy(TransactionServices.createRaw(kvs, timestampService, false)); AtlasDbServices mockServices = mock(AtlasDbServices.class); @@ -360,7 +355,7 @@ private static AtlasDbServices createMock(KeyValueService kvs, InMemoryTimeLockR when(mockServices.getKeyValueService()).thenReturn(kvs); TargetedSweeper sweeper = TargetedSweeper.createUninitializedForTest(() -> 1); SerializableTransactionManager txManager = SerializableTransactionManager.createForTest( - metricsManager, + MetricsManagers.createForTests(), kvs, timeLock.getLegacyTimelockService(), timestampService, @@ -373,8 +368,7 @@ private static AtlasDbServices createMock(KeyValueService kvs, InMemoryTimeLockR new NoOpCleaner(), 16, 4, - sweeper, - knowledge); + sweeper); sweeper.initialize(txManager); when(mockServices.getTransactionManager()).thenReturn(txManager); return mockServices; diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionStatusUtils.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionStatusUtils.java index 6d9185e5d33..9ea59a198e8 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionStatusUtils.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionStatusUtils.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.transaction.impl; -import com.google.common.annotations.VisibleForTesting; import com.palantir.atlasdb.transaction.service.TransactionStatus; import com.palantir.atlasdb.transaction.service.TransactionStatuses; import com.palantir.logsafe.SafeArg; @@ -34,11 +33,7 @@ public static TransactionStatus fromTimestamp(long timestamp) { return TransactionStatuses.committed(timestamp); } - /** - * This helper is only meant to be used for transactions with schema < 4. For schemas >= 4, - * use {@link #getCommitTsFromStatus(long, TransactionStatus, Function)} - * */ - public static long getCommitTimestampIfKnown(TransactionStatus status) { + public static long getCommitTimestampOrThrow(TransactionStatus status) { return TransactionStatuses.caseOf(status) .committed(Function.identity()) .aborted_(TransactionConstants.FAILED_COMMIT_TS) @@ -47,32 +42,10 @@ public static long getCommitTimestampIfKnown(TransactionStatus status) { }); } - /** - * This helper is only meant to be used for transactions with schema < 4. For schemas >= 4, - * use {@link #getCommitTsFromStatus(long, TransactionStatus, Function)} - * */ public static Optional maybeGetCommitTs(TransactionStatus status) { return TransactionStatuses.caseOf(status) .committed(Function.identity()) .aborted_(TransactionConstants.FAILED_COMMIT_TS) .otherwiseEmpty(); } - - public static Long getCommitTsFromStatus( - long startTs, TransactionStatus status, Function abortedCheck) { - return TransactionStatuses.caseOf(status) - .unknown(() -> getCommitTsForConcludedTransaction(startTs, abortedCheck)) - .otherwise(() -> TransactionStatusUtils.maybeGetCommitTs(status).orElse(null)); - } - - public static long getCommitTsForConcludedTransaction(long startTs, Function isAborted) { - return isAborted.apply(startTs) - ? TransactionConstants.FAILED_COMMIT_TS - : getCommitTsForNonAbortedUnknownTransaction(startTs); - } - - @VisibleForTesting - static long getCommitTsForNonAbortedUnknownTransaction(long startTs) { - return startTs; - } } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index 569485e35df..7054e721d07 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -104,7 +104,6 @@ import com.palantir.atlasdb.transaction.impl.consistency.ImmutableTimestampCorroborationConsistencyCheck; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; import com.palantir.atlasdb.util.AtlasDbMetrics; @@ -426,11 +425,8 @@ private TransactionManager serializableInternal(@Output List clos TransactionManagersInitializer initializer = TransactionManagersInitializer.createInitialTables( keyValueService, schemas(), config().initializeAsync(), allSafeForLogging()); - TransactionKnowledgeComponents knowledge = TransactionKnowledgeComponents.create( - keyValueService, metricsManager.getTaggedRegistry(), config().internalSchema()); - TransactionComponents components = createTransactionComponents( - closeables, metricsManager, knowledge, lockAndTimestampServices, keyValueService, runtime); + closeables, metricsManager, lockAndTimestampServices, keyValueService, runtime); TransactionService transactionService = components.transactionService(); ConflictDetectionManager conflictManager = ConflictDetectionManagers.create(keyValueService); SweepStrategyManager sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService); @@ -506,8 +502,7 @@ private TransactionManager serializableInternal(@Output List clos transactionConfigSupplier, conflictTracer, metricsFilterEvaluationContext(), - installConfig.sharedResourcesConfig().map(SharedResourcesConfig::sharedGetRangesPoolSize), - knowledge), + installConfig.sharedResourcesConfig().map(SharedResourcesConfig::sharedGetRangesPoolSize)), closeables); transactionManager.registerClosingCallback(runtimeConfigRefreshable::close); @@ -677,13 +672,13 @@ private static boolean targetedSweepIsEnabled(Supplier run private TransactionComponents createTransactionComponents( @Output List closeables, MetricsManager metricsManager, - TransactionKnowledgeComponents knowledgeCache, LockAndTimestampServices lockAndTimestampServices, KeyValueService keyValueService, Supplier runtimeConfigSupplier) { CoordinationService coordinationService = getSchemaMetadataCoordinationService(metricsManager, lockAndTimestampServices, keyValueService); TransactionSchemaManager transactionSchemaManager = new TransactionSchemaManager(coordinationService); + TransactionService transactionService = initializeCloseable( () -> AtlasDbMetrics.instrumentTimed( metricsManager.getRegistry(), @@ -691,7 +686,6 @@ private TransactionComponents createTransactionComponents( TransactionServices.createTransactionService( keyValueService, transactionSchemaManager, - knowledgeCache, metricsManager.getTaggedRegistry(), () -> runtimeConfigSupplier .get() diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/AtlasDbServices.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/AtlasDbServices.java index 01a110dd858..66b9905a7f6 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/AtlasDbServices.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/AtlasDbServices.java @@ -20,7 +20,6 @@ import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.sweep.SweepTaskRunner; import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.lock.LockService; import com.palantir.lock.v2.TimelockService; @@ -61,8 +60,6 @@ public abstract class AtlasDbServices implements AutoCloseable { public abstract TransactionService getTransactionService(); - public abstract TransactionKnowledgeComponents getTransactionKnowledgeComponents(); - @Override public void close() { getTransactionManager().close(); diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/KeyValueServiceModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/KeyValueServiceModule.java index 470f0802e1c..c9d06c83836 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/KeyValueServiceModule.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/KeyValueServiceModule.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.services; import com.google.common.collect.ImmutableSet; -import com.palantir.atlasdb.config.AtlasDbConfig; import com.palantir.atlasdb.config.SweepConfig; import com.palantir.atlasdb.coordination.CoordinationService; import com.palantir.atlasdb.internalschema.InternalSchemaMetadata; @@ -37,7 +36,6 @@ import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers; import com.palantir.atlasdb.transaction.impl.TransactionTables; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; import com.palantir.atlasdb.util.AtlasDbMetrics; @@ -87,18 +85,8 @@ public KeyValueService provideWrappedKeyValueService( @Provides @Singleton public TransactionService provideTransactionService( - @Named("kvs") KeyValueService kvs, - CoordinationService coordinationService, - TransactionKnowledgeComponents knowledge) { - return TransactionServices.createTransactionService( - kvs, new TransactionSchemaManager(coordinationService), knowledge); - } - - @Provides - @Singleton - public TransactionKnowledgeComponents provideTransactionKnowledgeComponents( - @Named("kvs") KeyValueService kvs, MetricsManager metricsManager, AtlasDbConfig config) { - return TransactionKnowledgeComponents.create(kvs, metricsManager.getTaggedRegistry(), config.internalSchema()); + @Named("kvs") KeyValueService kvs, CoordinationService coordinationService) { + return TransactionServices.createTransactionService(kvs, new TransactionSchemaManager(coordinationService)); } @Provides diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java index a64ae6ca6ad..a09b28ec85a 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java @@ -38,7 +38,6 @@ import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.concurrent.NamedThreadFactory; @@ -111,12 +110,12 @@ public SerializableTransactionManager provideTransactionManager( ServicesConfig config, @Named("kvs") KeyValueService kvs, LockAndTimestampServices lts, + LockClient lockClient, TransactionService transactionService, ConflictDetectionManager conflictManager, SweepStrategyManager sweepStrategyManager, Cleaner cleaner, - @Internal DerivedSnapshotConfig derivedSnapshotConfig, - TransactionKnowledgeComponents knowledge) { + @Internal DerivedSnapshotConfig derivedSnapshotConfig) { // todo(gmaretic): should this be using a real sweep queue? return new SerializableTransactionManager( metricsManager, @@ -142,7 +141,6 @@ public SerializableTransactionManager provideTransactionManager( () -> config.atlasDbRuntimeConfig().transaction(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); } } diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java index 4d785e748ce..a04bc3961bb 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java @@ -39,7 +39,6 @@ import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; @@ -112,12 +111,12 @@ public SerializableTransactionManager provideTransactionManager( ServicesConfig config, @Named("kvs") KeyValueService kvs, LockAndTimestampServices lts, + LockClient lockClient, TransactionService transactionService, ConflictDetectionManager conflictManager, SweepStrategyManager sweepStrategyManager, Cleaner cleaner, - @Internal DerivedSnapshotConfig derivedSnapshotConfig, - TransactionKnowledgeComponents knowledge) { + @Internal DerivedSnapshotConfig derivedSnapshotConfig) { return new SerializableTransactionManager( metricsManager, kvs, @@ -141,7 +140,6 @@ public SerializableTransactionManager provideTransactionManager( () -> config.atlasDbRuntimeConfig().transaction(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTable.java index e44ef8a9d8d..466ff2e76de 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTable.java @@ -22,11 +22,12 @@ import com.google.common.util.concurrent.MoreExecutors; import com.palantir.atlasdb.futures.AtlasFutures; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; +import com.palantir.atlasdb.transaction.impl.TransactionConstants; import com.palantir.atlasdb.transaction.impl.TransactionStatusUtils; import com.palantir.atlasdb.transaction.knowledge.KnownAbortedTransactions; import com.palantir.atlasdb.transaction.knowledge.KnownConcludedTransactions; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionStatus; +import com.palantir.atlasdb.transaction.service.TransactionStatuses; import com.palantir.common.streams.KeyedStream; import java.util.Comparator; import java.util.Map; @@ -38,10 +39,12 @@ public class KnowledgeableTimestampExtractingAtomicTable implements AtomicTable< private final KnownAbortedTransactions knownAbortedTransactions; public KnowledgeableTimestampExtractingAtomicTable( - AtomicTable delegate, TransactionKnowledgeComponents knowledge) { + AtomicTable delegate, + KnownConcludedTransactions knownConcludedTransactions, + KnownAbortedTransactions knownAbortedTransactions) { this.delegate = delegate; - this.knownConcludedTransactions = knowledge.concluded(); - this.knownAbortedTransactions = knowledge.aborted(); + this.knownConcludedTransactions = knownConcludedTransactions; + this.knownAbortedTransactions = knownAbortedTransactions; } @Override @@ -58,12 +61,12 @@ public void updateMultiple(Map keyValues) throws KeyAlreadyExistsExc /** * Returns commit timestamp for the start timestamp supplied as arg. - * For transaction with a known commit timestamp, returns the respective commit timestamp. - * For transaction that is aborted, returns -1. - * For transaction that is known to be committed but have unknown commitTs, returns startTs as commitTs for - * read-write transaction. For read-only transactions, only returns if the greatestSeenCommitTS < startTs, - * otherwise throws. - * For transactions that are in-progress, returns a void future. + * For transactions with a known commit timestamp, returns the respective commit timestamps. + * For transactions that are aborted, returns -1. + * For transactions that are known ot be committed but have unknown commitTs, returns startTs as commitTs for + * read-write transactions. + * For read-only transactions, only returns if the greatestSeenCommitTS < startTs, otherwise throws. + * Start timestamps for transactions that are in progress return a void future. * */ @Override public ListenableFuture get(Long startTimestamp) { @@ -88,15 +91,23 @@ public ListenableFuture> get(Iterable keys) { ListenableFuture getInternal(long startTimestamp) { if (knownConcludedTransactions.isKnownConcluded( startTimestamp, KnownConcludedTransactions.Consistency.LOCAL_READ)) { - return Futures.immediateFuture(TransactionStatusUtils.getCommitTsForConcludedTransaction( - startTimestamp, knownAbortedTransactions::isKnownAborted)); + return Futures.immediateFuture(getCommitTsForConcludedTransaction(startTimestamp)); } else { ListenableFuture presentValueFuture = delegate.get(startTimestamp); return Futures.transform( presentValueFuture, - presentValue -> TransactionStatusUtils.getCommitTsFromStatus( - startTimestamp, presentValue, knownAbortedTransactions::isKnownAborted), + presentValue -> getCommitTsFromStatus(startTimestamp, presentValue), MoreExecutors.directExecutor()); } } + + private Long getCommitTsFromStatus(long startTs, TransactionStatus status) { + return TransactionStatuses.caseOf(status) + .unknown(() -> getCommitTsForConcludedTransaction(startTs)) + .otherwise(() -> TransactionStatusUtils.maybeGetCommitTs(status).orElse(null)); + } + + private long getCommitTsForConcludedTransaction(long startTs) { + return knownAbortedTransactions.isKnownAborted(startTs) ? TransactionConstants.FAILED_COMMIT_TS : startTs; + } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/ResilientCommitTimestampAtomicTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/ResilientCommitTimestampAtomicTable.java index 4d16f804b2f..186c0a68a07 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/ResilientCommitTimestampAtomicTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/atomic/ResilientCommitTimestampAtomicTable.java @@ -208,7 +208,7 @@ private Map processReads(Map reads, Map= 0) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/internalschema/InternalSchemaInstallConfig.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/internalschema/InternalSchemaInstallConfig.java index 3152160be75..62e868678f7 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/internalschema/InternalSchemaInstallConfig.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/internalschema/InternalSchemaInstallConfig.java @@ -40,8 +40,4 @@ public abstract class InternalSchemaInstallConfig { public int versionFourAbortedTransactionsCacheSize() { return KnownAbortedTransactionsImpl.MAXIMUM_CACHE_WEIGHT; } - - public static InternalSchemaInstallConfig getDefault() { - return ImmutableInternalSchemaInstallConfig.builder().build(); - } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/ShardProgress.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/ShardProgress.java index 4657cb47095..675844cd7ac 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/ShardProgress.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/ShardProgress.java @@ -121,11 +121,7 @@ public void updateLastSeenCommitTimestamp(ShardAndStrategy shardAndStrategy, lon tryUpdateLastSeenCommitTimestamp(shardAndStrategy, commitTimestamp); } - public long getLastSeenCommitTimestamp() { - return maybeGet(LAST_SEEN_COMMIT_TIMESTAMP).orElse(SweepQueueUtils.INITIAL_TIMESTAMP); - } - - public Optional getMaybeLastSeenCommitTimestamp() { + public Optional getLastSeenCommitTimestamp() { return maybeGet(LAST_SEEN_COMMIT_TIMESTAMP); } @@ -134,17 +130,19 @@ private void tryUpdateLastSeenCommitTimestamp(ShardAndStrategy shardAndStrategy, return; } - long previous = getLastSeenCommitTimestamp(); - boolean updateNeeded = previous < lastSeenCommitTs; + Optional previous = getLastSeenCommitTimestamp(); + boolean updateNeeded = + previous.map(persisted -> persisted < lastSeenCommitTs).orElse(true); while (updateNeeded) { byte[] colValNew = createColumnValue(lastSeenCommitTs); - CheckAndSetRequest casRequest = createRequest(LAST_SEEN_COMMIT_TIMESTAMP, previous, colValNew); + CheckAndSetRequest casRequest = createRequest( + LAST_SEEN_COMMIT_TIMESTAMP, previous.orElse(SweepQueueUtils.INITIAL_TIMESTAMP), colValNew); try { kvs.checkAndSet(casRequest); updateNeeded = false; } catch (CheckAndSetException exception) { - long current = getLastSeenCommitTimestamp(); - if (current == previous) { + Optional current = getLastSeenCommitTimestamp(); + if (current.equals(previous)) { log.warn( "Failed to update last seen commit timestamp. Values before and after CAS match.", SafeArg.of("previous", previous), @@ -154,7 +152,8 @@ private void tryUpdateLastSeenCommitTimestamp(ShardAndStrategy shardAndStrategy, throw exception; } previous = current; - updateNeeded = previous < lastSeenCommitTs; + updateNeeded = + previous.map(persisted -> persisted < lastSeenCommitTs).orElse(true); } } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java index 834b7c04ea2..f34f957eba4 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueue.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import java.util.function.Supplier; public final class SweepQueue implements MultiTableSweepQueueWriter { @@ -247,7 +246,7 @@ static SweepQueueFactory create( Supplier shardsConfig, TransactionService transaction, ReadBatchingRuntimeContext readBatchingRuntimeContext) { - init(kvs); + Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs); ShardProgress shardProgress = new ShardProgress(kvs); Supplier shards = createProgressUpdatingSupplier(shardsConfig, shardProgress, SweepQueueUtils.REFRESH_TIME); @@ -266,15 +265,6 @@ static SweepQueueFactory create( readBatchingRuntimeContext); } - public static LongSupplier getGetLastSeenCommitTsSupplier(KeyValueService kvs) { - init(kvs); - return new ShardProgress(kvs)::getLastSeenCommitTimestamp; - } - - private static void init(KeyValueService kvs) { - Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs); - } - private SweepQueueWriter createWriter() { return new SweepQueueWriter(timestamps, cells, partitioner); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitTimestampLoader.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitTimestampLoader.java deleted file mode 100644 index e1b1131204e..00000000000 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitTimestampLoader.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.transaction.impl; - -import com.codahale.metrics.Timer; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.palantir.atlasdb.cache.TimestampCache; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.logging.LoggingArgs; -import com.palantir.atlasdb.transaction.TransactionConfig; -import com.palantir.atlasdb.transaction.api.TransactionLockAcquisitionTimeoutException; -import com.palantir.atlasdb.transaction.knowledge.KnownAbortedTransactions; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; -import com.palantir.atlasdb.transaction.service.AsyncTransactionService; -import com.palantir.atlasdb.transaction.service.TransactionStatus; -import com.palantir.atlasdb.transaction.service.TransactionStatuses; -import com.palantir.atlasdb.util.MetricsManager; -import com.palantir.lock.AtlasRowLockDescriptor; -import com.palantir.lock.LockDescriptor; -import com.palantir.lock.v2.LockToken; -import com.palantir.lock.v2.TimelockService; -import com.palantir.lock.v2.WaitForLocksRequest; -import com.palantir.lock.v2.WaitForLocksResponse; -import com.palantir.logsafe.Preconditions; -import com.palantir.logsafe.SafeArg; -import com.palantir.logsafe.UnsafeArg; -import com.palantir.logsafe.logger.SafeLogger; -import com.palantir.logsafe.logger.SafeLoggerFactory; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; -import java.util.function.Supplier; -import javax.annotation.Nullable; - -public final class CommitTimestampLoader { - private static final SafeLogger log = SafeLoggerFactory.get(CommitTimestampLoader.class); - private static final SafeLogger perfLogger = SafeLoggerFactory.get("dualschema.perf"); - private final TimestampCache timestampCache; - private final Optional immutableTimestampLock; - private final Supplier startTimestampSupplier; - private final Supplier transactionConfig; - private final MetricsManager metricsManager; - private final TimelockService timelockService; - private final long immutableTimestamp; - private final LongSupplier lastSeenCommitTsSupplier; - - private final KnownAbortedTransactions abortedTransactionsCache; - - public CommitTimestampLoader( - TimestampCache timestampCache, - Optional immutableTimestampLock, - Supplier startTimestampSupplier, - Supplier transactionConfig, - MetricsManager metricsManager, - TimelockService timelockService, - long immutableTimestamp, - TransactionKnowledgeComponents knowledge) { - this.timestampCache = timestampCache; - this.immutableTimestampLock = immutableTimestampLock; - this.startTimestampSupplier = startTimestampSupplier; - this.transactionConfig = transactionConfig; - this.metricsManager = metricsManager; - this.timelockService = timelockService; - this.immutableTimestamp = immutableTimestamp; - this.lastSeenCommitTsSupplier = knowledge.lastSeenCommitSupplier(); - this.abortedTransactionsCache = knowledge.aborted(); - } - - /** - * Returns a map from start timestamp to commit timestamp. If a start timestamp wasn't committed, then it will be - * missing from the map. This method will block until the transactions for these start timestamps are complete. - */ - ListenableFuture> getCommitTimestamps( - @Nullable TableReference tableRef, - Iterable startTimestamps, - boolean shouldWaitForCommitterToComplete, - AsyncTransactionService asyncTransactionService) { - if (Iterables.isEmpty(startTimestamps)) { - return Futures.immediateFuture(ImmutableMap.of()); - } - - Set pendingGets = new HashSet<>(); - Map result = new HashMap<>(); - - for (Long startTs : startTimestamps) { - Long commitTs = timestampCache.getCommitTimestampIfPresent(startTs); - if (commitTs == null) { - pendingGets.add(startTs); - } else { - result.put(startTs, commitTs); - } - } - - if (pendingGets.isEmpty()) { - return Futures.immediateFuture(result); - } - - // Before we do the reads, we need to make sure the committer is done writing. - if (shouldWaitForCommitterToComplete) { - waitForCommitterToComplete(tableRef, startTimestamps); - } - - return Futures.transform( - loadCommitTimestamps(asyncTransactionService, pendingGets), - rawResults -> { - Map loadedCommitTs = cacheKnownLoadedValuesAndValidate(rawResults); - result.putAll(loadedCommitTs); - return result; - }, - MoreExecutors.directExecutor()); - } - - // We do not cache unknown transactions as they are already being cached at a lower level. - private Map cacheKnownLoadedValuesAndValidate(Map rawResults) { - Map results = new HashMap<>(); - boolean shouldValidate = false; - - // The method is written this way to avoid multiple scans on the result set as it is on a hot path. - for (Map.Entry entry : rawResults.entrySet()) { - long start = entry.getKey(); - TransactionStatus commitStatus = entry.getValue(); - - if (commitStatus.equals(TransactionStatuses.inProgress())) { - continue; - } - - long commitTs = TransactionStatusUtils.getCommitTsFromStatus( - start, commitStatus, abortedTransactionsCache::isKnownAborted); - if (commitStatus.equals(TransactionStatuses.unknown())) { - shouldValidate = true; - } else { - timestampCache.putAlreadyCommittedTransaction(start, commitTs); - } - results.put(start, commitTs); - } - - if (shouldValidate) { - throwIfTransactionsTableSweptBeyondReadOnlyTxn(); - } - - return results; - } - - /** - * We will block here until the passed transactions have released their lock. This means that the committing - * transaction is either complete or it has failed, and we are allowed to roll it back. - */ - private void waitForCommitToComplete(Iterable startTimestamps) { - Set lockDescriptors = new HashSet<>(); - for (long start : startTimestamps) { - if (start < immutableTimestamp) { - // We don't need to block in this case because this transaction is already complete - continue; - } - lockDescriptors.add(AtlasRowLockDescriptor.of( - TransactionConstants.TRANSACTION_TABLE.getQualifiedName(), - TransactionConstants.getValueForTimestamp(start))); - } - - if (lockDescriptors.isEmpty()) { - return; - } - - waitFor(lockDescriptors); - } - - private void waitFor(Set lockDescriptors) { - TransactionConfig currentTransactionConfig = transactionConfig.get(); - - // TODO(fdesouza): Revert this once PDS-95791 is resolved. - long lockAcquireTimeoutMillis = currentTransactionConfig.getLockAcquireTimeoutMillis(); - WaitForLocksRequest request = WaitForLocksRequest.of(lockDescriptors, lockAcquireTimeoutMillis); - WaitForLocksResponse response = timelockService.waitForLocks(request); - if (!response.wasSuccessful()) { - log.error( - "Timed out waiting for commits to complete. Timeout was {} ms. First ten locks were {}.", - SafeArg.of("requestId", request.getRequestId()), - SafeArg.of("acquireTimeoutMs", lockAcquireTimeoutMillis), - SafeArg.of("numberOfDescriptors", lockDescriptors.size()), - UnsafeArg.of("firstTenLockDescriptors", Iterables.limit(lockDescriptors, 10))); - throw new TransactionLockAcquisitionTimeoutException("Timed out waiting for commits to complete."); - } - } - - private void waitForCommitterToComplete(@Nullable TableReference tableRef, Iterable startTimestamps) { - Timer.Context timer = getTimer("waitForCommitTsMillis").time(); - waitForCommitToComplete(startTimestamps); - long waitForCommitTsMillis = TimeUnit.NANOSECONDS.toMillis(timer.stop()); - - if (tableRef != null) { - perfLogger.debug( - "Waited to get commit timestamps when reading from a known table.", - SafeArg.of("commitTsMillis", waitForCommitTsMillis), - LoggingArgs.tableRef(tableRef)); - } else { - perfLogger.debug("Waited to get commit timestamps.", SafeArg.of("commitTsMillis", waitForCommitTsMillis)); - } - } - - private Timer getTimer(String name) { - return metricsManager.registerOrGetTimer(CommitTimestampLoader.class, name); - } - - private static ListenableFuture> loadCommitTimestamps( - AsyncTransactionService asyncTransactionService, Set startTimestamps) { - // distinguish between a single timestamp and a batch, for more granular metrics - if (startTimestamps.size() == 1) { - Long singleTs = Iterables.getOnlyElement(startTimestamps); - return Futures.transform( - asyncTransactionService.getAsyncV2(singleTs), - commitState -> ImmutableMap.of(singleTs, commitState), - MoreExecutors.directExecutor()); - } else { - return asyncTransactionService.getAsyncV2(startTimestamps); - } - } - - private void throwIfTransactionsTableSweptBeyondReadOnlyTxn() { - long startTs = startTimestampSupplier.get(); - // The schema version of current transaction does not matter. If the current transaction does not hold - // immutableTs lock, and we were previously on schema 4 for a range of transactions, we cannot know the state - // of those writes consistently if sweep has progressed. - - if (immutableTimestampLock.isEmpty()) { - Preconditions.checkState( - lastSeenCommitTsSupplier.getAsLong() < startTs, - "Sweep has swept some entries with a commit TS after us, and now we cannot know the commit TS for" - + " a timestamp that has been TTSd, but it is greater than our start timestamp. This can " - + "happen if the transaction has been alive for more than an hour and is expected to be " - + "transient."); - } - } -} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ReadOnlyTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ReadOnlyTransactionManager.java index 3cdf01e6c13..f45a9cbf8c0 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ReadOnlyTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ReadOnlyTransactionManager.java @@ -31,7 +31,6 @@ import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException; import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.api.TransactionTask; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.lock.HeldLocksToken; @@ -59,9 +58,6 @@ public final class ReadOnlyTransactionManager extends AbstractLockAwareTransacti private final int defaultGetRangesConcurrency; private final Supplier transactionConfig; - private final TransactionKnowledgeComponents knowledge; - - // Todo(snanda): breaking change public ReadOnlyTransactionManager( MetricsManager metricsManager, KeyValueService keyValueService, @@ -72,8 +68,7 @@ public ReadOnlyTransactionManager( boolean allowHiddenTableAccess, int defaultGetRangesConcurrency, TimestampCache timestampCache, - Supplier transactionConfig, - TransactionKnowledgeComponents knowledge) { + Supplier transactionConfig) { super(metricsManager, timestampCache, () -> transactionConfig.get().retryStrategy()); this.metricsManager = metricsManager; this.keyValueService = keyValueService; @@ -84,7 +79,6 @@ public ReadOnlyTransactionManager( this.allowHiddenTableAccess = allowHiddenTableAccess; this.defaultGetRangesConcurrency = defaultGetRangesConcurrency; this.transactionConfig = transactionConfig; - this.knowledge = knowledge; } @Override @@ -228,8 +222,7 @@ public T runTaskWithCondi timestampValidationReadCache, MoreExecutors.newDirectExecutorService(), defaultGetRangesConcurrency, - transactionConfig, - knowledge); + transactionConfig); return runTaskThrowOnConflictWithCallback( transaction -> task.execute(transaction, condition), new ReadTransaction(txn, txn.sweepStrategyManager), diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java index e888cdbb4cd..d4b5ada9322 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java @@ -64,7 +64,6 @@ import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.api.TransactionSerializableConflictException; import com.palantir.atlasdb.transaction.impl.metrics.TableLevelMetricsController; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.AsyncTransactionService; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.ByteArrayUtilities; @@ -161,8 +160,7 @@ public SerializableTransaction( boolean validateLocksOnReads, Supplier transactionConfig, ConflictTracer conflictTracer, - TableLevelMetricsController tableLevelMetricsController, - TransactionKnowledgeComponents knowledge) { + TableLevelMetricsController tableLevelMetricsController) { super( metricsManager, keyValueService, @@ -188,8 +186,7 @@ public SerializableTransaction( validateLocksOnReads, transactionConfig, conflictTracer, - tableLevelMetricsController, - knowledge); + tableLevelMetricsController); } @Override @@ -268,7 +265,7 @@ private Iterator> wrapIteratorWithSortedColumnsBoundChec sortedColumnRangeEnds.computeIfAbsent(request, _key -> new AtomicReference<>()); ConcurrentNavigableMap readsForTable = getReadsForTable(tableRef); - return new AbstractIterator<>() { + return new AbstractIterator>() { @Override protected Map.Entry computeNext() { if (!sortedColumns.hasNext()) { @@ -861,7 +858,7 @@ private Transaction getReadOnlyTransaction(final long commitTs) { transactionReadTimeoutMillis, getReadSentinelBehavior(), allowHiddenTableAccess, - timestampCache, + timestampValidationReadCache, getRangesExecutor, defaultGetRangesConcurrency, sweepQueue, @@ -869,8 +866,7 @@ private Transaction getReadOnlyTransaction(final long commitTs) { validateLocksOnReads, transactionConfig, conflictTracer, - tableLevelMetricsController, - knowledge) { + tableLevelMetricsController) { @Override protected TransactionScopedCache getCache() { return lockWatchManager.getReadOnlyTransactionScopedCache(SerializableTransaction.this.getTimestamp()); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java index 7026d85206f..369ec5d2a7d 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java @@ -34,7 +34,6 @@ import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.atlasdb.util.MetricsManager; @@ -241,8 +240,8 @@ public static TransactionManager createInstrumented( Supplier transactionConfig, ConflictTracer conflictTracer, MetricsFilterEvaluationContext metricsFilterEvaluationContext, - Optional sharedGetRangesPoolSize, - TransactionKnowledgeComponents knowledge) { + Optional sharedGetRangesPoolSize) { + return create( metricsManager, keyValueService, @@ -270,8 +269,7 @@ public static TransactionManager createInstrumented( true, conflictTracer, metricsFilterEvaluationContext, - sharedGetRangesPoolSize, - knowledge); + sharedGetRangesPoolSize); } public static TransactionManager create( @@ -298,8 +296,8 @@ public static TransactionManager create( Supplier transactionConfig, ConflictTracer conflictTracer, MetricsFilterEvaluationContext metricsFilterEvaluationContext, - Optional sharedGetRangesPoolSize, - TransactionKnowledgeComponents knowledge) { + Optional sharedGetRangesPoolSize) { + return create( metricsManager, keyValueService, @@ -326,8 +324,7 @@ public static TransactionManager create( transactionConfig, conflictTracer, metricsFilterEvaluationContext, - sharedGetRangesPoolSize, - knowledge); + sharedGetRangesPoolSize); } public static TransactionManager create( @@ -355,8 +352,7 @@ public static TransactionManager create( Supplier transactionConfig, ConflictTracer conflictTracer, MetricsFilterEvaluationContext metricsFilterEvaluationContext, - Optional sharedGetRangesPoolSize, - TransactionKnowledgeComponents knowledge) { + Optional sharedGetRangesPoolSize) { return create( metricsManager, keyValueService, @@ -383,8 +379,7 @@ public static TransactionManager create( false, conflictTracer, metricsFilterEvaluationContext, - sharedGetRangesPoolSize, - knowledge); + sharedGetRangesPoolSize); } private static TransactionManager create( @@ -413,8 +408,7 @@ private static TransactionManager create( boolean shouldInstrument, ConflictTracer conflictTracer, MetricsFilterEvaluationContext metricsFilterEvaluationContext, - Optional sharedGetRangesPoolSize, - TransactionKnowledgeComponents knowledge) { + Optional sharedGetRangesPoolSize) { TransactionManager transactionManager = new SerializableTransactionManager( metricsManager, keyValueService, @@ -437,8 +431,7 @@ private static TransactionManager create( transactionConfig, conflictTracer, metricsFilterEvaluationContext, - sharedGetRangesPoolSize, - knowledge); + sharedGetRangesPoolSize); if (shouldInstrument) { transactionManager = AtlasDbMetrics.instrumentTimed( @@ -468,8 +461,7 @@ public static SerializableTransactionManager createForTest( Cleaner cleaner, int concurrentGetRangesThreadPoolSize, int defaultGetRangesConcurrency, - MultiTableSweepQueueWriter sweepQueue, - TransactionKnowledgeComponents knowledge) { + MultiTableSweepQueueWriter sweepQueue) { return new SerializableTransactionManager( metricsManager, keyValueService, @@ -492,8 +484,7 @@ public static SerializableTransactionManager createForTest( () -> ImmutableTransactionConfig.builder().build(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); } public SerializableTransactionManager( @@ -518,8 +509,7 @@ public SerializableTransactionManager( Supplier transactionConfig, ConflictTracer conflictTracer, MetricsFilterEvaluationContext metricsFilterEvaluationContext, - Optional sharedGetRangesPoolSize, - TransactionKnowledgeComponents knowledge) { + Optional sharedGetRangesPoolSize) { super( metricsManager, keyValueService, @@ -542,8 +532,7 @@ public SerializableTransactionManager( transactionConfig, conflictTracer, metricsFilterEvaluationContext, - sharedGetRangesPoolSize, - knowledge); + sharedGetRangesPoolSize); this.conflictTracer = conflictTracer; } @@ -578,8 +567,7 @@ protected CallbackAwareTransaction createTransaction( validateLocksOnReads, transactionConfig, conflictTracer, - tableLevelMetricsController, - knowledge); + tableLevelMetricsController); } @VisibleForTesting diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java index c21d1aa3124..6e9d07f1979 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java @@ -25,7 +25,6 @@ import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode; import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.impl.metrics.SimpleTableLevelMetricsController; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import java.util.List; @@ -81,8 +80,7 @@ public ShouldNotDeleteAndRollbackTransaction( TimestampCache timestampCache, ExecutorService getRangesExecutor, int defaultGetRangesConcurrency, - Supplier transactionConfig, - TransactionKnowledgeComponents knowledge) { + Supplier transactionConfig) { super( metricsManager, keyValueService, @@ -108,8 +106,7 @@ public ShouldNotDeleteAndRollbackTransaction( true, transactionConfig, ConflictTracer.NO_OP, - new SimpleTableLevelMetricsController(metricsManager), - knowledge); + new SimpleTableLevelMetricsController(metricsManager)); } @Override diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index 2d9aea2532f..7bca754db50 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -98,7 +98,6 @@ import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.impl.metrics.TableLevelMetricsController; import com.palantir.atlasdb.transaction.impl.metrics.TransactionOutcomeMetrics; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.AsyncTransactionService; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; @@ -120,10 +119,13 @@ import com.palantir.lock.AtlasRowLockDescriptor; import com.palantir.lock.LockDescriptor; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.ImmutableLockRequest; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.v2.TimelockService; +import com.palantir.lock.v2.WaitForLocksRequest; +import com.palantir.lock.v2.WaitForLocksResponse; import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; @@ -248,6 +250,7 @@ private enum State { private final TransactionReadSentinelBehavior readSentinelBehavior; private volatile long commitTsForScrubbing = TransactionConstants.FAILED_COMMIT_TS; protected final boolean allowHiddenTableAccess; + protected final TimestampCache timestampValidationReadCache; protected final ExecutorService getRangesExecutor; protected final int defaultGetRangesConcurrency; private final Set involvedTables = ConcurrentHashMap.newKeySet(); @@ -258,14 +261,9 @@ private enum State { protected final Supplier transactionConfig; protected final TableLevelMetricsController tableLevelMetricsController; protected final SuccessCallbackManager successCallbackManager = new SuccessCallbackManager(); - private final CommitTimestampLoader commitTimestampLoader; protected volatile boolean hasReads; - protected final TimestampCache timestampCache; - - protected final TransactionKnowledgeComponents knowledge; - /** * @param immutableTimestamp If we find a row written before the immutableTimestamp we don't need to grab a read * lock for it because we know that no writers exist. @@ -296,8 +294,7 @@ private enum State { boolean validateLocksOnReads, Supplier transactionConfig, ConflictTracer conflictTracer, - TableLevelMetricsController tableLevelMetricsController, - TransactionKnowledgeComponents knowledge) { + TableLevelMetricsController tableLevelMetricsController) { this.metricsManager = metricsManager; this.lockWatchManager = lockWatchManager; this.conflictTracer = conflictTracer; @@ -318,6 +315,7 @@ private enum State { this.transactionReadTimeoutMillis = transactionTimeoutMillis; this.readSentinelBehavior = readSentinelBehavior; this.allowHiddenTableAccess = allowHiddenTableAccess; + this.timestampValidationReadCache = timestampValidationReadCache; this.getRangesExecutor = getRangesExecutor; this.defaultGetRangesConcurrency = defaultGetRangesConcurrency; this.sweepQueue = sweepQueue; @@ -327,17 +325,6 @@ private enum State { this.validateLocksOnReads = validateLocksOnReads; this.transactionConfig = transactionConfig; this.tableLevelMetricsController = tableLevelMetricsController; - this.timestampCache = timestampValidationReadCache; - this.knowledge = knowledge; - this.commitTimestampLoader = new CommitTimestampLoader( - timestampValidationReadCache, - immutableTimestampLock, - this::getStartTimestamp, - transactionConfig, - metricsManager, - timelockService, - immutableTimestamp, - knowledge); } protected TransactionScopedCache getCache() { @@ -1097,9 +1084,10 @@ protected void batchAcceptSizeHint( } private void validatePreCommitRequirementsOnReadIfNecessary(TableReference tableRef, long timestamp) { - if (isValidationNecessaryOnReads(tableRef)) { - throwIfPreCommitRequirementsNotMet(null, timestamp); + if (!isValidationNecessaryOnReads(tableRef)) { + return; } + throwIfPreCommitRequirementsNotMet(null, timestamp); } private boolean isValidationNecessaryOnReads(TableReference tableRef) { @@ -2325,7 +2313,10 @@ protected LockToken acquireLocksForCommit() { // TODO(fdesouza): Revert this once PDS-95791 is resolved. long lockAcquireTimeoutMillis = currentTransactionConfig.getLockAcquireTimeoutMillis(); - LockRequest request = LockRequest.of(lockDescriptors, lockAcquireTimeoutMillis); + LockRequest request = ImmutableLockRequest.of( + lockDescriptors, + lockAcquireTimeoutMillis, + Optional.ofNullable(getStartTimestampAsClientDescription(currentTransactionConfig))); RuntimeException stackTraceSnapshot = new SafeRuntimeException("I exist to show you the stack trace"); LockResponse lockResponse = timelockService.lock( @@ -2348,15 +2339,6 @@ protected LockToken acquireLocksForCommit() { return lockResponse.getToken(); } - protected ListenableFuture> getCommitTimestamps( - TableReference tableRef, - Iterable startTimestamps, - boolean shouldWaitForCommitterToComplete, - AsyncTransactionService asyncTransactionService) { - return commitTimestampLoader.getCommitTimestamps( - tableRef, startTimestamps, shouldWaitForCommitterToComplete, asyncTransactionService); - } - private void logCommitLockTenureExceeded( Set lockDescriptors, TransactionConfig currentTransactionConfig, @@ -2397,6 +2379,62 @@ protected Set getLocksForWrites() { return result; } + /** + * We will block here until the passed transactions have released their lock. This means that the committing + * transaction is either complete or it has failed and we are allowed to roll it back. + */ + private void waitForCommitToComplete(Iterable startTimestamps) { + Set lockDescriptors = new HashSet<>(); + for (long start : startTimestamps) { + if (start < immutableTimestamp) { + // We don't need to block in this case because this transaction is already complete + continue; + } + lockDescriptors.add(AtlasRowLockDescriptor.of( + TransactionConstants.TRANSACTION_TABLE.getQualifiedName(), + TransactionConstants.getValueForTimestamp(start))); + } + + if (lockDescriptors.isEmpty()) { + return; + } + + waitFor(lockDescriptors); + } + + private void waitFor(Set lockDescriptors) { + TransactionConfig currentTransactionConfig = transactionConfig.get(); + String startTimestampAsDescription = getStartTimestampAsClientDescription(currentTransactionConfig); + + // TODO(fdesouza): Revert this once PDS-95791 is resolved. + long lockAcquireTimeoutMillis = currentTransactionConfig.getLockAcquireTimeoutMillis(); + WaitForLocksRequest request = + WaitForLocksRequest.of(lockDescriptors, lockAcquireTimeoutMillis, startTimestampAsDescription); + WaitForLocksResponse response = timelockService.waitForLocks(request); + if (!response.wasSuccessful()) { + log.error( + "Timed out waiting for commits to complete. Timeout was {} ms. First ten locks were {}.", + SafeArg.of("requestId", request.getRequestId()), + SafeArg.of("acquireTimeoutMs", lockAcquireTimeoutMillis), + SafeArg.of("numberOfDescriptors", lockDescriptors.size()), + UnsafeArg.of("firstTenLockDescriptors", Iterables.limit(lockDescriptors, 10))); + throw new TransactionLockAcquisitionTimeoutException("Timed out waiting for commits to complete."); + } + } + + /** + * TODO(fdesouza): Remove this once PDS-95791 is resolved. + * + * @deprecated Remove this once PDS-95791 is resolved. + */ + @Deprecated + @Nullable + private String getStartTimestampAsClientDescription(TransactionConfig currentTransactionConfig) { + return currentTransactionConfig.attachStartTimestampToLockRequestDescriptions() + ? Long.toString(getStartTimestamp()) + : null; + } + /////////////////////////////////////////////////////////////////////////// /// Commit timestamp management /////////////////////////////////////////////////////////////////////////// @@ -2415,6 +2453,109 @@ private Map getCommitTimestampsSync( tableRef, startTimestamps, waitForCommitterToComplete, immediateTransactionService)); } + /** + * Returns a map from start timestamp to commit timestamp. If a start timestamp wasn't committed, then it will be + * missing from the map. This method will block until the transactions for these start timestamps are complete. + */ + protected ListenableFuture> getCommitTimestamps( + @Nullable TableReference tableRef, + Iterable startTimestamps, + boolean shouldWaitForCommitterToComplete, + AsyncTransactionService asyncTransactionService) { + if (Iterables.isEmpty(startTimestamps)) { + return Futures.immediateFuture(ImmutableMap.of()); + } + Map startToCommitTimestamps = new HashMap<>(); + Set gets = new HashSet<>(); + for (Long startTs : startTimestamps) { + Long cached = timestampValidationReadCache.getCommitTimestampIfPresent(startTs); + if (cached != null) { + startToCommitTimestamps.put(startTs, cached); + } else { + gets.add(startTs); + } + } + + if (gets.isEmpty()) { + return Futures.immediateFuture(startToCommitTimestamps); + } + + // Before we do the reads, we need to make sure the committer is done writing. + if (shouldWaitForCommitterToComplete) { + waitForCommitterToComplete(tableRef, startTimestamps); + } + + traceGetCommitTimestamps(tableRef, gets); + + if (gets.size() > transactionConfig.get().getThresholdForLoggingLargeNumberOfTransactionLookups()) { + logLargeNumberOfTransactions(tableRef, gets); + } + + return Futures.transform( + loadCommitTimestamps(asyncTransactionService, gets), + rawResults -> { + for (Map.Entry e : rawResults.entrySet()) { + if (e.getValue() != null) { + Long startTs = e.getKey(); + Long commitTs = e.getValue(); + startToCommitTimestamps.put(startTs, commitTs); + timestampValidationReadCache.putAlreadyCommittedTransaction(startTs, commitTs); + } + } + return startToCommitTimestamps; + }, + MoreExecutors.directExecutor()); + } + + private void waitForCommitterToComplete(@Nullable TableReference tableRef, Iterable startTimestamps) { + Timer.Context timer = getTimer("waitForCommitTsMillis").time(); + waitForCommitToComplete(startTimestamps); + long waitForCommitTsMillis = TimeUnit.NANOSECONDS.toMillis(timer.stop()); + + if (tableRef != null) { + perfLogger.debug( + "Waited to get commit timestamps when reading from a known table.", + SafeArg.of("commitTsMillis", waitForCommitTsMillis), + LoggingArgs.tableRef(tableRef)); + } else { + perfLogger.debug("Waited to get commit timestamps.", SafeArg.of("commitTsMillis", waitForCommitTsMillis)); + } + } + + private void traceGetCommitTimestamps(@Nullable TableReference tableRef, Set gets) { + if (tableRef != null) { + log.trace( + "Getting commit timestamps for a read while reading table.", + SafeArg.of("numTimestamps", gets.size()), + LoggingArgs.tableRef(tableRef)); + return; + } + + log.trace("Getting commit timestamps.", SafeArg.of("numTimestamps", gets.size())); + } + + private void logLargeNumberOfTransactions(@Nullable TableReference tableRef, Set gets) { + log.info( + "Looking up a large number of transactions.", + SafeArg.of("numberOfTransactionIds", gets.size()), + tableRef == null ? SafeArg.of("tableRef", "no_table") : LoggingArgs.tableRef(tableRef)); + } + + private static ListenableFuture> loadCommitTimestamps( + AsyncTransactionService asyncTransactionService, Set startTimestamps) { + // distinguish between a single timestamp and a batch, for more granular metrics + if (startTimestamps.size() == 1) { + Long singleTs = startTimestamps.iterator().next(); + return Futures.transform( + asyncTransactionService.getAsync(singleTs), + commitTsOrNull -> + commitTsOrNull == null ? ImmutableMap.of() : ImmutableMap.of(singleTs, commitTsOrNull), + MoreExecutors.directExecutor()); + } else { + return asyncTransactionService.getAsync(startTimestamps); + } + } + /** * This will attempt to put the commitTimestamp into the DB. * diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java index 38168edfceb..888f070f94c 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java @@ -49,7 +49,6 @@ import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext; import com.palantir.atlasdb.transaction.impl.metrics.TableLevelMetricsController; import com.palantir.atlasdb.transaction.impl.metrics.ToplistDeltaFilteringTableLevelMetricsController; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.base.Throwables; @@ -109,8 +108,6 @@ private final ConflictTracer conflictTracer; - protected final TransactionKnowledgeComponents knowledge; - protected SnapshotTransactionManager( MetricsManager metricsManager, KeyValueService keyValueService, @@ -133,8 +130,7 @@ protected SnapshotTransactionManager( Supplier transactionConfig, ConflictTracer conflictTracer, MetricsFilterEvaluationContext metricsFilterEvaluationContext, - Optional sharedGetRangesPoolSize, - TransactionKnowledgeComponents knowledge) { + Optional sharedGetRangesPoolSize) { super(metricsManager, timestampCache, () -> transactionConfig.get().retryStrategy()); this.lockWatchManager = lockWatchManager; TimestampTracker.instrumentTimestamps(metricsManager, timelockService, cleaner); @@ -163,7 +159,6 @@ protected SnapshotTransactionManager( metricsManager, metricsFilterEvaluationContext)); this.openTransactionCounter = metricsManager.registerOrGetCounter(SnapshotTransactionManager.class, "openTransactionCounter"); - this.knowledge = knowledge; } @Override @@ -329,8 +324,7 @@ protected CallbackAwareTransaction createTransaction( validateLocksOnReads, transactionConfig, conflictTracer, - tableLevelMetricsController, - knowledge); + tableLevelMetricsController); } @Override @@ -372,8 +366,7 @@ private T runTaskWithCond validateLocksOnReads, transactionConfig, conflictTracer, - tableLevelMetricsController, - knowledge); + tableLevelMetricsController); return runTaskThrowOnConflictWithCallback( txn -> task.execute(txn, condition), new ReadTransaction(transaction, sweepStrategyManager), diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/KnownAbortedTransactionsImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/KnownAbortedTransactionsImpl.java index eaffcee5da1..8342f9f9129 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/KnownAbortedTransactionsImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/KnownAbortedTransactionsImpl.java @@ -25,6 +25,7 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; +import java.util.Optional; import java.util.Set; import org.checkerframework.checker.index.qual.NonNegative; @@ -66,11 +67,15 @@ public static KnownAbortedTransactionsImpl create( KnownConcludedTransactions knownConcludedTransactions, AbandonedTimestampStore abandonedTimestampStore, TaggedMetricRegistry registry, - InternalSchemaInstallConfig config) { + Optional config) { AbortedTransactionSoftCache softCache = new AbortedTransactionSoftCache(abandonedTimestampStore, knownConcludedTransactions); return new KnownAbortedTransactionsImpl( - abandonedTimestampStore, softCache, registry, config.versionFourAbortedTransactionsCacheSize()); + abandonedTimestampStore, + softCache, + registry, + config.map(InternalSchemaInstallConfig::versionFourAbortedTransactionsCacheSize) + .orElse(MAXIMUM_CACHE_WEIGHT)); } @Override @@ -94,6 +99,7 @@ public boolean isKnownAborted(long startTimestamp) { @Override public void addAbortedTimestamps(Set abortedTimestamps) { + // todo(snanda): batching? abortedTimestamps.forEach(abandonedTimestampStore::markAbandoned); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/TransactionKnowledgeComponents.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/TransactionKnowledgeComponents.java deleted file mode 100644 index 630c778578b..00000000000 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/knowledge/TransactionKnowledgeComponents.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.transaction.knowledge; - -import com.palantir.atlasdb.internalschema.InternalSchemaInstallConfig; -import com.palantir.atlasdb.keyvalue.api.KeyValueService; -import com.palantir.atlasdb.sweep.queue.SweepQueue.SweepQueueFactory; -import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; -import java.util.function.LongSupplier; -import org.immutables.value.Value; - -@Value.Immutable -public interface TransactionKnowledgeComponents { - KnownConcludedTransactions concluded(); - - KnownAbortedTransactions aborted(); - - LongSupplier lastSeenCommitSupplier(); - - static TransactionKnowledgeComponents createForTests(KeyValueService kvs, TaggedMetricRegistry metricRegistry) { - return create(kvs, metricRegistry, InternalSchemaInstallConfig.getDefault()); - } - - static TransactionKnowledgeComponents create( - KeyValueService kvs, TaggedMetricRegistry metricRegistry, InternalSchemaInstallConfig config) { - return ImmutableTransactionKnowledgeComponents.builder() - .concluded(KnownConcludedTransactionsImpl.create( - KnownConcludedTransactionsStore.create(kvs), metricRegistry)) - .aborted(KnownAbortedTransactionsImpl.create( - KnownConcludedTransactionsImpl.create( - KnownConcludedTransactionsStore.create(kvs), metricRegistry), - new DefaultAbandonedTimestampStore(kvs), - metricRegistry, - config)) - .lastSeenCommitSupplier(SweepQueueFactory.getGetLastSeenCommitTsSupplier(kvs)) - .build(); - } -} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/SimpleTransactionService.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/SimpleTransactionService.java index e6bc0e4c1b7..d469fce3506 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/SimpleTransactionService.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/SimpleTransactionService.java @@ -19,7 +19,6 @@ import com.palantir.atlasdb.atomic.AtomicTable; import com.palantir.atlasdb.atomic.ConsensusForgettingStore; import com.palantir.atlasdb.atomic.InstrumentedConsensusForgettingStore; -import com.palantir.atlasdb.atomic.KnowledgeableTimestampExtractingAtomicTable; import com.palantir.atlasdb.atomic.PueConsensusForgettingStore; import com.palantir.atlasdb.atomic.ResilientCommitTimestampAtomicTable; import com.palantir.atlasdb.atomic.SimpleCommitTimestampAtomicTable; @@ -33,9 +32,7 @@ import com.palantir.atlasdb.transaction.encoding.TransactionStatusEncodingStrategy; import com.palantir.atlasdb.transaction.encoding.TwoPhaseEncodingStrategy; import com.palantir.atlasdb.transaction.encoding.V1EncodingStrategy; -import com.palantir.atlasdb.transaction.encoding.V4ProgressEncodingStrategy; import com.palantir.atlasdb.transaction.impl.TransactionConstants; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; import java.util.Map; import java.util.function.Supplier; @@ -76,23 +73,6 @@ public static SimpleTransactionService createV3( acceptStagingReadsAsCommitted); } - public static SimpleTransactionService createV4( - KeyValueService kvs, - TransactionKnowledgeComponents knowledge, - TaggedMetricRegistry metricRegistry, - Supplier acceptStagingReadsAsCommitted) { - if (kvs.getCheckAndSetCompatibility().consistentOnFailure()) { - return createSimple(kvs, TransactionConstants.TRANSACTIONS2_TABLE, TicketsEncodingStrategy.INSTANCE); - } - return createKnowledgeableResilient( - kvs, - metricRegistry, - TransactionConstants.TRANSACTIONS2_TABLE, - new TwoPhaseEncodingStrategy(V4ProgressEncodingStrategy.INSTANCE), - acceptStagingReadsAsCommitted, - knowledge); - } - private static SimpleTransactionService createSimple( KeyValueService kvs, TableReference tableRef, @@ -109,35 +89,12 @@ private static SimpleTransactionService createResilient( TwoPhaseEncodingStrategy encodingStrategy, TaggedMetricRegistry metricRegistry, Supplier acceptStagingReadsAsCommitted) { - ResilientCommitTimestampAtomicTable delegate = - getDelegate(kvs, tableRef, encodingStrategy, metricRegistry, acceptStagingReadsAsCommitted); - AtomicTable atomicTable = new TimestampExtractingAtomicTable(delegate); - return new SimpleTransactionService(atomicTable, delegate, encodingStrategy); - } - - private static SimpleTransactionService createKnowledgeableResilient( - KeyValueService kvs, - TaggedMetricRegistry metricRegistry, - TableReference tableRef, - TwoPhaseEncodingStrategy encodingStrategy, - Supplier acceptStagingReadsAsCommitted, - TransactionKnowledgeComponents knowledge) { - AtomicTable delegate = - getDelegate(kvs, tableRef, encodingStrategy, metricRegistry, acceptStagingReadsAsCommitted); - AtomicTable atomicTable = new KnowledgeableTimestampExtractingAtomicTable(delegate, knowledge); - return new SimpleTransactionService(atomicTable, delegate, encodingStrategy); - } - - private static ResilientCommitTimestampAtomicTable getDelegate( - KeyValueService kvs, - TableReference tableRef, - TwoPhaseEncodingStrategy encodingStrategy, - TaggedMetricRegistry metricRegistry, - Supplier acceptStagingReadsAsCommitted) { ConsensusForgettingStore store = InstrumentedConsensusForgettingStore.create( new PueConsensusForgettingStore(kvs, tableRef), metricRegistry); - return new ResilientCommitTimestampAtomicTable( + ResilientCommitTimestampAtomicTable delegate = new ResilientCommitTimestampAtomicTable( store, encodingStrategy, acceptStagingReadsAsCommitted, metricRegistry); + AtomicTable atomicTable = new TimestampExtractingAtomicTable(delegate); + return new SimpleTransactionService(atomicTable, delegate, encodingStrategy); } @Override diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/TransactionServices.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/TransactionServices.java index 780ebd9bb41..2e3b1a71306 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/TransactionServices.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/service/TransactionServices.java @@ -25,8 +25,6 @@ import com.palantir.atlasdb.keyvalue.api.CheckAndSetCompatibility; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.transaction.impl.TransactionConstants; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; -import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; import com.palantir.timestamp.TimestampService; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; @@ -40,28 +38,21 @@ private TransactionServices() { } public static TransactionService createTransactionService( - KeyValueService keyValueService, - TransactionSchemaManager transactionSchemaManager, - TransactionKnowledgeComponents knowledge) { + KeyValueService keyValueService, TransactionSchemaManager transactionSchemaManager) { // Should only be used for testing, or in contexts where users are not concerned about metrics return createTransactionService( - keyValueService, transactionSchemaManager, knowledge, new DefaultTaggedMetricRegistry(), () -> false); + keyValueService, transactionSchemaManager, new DefaultTaggedMetricRegistry(), () -> false); } public static TransactionService createTransactionService( KeyValueService keyValueService, TransactionSchemaManager transactionSchemaManager, - TransactionKnowledgeComponents knowledge, TaggedMetricRegistry metricRegistry, Supplier acceptStagingReadsOnVersionThree) { CheckAndSetCompatibility compatibility = keyValueService.getCheckAndSetCompatibility(); if (compatibility.supportsCheckAndSetOperations() && compatibility.supportsDetailOnFailure()) { return createSplitKeyTransactionService( - keyValueService, - transactionSchemaManager, - knowledge, - metricRegistry, - acceptStagingReadsOnVersionThree); + keyValueService, transactionSchemaManager, metricRegistry, acceptStagingReadsOnVersionThree); } return createV1TransactionService(keyValueService); } @@ -69,7 +60,6 @@ public static TransactionService createTransactionService( private static TransactionService createSplitKeyTransactionService( KeyValueService keyValueService, TransactionSchemaManager transactionSchemaManager, - TransactionKnowledgeComponents knowledge, TaggedMetricRegistry metricRegistry, Supplier acceptStagingReadsOnVersionThree) { // TODO (jkong): Is there a way to disallow DIRECT -> V2 transaction service in the map? @@ -81,10 +71,8 @@ private static TransactionService createSplitKeyTransactionService( TransactionConstants.TICKETS_ENCODING_TRANSACTIONS_SCHEMA_VERSION, createV2TransactionService(keyValueService), TransactionConstants.TWO_STAGE_ENCODING_TRANSACTIONS_SCHEMA_VERSION, - createV3TransactionService(keyValueService, metricRegistry, acceptStagingReadsOnVersionThree), - TransactionConstants.TTS_TRANSACTIONS_SCHEMA_VERSION, - createV4TransactionService( - keyValueService, knowledge, metricRegistry, acceptStagingReadsOnVersionThree)))); + createV3TransactionService( + keyValueService, metricRegistry, acceptStagingReadsOnVersionThree)))); } public static TransactionService createV1TransactionService(KeyValueService keyValueService) { @@ -104,15 +92,6 @@ private static TransactionService createV3TransactionService( SimpleTransactionService.createV3(keyValueService, metricRegistry, acceptStagingReadsAsCommitted))); } - private static TransactionService createV4TransactionService( - KeyValueService keyValueService, - TransactionKnowledgeComponents knowledge, - TaggedMetricRegistry metricRegistry, - Supplier acceptStagingReadsAsCommitted) { - return new PreStartHandlingTransactionService(SimpleTransactionService.createV4( - keyValueService, knowledge, metricRegistry, acceptStagingReadsAsCommitted)); - } - /** * This method should only be used to create {@link TransactionService}s for testing, because in production there * are intermediate services like the {@link CoordinationService} this creates where metrics or other forms of @@ -120,12 +99,9 @@ private static TransactionService createV4TransactionService( */ public static TransactionService createRaw( KeyValueService keyValueService, TimestampService timestampService, boolean initializeAsync) { - MetricsManager metricsManager = MetricsManagers.createForTests(); - TransactionKnowledgeComponents knowledge = - TransactionKnowledgeComponents.createForTests(keyValueService, metricsManager.getTaggedRegistry()); - CoordinationService coordinationService = - CoordinationServices.createDefault(keyValueService, timestampService, metricsManager, initializeAsync); - return createTransactionService(keyValueService, new TransactionSchemaManager(coordinationService), knowledge); + CoordinationService coordinationService = CoordinationServices.createDefault( + keyValueService, timestampService, MetricsManagers.createForTests(), initializeAsync); + return createTransactionService(keyValueService, new TransactionSchemaManager(coordinationService)); } /** diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTableTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTableTest.java index dada929d4c9..6fff5e219f3 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTableTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/atomic/KnowledgeableTimestampExtractingAtomicTableTest.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures; import com.palantir.atlasdb.transaction.impl.TransactionConstants; -import com.palantir.atlasdb.transaction.knowledge.ImmutableTransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.knowledge.KnownAbortedTransactions; import com.palantir.atlasdb.transaction.knowledge.KnownConcludedTransactions; import com.palantir.atlasdb.transaction.service.TransactionStatus; @@ -36,12 +35,7 @@ public class KnowledgeableTimestampExtractingAtomicTableTest { private final KnownAbortedTransactions knownAbortedTransactions = mock(KnownAbortedTransactions.class); private final KnowledgeableTimestampExtractingAtomicTable tsExtractingTable = new KnowledgeableTimestampExtractingAtomicTable( - delegate, - ImmutableTransactionKnowledgeComponents.builder() - .concluded(knownConcludedTransactions) - .aborted(knownAbortedTransactions) - .lastSeenCommitSupplier(() -> Long.MAX_VALUE) - .build()); + delegate, knownConcludedTransactions, knownAbortedTransactions); @Test public void canGetTsOfConcludedTxn() throws ExecutionException, InterruptedException { diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ShardProgressTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ShardProgressTest.java index f8ac491c4ec..c112cc3709a 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ShardProgressTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/queue/ShardProgressTest.java @@ -235,13 +235,13 @@ public void repeatedlyFailingCasThrowsForReset() { @Test public void initialLastSeenCommitTimestampIsEmpty() { - assertThat(progress.getMaybeLastSeenCommitTimestamp()).isEmpty(); + assertThat(progress.getLastSeenCommitTimestamp()).isEmpty(); } @Test public void canUpdateLastCommitTimestamp() { progress.updateLastSeenCommitTimestamp(CONSERVATIVE_TEN, 1024L); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).hasValue(1024L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(1024L); } @Test @@ -253,26 +253,26 @@ public void attemptingToDecreaseLastSeenCommitTimestampIsNoop() { progress.updateLastSeenCommitTimestamp(CONSERVATIVE_TEN, 512L); // checkAndSet is only called the one time verify(kvs).checkAndSet(any()); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).hasValue(1024L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(1024L); } @Test public void updatingLastSeenTimestampForOneShardUpdatesGlobalValue() { - assertThat(progress.getMaybeLastSeenCommitTimestamp()).isEmpty(); + assertThat(progress.getLastSeenCommitTimestamp()).isEmpty(); progress.updateLastSeenCommitTimestamp(CONSERVATIVE_TEN, 1024L); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).hasValue(1024L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(1024L); progress.updateLastSeenCommitTimestamp(CONSERVATIVE_TWENTY, 2048L); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).hasValue(2048L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(2048L); } @Test public void onlyUpdatesLastSeenCommitTsForConservative() { - assertThat(progress.getMaybeLastSeenCommitTimestamp()).isEmpty(); + assertThat(progress.getLastSeenCommitTimestamp()).isEmpty(); progress.updateLastSeenCommitTimestamp(CONSERVATIVE_TEN, 128L); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).hasValue(128L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(128L); progress.updateLastSeenCommitTimestamp(THOROUGH_TEN, 256L); } @@ -280,13 +280,13 @@ public void onlyUpdatesLastSeenCommitTsForConservative() { @Test public void updatingLastSeenCommitTimestampsDoesNotAffectShardsAndViceVersa() { assertThat(progress.getNumberOfShards()).isEqualTo(AtlasDbConstants.LEGACY_DEFAULT_TARGETED_SWEEP_SHARDS); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).isEmpty(); + assertThat(progress.getLastSeenCommitTimestamp()).isEmpty(); progress.updateNumberOfShards(64); progress.updateLastSeenCommitTimestamp(CONSERVATIVE_TEN, 32L); assertThat(progress.getNumberOfShards()).isEqualTo(64); - assertThat(progress.getMaybeLastSeenCommitTimestamp()).hasValue(32L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(32L); } @Test @@ -295,7 +295,7 @@ public void canUpdateProgressForNonSweepable() { assertThat(progress.getLastSweptTimestamp(NON_SWEEPABLE)).isEqualTo(150L); progress.updateLastSeenCommitTimestamp(NON_SWEEPABLE, 200L); - assertThat(progress.getLastSeenCommitTimestamp()).isEqualTo(200L); + assertThat(progress.getLastSeenCommitTimestamp()).hasValue(200L); } @Test diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CommitTimestampLoaderTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CommitTimestampLoaderTest.java deleted file mode 100644 index caa1635feb8..00000000000 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CommitTimestampLoaderTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.transaction.impl; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Futures; -import com.palantir.atlasdb.cache.TimestampCache; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.transaction.TransactionConfig; -import com.palantir.atlasdb.transaction.knowledge.ImmutableTransactionKnowledgeComponents; -import com.palantir.atlasdb.transaction.knowledge.KnownAbortedTransactions; -import com.palantir.atlasdb.transaction.knowledge.KnownConcludedTransactions; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; -import com.palantir.atlasdb.transaction.service.AsyncTransactionService; -import com.palantir.atlasdb.transaction.service.TransactionStatus; -import com.palantir.atlasdb.transaction.service.TransactionStatuses; -import com.palantir.atlasdb.util.MetricsManager; -import com.palantir.atlasdb.util.MetricsManagers; -import com.palantir.lock.v2.LockToken; -import com.palantir.lock.v2.TimelockService; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import org.junit.Test; - -public class CommitTimestampLoaderTest { - private static final TableReference TABLE_REF = TableReference.fromString("table"); - private final TimestampCache timestampCache = mock(TimestampCache.class); - private final TransactionConfig transactionConfig = mock(TransactionConfig.class); - private final MetricsManager metricsManager = MetricsManagers.createForTests(); - private final TimelockService timelockService = mock(TimelockService.class); - private final AsyncTransactionService transactionService = mock(AsyncTransactionService.class); - - private final KnownAbortedTransactions knownAbortedTransactions = mock(KnownAbortedTransactions.class); - - private final KnownConcludedTransactions knownConcludedTransactions = mock(KnownConcludedTransactions.class); - - private void setup(long startTs, long commitTs) { - TransactionStatus commitStatus = TransactionStatuses.committed(commitTs); - setup(startTs, commitStatus, false); - } - - private void setup(long startTs, TransactionStatus commitStatus, boolean isAborted) { - when(timestampCache.getCommitTimestampIfPresent(anyLong())).thenReturn(null); - when(knownAbortedTransactions.isKnownAborted(anyLong())).thenReturn(isAborted); - when(transactionService.getAsyncV2(startTs)).thenReturn(Futures.immediateFuture(commitStatus)); - } - - @Test - public void readOnlyDoesNotThrowForUnsweptTTSCell() throws ExecutionException, InterruptedException { - long transactionTs = 27l; - long startTs = 5l; - long commitTs = startTs + 1; - - setup(startTs, commitTs); - - // no immutableTs lock for read-only transaction - CommitTimestampLoader commitTimestampLoader = getCommitTsLoader(Optional.empty(), transactionTs, commitTs - 1); - - assertCanGetCommitTs(startTs, commitTs, commitTimestampLoader); - } - - @Test - public void throwIfTTSBeyondReadOnlyForSweptTTSCell() { - long transactionTs = 27l; - long startTs = 5l; - TransactionStatus commitStatus = TransactionStatuses.unknown(); - - setup(startTs, commitStatus, true); - - // no immutableTs lock for read-only transaction - CommitTimestampLoader commitTimestampLoader = - getCommitTsLoader(Optional.empty(), transactionTs, transactionTs + 1); - - assertThatExceptionOfType(ExecutionException.class) - .isThrownBy(() -> commitTimestampLoader - .getCommitTimestamps(TABLE_REF, ImmutableList.of(startTs), false, transactionService) - .get()) - .withRootCauseInstanceOf(SafeIllegalStateException.class) - .withMessageContaining("Sweep has swept some entries with a commit TS after us"); - } - - @Test - public void doNotThrowIfTTSBeyondReadOnlyTxnForNonTTSCell() throws ExecutionException, InterruptedException { - long transactionTs = 27l; - long startTs = 5l; - long commitTs = startTs + 1; - - setup(startTs, commitTs); - - // no immutableTs lock for read-only transaction - CommitTimestampLoader commitTimestampLoader = - getCommitTsLoader(Optional.empty(), transactionTs, transactionTs + 1); - - assertCanGetCommitTs(startTs, commitTs, commitTimestampLoader); - } - - @Test - public void doNotThrowIfTTSBeyondReadWriteTxnForTTSCell() throws ExecutionException, InterruptedException { - long transactionTs = 27l; - long startTs = 5l; - long commitTs = TransactionStatusUtils.getCommitTsForNonAbortedUnknownTransaction(startTs); - - setup(startTs, commitTs); - - LockToken lock = mock(LockToken.class); - - // no immutableTs lock for read-only transaction - CommitTimestampLoader commitTimestampLoader = - getCommitTsLoader(Optional.of(lock), transactionTs, transactionTs + 1); - - // the transaction will eventually throw at commit time. In this test we are only concerned with per read - // validation. - assertCanGetCommitTs(startTs, commitTs, commitTimestampLoader); - } - - @Test - public void doNotThrowIfTTSBeyondReadWriteTxnForNonTTSCell() throws ExecutionException, InterruptedException { - long transactionTs = 27l; - long startTs = 5l; - long commitTs = startTs + 1; - - setup(startTs, commitTs); - - LockToken lock = mock(LockToken.class); - - // no immutableTs lock for read-only transaction - CommitTimestampLoader commitTimestampLoader = getCommitTsLoader(Optional.of(lock), transactionTs, commitTs + 1); - assertCanGetCommitTs(startTs, commitTs, commitTimestampLoader); - } - - @Test - public void doesNotCacheUnknownTransactions() throws ExecutionException, InterruptedException { - long transactionTs = 27l; - - long startTsKnown = 5l; - long commitTsKnown = startTsKnown + 1; - - long startTsUnknown = 7l; - TransactionStatus commitUnknown = TransactionStatuses.unknown(); - - CommitTimestampLoader commitTimestampLoader = - getCommitTsLoader(Optional.empty(), transactionTs, transactionTs - 1); - - setup(startTsKnown, commitTsKnown); - // the transaction will eventually throw at commit time. In this test we are only concerned with per read - // validation. - assertCanGetCommitTs(startTsKnown, commitTsKnown, commitTimestampLoader); - verify(timestampCache).getCommitTimestampIfPresent(startTsKnown); - verify(timestampCache).putAlreadyCommittedTransaction(startTsKnown, commitTsKnown); - - setup(startTsUnknown, commitUnknown, false); - assertCanGetCommitTs( - startTsUnknown, - TransactionStatusUtils.getCommitTsForNonAbortedUnknownTransaction(startTsUnknown), - commitTimestampLoader); - verify(timestampCache).getCommitTimestampIfPresent(startTsUnknown); - verifyNoMoreInteractions(timestampCache); - } - - private void assertCanGetCommitTs(long startTs, long commitTs, CommitTimestampLoader commitTimestampLoader) - throws InterruptedException, ExecutionException { - Map loadedCommitTs = commitTimestampLoader - .getCommitTimestamps(TABLE_REF, ImmutableList.of(startTs), false, transactionService) - .get(); - assertThat(loadedCommitTs).hasSize(1); - assertThat(loadedCommitTs.get(startTs)).isEqualTo(commitTs); - } - - private CommitTimestampLoader getCommitTsLoader( - Optional lock, long transactionTs, long lastSeenCommitTs) { - createKnowledgeComponents(lastSeenCommitTs); - CommitTimestampLoader commitTimestampLoader = new CommitTimestampLoader( - timestampCache, - lock, // commitTsLoader does not care if the lock expires. - () -> transactionTs, - () -> transactionConfig, - metricsManager, - timelockService, - 1l, - createKnowledgeComponents(lastSeenCommitTs)); - return commitTimestampLoader; - } - - private TransactionKnowledgeComponents createKnowledgeComponents(long lastSeenCommitTs) { - return ImmutableTransactionKnowledgeComponents.builder() - .aborted(knownAbortedTransactions) - .concluded(knownConcludedTransactions) - .lastSeenCommitSupplier(() -> lastSeenCommitTs) - .build(); - } -} diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractBackgroundSweeperIntegrationTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractBackgroundSweeperIntegrationTest.java index 1bfdeb80197..d5c336f6a4b 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractBackgroundSweeperIntegrationTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractBackgroundSweeperIntegrationTest.java @@ -16,14 +16,10 @@ package com.palantir.atlasdb.sweep; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.atlasdb.internalschema.TransactionSchemaManager; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.api.RangeRequest; @@ -79,7 +75,6 @@ public abstract class AbstractBackgroundSweeperIntegrationTest { .maxCellTsPairsToExamine(1000) .build(); protected TransactionService txService; - protected TransactionSchemaManager txSchemaManager = mock(TransactionSchemaManager.class); SpecificTableSweeper specificTableSweeper; AdjustableSweepBatchConfigSource sweepBatchConfigSource; PeriodicTrueSupplier skipCellVersion = new PeriodicTrueSupplier(); @@ -131,7 +126,6 @@ public void setup() { SweepOutcomeMetrics.registerLegacy(metricsManager), new CountDownLatch(1), 0); - when(txSchemaManager.getTransactionsSchemaVersion(anyLong())).thenReturn(1); } @After diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTest.java index 66c8f8f7d30..93bd76ceeef 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTest.java @@ -527,6 +527,6 @@ protected void createTable(TableReference tableReference, SweepStrategy sweepStr } protected Optional getLastSeenCommitTimestamp() { - return shardProgress.getMaybeLastSeenCommitTimestamp(); + return shardProgress.getLastSeenCommitTimestamp(); } } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java index 879f6ef7ba1..8c18452901b 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java @@ -33,7 +33,6 @@ import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers; import com.palantir.atlasdb.transaction.impl.TransactionTables; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; import com.palantir.atlasdb.util.MetricsManager; @@ -74,8 +73,6 @@ public static TransactionManager setupTxManager( ConflictDetectionManager cdm = ConflictDetectionManagers.createWithoutWarmingCache(kvs); Cleaner cleaner = new NoOpCleaner(); MultiTableSweepQueueWriter writer = TargetedSweeper.createUninitializedForTest(() -> 1); - TransactionKnowledgeComponents knowledge = - TransactionKnowledgeComponents.createForTests(kvs, metricsManager.getTaggedRegistry()); TransactionManager txManager = SerializableTransactionManager.createForTest( metricsManager, kvs, @@ -90,8 +87,7 @@ public static TransactionManager setupTxManager( cleaner, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, - writer, - knowledge); + writer); setupTables(kvs); writer.initialize(txManager); return txManager; diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java index 5c222907ab5..7567c2ca14e 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java @@ -124,8 +124,7 @@ protected TransactionManager createManager() { NoOpCleaner.INSTANCE, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, - sweepQueue, - knowledge); + sweepQueue); sweepQueue.initialize(txManager); return txManager; } @@ -166,8 +165,7 @@ private Transaction startTransactionWithOptions(TransactionOptions options) { true, () -> ImmutableTransactionConfig.builder().build(), ConflictTracer.NO_OP, - new SimpleTableLevelMetricsController(metricsManager), - knowledge) { + new SimpleTableLevelMetricsController(metricsManager)) { @Override protected Map transformGetsForTesting(Map map) { return Maps.transformValues(map, byte[]::clone); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java index 13e9190f657..5a7fca6dbbc 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java @@ -184,8 +184,7 @@ private Transaction startTransaction(PreCommitCondition preCommitCondition, Conf true, () -> TRANSACTION_CONFIG, ConflictTracer.NO_OP, - new SimpleTableLevelMetricsController(metricsManager), - knowledge) { + new SimpleTableLevelMetricsController(metricsManager)) { @Override protected Map transformGetsForTesting(Map map) { return Maps.transformValues(map, byte[]::clone); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java index 657f992b8cb..f273027a627 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java @@ -36,7 +36,6 @@ import com.palantir.atlasdb.transaction.api.Transaction; import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.lock.LockService; @@ -70,7 +69,6 @@ public TestTransactionManagerImpl( SweepStrategyManager sweepStrategyManager, TimestampCache timestampCache, MultiTableSweepQueueWriter sweepQueue, - TransactionKnowledgeComponents knowledge, ExecutorService deleteExecutor) { this( metricsManager, @@ -84,8 +82,7 @@ public TestTransactionManagerImpl( sweepQueue, deleteExecutor, WrapperWithTracker.TRANSACTION_NO_OP, - WrapperWithTracker.KEY_VALUE_SERVICE_NO_OP, - knowledge); + WrapperWithTracker.KEY_VALUE_SERVICE_NO_OP); } @SuppressWarnings("Indentation") // Checkstyle complains about lambda in constructor. @@ -97,8 +94,7 @@ public TestTransactionManagerImpl( LockService lockService, LockWatchManagerInternal lockWatchManager, TransactionService transactionService, - AtlasDbConstraintCheckingMode constraintCheckingMode, - TransactionKnowledgeComponents knowledge) { + AtlasDbConstraintCheckingMode constraintCheckingMode) { super( metricsManager, createAssertKeyValue(keyValueService, lockService), @@ -121,8 +117,7 @@ public TestTransactionManagerImpl( () -> TRANSACTION_CONFIG, ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); this.transactionWrapper = WrapperWithTracker.TRANSACTION_NO_OP; this.keyValueServiceWrapper = WrapperWithTracker.KEY_VALUE_SERVICE_NO_OP; } @@ -140,8 +135,7 @@ public TestTransactionManagerImpl( MultiTableSweepQueueWriter sweepQueue, ExecutorService deleteExecutor, WrapperWithTracker transactionWrapper, - WrapperWithTracker keyValueServiceWrapper, - TransactionKnowledgeComponents knowledge) { + WrapperWithTracker keyValueServiceWrapper) { super( metricsManager, createAssertKeyValue(keyValueService, lockService), @@ -164,8 +158,7 @@ public TestTransactionManagerImpl( () -> TRANSACTION_CONFIG, ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); this.transactionWrapper = transactionWrapper; this.keyValueServiceWrapper = keyValueServiceWrapper; } @@ -223,8 +216,7 @@ protected CallbackAwareTransaction createTransaction( validateLocksOnReads, transactionConfig, ConflictTracer.NO_OP, - tableLevelMetricsController, - knowledge), + tableLevelMetricsController), pathTypeTracker); } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java index cebb97ae747..ae56613cffd 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java @@ -24,7 +24,6 @@ import com.palantir.atlasdb.cache.TimestampCache; import com.palantir.atlasdb.coordination.CoordinationService; import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.internalschema.ImmutableInternalSchemaInstallConfig; import com.palantir.atlasdb.internalschema.InternalSchemaMetadata; import com.palantir.atlasdb.internalschema.TransactionSchemaManager; import com.palantir.atlasdb.internalschema.persistence.CoordinationServices; @@ -45,7 +44,6 @@ import com.palantir.atlasdb.transaction.api.ConflictHandler; import com.palantir.atlasdb.transaction.api.Transaction; import com.palantir.atlasdb.transaction.api.TransactionManager; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; @@ -122,8 +120,6 @@ public static void storageTearDown() throws Exception { protected TimestampCache timestampCache; - protected TransactionKnowledgeComponents knowledge; - @Rule public InMemoryTimeLockRule inMemoryTimeLockRule = new InMemoryTimeLockRule(); @@ -180,11 +176,7 @@ public void setUp() { CoordinationService coordinationService = CoordinationServices.createDefault(keyValueService, timestampService, metricsManager, false); transactionSchemaManager = new TransactionSchemaManager(coordinationService); - knowledge = TransactionKnowledgeComponents.create( - keyValueService, - metricsManager.getTaggedRegistry(), - ImmutableInternalSchemaInstallConfig.builder().build()); - transactionService = createTransactionService(keyValueService, transactionSchemaManager, knowledge); + transactionService = createTransactionService(keyValueService, transactionSchemaManager); conflictDetectionManager = ConflictDetectionManagers.createWithoutWarmingCache(keyValueService); sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService); txMgr = createAndRegisterManager(); @@ -215,7 +207,6 @@ protected TransactionManager createManager() { sweepStrategyManager, timestampCache, MultiTableSweepQueueWriter.NO_OP, - knowledge, MoreExecutors.newDirectExecutorService()); } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java index 825b784cb44..71b29aea9e6 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java @@ -37,7 +37,6 @@ import com.palantir.atlasdb.transaction.impl.TestTransactionManager; import com.palantir.atlasdb.transaction.impl.TestTransactionManagerImpl; import com.palantir.atlasdb.transaction.impl.TransactionTables; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; import com.palantir.atlasdb.util.AtlasDbMetrics; @@ -75,8 +74,6 @@ public class AtlasDbTestCase { protected SpecialTimestampsSupplier sweepTimestampSupplier; protected int sweepQueueShards = 128; - protected TransactionKnowledgeComponents knowledge; - @Rule public InMemoryTimeLockRule inMemoryTimeLockRule = new InMemoryTimeLockRule(CLIENT); @@ -91,8 +88,8 @@ public void setUp() throws Exception { transactionService = spy(TransactionServices.createRaw(keyValueService, timestampService, false)); conflictDetectionManager = ConflictDetectionManagers.createWithoutWarmingCache(keyValueService); sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService); + sweepQueue = spy(TargetedSweeper.createUninitializedForTest(() -> sweepQueueShards)); - knowledge = TransactionKnowledgeComponents.createForTests(keyValueService, metricsManager.getTaggedRegistry()); setUpTransactionManagers(); sweepQueue.initialize(serializableTxManager); sweepTimestampSupplier = new SpecialTimestampsSupplier( @@ -120,7 +117,6 @@ protected TestTransactionManager constructTestTransactionManager() { sweepStrategyManager, DefaultTimestampCache.createForTests(), sweepQueue, - knowledge, MoreExecutors.newDirectExecutorService()); } @@ -159,8 +155,7 @@ protected void setConstraintCheckingMode(AtlasDbConstraintCheckingMode mode) { lockService, inMemoryTimeLockRule.getLockWatchManager(), transactionService, - mode, - TransactionKnowledgeComponents.createForTests(keyValueService, metricsManager.getTaggedRegistry())); + mode); } protected void clearTablesWrittenTo() { diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java index a42e5f3dc52..69f4e898021 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java @@ -39,7 +39,6 @@ import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.transaction.service.TransactionServices; import com.palantir.atlasdb.util.MetricsManager; @@ -65,16 +64,12 @@ public class TableTasksTest { private TransactionManager txManager; private TransactionService txService; - private TransactionKnowledgeComponents knowledge; - @Rule public InMemoryTimeLockRule inMemoryTimeLockRule = new InMemoryTimeLockRule(); @Before public void setup() { kvs = new InMemoryKeyValueService(true); - metricsManager = MetricsManagers.createForTests(); - knowledge = TransactionKnowledgeComponents.createForTests(kvs, metricsManager.getTaggedRegistry()); LockClient lockClient = LockClient.of("sweep client"); lockService = LockServiceImpl.create( @@ -85,6 +80,7 @@ public void setup() { ConflictDetectionManager cdm = ConflictDetectionManagers.createWithoutWarmingCache(kvs); SweepStrategyManager ssm = SweepStrategyManagers.createDefault(kvs); Cleaner cleaner = new NoOpCleaner(); + metricsManager = MetricsManagers.createForTests(); txManager = SerializableTransactionManager.createForTest( metricsManager, kvs, @@ -99,8 +95,7 @@ public void setup() { cleaner, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, - MultiTableSweepQueueWriter.NO_OP, - knowledge); + MultiTableSweepQueueWriter.NO_OP); } @After diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/TableMigratorTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/TableMigratorTest.java index 77c2a8ad2f9..bec2d3d7ddf 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/TableMigratorTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/TableMigratorTest.java @@ -40,9 +40,9 @@ import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers; import com.palantir.atlasdb.transaction.impl.TestTransactionManagerImpl; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; +import com.palantir.common.base.AbortingVisitor; import com.palantir.common.base.AbortingVisitors; import com.palantir.common.base.BatchingVisitable; import com.palantir.common.concurrent.PTExecutors; @@ -101,7 +101,6 @@ public void testMigrationToDifferentKvs() throws TableMappingNotFoundException { ssm2, DefaultTimestampCache.createForTests(), MultiTableSweepQueueWriter.NO_OP, - TransactionKnowledgeComponents.createForTests(kvs2, metricsManager.getTaggedRegistry()), MoreExecutors.newDirectExecutorService()); kvs2.createTable(tableRef, definition.toTableMetadata().persistToBytes()); kvs2.createTable(namespacedTableRef, definition.toTableMetadata().persistToBytes()); @@ -139,20 +138,23 @@ public void testMigrationToDifferentKvs() throws TableMappingNotFoundException { verifySsm, DefaultTimestampCache.createForTests(), MultiTableSweepQueueWriter.NO_OP, - TransactionKnowledgeComponents.createForTests(kvs2, metricsManager.getTaggedRegistry()), MoreExecutors.newDirectExecutorService()); final MutableLong count = new MutableLong(); for (final TableReference name : Lists.newArrayList(tableRef, namespacedTableRef)) { verifyTxManager.runTaskReadOnly((TransactionTask) txn -> { BatchingVisitable> bv = txn.getRange(name, RangeRequest.all()); - bv.batchAccept(1000, AbortingVisitors.batching(item -> { - Iterable> cells = item.getCells(); - Map.Entry entry = Iterables.getOnlyElement(cells); - assertThat(entry.getKey()).isEqualTo(theCell); - assertThat(entry.getValue()).isEqualTo(theValue); - count.increment(); - return true; - })); + bv.batchAccept( + 1000, AbortingVisitors.batching(new AbortingVisitor, RuntimeException>() { + @Override + public boolean visit(RowResult item) throws RuntimeException { + Iterable> cells = item.getCells(); + Map.Entry entry = Iterables.getOnlyElement(cells); + assertThat(entry.getKey()).isEqualTo(theCell); + assertThat(entry.getValue()).isEqualTo(theValue); + count.increment(); + return true; + } + })); return null; }); } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java index 9b29c07cbb1..686300e1140 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java @@ -39,13 +39,13 @@ import com.palantir.atlasdb.transaction.api.KeyValueServiceStatus; import com.palantir.atlasdb.transaction.api.TransactionManager; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; -import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; import com.palantir.common.concurrent.PTExecutors; import com.palantir.exception.NotInitializedException; import com.palantir.lock.v2.TimelockService; +import com.palantir.lock.watch.LockWatchEventCache; +import com.palantir.lock.watch.NoOpLockWatchEventCache; import com.palantir.timestamp.TimestampManagementService; import java.time.Duration; import java.util.Optional; @@ -70,16 +70,12 @@ public class SerializableTransactionManagerTest { private DeterministicScheduler executorService; private TransactionManager manager; - private MetricsManager metricsManager = MetricsManagers.createForTests(); - private TransactionKnowledgeComponents knowledge; - @Before public void setUp() { nothingInitialized(); executorService = new DeterministicSchedulerWithShutdownFlag(); manager = getManagerWithCallback(true, mockCallback, executorService); when(mockKvs.getClusterAvailabilityStatus()).thenReturn(ClusterAvailabilityStatus.ALL_AVAILABLE); - knowledge = TransactionKnowledgeComponents.createForTests(mockKvs, metricsManager.getTaggedRegistry()); } @Test @@ -273,8 +269,9 @@ public void callbackCanCallTmMethodsEvenThoughTmStillThrows() { private TransactionManager getManagerWithCallback( boolean initializeAsync, Callback callBack, ScheduledExecutorService executor) { + LockWatchEventCache lockWatchEventCache = NoOpLockWatchEventCache.create(); return SerializableTransactionManager.create( - metricsManager, + MetricsManagers.createForTests(), mockKvs, mockTimelockService, NoOpLockWatchManager.create(), @@ -298,8 +295,7 @@ private TransactionManager getManagerWithCallback( () -> ImmutableTransactionConfig.builder().build(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); } private void nothingInitialized() { diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java index ad34c7deafb..0bff38a529b 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java @@ -41,7 +41,6 @@ import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode; import com.palantir.atlasdb.transaction.api.OpenTransaction; import com.palantir.atlasdb.transaction.impl.metrics.DefaultMetricsFilterEvaluationContext; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; @@ -82,9 +81,6 @@ public class SnapshotTransactionManagerTest { private ManagedTimestampService timestampService; private SnapshotTransactionManager snapshotTransactionManager; - private final TransactionKnowledgeComponents knowledge = - TransactionKnowledgeComponents.createForTests(keyValueService, metricsManager.getTaggedRegistry()); - @Before public void setUp() { timestampService = services.getManagedTimestampService(); @@ -110,8 +106,7 @@ public void setUp() { () -> ImmutableTransactionConfig.builder().build(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); } @Test @@ -167,8 +162,7 @@ public void canCloseTransactionManagerWithNonCloseableLockService() { () -> ImmutableTransactionConfig.builder().build(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); newTransactionManager.close(); // should not throw } @@ -298,7 +292,6 @@ private SnapshotTransactionManager createSnapshotTransactionManager( .build(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); } } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java index 799262e1d60..4cfddc48ac6 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java @@ -355,8 +355,7 @@ protected TestTransactionManager constructTestTransactionManager() { sweepQueue, MoreExecutors.newDirectExecutorService(), transactionWrapper, - keyValueServiceWrapper, - knowledge); + keyValueServiceWrapper); } @Test @@ -424,11 +423,10 @@ public void testLockAfterGet() throws Exception { }); PathTypeTracker pathTypeTracker = PathTypeTrackers.constructSynchronousTracker(); - KeyValueService kvs = keyValueServiceWrapper.apply(kvMock, pathTypeTracker); Transaction snapshot = transactionWrapper.apply( new SnapshotTransaction( metricsManager, - kvs, + keyValueServiceWrapper.apply(kvMock, pathTypeTracker), inMemoryTimeLockRule.getLegacyTimelockService(), NoOpLockWatchManager.create(), transactionService, @@ -451,8 +449,7 @@ public void testLockAfterGet() throws Exception { true, () -> transactionConfig, ConflictTracer.NO_OP, - tableLevelMetricsController, - knowledge), + tableLevelMetricsController), pathTypeTracker); assertThatThrownBy(() -> snapshot.get(TABLE, ImmutableSet.of(cell))).isInstanceOf(RuntimeException.class); @@ -483,8 +480,7 @@ public void testTransactionAtomicity() throws Exception { sweepQueue, MoreExecutors.newDirectExecutorService(), transactionWrapper, - keyValueServiceWrapper, - knowledge); + keyValueServiceWrapper); ScheduledExecutorService service = PTExecutors.newScheduledThreadPool(20); @@ -970,8 +966,7 @@ public void transactionDeletesAsyncOnRollback() { sweepQueue, executor, transactionWrapper, - keyValueServiceWrapper, - knowledge); + keyValueServiceWrapper); Supplier conditionSupplier = mock(Supplier.class); when(conditionSupplier.get()).thenReturn(ALWAYS_FAILS_CONDITION).thenReturn(PreCommitConditions.NO_OP); @@ -2282,8 +2277,7 @@ private Transaction getSnapshotTransactionWith( validateLocksOnReads, () -> transactionConfig, ConflictTracer.NO_OP, - tableLevelMetricsController, - knowledge); + tableLevelMetricsController); return transactionWrapper.apply(transaction, pathTypeTracker); } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java index 046b46475dc..a45040bd48d 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java @@ -133,8 +133,7 @@ public void shouldNotMakeRemoteCallsInAReadOnlyTransactionIfNoWorkIsDone() { NoOpCleaner.INSTANCE, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, - MultiTableSweepQueueWriter.NO_OP, - knowledge); + MultiTableSweepQueueWriter.NO_OP); // fetch an immutable timestamp once so it's cached when(mockTimeLockService.getImmutableTimestamp()).thenReturn(1L); @@ -266,8 +265,7 @@ private TransactionManager setupTransactionManager() { () -> ImmutableTransactionConfig.builder().build(), ConflictTracer.NO_OP, DefaultMetricsFilterEvaluationContext.createDefault(), - Optional.empty(), - knowledge); + Optional.empty()); when(timelock.getFreshTimestamp()).thenReturn(1L); when(timelock.lockImmutableTimestamp()) diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/service/TransactionServicesTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/service/TransactionServicesTest.java index d37cdc2669c..06e7dfc3a9e 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/service/TransactionServicesTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/service/TransactionServicesTest.java @@ -39,8 +39,6 @@ import com.palantir.atlasdb.transaction.impl.TransactionConstants; import com.palantir.atlasdb.transaction.impl.TransactionStatusUtils; import com.palantir.atlasdb.transaction.impl.TransactionTables; -import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; -import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; import com.palantir.timelock.paxos.InMemoryTimeLockRule; import com.palantir.timestamp.TimestampManagementService; @@ -60,8 +58,6 @@ public class TransactionServicesTest { private CoordinationService coordinationService; private TransactionService transactionService; - private TransactionKnowledgeComponents knowledge; - private long startTs; private long commitTs; @@ -71,14 +67,11 @@ public class TransactionServicesTest { @Before public void setUp() { TransactionTables.createTables(keyValueService); - MetricsManager metricsManager = MetricsManagers.createForTests(); - timestampService = services.getTimestampService(); - coordinationService = - CoordinationServices.createDefault(keyValueService, timestampService, metricsManager, false); - knowledge = TransactionKnowledgeComponents.createForTests(keyValueService, metricsManager.getTaggedRegistry()); + coordinationService = CoordinationServices.createDefault( + keyValueService, timestampService, MetricsManagers.createForTests(), false); transactionService = TransactionServices.createTransactionService( - keyValueService, new TransactionSchemaManager(coordinationService), knowledge); + keyValueService, new TransactionSchemaManager(coordinationService)); } @Test diff --git a/changelog/@unreleased/pr-6254.v2.yml b/changelog/@unreleased/pr-6254.v2.yml new file mode 100644 index 00000000000..db92e5c51be --- /dev/null +++ b/changelog/@unreleased/pr-6254.v2.yml @@ -0,0 +1,5 @@ +type: fix +fix: + description: Revert "[TTS] Readonly transactions on TTS (#6212)" + links: + - https://github.com/palantir/atlasdb/pull/6254 diff --git a/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/PaxosResourcesFactory.java b/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/PaxosResourcesFactory.java index 98897706c74..c3407ad4a02 100644 --- a/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/PaxosResourcesFactory.java +++ b/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/PaxosResourcesFactory.java @@ -154,7 +154,6 @@ private static PaxosResources configureLeaderForAllClients( LeaderAcceptorResource leaderAcceptorResource = new LeaderAcceptorResource(factory.components().acceptor(PaxosUseCase.PSEUDO_LEADERSHIP_CLIENT)); - return resourcesBuilder .leadershipContextFactory(factory) .putLeadershipBatchComponents(PaxosUseCase.LEADER_FOR_ALL_CLIENTS, factory.components())