Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
TransactionKeyValueServiceFactory and service loading. (#6984)
Browse files Browse the repository at this point in the history
TransactionKeyValueServiceFactory and service loading.
  • Loading branch information
jkozlowski authored Feb 26, 2024
1 parent e300b67 commit 53e716a
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.spi;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.refreshable.Refreshable;
import java.util.Optional;

public interface KeyValueServiceManager {
KeyValueService getKeyValueService(
KeyValueServiceConfig config,
Refreshable<Optional<KeyValueServiceRuntimeConfig>> runtimeConfig,
String namespace,
boolean initializeAsync);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.spi;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = As.PROPERTY, property = "type", visible = false)
public interface TransactionKeyValueServiceConfig {
String type();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.spi;

import com.palantir.atlasdb.coordination.CoordinationService;
import com.palantir.atlasdb.transaction.api.TransactionKeyValueServiceManager;
import com.palantir.refreshable.Refreshable;

/**
* Factory for creating {@link TransactionKeyValueServiceManager} instances.
*
* Implementations have access to the coordination service to be able query their internal state at a particular
* timestamp, as well as schedule changes to it at a future timestamp.
*
* AtlasDb will ensure that transactions that span state changes do not succeed and non-transactional workflows
* are also retried.
*
* @param <T> type used for the coordination state. Should be a jackson-compatible POJO.
*/
public interface TransactionKeyValueServiceManagerFactory<T> {

String getType();

Class<T> coordinationValueClass();

TransactionKeyValueServiceManager create(
CoordinationService<T> coordinationService,
KeyValueServiceManager keyValueServiceManager,
TransactionKeyValueServiceConfig install,
Refreshable<TransactionKeyValueServiceRuntimeConfig> runtime,
boolean initializeAsync);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.spi;

import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type", visible = false)
public interface TransactionKeyValueServiceRuntimeConfig {
String type();
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ private AtlasDbConstants() {

public static final byte[] DEFAULT_METADATA_COORDINATION_KEY = PtBytes.toBytes("m");

public static final byte[] DEFAULT_TRANSACTION_KEY_VALUE_SERVICE_COORDINATION_KEY = PtBytes.toBytes("x");

public static final long DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS = 60_000;
public static final int THRESHOLD_FOR_LOGGING_LARGE_NUMBER_OF_TRANSACTION_LOOKUPS = 10_000_000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.annotations.Beta;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cache.TimestampCache;
import com.palantir.atlasdb.internalschema.ImmutableInternalSchemaInstallConfig;
import com.palantir.atlasdb.internalschema.InternalSchemaInstallConfig;
import com.palantir.atlasdb.keyvalue.api.LockWatchCachingConfig;
import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig;
import com.palantir.atlasdb.spi.KeyValueServiceConfig;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceConfig;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig;
import com.palantir.exception.NotInitializedException;
import com.palantir.logsafe.DoNotLog;
Expand All @@ -48,6 +50,9 @@ public abstract class AtlasDbConfig {

public abstract KeyValueServiceConfig keyValueService();

@Beta
public abstract Optional<TransactionKeyValueServiceConfig> transactionKeyValueService();

public abstract Optional<LeaderConfig> leader();

public abstract Optional<TimeLockClientConfig> timelock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.annotations.Beta;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.compact.CompactorConfig;
import com.palantir.atlasdb.internalschema.ImmutableInternalSchemaRuntimeConfig;
import com.palantir.atlasdb.internalschema.InternalSchemaRuntimeConfig;
import com.palantir.atlasdb.spi.KeyValueServiceRuntimeConfig;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceRuntimeConfig;
import com.palantir.atlasdb.stream.StreamStorePersistenceConfiguration;
import com.palantir.atlasdb.stream.StreamStorePersistenceConfigurations;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepRuntimeConfig;
Expand Down Expand Up @@ -91,6 +93,9 @@ public long getTimestampCacheSize() {

public abstract Optional<KeyValueServiceRuntimeConfig> keyValueService();

@Beta
public abstract Optional<TransactionKeyValueServiceRuntimeConfig> transactionKeyValueService();

/**
* Runtime live-reloadable parameters for communicating with TimeLock.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.palantir.atlasdb.namespacedeleter.NamespaceDeleterFactory;
import com.palantir.atlasdb.spi.AtlasDbFactory;
import com.palantir.atlasdb.spi.KeyValueServiceConfig;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceConfig;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceManagerFactory;
import com.palantir.atlasdb.timestamp.DbTimeLockFactory;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
Expand All @@ -34,6 +36,14 @@ public static AtlasDbFactory createAtlasFactoryOfCorrectType(KeyValueServiceConf
return createAtlasDbServiceOfCorrectType(config, AtlasDbFactory::getType, AtlasDbFactory.class);
}

public static TransactionKeyValueServiceManagerFactory<?>
createTransactionKeyValueServiceManagerFactoryOfCorrectType(TransactionKeyValueServiceConfig config) {
return createAtlasDbServiceOfCorrectType(
config.type(),
TransactionKeyValueServiceManagerFactory::getType,
TransactionKeyValueServiceManagerFactory.class);
}

public static DbTimeLockFactory createDbTimeLockFactoryOfCorrectType(KeyValueServiceConfig config) {
return createAtlasDbServiceOfCorrectType(config, DbTimeLockFactory::getType, DbTimeLockFactory.class);
}
Expand All @@ -49,8 +59,13 @@ public static NamespaceDeleterFactory createNamespaceDeleterFactoryOfCorrectType

private static <T> T createAtlasDbServiceOfCorrectType(
KeyValueServiceConfig config, Function<T, String> typeExtractor, Class<T> clazz) {
return createAtlasDbServiceOfCorrectType(config.type(), typeExtractor, clazz);
}

private static <T> T createAtlasDbServiceOfCorrectType(
String type, Function<T, String> typeExtractor, Class<T> clazz) {
for (T element : ServiceLoader.load(clazz)) {
if (config.type().equalsIgnoreCase(typeExtractor.apply(element))) {
if (type.equalsIgnoreCase(typeExtractor.apply(element))) {
return element;
}
}
Expand All @@ -59,6 +74,6 @@ private static <T> T createAtlasDbServiceOfCorrectType(
+ "Ensure that the implementation of the AtlasDbFactory is annotated "
+ "@AutoService with a suitable class as parameter and that it is on your classpath.",
SafeArg.of("class", clazz),
SafeArg.of("type", config.type()));
SafeArg.of("type", type));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.factory;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.spi.AtlasDbFactory;
import com.palantir.atlasdb.spi.KeyValueServiceConfig;
import com.palantir.atlasdb.spi.KeyValueServiceManager;
import com.palantir.atlasdb.spi.KeyValueServiceRuntimeConfig;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.refreshable.Refreshable;
import java.util.Optional;
import java.util.function.LongSupplier;

public final class DefaultKeyValueServiceManager implements KeyValueServiceManager {

private final MetricsManager metricsManager;
private final LongSupplier timestampSupplier;

public DefaultKeyValueServiceManager(MetricsManager metricsManager, LongSupplier timestampSupplier) {
this.metricsManager = metricsManager;
this.timestampSupplier = timestampSupplier;
}

@Override
public KeyValueService getKeyValueService(
KeyValueServiceConfig config,
Refreshable<Optional<KeyValueServiceRuntimeConfig>> runtimeConfig,
String namespace,
boolean initializeAsync) {
// TODO(jakubk): In order to meaningfully memoize things we need to know cluster names.
AtlasDbFactory atlasFactory = AtlasDbServiceDiscovery.createAtlasFactoryOfCorrectType(config);
return atlasFactory.createRawKeyValueService(
metricsManager, config, runtimeConfig, Optional.of(namespace), timestampSupplier, initializeAsync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@
import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory;
import com.palantir.atlasdb.spi.DerivedSnapshotConfig;
import com.palantir.atlasdb.spi.KeyValueServiceConfig;
import com.palantir.atlasdb.spi.KeyValueServiceManager;
import com.palantir.atlasdb.spi.SharedResourcesConfig;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceConfig;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceManagerFactory;
import com.palantir.atlasdb.spi.TransactionKeyValueServiceRuntimeConfig;
import com.palantir.atlasdb.sweep.AdjustableSweepBatchConfigSource;
import com.palantir.atlasdb.sweep.BackgroundSweeperImpl;
import com.palantir.atlasdb.sweep.BackgroundSweeperPerformanceLogger;
Expand Down Expand Up @@ -437,8 +441,32 @@ private TransactionManager serializableInternal(@Output List<AutoCloseable> clos
return ValidatingQueryRewritingKeyValueService.create(kvs); // ok
},
closeables);
TransactionKeyValueServiceManager transactionKeyValueServiceManager =
initializeCloseable(() -> new DefaultTransactionKeyValueServiceManager(keyValueService), closeables);
KeyValueServiceManager keyValueServiceManager = new DefaultKeyValueServiceManager(metricsManager, adapter);
TransactionKeyValueServiceManager transactionKeyValueServiceManager = initializeCloseable(
() -> {
TransactionKeyValueServiceManager tkvsm;
if (config().transactionKeyValueService().isPresent()) {
TransactionKeyValueServiceManagerFactory<?> tkvsmfFactory =
AtlasDbServiceDiscovery.createTransactionKeyValueServiceManagerFactoryOfCorrectType(
config().transactionKeyValueService().get());
tkvsm = createTransactionKeyValueServiceManager(
tkvsmfFactory,
metricsManager,
lockAndTimestampServices,
keyValueService,
keyValueServiceManager,
config().transactionKeyValueService().get(),
runtimeConfig().get().map(optionalConfig -> optionalConfig
.get()
.transactionKeyValueService()
.get()),
config().initializeAsync());
} else {
tkvsm = new DefaultTransactionKeyValueServiceManager(keyValueService);
}
return tkvsm;
},
closeables);

TransactionManagersInitializer initializer = TransactionManagersInitializer.createInitialTables(
keyValueService, schemas(), config().initializeAsync(), allSafeForLogging());
Expand Down Expand Up @@ -572,6 +600,26 @@ private TransactionManager serializableInternal(@Output List<AutoCloseable> clos
return transactionManager;
}

private <T> TransactionKeyValueServiceManager createTransactionKeyValueServiceManager(
TransactionKeyValueServiceManagerFactory<T> factory,
MetricsManager metricsManager,
LockAndTimestampServices lockAndTimestampServices,
KeyValueService internalTablesKeyValueService,
KeyValueServiceManager keyValueServiceManager,
TransactionKeyValueServiceConfig config,
Refreshable<TransactionKeyValueServiceRuntimeConfig> runtimeConfigRefreshable,
boolean initializeAsync) {
CoordinationService<T> coordinationService =
CoordinationServices.createTransactionKeyValueServiceManagerCoordinator(
factory.coordinationValueClass(),
internalTablesKeyValueService,
lockAndTimestampServices.managedTimestampService(),
metricsManager,
config().initializeAsync());
return factory.create(
coordinationService, keyValueServiceManager, config, runtimeConfigRefreshable, initializeAsync);
}

private MetricsManager setUpMetricsAndGetMetricsManager() {
MetricRegistry internalAtlasDbMetrics = new MetricRegistry();
TaggedMetricRegistry internalTaggedAtlasDbMetrics = new DefaultTaggedMetricRegistry();
Expand Down
Loading

0 comments on commit 53e716a

Please sign in to comment.