Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
[ch56562] More consistency with .NET and fix flaky tests. (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwhelanLD authored Nov 22, 2019
1 parent 225fdf0 commit 72e91e7
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 146 deletions.
19 changes: 15 additions & 4 deletions src/main/java/com/launchdarkly/client/Components.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,18 @@ public FeatureStore createFeatureStore() {
}
}

private static final class DefaultEventProcessorFactory implements EventProcessorFactory {
private static final class DefaultEventProcessorFactory implements EventProcessorFactoryWithDiagnostics {
@Override
public EventProcessor createEventProcessor(String sdkKey, LDConfig config) {
return createEventProcessor(sdkKey, config, null);
}

public EventProcessor createEventProcessor(String sdkKey, LDConfig config,
DiagnosticAccumulator diagnosticAccumulator) {
if (config.offline || !config.sendEvents) {
return new EventProcessor.NullEventProcessor();
} else {
return new DefaultEventProcessor(sdkKey, config);
return new DefaultEventProcessor(sdkKey, config, diagnosticAccumulator);
}
}
}
Expand All @@ -105,12 +110,18 @@ public EventProcessor createEventProcessor(String sdkKey, LDConfig config) {
}
}

private static final class DefaultUpdateProcessorFactory implements UpdateProcessorFactory {
private static final class DefaultUpdateProcessorFactory implements UpdateProcessorFactoryWithDiagnostics {
// Note, logger uses LDClient class name for backward compatibility
private static final Logger logger = LoggerFactory.getLogger(LDClient.class);

@Override
public UpdateProcessor createUpdateProcessor(String sdkKey, LDConfig config, FeatureStore featureStore) {
return createUpdateProcessor(sdkKey, config, featureStore, null);
}

@Override
public UpdateProcessor createUpdateProcessor(String sdkKey, LDConfig config, FeatureStore featureStore,
DiagnosticAccumulator diagnosticAccumulator) {
if (config.offline) {
logger.info("Starting LaunchDarkly client in offline mode");
return new UpdateProcessor.NullUpdateProcessor();
Expand All @@ -121,7 +132,7 @@ public UpdateProcessor createUpdateProcessor(String sdkKey, LDConfig config, Fea
DefaultFeatureRequestor requestor = new DefaultFeatureRequestor(sdkKey, config);
if (config.stream) {
logger.info("Enabling streaming API");
return new StreamProcessor(sdkKey, config, requestor, featureStore, null);
return new StreamProcessor(sdkKey, config, requestor, featureStore, null, diagnosticAccumulator);
} else {
logger.info("Disabling streaming API");
logger.warn("You should only disable the streaming API if instructed to do so by LaunchDarkly support");
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/com/launchdarkly/client/DefaultEventProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class DefaultEventProcessor implements EventProcessor {
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile boolean inputCapacityExceeded = false;

DefaultEventProcessor(String sdkKey, LDConfig config) {
DefaultEventProcessor(String sdkKey, LDConfig config, DiagnosticAccumulator diagnosticAccumulator) {
inbox = new ArrayBlockingQueue<>(config.capacity);

ThreadFactory threadFactory = new ThreadFactoryBuilder()
Expand All @@ -60,7 +60,7 @@ final class DefaultEventProcessor implements EventProcessor {
.build();
scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);

new EventDispatcher(sdkKey, config, inbox, threadFactory, closed);
new EventDispatcher(sdkKey, config, inbox, threadFactory, closed, diagnosticAccumulator);

Runnable flusher = new Runnable() {
public void run() {
Expand All @@ -75,7 +75,7 @@ public void run() {
};
this.scheduler.scheduleAtFixedRate(userKeysFlusher, config.userKeysFlushInterval, config.userKeysFlushInterval,
TimeUnit.SECONDS);
if (!config.diagnosticOptOut) {
if (!config.diagnosticOptOut && diagnosticAccumulator != null) {
Runnable diagnosticsTrigger = new Runnable() {
public void run() {
postMessageAsync(MessageType.DIAGNOSTIC, null);
Expand Down Expand Up @@ -207,6 +207,7 @@ static final class EventDispatcher {
private final Random random = new Random();
private final AtomicLong lastKnownPastTime = new AtomicLong(0);
private final AtomicBoolean disabled = new AtomicBoolean(false);
private final DiagnosticAccumulator diagnosticAccumulator;
private final ExecutorService diagnosticExecutor;
private final SendDiagnosticTaskFactory sendDiagnosticTaskFactory;

Expand All @@ -215,8 +216,10 @@ static final class EventDispatcher {
private EventDispatcher(String sdkKey, LDConfig config,
final BlockingQueue<EventProcessorMessage> inbox,
ThreadFactory threadFactory,
final AtomicBoolean closed) {
final AtomicBoolean closed,
DiagnosticAccumulator diagnosticAccumulator) {
this.config = config;
this.diagnosticAccumulator = diagnosticAccumulator;
this.busyFlushWorkersCount = new AtomicInteger(0);

OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
Expand Down Expand Up @@ -270,14 +273,11 @@ public void handleResponse(Response response, Date responseDate) {
flushWorkers.add(task);
}

if (!config.diagnosticOptOut) {
if (!config.diagnosticOptOut && diagnosticAccumulator != null) {
// Set up diagnostics
long currentTime = System.currentTimeMillis();
DiagnosticId diagnosticId = new DiagnosticId(sdkKey);
config.diagnosticAccumulator.start(diagnosticId, currentTime);
this.sendDiagnosticTaskFactory = new SendDiagnosticTaskFactory(sdkKey, config, httpClient);
diagnosticExecutor = Executors.newSingleThreadExecutor(threadFactory);
DiagnosticEvent.Init diagnosticInitEvent = new DiagnosticEvent.Init(currentTime, diagnosticId, config);
DiagnosticEvent.Init diagnosticInitEvent = new DiagnosticEvent.Init(diagnosticAccumulator.dataSinceDate, diagnosticAccumulator.diagnosticId, config);
diagnosticExecutor.submit(sendDiagnosticTaskFactory.createSendDiagnosticTask(diagnosticInitEvent));
} else {
diagnosticExecutor = null;
Expand Down Expand Up @@ -334,7 +334,7 @@ private void runMainLoop(BlockingQueue<EventProcessorMessage> inbox,
private void sendAndResetDiagnostics(EventBuffer outbox) {
long droppedEvents = outbox.getAndClearDroppedCount();
long eventsInQueue = outbox.getEventsInQueueCount();
DiagnosticEvent diagnosticEvent = config.diagnosticAccumulator.createEventAndReset(droppedEvents, deduplicatedUsers, eventsInQueue);
DiagnosticEvent diagnosticEvent = diagnosticAccumulator.createEventAndReset(droppedEvents, deduplicatedUsers, eventsInQueue);
deduplicatedUsers = 0;
diagnosticExecutor.submit(sendDiagnosticTaskFactory.createSendDiagnosticTask(diagnosticEvent));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

class DiagnosticAccumulator {

final DiagnosticId diagnosticId;
volatile long dataSinceDate;
volatile DiagnosticId diagnosticId;

void start(DiagnosticId diagnosticId, long dataSinceDate) {
DiagnosticAccumulator(DiagnosticId diagnosticId) {
this.diagnosticId = diagnosticId;
this.dataSinceDate = dataSinceDate;
this.dataSinceDate = System.currentTimeMillis();
}

DiagnosticEvent.Statistics createEventAndReset(long droppedEvents, long deduplicatedUsers, long eventsInQueue) {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/launchdarkly/client/DiagnosticEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ static class DiagnosticConfiguration {
private final boolean usingRelayDaemon;
private final boolean offline;
private final boolean allAttributesPrivate;
private final boolean eventReportingDisabled;
private final long pollingIntervalMillis;
private final long startWaitMillis;
private final int samplingInterval;
Expand All @@ -80,7 +79,6 @@ static class DiagnosticConfiguration {
this.usingRelayDaemon = config.useLdd;
this.offline = config.offline;
this.allAttributesPrivate = config.allAttributesPrivate;
this.eventReportingDisabled = !config.sendEvents;
this.pollingIntervalMillis = config.pollingIntervalMillis;
this.startWaitMillis = config.startWaitMillis;
this.samplingInterval = config.samplingInterval;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.launchdarkly.client;

interface EventProcessorFactoryWithDiagnostics extends EventProcessorFactory {
EventProcessor createEventProcessor(String sdkKey, LDConfig config,
DiagnosticAccumulator diagnosticAccumulator);
}
28 changes: 23 additions & 5 deletions src/main/java/com/launchdarkly/client/LDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,32 @@ public LDClient(String sdkKey, LDConfig config) {
this.shouldCloseFeatureStore = true;
}
this.featureStore = new FeatureStoreClientWrapper(store);

EventProcessorFactory epFactory = this.config.eventProcessorFactory == null ?
Components.defaultEventProcessor() : this.config.eventProcessorFactory;
this.eventProcessor = epFactory.createEventProcessor(sdkKey, this.config);

UpdateProcessorFactory upFactory = this.config.updateProcessorFactory == null ?
Components.defaultUpdateProcessor() : this.config.updateProcessorFactory;
this.updateProcessor = upFactory.createUpdateProcessor(sdkKey, this.config, featureStore);
Components.defaultUpdateProcessor() : this.config.updateProcessorFactory;

DiagnosticAccumulator diagnosticAccumulator = null;
// Do not create accumulator if config has specified is opted out, or if epFactory doesn't support diagnostics
if (!this.config.diagnosticOptOut && epFactory instanceof EventProcessorFactoryWithDiagnostics) {
diagnosticAccumulator = new DiagnosticAccumulator(new DiagnosticId(sdkKey));
}

if (epFactory instanceof EventProcessorFactoryWithDiagnostics) {
EventProcessorFactoryWithDiagnostics epwdFactory = ((EventProcessorFactoryWithDiagnostics) epFactory);
this.eventProcessor = epwdFactory.createEventProcessor(sdkKey, this.config, diagnosticAccumulator);
} else {
this.eventProcessor = epFactory.createEventProcessor(sdkKey, this.config);
}

if (upFactory instanceof UpdateProcessorFactoryWithDiagnostics) {
UpdateProcessorFactoryWithDiagnostics upwdFactory = ((UpdateProcessorFactoryWithDiagnostics) upFactory);
this.updateProcessor = upwdFactory.createUpdateProcessor(sdkKey, this.config, featureStore, diagnosticAccumulator);
} else {
this.updateProcessor = upFactory.createUpdateProcessor(sdkKey, this.config, featureStore);
}

Future<Void> startFuture = updateProcessor.start();
if (this.config.startWaitMillis > 0L) {
if (!this.config.offline && !this.config.useLdd) {
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/launchdarkly/client/LDConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public final class LDConfig {
final TimeUnit connectTimeoutUnit;
final int socketTimeout;
final TimeUnit socketTimeoutUnit;

DiagnosticAccumulator diagnosticAccumulator = new DiagnosticAccumulator();

protected LDConfig(Builder builder) {
this.baseURI = builder.baseURI;
Expand Down Expand Up @@ -628,7 +626,7 @@ public Builder inlineUsersInEvents(boolean inlineUsersInEvents) {

/**
* Sets the interval at which periodic diagnostic data is sent. The default is every 15 minutes (900,000
* milliseconds) and the minimum value is 6000.
* milliseconds) and the minimum value is 60,000.
*
* @see #diagnosticOptOut(boolean)
*
Expand Down Expand Up @@ -658,8 +656,8 @@ public Builder diagnosticOptOut(boolean diagnosticOptOut) {
}

/**
* For use by wrapper libraries to set an identifying name for the wrapper being used. This will be sent in
* User-Agent headers during requests to the LaunchDarkly servers to allow recording metrics on the usage of
* For use by wrapper libraries to set an identifying name for the wrapper being used. This will be included in a
* header during requests to the LaunchDarkly servers to allow recording metrics on the usage of
* these wrapper libraries.
*
* @param wrapperName an identifying name for the wrapper library
Expand All @@ -672,7 +670,7 @@ public Builder wrapperName(String wrapperName) {

/**
* For use by wrapper libraries to report the version of the library in use. If {@link #wrapperName(String)} is not
* set, this field will be ignored. Otherwise the version string will be included in the User-Agent headers along
* set, this field will be ignored. Otherwise the version string will be included in a header along
* with the wrapperName during requests to the LaunchDarkly servers.
*
* @param wrapperVersion Version string for the wrapper library
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/launchdarkly/client/StreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final class StreamProcessor implements UpdateProcessor {
private final LDConfig config;
private final String sdkKey;
private final FeatureRequestor requestor;
private final DiagnosticAccumulator diagnosticAccumulator;
private final EventSourceCreator eventSourceCreator;
private volatile EventSource es;
private final AtomicBoolean initialized = new AtomicBoolean(false);
Expand All @@ -51,11 +52,12 @@ public static interface EventSourceCreator {
}

StreamProcessor(String sdkKey, LDConfig config, FeatureRequestor requestor, FeatureStore featureStore,
EventSourceCreator eventSourceCreator) {
EventSourceCreator eventSourceCreator, DiagnosticAccumulator diagnosticAccumulator) {
this.store = featureStore;
this.config = config;
this.sdkKey = sdkKey;
this.requestor = requestor;
this.diagnosticAccumulator = diagnosticAccumulator;
this.eventSourceCreator = eventSourceCreator != null ? eventSourceCreator : new DefaultEventSourceCreator();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.launchdarkly.client;

interface UpdateProcessorFactoryWithDiagnostics extends UpdateProcessorFactory {
UpdateProcessor createUpdateProcessor(String sdkKey, LDConfig config, FeatureStore featureStore,
DiagnosticAccumulator diagnosticAccumulator);
}
Loading

0 comments on commit 72e91e7

Please sign in to comment.