diff --git a/src/main/java/com/launchdarkly/client/DefaultEventProcessor.java b/src/main/java/com/launchdarkly/client/DefaultEventProcessor.java index 1c33a51a8..5afb3fdec 100644 --- a/src/main/java/com/launchdarkly/client/DefaultEventProcessor.java +++ b/src/main/java/com/launchdarkly/client/DefaultEventProcessor.java @@ -35,25 +35,25 @@ final class DefaultEventProcessor implements EventProcessor { private static final Logger logger = LoggerFactory.getLogger(DefaultEventProcessor.class); - private static final int CHANNEL_BLOCK_MILLIS = 1000; private static final String EVENT_SCHEMA_HEADER = "X-LaunchDarkly-Event-Schema"; private static final String EVENT_SCHEMA_VERSION = "3"; - private final BlockingQueue inputChannel; + private final BlockingQueue inbox; private final ScheduledExecutorService scheduler; private final AtomicBoolean closed = new AtomicBoolean(false); - private final AtomicBoolean inputCapacityExceeded = new AtomicBoolean(false); + private volatile boolean inputCapacityExceeded = false; DefaultEventProcessor(String sdkKey, LDConfig config) { - inputChannel = new ArrayBlockingQueue<>(config.capacity); + inbox = new ArrayBlockingQueue<>(config.capacity); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("LaunchDarkly-EventProcessor-%d") + .setPriority(Thread.MIN_PRIORITY) .build(); scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); - new EventDispatcher(sdkKey, config, inputChannel, threadFactory, closed); + new EventDispatcher(sdkKey, config, inbox, threadFactory, closed); Runnable flusher = new Runnable() { public void run() { @@ -104,31 +104,25 @@ private void postMessageAsync(MessageType type, Event event) { private void postMessageAndWait(MessageType type, Event event) { EventProcessorMessage message = new EventProcessorMessage(type, event, true); - postToChannel(message); - message.waitForCompletion(); + if (postToChannel(message)) { + message.waitForCompletion(); + } } - private void postToChannel(EventProcessorMessage message) { - while (true) { - try { - if (inputChannel.offer(message, CHANNEL_BLOCK_MILLIS, TimeUnit.MILLISECONDS)) { - inputCapacityExceeded.set(false); - break; - } else { - // This doesn't mean that the output event buffer is full, but rather that the main thread is - // seriously backed up with not-yet-processed events. We shouldn't see this. - if (inputCapacityExceeded.compareAndSet(false, true)) { - logger.warn("Events are being produced faster than they can be processed"); - } - if (closed.get()) { - // Whoops, the event processor has been shut down - message.completed(); - return; - } - } - } catch (InterruptedException ex) { - } + private boolean postToChannel(EventProcessorMessage message) { + if (inbox.offer(message)) { + return true; + } + // If the inbox is full, it means the EventDispatcher thread is seriously backed up with not-yet-processed + // events. This is unlikely, but if it happens, it means the application is probably doing a ton of flag + // evaluations across many threads-- so if we wait for a space in the inbox, we risk a very serious slowdown + // of the app. To avoid that, we'll just drop the event. The log warning about this will only be shown once. + boolean alreadyLogged = inputCapacityExceeded; // possible race between this and the next line, but it's of no real consequence - we'd just get an extra log line + inputCapacityExceeded = true; + if (!alreadyLogged) { + logger.warn("Events are being produced faster than they can be processed; some events will be dropped"); } + return false; } private static enum MessageType { @@ -194,7 +188,7 @@ static final class EventDispatcher { private final AtomicBoolean disabled = new AtomicBoolean(false); private EventDispatcher(String sdkKey, LDConfig config, - final BlockingQueue inputChannel, + final BlockingQueue inbox, ThreadFactory threadFactory, final AtomicBoolean closed) { this.config = config; @@ -205,12 +199,12 @@ private EventDispatcher(String sdkKey, LDConfig config, // all the workers are busy. final BlockingQueue payloadQueue = new ArrayBlockingQueue<>(1); - final EventBuffer buffer = new EventBuffer(config.capacity); + final EventBuffer outbox = new EventBuffer(config.capacity); final SimpleLRUCache userKeys = new SimpleLRUCache(config.userKeysCapacity); Thread mainThread = threadFactory.newThread(new Runnable() { public void run() { - runMainLoop(inputChannel, buffer, userKeys, payloadQueue); + runMainLoop(inbox, outbox, userKeys, payloadQueue); } }); mainThread.setDaemon(true); @@ -226,7 +220,7 @@ public void uncaughtException(Thread t, Throwable e) { closed.set(true); // Now discard everything that was on the queue, but also make sure no one was blocking on a message List messages = new ArrayList(); - inputChannel.drainTo(messages); + inbox.drainTo(messages); for (EventProcessorMessage m: messages) { m.completed(); } @@ -253,22 +247,22 @@ public void handleResponse(Response response) { * thread so we don't have to synchronize on our internal structures; when it's time to flush, * triggerFlush will hand the events off to another task. */ - private void runMainLoop(BlockingQueue inputChannel, - EventBuffer buffer, SimpleLRUCache userKeys, + private void runMainLoop(BlockingQueue inbox, + EventBuffer outbox, SimpleLRUCache userKeys, BlockingQueue payloadQueue) { List batch = new ArrayList(MESSAGE_BATCH_SIZE); while (true) { try { batch.clear(); - batch.add(inputChannel.take()); // take() blocks until a message is available - inputChannel.drainTo(batch, MESSAGE_BATCH_SIZE - 1); // this nonblocking call allows us to pick up more messages if available + batch.add(inbox.take()); // take() blocks until a message is available + inbox.drainTo(batch, MESSAGE_BATCH_SIZE - 1); // this nonblocking call allows us to pick up more messages if available for (EventProcessorMessage message: batch) { switch(message.type) { case EVENT: - processEvent(message.event, userKeys, buffer); + processEvent(message.event, userKeys, outbox); break; case FLUSH: - triggerFlush(buffer, payloadQueue); + triggerFlush(outbox, payloadQueue); break; case FLUSH_USERS: userKeys.clear(); @@ -315,13 +309,13 @@ private void waitUntilAllFlushWorkersInactive() { } } - private void processEvent(Event e, SimpleLRUCache userKeys, EventBuffer buffer) { + private void processEvent(Event e, SimpleLRUCache userKeys, EventBuffer outbox) { if (disabled.get()) { return; } // Always record the event in the summarizer. - buffer.addToSummary(e); + outbox.addToSummary(e); // Decide whether to add the event to the payload. Feature events may be added twice, once for // the event (if tracked) and once for debugging. @@ -353,13 +347,13 @@ private void processEvent(Event e, SimpleLRUCache userKeys, Even if (addIndexEvent) { Event.Index ie = new Event.Index(e.creationDate, e.user); - buffer.add(ie); + outbox.add(ie); } if (addFullEvent) { - buffer.add(e); + outbox.add(e); } if (debugEvent != null) { - buffer.add(debugEvent); + outbox.add(debugEvent); } } @@ -391,15 +385,15 @@ private boolean shouldDebugEvent(Event.FeatureRequest fe) { return false; } - private void triggerFlush(EventBuffer buffer, BlockingQueue payloadQueue) { - if (disabled.get() || buffer.isEmpty()) { + private void triggerFlush(EventBuffer outbox, BlockingQueue payloadQueue) { + if (disabled.get() || outbox.isEmpty()) { return; } - FlushPayload payload = buffer.getPayload(); + FlushPayload payload = outbox.getPayload(); busyFlushWorkersCount.incrementAndGet(); if (payloadQueue.offer(payload)) { // These events now belong to the next available flush worker, so drop them from our state - buffer.clear(); + outbox.clear(); } else { logger.debug("Skipped flushing because all workers are busy"); // All the workers are busy so we can't flush now; keep the events in our state