Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[PDS-88514 / _transactions2 Part 44] Instrument CoordinationStore in …
Browse files Browse the repository at this point in the history
…addition to CoordinationService (#3953)

* Instrument the coordination store (not just the service)

* Fix test breaks

* Changelog
  • Loading branch information
jeremyk-91 authored Apr 23, 2019
1 parent fbdfdd5 commit d829c5f
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,11 @@ private CoordinationService<InternalSchemaMetadata> getSchemaMetadataCoordinatio
MetricsManager metricsManager,
LockAndTimestampServices lockAndTimestampServices,
KeyValueService keyValueService) {
@SuppressWarnings("unchecked") // Coordination service clearly has this type.
CoordinationService<InternalSchemaMetadata> metadataCoordinationService = AtlasDbMetrics.instrument(
metricsManager.getRegistry(),
CoordinationService.class,
CoordinationServices.createDefault(
keyValueService,
lockAndTimestampServices.timestamp(),
config().initializeAsync()));
CoordinationService<InternalSchemaMetadata> metadataCoordinationService = CoordinationServices.createDefault(
keyValueService,
lockAndTimestampServices.timestamp(),
metricsManager,
config().initializeAsync());
MetadataCoordinationServiceMetrics.registerMetrics(
metricsManager,
metadataCoordinationService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ public SweepStrategyManager provideSweepStrategyManager(@Named("kvs") KeyValueSe
public CoordinationService<InternalSchemaMetadata> provideMetadataCoordinationService(
@Named("kvs") KeyValueService kvs,
TimestampService ts,
ServicesConfig config) {
return CoordinationServices.createDefault(kvs, ts, config.atlasDbConfig().initializeAsync());
ServicesConfig config,
MetricsManager metricsManager) {
return CoordinationServices.createDefault(kvs, ts, metricsManager, config.atlasDbConfig().initializeAsync());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.timestamp.TimestampService;

public final class SimpleCoordinationResource implements CoordinationResource {
Expand All @@ -48,9 +49,10 @@ public static CoordinationResource create(TransactionManager transactionManager)
return new SimpleCoordinationResource(transactionManager,
new TransactionSchemaManager(
CoordinationServices.createDefault(
transactionManager.getKeyValueService(),
transactionManager.getTimestampService(),
false)));
transactionManager.getKeyValueService(),
transactionManager.getTimestampService(),
MetricsManagers.createForTests(),
false)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.coordination.CoordinationService;
import com.palantir.atlasdb.coordination.CoordinationServiceImpl;
import com.palantir.atlasdb.coordination.CoordinationStore;
import com.palantir.atlasdb.coordination.TransformingCoordinationService;
import com.palantir.atlasdb.coordination.keyvalue.KeyValueServiceCoordinationStore;
import com.palantir.atlasdb.internalschema.InternalSchemaMetadata;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.conjure.java.serialization.ObjectMappers;
import com.palantir.timestamp.TimestampService;

Expand All @@ -33,33 +36,62 @@ private CoordinationServices() {
// factory
}

public static CoordinationService<InternalSchemaMetadata> wrapHidingVersionSerialization(
CoordinationService<VersionedInternalSchemaMetadata> rawCoordinationService) {
return new TransformingCoordinationService<>(
rawCoordinationService,
InternalSchemaMetadataPayloadCodec::decode,
InternalSchemaMetadataPayloadCodec::encode);
}

public static CoordinationService<InternalSchemaMetadata> createDefault(
KeyValueService keyValueService,
TimestampService timestampService,
MetricsManager metricsManager,
boolean initializeAsync) {
return createDefault(keyValueService, timestampService::getFreshTimestamp, initializeAsync);
return createDefault(keyValueService, timestampService::getFreshTimestamp, metricsManager, initializeAsync);
}

public static CoordinationService<InternalSchemaMetadata> createDefault(
KeyValueService keyValueService,
LongSupplier timestampSupplier,
MetricsManager metricsManager,
boolean initializeAsync) {
CoordinationService<VersionedInternalSchemaMetadata> versionedService = new CoordinationServiceImpl<>(
KeyValueServiceCoordinationStore.create(
ObjectMappers.newServerObjectMapper(),
keyValueService,
AtlasDbConstants.DEFAULT_METADATA_COORDINATION_KEY,
timestampSupplier,
VersionedInternalSchemaMetadata.class,
initializeAsync));
return wrapHidingVersionSerialization(versionedService);
createCoordinationStore(keyValueService, timestampSupplier, metricsManager, initializeAsync));

@SuppressWarnings("unchecked") // The service has the same type as the version-hiding service.
CoordinationService<InternalSchemaMetadata> instrumentedService = AtlasDbMetrics.instrument(
metricsManager.getRegistry(),
CoordinationService.class,
wrapHidingVersionSerialization(versionedService));
return instrumentedService;
}

private static CoordinationStore<VersionedInternalSchemaMetadata> createCoordinationStore(
KeyValueService keyValueService,
LongSupplier timestampSupplier,
MetricsManager metricsManager,
boolean initializeAsync) {
CoordinationStore<VersionedInternalSchemaMetadata> rawCoordinationStore
= createRawCoordinationStore(keyValueService, timestampSupplier, initializeAsync);

@SuppressWarnings("unchecked") // The store has the same type as the rawCoordinationStore.
CoordinationStore<VersionedInternalSchemaMetadata> store = AtlasDbMetrics.instrument(
metricsManager.getRegistry(),
CoordinationStore.class,
rawCoordinationStore);
return store;
}

private static CoordinationStore<VersionedInternalSchemaMetadata> createRawCoordinationStore(
KeyValueService keyValueService, LongSupplier timestampSupplier, boolean initializeAsync) {
return KeyValueServiceCoordinationStore.create(
ObjectMappers.newServerObjectMapper(),
keyValueService,
AtlasDbConstants.DEFAULT_METADATA_COORDINATION_KEY,
timestampSupplier,
VersionedInternalSchemaMetadata.class,
initializeAsync);
}

private static CoordinationService<InternalSchemaMetadata> wrapHidingVersionSerialization(
CoordinationService<VersionedInternalSchemaMetadata> rawCoordinationService) {
return new TransformingCoordinationService<>(
rawCoordinationService,
InternalSchemaMetadataPayloadCodec::decode,
InternalSchemaMetadataPayloadCodec::encode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.palantir.atlasdb.keyvalue.api.CheckAndSetCompatibility;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.timestamp.TimestampService;

Expand Down Expand Up @@ -71,12 +73,14 @@ private static TransactionService createV2TransactionService(KeyValueService key
public static TransactionService createRaw(
KeyValueService keyValueService, TimestampService timestampService, boolean initializeAsync) {
CoordinationService<InternalSchemaMetadata> coordinationService
= CoordinationServices.createDefault(keyValueService, timestampService, initializeAsync);
= CoordinationServices.createDefault(
keyValueService, timestampService, MetricsManagers.createForTests(), initializeAsync);
return createTransactionService(keyValueService, new TransactionSchemaManager(coordinationService));
}

public static TransactionService createReadOnlyTransactionServiceIgnoresUncommittedTransactionsDoesNotRollBack(
KeyValueService keyValueService) {
KeyValueService keyValueService,
MetricsManager metricsManager) {
if (keyValueService.supportsCheckAndSet()) {
CoordinationService<InternalSchemaMetadata> coordinationService = CoordinationServices.createDefault(
keyValueService,
Expand All @@ -85,6 +89,7 @@ public static TransactionService createReadOnlyTransactionServiceIgnoresUncommit
+ " transaction service! This is probably a product bug. Please contact"
+ " support.");
},
metricsManager,
false);
ReadOnlyTransactionSchemaManager readOnlyTransactionSchemaManager
= new ReadOnlyTransactionSchemaManager(coordinationService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.palantir.atlasdb.internalschema.persistence.CoordinationServices;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.timestamp.InMemoryTimestampService;
import com.palantir.timestamp.TimestampService;

Expand All @@ -64,7 +65,7 @@ public void multipleManagersInAgreementAfterAggressiveConcurrentUpdates() {

private void scheduleTasksAndValidateSnapshots(int numRequests, int numManagers) {
List<TransactionSchemaManager> managers = IntStream.range(0, numManagers)
.mapToObj(this::createTransactionSchemaManager)
.mapToObj(unused -> createTransactionSchemaManager())
.collect(Collectors.toList());

List<Future> futures = Lists.newArrayList();
Expand All @@ -77,8 +78,13 @@ private void scheduleTasksAndValidateSnapshots(int numRequests, int numManagers)
validateSnapshots(snapshots);
}

private TransactionSchemaManager createTransactionSchemaManager(int unused) {
return new TransactionSchemaManager(CoordinationServices.createDefault(kvs, timestampService, false));
private TransactionSchemaManager createTransactionSchemaManager() {
return new TransactionSchemaManager(
CoordinationServices.createDefault(
kvs,
timestampService,
MetricsManagers.createForTests(),
false));
}

private static TransactionSchemaManager getRandomManager(List<TransactionSchemaManager> managers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@
import org.junit.Test;

import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.coordination.CoordinationServiceImpl;
import com.palantir.atlasdb.coordination.keyvalue.KeyValueServiceCoordinationStore;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.internalschema.persistence.CoordinationServices;
import com.palantir.atlasdb.internalschema.persistence.VersionedInternalSchemaMetadata;
import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService;
import com.palantir.conjure.java.serialization.ObjectMappers;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.timestamp.InMemoryTimestampService;
import com.palantir.timestamp.TimestampService;

Expand Down Expand Up @@ -81,16 +77,13 @@ public void throwsIfTryingToGetAnImpossibleTimestamp() {
.hasMessageContaining("was never given out by the timestamp service");
}

private static TransactionSchemaManager createTransactionSchemaManager(TimestampService ts) {
CoordinationServiceImpl<VersionedInternalSchemaMetadata> rawService = new CoordinationServiceImpl<>(
KeyValueServiceCoordinationStore.create(
ObjectMappers.newServerObjectMapper(),
private TransactionSchemaManager createTransactionSchemaManager(TimestampService ts) {
return new TransactionSchemaManager(
CoordinationServices.createDefault(
new InMemoryKeyValueService(true),
PtBytes.toBytes("aaa"),
ts::getFreshTimestamp,
VersionedInternalSchemaMetadata.class,
timestamps,
MetricsManagers.createForTests(),
false));
return new TransactionSchemaManager(CoordinationServices.wrapHidingVersionSerialization(rawService));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.keyvalue.impl.InMemoryKeyValueService;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.timestamp.InMemoryTimestampService;

public class ReadOnlyTransactionServiceIntegrationTest {
Expand All @@ -34,7 +35,7 @@ public class ReadOnlyTransactionServiceIntegrationTest {
= TransactionServices.createRaw(keyValueService, timestampService, false);
private final TransactionService readOnlyTransactionService
= TransactionServices.createReadOnlyTransactionServiceIgnoresUncommittedTransactionsDoesNotRollBack(
keyValueService);
keyValueService, MetricsManagers.createForTests());

@Test
public void canReadAlreadyAgreedValuesEvenAfterAdditionalCoordinations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@
import com.palantir.atlasdb.transaction.encoding.V1EncodingStrategy;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.transaction.impl.TransactionTables;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.timestamp.InMemoryTimestampService;
import com.palantir.timestamp.TimestampManagementService;
import com.palantir.timestamp.TimestampService;

public class TransactionServicesTest {
private final KeyValueService keyValueService = spy(new InMemoryKeyValueService(false));
private final TimestampService timestampService = new InMemoryTimestampService();
private final CoordinationService<InternalSchemaMetadata> coordinationService
= CoordinationServices.createDefault(keyValueService, timestampService, false);
private final CoordinationService<InternalSchemaMetadata> coordinationService = CoordinationServices.createDefault(
keyValueService,
timestampService,
MetricsManagers.createForTests(),
false);
private final TransactionService transactionService = TransactionServices.createTransactionService(
keyValueService, new TransactionSchemaManager(coordinationService));

Expand Down
6 changes: 6 additions & 0 deletions docs/source/release_notes/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ develop
- lock-api now declares a minimum dependency on timelock-server 0.59.0.
(`Pull Request <https://github.com/palantir/atlasdb/pull/3894>`__)

* - |improved| |devbreak|
- Usage metrics for the coordination store have been added.
Users should provide a MetricsRegistry when creating their coordination services.
Also, ``CoordinationService.createDefault()`` now handles instrumentation of both the coordination service and store.
(`Pull Request <https://github.com/palantir/atlasdb/pull/3894>`__)

========
v0.133.0
========
Expand Down

0 comments on commit d829c5f

Please sign in to comment.