Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#93 Change the kafkaKey from PID to all "dimensions" of the eventReco… #94

Merged
merged 2 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -20,6 +21,7 @@

@Slf4j
public class EventLogger {
private static final String SEPARATOR = ";";
final ExecutorService pool;

private final EventLoggingConfig config;
Expand Down Expand Up @@ -85,13 +87,32 @@ public void log(EventRecord eventRecord) {
ProducerRecord<String, EventRecord> producerRecord =
new ProducerRecord<>(
config.getEventTopic(),
eventRecord.getPid().toString(),
createKafkaKey(enrichedRecord),
enrichedRecord);

Runnable task = createSendTask(producerRecord, producer);
pool.submit(task);
}

/**
* The kafka key is generated based upon all "dimensions" of the eventRecord.
* This ensures that all records that should be counted together (statistics-wise), will
* end up in the same partition and can be aggregated using kafka-streams.
*
* @param eventRecord the eventRecord that the key should be based upon.
* @return a composite kafkaKey based on all dimensions of the eventRecord.
*/
private String createKafkaKey(EventRecord eventRecord) {
Objects.requireNonNull(eventRecord, "EventRecord was null, cannot create a KafkaKey from a null-object.");
return eventRecord.getName() + SEPARATOR +
eventRecord.getApplication() + SEPARATOR +
eventRecord.getClient() + SEPARATOR +
eventRecord.getRepresenting() + SEPARATOR +
eventRecord.getEid() + SEPARATOR +
eventRecord.getAuthmethod() + SEPARATOR +
eventRecord.getEnvironment() + SEPARATOR;
}

@Override
protected void finalize() {
if (producer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ void log() throws ExecutionException, InterruptedException {
Future<Integer> sentEventsFuture = eventLogger.pool.submit(() -> mockProducer.history().size());

assertEquals(1, sentEventsFuture.get(), "Record should be published");
assertEquals(FNR, mockProducer.history().get(0).key(), "Record key should be the PID");
assertEquals(
"Innlogget;testApplication;McDuck IT;Andeby kommune;null;OTC;unitTest;",
mockProducer.history().get(0).key(),
"Record key should be a compositeKey based upon all dimensions of the record");
}

@SuppressWarnings("unchecked")
Expand Down