Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #223 from launchdarkly/eb/ch75800/store-status
Browse files Browse the repository at this point in the history
(#3) simplify store status implementation to not use optional interface
  • Loading branch information
eli-darkly authored May 8, 2020
2 parents f9ed725 + 4b747cb commit 88e7040
Show file tree
Hide file tree
Showing 20 changed files with 471 additions and 238 deletions.
7 changes: 5 additions & 2 deletions src/main/java/com/launchdarkly/sdk/server/Components.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.launchdarkly.sdk.server.interfaces.DataSourceFactory;
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreFactory;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreUpdates;
import com.launchdarkly.sdk.server.interfaces.DiagnosticDescription;
import com.launchdarkly.sdk.server.interfaces.Event;
Expand All @@ -31,6 +32,7 @@
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import okhttp3.Credentials;

Expand Down Expand Up @@ -292,7 +294,7 @@ public static FlagChangeListener flagValueMonitoringListener(LDClientInterface c
private static final class InMemoryDataStoreFactory implements DataStoreFactory, DiagnosticDescription {
static final DataStoreFactory INSTANCE = new InMemoryDataStoreFactory();
@Override
public DataStore createDataStore(ClientContext context) {
public DataStore createDataStore(ClientContext context, Consumer<DataStoreStatusProvider.Status> statusUpdater) {
return new InMemoryDataStore();
}

Expand Down Expand Up @@ -563,13 +565,14 @@ public LDValue describeConfiguration(LDConfig config) {
* Called by the SDK to create the data store instance.
*/
@Override
public DataStore createDataStore(ClientContext context) {
public DataStore createDataStore(ClientContext context, Consumer<DataStoreStatusProvider.Status> statusUpdater) {
PersistentDataStore core = persistentDataStoreFactory.createPersistentDataStore(context);
return new PersistentDataStoreWrapper(
core,
cacheTime,
staleValuesPolicy,
recordCacheStats,
statusUpdater,
ClientContextImpl.get(context).sharedExecutor
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,56 @@
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;

import java.util.concurrent.atomic.AtomicReference;

// Simple delegator to ensure that LDClient.getDataStoreStatusProvider() never returns null and that
// the application isn't given direct access to the store.
final class DataStoreStatusProviderImpl implements DataStoreStatusProvider {
private final DataStoreStatusProvider delegateTo;
private final DataStore store;
private final EventBroadcasterImpl<DataStoreStatusProvider.StatusListener, DataStoreStatusProvider.Status> statusBroadcaster;
private final AtomicReference<DataStoreStatusProvider.Status> lastStatus;

DataStoreStatusProviderImpl(DataStore store) {
delegateTo = store instanceof DataStoreStatusProvider ? (DataStoreStatusProvider)store : null;
DataStoreStatusProviderImpl(
DataStore store,
EventBroadcasterImpl<DataStoreStatusProvider.StatusListener, DataStoreStatusProvider.Status> statusBroadcaster
) {
this.store = store;
this.statusBroadcaster = statusBroadcaster;
this.lastStatus = new AtomicReference<>(new DataStoreStatusProvider.Status(true, false)); // initially "available"
}

// package-private
void updateStatus(DataStoreStatusProvider.Status newStatus) {
if (newStatus != null) {
DataStoreStatusProvider.Status oldStatus = lastStatus.getAndSet(newStatus);
if (!newStatus.equals(oldStatus)) {
statusBroadcaster.broadcast(newStatus);
}
}
}

@Override
public Status getStoreStatus() {
return delegateTo == null ? null : delegateTo.getStoreStatus();
return lastStatus.get();
}

@Override
public boolean addStatusListener(StatusListener listener) {
return delegateTo != null && delegateTo.addStatusListener(listener);
public void addStatusListener(StatusListener listener) {
statusBroadcaster.register(listener);
}

@Override
public void removeStatusListener(StatusListener listener) {
if (delegateTo != null) {
delegateTo.removeStatusListener(listener);
}
statusBroadcaster.unregister(listener);
}

@Override
public boolean isStatusMonitoringEnabled() {
return store.isStatusMonitoringEnabled();
}

@Override
public CacheStats getCacheStats() {
return delegateTo == null ? null : delegateTo.getCacheStats();
return store.getCacheStats();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ final class DataStoreUpdatesImpl implements DataStoreUpdates {
private final DataModelDependencies.DependencyTracker dependencyTracker = new DataModelDependencies.DependencyTracker();
private final DataStoreStatusProvider dataStoreStatusProvider;

DataStoreUpdatesImpl(DataStore store, EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier) {
DataStoreUpdatesImpl(
DataStore store,
EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier,
DataStoreStatusProvider dataStoreStatusProvider
) {
this.store = store;
this.flagChangeEventNotifier = flagChangeEventNotifier;
this.dataStoreStatusProvider = new DataStoreStatusProviderImpl(store);
this.dataStoreStatusProvider = dataStoreStatusProvider;
}

@Override
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/launchdarkly/sdk/server/InMemoryDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableMap;
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider.CacheStats;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.ItemDescriptor;
Expand Down Expand Up @@ -99,7 +100,17 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
public boolean isInitialized() {
return initialized;
}


@Override
public boolean isStatusMonitoringEnabled() {
return false;
}

@Override
public CacheStats getCacheStats() {
return null;
}

/**
* Does nothing; this class does not have any resources to release
*
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/com/launchdarkly/sdk/server/LDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public final class LDClient implements LDClientInterface {
final EventProcessor eventProcessor;
final DataSource dataSource;
final DataStore dataStore;
private final DataStoreStatusProvider dataStoreStatusProvider;
private final DataStoreStatusProviderImpl dataStoreStatusProvider;
private final EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier;
private final ScheduledExecutorService sharedExecutor;

Expand Down Expand Up @@ -158,9 +158,8 @@ public LDClient(String sdkKey, LDConfig config) {

DataStoreFactory factory = config.dataStoreFactory == null ?
Components.inMemoryDataStore() : config.dataStoreFactory;
this.dataStore = factory.createDataStore(context);
this.dataStoreStatusProvider = new DataStoreStatusProviderImpl(this.dataStore);

this.dataStore = factory.createDataStore(context, this::updateDataStoreStatus);

this.evaluator = new Evaluator(new Evaluator.Getters() {
public DataModel.FeatureFlag getFlag(String key) {
return LDClient.getFlag(LDClient.this.dataStore, key);
Expand All @@ -170,12 +169,20 @@ public DataModel.Segment getSegment(String key) {
return LDClient.getSegment(LDClient.this.dataStore, key);
}
});

this.flagChangeEventNotifier = new EventBroadcasterImpl<>(FlagChangeListener::onFlagChange, sharedExecutor);

EventBroadcasterImpl<DataStoreStatusProvider.StatusListener, DataStoreStatusProvider.Status> dataStoreStatusNotifier =
new EventBroadcasterImpl<>(DataStoreStatusProvider.StatusListener::dataStoreStatusChanged, sharedExecutor);

this.dataStoreStatusProvider = new DataStoreStatusProviderImpl(this.dataStore, dataStoreStatusNotifier);

DataSourceFactory dataSourceFactory = config.dataSourceFactory == null ?
Components.streamingDataSource() : config.dataSourceFactory;
DataStoreUpdates dataStoreUpdates = new DataStoreUpdatesImpl(dataStore, flagChangeEventNotifier);
DataStoreUpdates dataStoreUpdates = new DataStoreUpdatesImpl(
dataStore,
flagChangeEventNotifier,
dataStoreStatusProvider
);
this.dataSource = dataSourceFactory.createDataSource(context, dataStoreUpdates);

Future<Void> startFuture = dataSource.start();
Expand Down Expand Up @@ -511,6 +518,12 @@ private ScheduledExecutorService createSharedExecutor() {
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}

private void updateDataStoreStatus(DataStoreStatusProvider.Status newStatus) {
if (dataStoreStatusProvider != null) {
dataStoreStatusProvider.updateStatus(newStatus);
}
}

private static String getClientVersion() {
Class<?> clazz = LDConfig.class;
String className = clazz.getSimpleName() + ".class";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider.Status;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider.StatusListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -11,6 +10,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Used internally to encapsulate the data store status broadcasting mechanism for PersistentDataStoreWrapper.
Expand All @@ -22,8 +22,8 @@ final class PersistentDataStoreStatusManager {
private static final Logger logger = LoggerFactory.getLogger(PersistentDataStoreStatusManager.class);
static final int POLL_INTERVAL_MS = 500; // visible for testing

private final Consumer<DataStoreStatusProvider.Status> statusUpdater;
private final ScheduledExecutorService scheduler;
private final EventBroadcasterImpl<DataStoreStatusProvider.StatusListener, DataStoreStatusProvider.Status> statusBroadcaster;
private final Callable<Boolean> statusPollFn;
private final boolean refreshOnRecovery;
private volatile boolean lastAvailable;
Expand All @@ -33,24 +33,14 @@ final class PersistentDataStoreStatusManager {
boolean refreshOnRecovery,
boolean availableNow,
Callable<Boolean> statusPollFn,
Consumer<DataStoreStatusProvider.Status> statusUpdater,
ScheduledExecutorService sharedExecutor
) {
this.refreshOnRecovery = refreshOnRecovery;
this.lastAvailable = availableNow;
this.statusPollFn = statusPollFn;
this.statusUpdater = statusUpdater;
this.scheduler = sharedExecutor;
this.statusBroadcaster = new EventBroadcasterImpl<>(
DataStoreStatusProvider.StatusListener::dataStoreStatusChanged,
sharedExecutor
);
}

void addStatusListener(StatusListener listener) {
statusBroadcaster.register(listener);
}

synchronized void removeStatusListener(StatusListener listener) {
statusBroadcaster.unregister(listener);
}

void updateAvailability(boolean available) {
Expand All @@ -67,7 +57,7 @@ void updateAvailability(boolean available) {
logger.warn("Persistent store is available again");
}

statusBroadcaster.broadcast(status);
statusUpdater.accept(status);

// If the store has just become unavailable, start a poller to detect when it comes back. If it has
// become available, stop any polling we are currently doing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.launchdarkly.sdk.server.integrations.PersistentDataStoreBuilder;
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider.CacheStats;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.ItemDescriptor;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
Expand All @@ -43,7 +45,7 @@
* <p>
* This class is only constructed by {@link PersistentDataStoreBuilder}.
*/
final class PersistentDataStoreWrapper implements DataStore, DataStoreStatusProvider {
final class PersistentDataStoreWrapper implements DataStore {
private static final Logger logger = LoggerFactory.getLogger(PersistentDataStoreWrapper.class);

private final PersistentDataStore core;
Expand All @@ -61,6 +63,7 @@ final class PersistentDataStoreWrapper implements DataStore, DataStoreStatusProv
Duration cacheTtl,
PersistentDataStoreBuilder.StaleValuesPolicy staleValuesPolicy,
boolean recordCacheStats,
Consumer<DataStoreStatusProvider.Status> statusUpdater,
ScheduledExecutorService sharedExecutor
) {
this.core = core;
Expand Down Expand Up @@ -110,6 +113,7 @@ public Boolean load(String key) throws Exception {
!cacheIndefinitely,
true,
this::pollAvailabilityAfterOutage,
statusUpdater,
sharedExecutor
);
}
Expand Down Expand Up @@ -306,23 +310,12 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
}
return updated;
}

@Override
public Status getStoreStatus() {
return new Status(statusManager.isAvailable(), false);
}


@Override
public boolean addStatusListener(StatusListener listener) {
statusManager.addStatusListener(listener);
public boolean isStatusMonitoringEnabled() {
return true;
}

@Override
public void removeStatusListener(StatusListener listener) {
statusManager.removeStatusListener(listener);
}

@Override
public CacheStats getCacheStats() {
if (itemCache == null || allCache == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ static interface EventSourceCreator {
.build();

DataStoreStatusProvider.StatusListener statusListener = this::onStoreStatusChanged;
if (dataStoreUpdates.getStatusProvider().addStatusListener(statusListener)) {
this.statusListener = statusListener;
if (dataStoreUpdates.getStatusProvider() != null &&
dataStoreUpdates.getStatusProvider().isStatusMonitoringEnabled()) {
this.statusListener = this::onStoreStatusChanged;
dataStoreUpdates.getStatusProvider().addStatusListener(statusListener);
} else {
this.statusListener = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
/**
* Context information provided by the {@link com.launchdarkly.sdk.server.LDClient} when creating components.
* <p>
* This is passed as a parameter to {@link DataStoreFactory#createDataStore(ClientContext)}, etc. The
* actual implementation class may contain other properties that are only relevant to the built-in SDK
* components and are therefore not part of the public interface; this allows the SDK to add its own
* context information as needed without disturbing the public API.
* This is passed as a parameter to {@link DataStoreFactory#createDataStore(ClientContext, java.util.function.Consumer)},
* etc. The actual implementation class may contain other properties that are only relevant to the
* built-in SDK components and are therefore not part of the public interface; this allows the SDK
* to add its own context information as needed without disturbing the public API.
*
* @since 5.0.0
*/
Expand Down
Loading

0 comments on commit 88e7040

Please sign in to comment.