Skip to content

Commit

Permalink
Bounded queue-size in front of threadpool, with custom rejectPolicy t… (
Browse files Browse the repository at this point in the history
#198)

Bounded queue-size in front of threadpool, with custom rejectPolicy that will log the event-record when it is discarded.

Fixes GH-190
  • Loading branch information
erkdahl authored Apr 17, 2023
1 parent 155beea commit e548d5d
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 34 deletions.
2 changes: 2 additions & 0 deletions .idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions .idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package no.digdir.logging.event;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Based upon DiscardOldestPolicy, modified for our needs.
*
* @see java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
*/
class DiscardAndLogOldestPolicy implements RejectedExecutionHandler {
private static final Logger log = LoggerFactory.getLogger(DiscardAndLogOldestPolicy.class);

/**
* Obtains, logs and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
Runnable runnableToBeDiscarded = e.getQueue().poll();
if (runnableToBeDiscarded instanceof KafkaTask) {
log.warn("Queue is full, discarding event: {}", ((KafkaTask) runnableToBeDiscarded).getProducerRecord()
.value());
} else {
if (runnableToBeDiscarded != null) {
log.warn("Discarded runnable of unknown type. It was: " + runnableToBeDiscarded.getClass());
}
}
e.execute(r);
}
}
}
35 changes: 12 additions & 23 deletions log-event/src/main/java/no/digdir/logging/event/EventLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static no.digdir.logging.event.EventLoggingConfig.FEATURE_ENABLED_KEY;

Expand All @@ -27,7 +27,15 @@ public class EventLogger {
public EventLogger(EventLoggingConfig eventLoggingConfig) {
this.config = eventLoggingConfig;
this.kafkaProducer = resolveProducer(config);
this.pool = Executors.newFixedThreadPool(config.getThreadPoolSize(), buildThreadFactory());
this.pool = generateFixedThreadPool(config);
}

private ExecutorService generateFixedThreadPool(EventLoggingConfig config) {
return new ThreadPoolExecutor(config.getThreadPoolSize(), config.getThreadPoolSize(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(config.getThreadPoolQueueSize()),
buildThreadFactory(),
new DiscardAndLogOldestPolicy());
}

static Producer<String, SpecificRecordBase> resolveProducer(EventLoggingConfig config) {
Expand All @@ -50,27 +58,8 @@ public Thread newThread(@NonNull Runnable r) {
};
}

static Runnable createSendTask(
ProducerRecord<String, SpecificRecordBase> producerRecord,
Producer<String, SpecificRecordBase> producer) {

return () -> {
try {
producer.send(producerRecord, (recordMetadata, e) -> {
if (e != null) {
log.warn("Failed to publish event {}", producerRecord.value(), e);
} else if (log.isTraceEnabled() && recordMetadata != null) {
log.trace("Sent record {} with offset {}", producerRecord, recordMetadata.offset());
}
});
} catch (Exception e) {
log.warn("Failed to publish event {}", producerRecord.value(), e);
}
};
}

public void log(EventRecordBase eventRecord) {
pool.submit(createSendTask(eventRecord.toProducerRecord(config), kafkaProducer));
pool.submit(new KafkaTask(eventRecord.toProducerRecord(config), kafkaProducer));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class EventLoggingConfig {
static final String MP_AUTH_RECORD_TOPIC_KEY = "maskinporten-authenticated-record.topic";
static final String MP_TOKEN_RECORD_TOPIC_KEY = "maskinporten-token-record.topic";
static final String THREAD_POOL_SIZE_KEY = "thread.pool.size";
static final String THREAD_POOL_QUEUE_SIZE_KEY = "thread.pool.queue.size";

private static final String PRODUCER_PROPERTIES_FILE_PATH = "kafka-producer.properties";
private static final String EVENT_LOGGER_PROPERTIES_FILE_PATH = "event-logger.properties";
Expand Down Expand Up @@ -105,6 +106,11 @@ public class EventLoggingConfig {
*/
@Getter
private final int threadPoolSize;
/**
* The queue-capacity of the queue in front of the threadPool
*/
@Getter
private final int threadPoolQueueSize;
@Getter
private final Map<String, Object> producerConfig;

Expand All @@ -122,7 +128,8 @@ public EventLoggingConfig(
String activityRecordTopic,
String maskinportenAuthenticationRecordTopic,
String maskinportenTokenRecordTopic,
Integer threadPoolSize) {
Integer threadPoolSize,
Integer threadPoolQueueSize) {
this.kafkaPassword = kafkaPassword;
this.schemaRegistryUsername = schemaRegistryUsername;
this.schemaRegistryPassword = schemaRegistryPassword;
Expand All @@ -133,6 +140,8 @@ public EventLoggingConfig(
);
this.threadPoolSize = Optional.ofNullable(threadPoolSize).orElse(
Integer.valueOf(eventLoggerDefaultProperties.getProperty(THREAD_POOL_SIZE_KEY)));
this.threadPoolQueueSize = Optional.ofNullable(threadPoolQueueSize).orElse(
Integer.valueOf(eventLoggerDefaultProperties.getProperty(THREAD_POOL_QUEUE_SIZE_KEY)));
if (this.featureEnabled) {
this.applicationName = resolveProperty(APPLICATION_NAME, applicationName, eventLoggerDefaultProperties);
this.environmentName = resolveProperty(ENVIRONMENT_NAME, environmentName, eventLoggerDefaultProperties);
Expand Down
45 changes: 45 additions & 0 deletions log-event/src/main/java/no/digdir/logging/event/KafkaTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package no.digdir.logging.event;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class provides a convenient way of getting the ProducerRecord out of a submitted runnable task to the executorService.
* Used in combination with the DiscardAndLogOldestPolicy
*
* @see DiscardAndLogOldestPolicy
*/
class KafkaTask implements Runnable {
private static final Logger log = LoggerFactory.getLogger(KafkaTask.class);
private final ProducerRecord<String, SpecificRecordBase> producerRecord;
private final Producer<String, SpecificRecordBase> producer;

public KafkaTask(
ProducerRecord<String, SpecificRecordBase> producerRecord,
Producer<String, SpecificRecordBase> producer) {
this.producerRecord = producerRecord;
this.producer = producer;
}

@Override
public void run() {
try {
producer.send(producerRecord, (recordMetadata, e) -> {
if (e != null) {
log.warn("Failed to publish event {}", producerRecord.value(), e);
} else if (log.isTraceEnabled() && recordMetadata != null) {
log.trace("Sent record {} with offset {}", producerRecord, recordMetadata.offset());
}
});
} catch (Exception e) {
log.warn("Failed to publish event {}", producerRecord.value(), e);
}
}

ProducerRecord<String, SpecificRecordBase> getProducerRecord() {
return producerRecord;
}
}
3 changes: 2 additions & 1 deletion log-event/src/main/resources/event-logger.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ activity-record.topic=aktiviteter
maskinporten-authenticated-record.topic=mp-auth
maskinporten-token-record.topic=mp-token
digdir.event.logging.feature-enabled=true
thread.pool.size=4
thread.pool.size=4
thread.pool.queue.size=30000
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package no.digdir.logging.event;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.junit.jupiter.api.Test;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class DiscardAndLogOldestPolicyTest {

private final ThreadPoolExecutor executor = mock(ThreadPoolExecutor.class);
private final BlockingQueue<Runnable> queue = mock(BlockingQueue.class);
private final DiscardAndLogOldestPolicy discardAndLogOldestPolicy = new DiscardAndLogOldestPolicy();
private final EventLoggingConfig config = EventLoggingConfig.builder()
.applicationName("app")
.environmentName("env")
.bootstrapServers("localhost:443")
.schemaRegistryUrl("localhost:433")
.kafkaUsername("user")
.threadPoolSize(1)
.build();
private final Producer<String, SpecificRecordBase> kafkaProducer = new KafkaProducer<>(config.getProducerConfig());

@Test
void testOldestIsDescheduled() {
KafkaTask oldKafkaTask = createKafkaTask("OldEvent");
when(executor.getQueue()).thenReturn(queue);
when(queue.poll()).thenReturn(oldKafkaTask);

KafkaTask newKafkaTask = createKafkaTask("NewEvent");

discardAndLogOldestPolicy.rejectedExecution(newKafkaTask, executor);
verify(executor).execute(eq(newKafkaTask));
verify(queue).poll();
}

private KafkaTask createKafkaTask(String eventName) {
return new KafkaTask(ActivityRecord.builder()
.eventName(eventName)
.eventDescription("Brukeren har logget inn")
.eventSubjectPid("123")
.correlationId(UUID.randomUUID().toString())
.serviceProviderId("McDuck IT")
.serviceOwnerId("Andeby kommune")
.authEid("MinID")
.authMethod("OTC")
.build().toProducerRecord(config), kafkaProducer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void silentFailing() {
}

@Test
void threadPoolSize() {
void threadPoolThreadSize() {
assertTrue(eventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should be of type ThreadPoolExecutor");
assertEquals(POOL_SIZE, ((ThreadPoolExecutor) eventLogger.pool).getCorePoolSize(),
"PoolSize should have been initialized to " + POOL_SIZE);
Expand All @@ -192,9 +192,27 @@ void threadPoolSize() {
.threadPoolSize(20)
.build();

eventLogger = new EventLogger(customPoolSizeConfig);
assertTrue(eventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should still be of type ThreadPoolExecutor");
assertEquals(20, ((ThreadPoolExecutor) eventLogger.pool).getCorePoolSize(), "poolSize should be equal to the new custom set size");
EventLogger customEventLogger = new EventLogger(customPoolSizeConfig);
assertTrue(customEventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should still be of type ThreadPoolExecutor");
assertEquals(20, ((ThreadPoolExecutor) customEventLogger.pool).getCorePoolSize(), "poolSize should be equal to the new custom set size");
}

@Test
void threadPoolQueueSize() {
EventLoggingConfig customPoolSizeConfig = EventLoggingConfig.builder()
.applicationName(APPLICATION_NAME)
.environmentName(ENVIRONMENT_NAME)
.bootstrapServers(DUMMY_URL)
.schemaRegistryUrl(DUMMY_URL)
.kafkaUsername(USERNAME)
.activityRecordTopic("any topic")
.featureEnabled(true)
.threadPoolQueueSize(200)
.build();

EventLogger customEventLogger = new EventLogger(customPoolSizeConfig);
assertTrue(customEventLogger.pool instanceof ThreadPoolExecutor, "The threadPool should be of type ThreadPoolExecutor");
assertEquals(200, ((ThreadPoolExecutor) customEventLogger.pool).getQueue().remainingCapacity(), "poolSize remaining capacity should be equal to max capacity since its not in use yet");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,33 @@ void threadPoolSizeDefault() {
assertEquals(4, config.getThreadPoolSize(), "ThreadPoolSize default should be 4");
}

@Test
void threadPoolQueueSize() {
EventLoggingConfig config = EventLoggingConfig.builder()
.applicationName("testApplicationName")
.environmentName("unit")
.bootstrapServers("abc")
.schemaRegistryUrl("abc")
.kafkaUsername("abc")
.featureEnabled(true)
.threadPoolQueueSize(200)
.build();
assertEquals(200, config.getThreadPoolQueueSize(), "ThreadPoolQueueSize should be equal to builder input");
}

@Test
void threadPoolQueueSizeDefault() {
EventLoggingConfig config = EventLoggingConfig.builder()
.applicationName("testApplicationName")
.environmentName("unit")
.bootstrapServers("abc")
.schemaRegistryUrl("abc")
.kafkaUsername("abc")
.featureEnabled(true)
.build();
assertEquals(30000, config.getThreadPoolQueueSize(), "ThreadPoolQueueSize default should be 100000");
}

@Test
void noSchemaRegistryUsername() {
EventLoggingConfig eventLoggingConfig = EventLoggingConfig.builder()
Expand Down

0 comments on commit e548d5d

Please sign in to comment.