From e548d5d345603bf0663cc246df15dd836f46637a Mon Sep 17 00:00:00 2001
From: Erik Dahl <89081489+erkdahl@users.noreply.github.com>
Date: Mon, 17 Apr 2023 12:47:19 +0200
Subject: [PATCH] =?UTF-8?q?Bounded=20queue-size=20in=20front=20of=20thread?=
=?UTF-8?q?pool,=20with=20custom=20rejectPolicy=20t=E2=80=A6=20(#198)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Bounded queue-size in front of threadpool, with custom rejectPolicy that will log the event-record when it is discarded.
Fixes GH-190
---
.idea/encodings.xml | 2 +
.idea/jarRepositories.xml | 10 ++--
.../event/DiscardAndLogOldestPolicy.java | 40 +++++++++++++
.../no/digdir/logging/event/EventLogger.java | 35 ++++--------
.../logging/event/EventLoggingConfig.java | 11 +++-
.../no/digdir/logging/event/KafkaTask.java | 45 +++++++++++++++
.../main/resources/event-logger.properties | 3 +-
.../event/DiscardAndLogOldestPolicyTest.java | 57 +++++++++++++++++++
.../digdir/logging/event/EventLoggerTest.java | 26 +++++++--
.../logging/event/EventLoggingConfigTest.java | 27 +++++++++
10 files changed, 222 insertions(+), 34 deletions(-)
create mode 100644 log-event/src/main/java/no/digdir/logging/event/DiscardAndLogOldestPolicy.java
create mode 100644 log-event/src/main/java/no/digdir/logging/event/KafkaTask.java
create mode 100644 log-event/src/test/java/no/digdir/logging/event/DiscardAndLogOldestPolicyTest.java
diff --git a/.idea/encodings.xml b/.idea/encodings.xml
index ee46050..ec51e76 100644
--- a/.idea/encodings.xml
+++ b/.idea/encodings.xml
@@ -5,5 +5,7 @@
+
+
\ No newline at end of file
diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml
index 70d723c..2a8a088 100644
--- a/.idea/jarRepositories.xml
+++ b/.idea/jarRepositories.xml
@@ -6,16 +6,16 @@
-
-
-
-
-
+
+
+
+
+
diff --git a/log-event/src/main/java/no/digdir/logging/event/DiscardAndLogOldestPolicy.java b/log-event/src/main/java/no/digdir/logging/event/DiscardAndLogOldestPolicy.java
new file mode 100644
index 0000000..b009582
--- /dev/null
+++ b/log-event/src/main/java/no/digdir/logging/event/DiscardAndLogOldestPolicy.java
@@ -0,0 +1,40 @@
+package no.digdir.logging.event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Based upon DiscardOldestPolicy, modified for our needs.
+ *
+ * @see java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
+ */
+class DiscardAndLogOldestPolicy implements RejectedExecutionHandler {
+ private static final Logger log = LoggerFactory.getLogger(DiscardAndLogOldestPolicy.class);
+
+ /**
+ * Obtains, logs and ignores the next task that the executor
+ * would otherwise execute, if one is immediately available,
+ * and then retries execution of task r, unless the executor
+ * is shut down, in which case task r is instead discarded.
+ *
+ * @param r the runnable task requested to be executed
+ * @param e the executor attempting to execute this task
+ */
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ Runnable runnableToBeDiscarded = e.getQueue().poll();
+ if (runnableToBeDiscarded instanceof KafkaTask) {
+ log.warn("Queue is full, discarding event: {}", ((KafkaTask) runnableToBeDiscarded).getProducerRecord()
+ .value());
+ } else {
+ if (runnableToBeDiscarded != null) {
+ log.warn("Discarded runnable of unknown type. It was: " + runnableToBeDiscarded.getClass());
+ }
+ }
+ e.execute(r);
+ }
+ }
+}
\ No newline at end of file
diff --git a/log-event/src/main/java/no/digdir/logging/event/EventLogger.java b/log-event/src/main/java/no/digdir/logging/event/EventLogger.java
index 98dfd2a..3a9027e 100644
--- a/log-event/src/main/java/no/digdir/logging/event/EventLogger.java
+++ b/log-event/src/main/java/no/digdir/logging/event/EventLogger.java
@@ -5,15 +5,15 @@
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import static no.digdir.logging.event.EventLoggingConfig.FEATURE_ENABLED_KEY;
@@ -27,7 +27,15 @@ public class EventLogger {
public EventLogger(EventLoggingConfig eventLoggingConfig) {
this.config = eventLoggingConfig;
this.kafkaProducer = resolveProducer(config);
- this.pool = Executors.newFixedThreadPool(config.getThreadPoolSize(), buildThreadFactory());
+ this.pool = generateFixedThreadPool(config);
+ }
+
+ private ExecutorService generateFixedThreadPool(EventLoggingConfig config) {
+ return new ThreadPoolExecutor(config.getThreadPoolSize(), config.getThreadPoolSize(),
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(config.getThreadPoolQueueSize()),
+ buildThreadFactory(),
+ new DiscardAndLogOldestPolicy());
}
static Producer resolveProducer(EventLoggingConfig config) {
@@ -50,27 +58,8 @@ public Thread newThread(@NonNull Runnable r) {
};
}
- static Runnable createSendTask(
- ProducerRecord producerRecord,
- Producer producer) {
-
- return () -> {
- try {
- producer.send(producerRecord, (recordMetadata, e) -> {
- if (e != null) {
- log.warn("Failed to publish event {}", producerRecord.value(), e);
- } else if (log.isTraceEnabled() && recordMetadata != null) {
- log.trace("Sent record {} with offset {}", producerRecord, recordMetadata.offset());
- }
- });
- } catch (Exception e) {
- log.warn("Failed to publish event {}", producerRecord.value(), e);
- }
- };
- }
-
public void log(EventRecordBase eventRecord) {
- pool.submit(createSendTask(eventRecord.toProducerRecord(config), kafkaProducer));
+ pool.submit(new KafkaTask(eventRecord.toProducerRecord(config), kafkaProducer));
}
@Override
diff --git a/log-event/src/main/java/no/digdir/logging/event/EventLoggingConfig.java b/log-event/src/main/java/no/digdir/logging/event/EventLoggingConfig.java
index db47c84..fb1f1b7 100644
--- a/log-event/src/main/java/no/digdir/logging/event/EventLoggingConfig.java
+++ b/log-event/src/main/java/no/digdir/logging/event/EventLoggingConfig.java
@@ -30,6 +30,7 @@ public class EventLoggingConfig {
static final String MP_AUTH_RECORD_TOPIC_KEY = "maskinporten-authenticated-record.topic";
static final String MP_TOKEN_RECORD_TOPIC_KEY = "maskinporten-token-record.topic";
static final String THREAD_POOL_SIZE_KEY = "thread.pool.size";
+ static final String THREAD_POOL_QUEUE_SIZE_KEY = "thread.pool.queue.size";
private static final String PRODUCER_PROPERTIES_FILE_PATH = "kafka-producer.properties";
private static final String EVENT_LOGGER_PROPERTIES_FILE_PATH = "event-logger.properties";
@@ -105,6 +106,11 @@ public class EventLoggingConfig {
*/
@Getter
private final int threadPoolSize;
+ /**
+ * The queue-capacity of the queue in front of the threadPool
+ */
+ @Getter
+ private final int threadPoolQueueSize;
@Getter
private final Map producerConfig;
@@ -122,7 +128,8 @@ public EventLoggingConfig(
String activityRecordTopic,
String maskinportenAuthenticationRecordTopic,
String maskinportenTokenRecordTopic,
- Integer threadPoolSize) {
+ Integer threadPoolSize,
+ Integer threadPoolQueueSize) {
this.kafkaPassword = kafkaPassword;
this.schemaRegistryUsername = schemaRegistryUsername;
this.schemaRegistryPassword = schemaRegistryPassword;
@@ -133,6 +140,8 @@ public EventLoggingConfig(
);
this.threadPoolSize = Optional.ofNullable(threadPoolSize).orElse(
Integer.valueOf(eventLoggerDefaultProperties.getProperty(THREAD_POOL_SIZE_KEY)));
+ this.threadPoolQueueSize = Optional.ofNullable(threadPoolQueueSize).orElse(
+ Integer.valueOf(eventLoggerDefaultProperties.getProperty(THREAD_POOL_QUEUE_SIZE_KEY)));
if (this.featureEnabled) {
this.applicationName = resolveProperty(APPLICATION_NAME, applicationName, eventLoggerDefaultProperties);
this.environmentName = resolveProperty(ENVIRONMENT_NAME, environmentName, eventLoggerDefaultProperties);
diff --git a/log-event/src/main/java/no/digdir/logging/event/KafkaTask.java b/log-event/src/main/java/no/digdir/logging/event/KafkaTask.java
new file mode 100644
index 0000000..ea933e2
--- /dev/null
+++ b/log-event/src/main/java/no/digdir/logging/event/KafkaTask.java
@@ -0,0 +1,45 @@
+package no.digdir.logging.event;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides a convenient way of getting the ProducerRecord out of a submitted runnable task to the executorService.
+ * Used in combination with the DiscardAndLogOldestPolicy
+ *
+ * @see DiscardAndLogOldestPolicy
+ */
+class KafkaTask implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(KafkaTask.class);
+ private final ProducerRecord producerRecord;
+ private final Producer producer;
+
+ public KafkaTask(
+ ProducerRecord producerRecord,
+ Producer producer) {
+ this.producerRecord = producerRecord;
+ this.producer = producer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ producer.send(producerRecord, (recordMetadata, e) -> {
+ if (e != null) {
+ log.warn("Failed to publish event {}", producerRecord.value(), e);
+ } else if (log.isTraceEnabled() && recordMetadata != null) {
+ log.trace("Sent record {} with offset {}", producerRecord, recordMetadata.offset());
+ }
+ });
+ } catch (Exception e) {
+ log.warn("Failed to publish event {}", producerRecord.value(), e);
+ }
+ }
+
+ ProducerRecord getProducerRecord() {
+ return producerRecord;
+ }
+}
\ No newline at end of file
diff --git a/log-event/src/main/resources/event-logger.properties b/log-event/src/main/resources/event-logger.properties
index cfa8c7c..17b3e60 100644
--- a/log-event/src/main/resources/event-logger.properties
+++ b/log-event/src/main/resources/event-logger.properties
@@ -3,4 +3,5 @@ activity-record.topic=aktiviteter
maskinporten-authenticated-record.topic=mp-auth
maskinporten-token-record.topic=mp-token
digdir.event.logging.feature-enabled=true
-thread.pool.size=4
\ No newline at end of file
+thread.pool.size=4
+thread.pool.queue.size=30000
\ No newline at end of file
diff --git a/log-event/src/test/java/no/digdir/logging/event/DiscardAndLogOldestPolicyTest.java b/log-event/src/test/java/no/digdir/logging/event/DiscardAndLogOldestPolicyTest.java
new file mode 100644
index 0000000..8336d1c
--- /dev/null
+++ b/log-event/src/test/java/no/digdir/logging/event/DiscardAndLogOldestPolicyTest.java
@@ -0,0 +1,57 @@
+package no.digdir.logging.event;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class DiscardAndLogOldestPolicyTest {
+
+ private final ThreadPoolExecutor executor = mock(ThreadPoolExecutor.class);
+ private final BlockingQueue queue = mock(BlockingQueue.class);
+ private final DiscardAndLogOldestPolicy discardAndLogOldestPolicy = new DiscardAndLogOldestPolicy();
+ private final EventLoggingConfig config = EventLoggingConfig.builder()
+ .applicationName("app")
+ .environmentName("env")
+ .bootstrapServers("localhost:443")
+ .schemaRegistryUrl("localhost:433")
+ .kafkaUsername("user")
+ .threadPoolSize(1)
+ .build();
+ private final Producer kafkaProducer = new KafkaProducer<>(config.getProducerConfig());
+
+ @Test
+ void testOldestIsDescheduled() {
+ KafkaTask oldKafkaTask = createKafkaTask("OldEvent");
+ when(executor.getQueue()).thenReturn(queue);
+ when(queue.poll()).thenReturn(oldKafkaTask);
+
+ KafkaTask newKafkaTask = createKafkaTask("NewEvent");
+
+ discardAndLogOldestPolicy.rejectedExecution(newKafkaTask, executor);
+ verify(executor).execute(eq(newKafkaTask));
+ verify(queue).poll();
+ }
+
+ private KafkaTask createKafkaTask(String eventName) {
+ return new KafkaTask(ActivityRecord.builder()
+ .eventName(eventName)
+ .eventDescription("Brukeren har logget inn")
+ .eventSubjectPid("123")
+ .correlationId(UUID.randomUUID().toString())
+ .serviceProviderId("McDuck IT")
+ .serviceOwnerId("Andeby kommune")
+ .authEid("MinID")
+ .authMethod("OTC")
+ .build().toProducerRecord(config), kafkaProducer);
+ }
+}
\ No newline at end of file
diff --git a/log-event/src/test/java/no/digdir/logging/event/EventLoggerTest.java b/log-event/src/test/java/no/digdir/logging/event/EventLoggerTest.java
index 921b55e..c58f1ab 100644
--- a/log-event/src/test/java/no/digdir/logging/event/EventLoggerTest.java
+++ b/log-event/src/test/java/no/digdir/logging/event/EventLoggerTest.java
@@ -177,7 +177,7 @@ void silentFailing() {
}
@Test
- void threadPoolSize() {
+ void threadPoolThreadSize() {
assertTrue(eventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should be of type ThreadPoolExecutor");
assertEquals(POOL_SIZE, ((ThreadPoolExecutor) eventLogger.pool).getCorePoolSize(),
"PoolSize should have been initialized to " + POOL_SIZE);
@@ -192,9 +192,27 @@ void threadPoolSize() {
.threadPoolSize(20)
.build();
- eventLogger = new EventLogger(customPoolSizeConfig);
- assertTrue(eventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should still be of type ThreadPoolExecutor");
- assertEquals(20, ((ThreadPoolExecutor) eventLogger.pool).getCorePoolSize(), "poolSize should be equal to the new custom set size");
+ EventLogger customEventLogger = new EventLogger(customPoolSizeConfig);
+ assertTrue(customEventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should still be of type ThreadPoolExecutor");
+ assertEquals(20, ((ThreadPoolExecutor) customEventLogger.pool).getCorePoolSize(), "poolSize should be equal to the new custom set size");
+ }
+
+ @Test
+ void threadPoolQueueSize() {
+ EventLoggingConfig customPoolSizeConfig = EventLoggingConfig.builder()
+ .applicationName(APPLICATION_NAME)
+ .environmentName(ENVIRONMENT_NAME)
+ .bootstrapServers(DUMMY_URL)
+ .schemaRegistryUrl(DUMMY_URL)
+ .kafkaUsername(USERNAME)
+ .activityRecordTopic("any topic")
+ .featureEnabled(true)
+ .threadPoolQueueSize(200)
+ .build();
+
+ EventLogger customEventLogger = new EventLogger(customPoolSizeConfig);
+ assertTrue(customEventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should be of type ThreadPoolExecutor");
+ assertEquals(200, ((ThreadPoolExecutor) customEventLogger.pool).getQueue().remainingCapacity(), "poolSize remaining capacity should be equal to max capacity since its not in use yet");
}
@Test
diff --git a/log-event/src/test/java/no/digdir/logging/event/EventLoggingConfigTest.java b/log-event/src/test/java/no/digdir/logging/event/EventLoggingConfigTest.java
index e73b9db..c55cfc9 100644
--- a/log-event/src/test/java/no/digdir/logging/event/EventLoggingConfigTest.java
+++ b/log-event/src/test/java/no/digdir/logging/event/EventLoggingConfigTest.java
@@ -230,6 +230,33 @@ void threadPoolSizeDefault() {
assertEquals(4, config.getThreadPoolSize(), "ThreadPoolSize default should be 4");
}
+ @Test
+ void threadPoolQueueSize() {
+ EventLoggingConfig config = EventLoggingConfig.builder()
+ .applicationName("testApplicationName")
+ .environmentName("unit")
+ .bootstrapServers("abc")
+ .schemaRegistryUrl("abc")
+ .kafkaUsername("abc")
+ .featureEnabled(true)
+ .threadPoolQueueSize(200)
+ .build();
+ assertEquals(200, config.getThreadPoolQueueSize(), "ThreadPoolQueueSize should be equal to builder input");
+ }
+
+ @Test
+ void threadPoolQueueSizeDefault() {
+ EventLoggingConfig config = EventLoggingConfig.builder()
+ .applicationName("testApplicationName")
+ .environmentName("unit")
+ .bootstrapServers("abc")
+ .schemaRegistryUrl("abc")
+ .kafkaUsername("abc")
+ .featureEnabled(true)
+ .build();
+ assertEquals(30000, config.getThreadPoolQueueSize(), "ThreadPoolQueueSize default should be 100000");
+ }
+
@Test
void noSchemaRegistryUsername() {
EventLoggingConfig eventLoggingConfig = EventLoggingConfig.builder()