Skip to content

Commit

Permalink
Implement cache loading.
Browse files Browse the repository at this point in the history
  • Loading branch information
SamBarker committed Jul 19, 2023
1 parent 0b54d75 commit b391d55
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 12 deletions.
13 changes: 13 additions & 0 deletions kroxylicious-filter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<properties>
<caffeine.version>3.1.6</caffeine.version>
<assertj.version>3.24.2</assertj.version>
<mockito.version>5.4.0</mockito.version>
</properties>
<modelVersion>4.0.0</modelVersion>
<artifactId>kroxylicious-filter</artifactId>
Expand Down Expand Up @@ -91,6 +92,18 @@
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.requests.MetadataRequest;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;

import io.kroxylicious.proxy.filter.KrpcFilterContext;

public class TopicIdCache {
private final AsyncLoadingCache<Uuid, String> topicNamesById;
private final AsyncCache<Uuid, String> topicNamesById;

public TopicIdCache() {
this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync((key, executor) -> {
//TODO something clever.
return null;
}));
this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync());
}

TopicIdCache(AsyncLoadingCache<Uuid, String> topicNamesById) {
TopicIdCache(AsyncCache<Uuid, String> topicNamesById) {
this.topicNamesById = topicNamesById;
}

Expand All @@ -44,6 +44,21 @@ public boolean hasResolvedTopic(Uuid topicId) {
}

public void resolveTopicNames(KrpcFilterContext context, Set<Uuid> topicIdsToResolve) {

final MetadataRequest.Builder builder = new MetadataRequest.Builder(List.copyOf(topicIdsToResolve));
final MetadataRequest metadataRequest = builder.build(builder.latestAllowedVersion());
topicIdsToResolve.forEach(uuid -> topicNamesById.put(uuid, new CompletableFuture<>()));
context.<MetadataResponseData> sendRequest(metadataRequest.version(), metadataRequest.data())
.whenComplete((metadataResponseData, throwable) -> {
if (throwable != null) {
//TODO something sensible
}
else {
metadataResponseData.topics()
.forEach(metadataResponseTopic -> Objects.requireNonNull(topicNamesById.getIfPresent(metadataResponseTopic.topicId()))
.complete(metadataResponseTopic.name()));
//If we were to get null from getIfPresent it would imply we got a result for a topic we didn't expect
}
});
}

}
Original file line number Diff line number Diff line change
@@ -1,33 +1,54 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.kroxylicious.proxy.filter.KrpcFilterContext;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
class TopicIdCacheTest {

private static final Uuid UNKNOWN_TOPIC_ID = Uuid.randomUuid();
private static final Uuid KNOWN_TOPIC_ID = Uuid.randomUuid();
private static final Uuid PENDING_TOPIC_ID = Uuid.randomUuid();
private static final String KNOWN_TOPIC_NAME = "TOPIC_WIBBLE";
private static final String RESOLVED_TOPIC_NAME = "TOPIC_RESOLVED";
private TopicIdCache topicIdCache;
private AsyncLoadingCache<Uuid, String> underlyingCache;
private KrpcFilterContext filterContext;
private CompletableFuture<ApiMessage> pendingFuture;

@BeforeEach
void setUp() {
underlyingCache = Caffeine.newBuilder().buildAsync((key, executor) -> null);
underlyingCache.put(KNOWN_TOPIC_ID, CompletableFuture.completedFuture(KNOWN_TOPIC_NAME));
underlyingCache.put(PENDING_TOPIC_ID, new CompletableFuture<>());
topicIdCache = new TopicIdCache(underlyingCache);
filterContext = mock(KrpcFilterContext.class);
pendingFuture = new CompletableFuture<>();

lenient().when(filterContext.sendRequest(anyShort(), any())).thenReturn(pendingFuture);
}

@Test
Expand Down Expand Up @@ -125,4 +146,87 @@ void shouldReturnNullFromGetForUnresolvedTopic() {
Assertions.assertThat(topicName).isNull();
}

@Test
void shouldSendMetadataRequestToResolveTopicNames() {
//Given

//When
topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID));

//Then
verify(filterContext).sendRequest(anyShort(), any(MetadataRequestData.class));
}

@Test
void shouldIncludeTopicId() {
//Given

//When
topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID));

//Then
verify(filterContext).sendRequest(anyShort(), Mockito.argThat(apiMessage -> {
assertThat(apiMessage).isInstanceOf(MetadataRequestData.class);
assertThat(((MetadataRequestData) apiMessage).topics()).anyMatch(metadataRequestTopic -> UNKNOWN_TOPIC_ID.equals(metadataRequestTopic.topicId()));
return true;
}));
}

@Test
void shouldIncludeMultipleTopicId() {
//Given

//When
topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID, PENDING_TOPIC_ID));

//Then
verify(filterContext).sendRequest(anyShort(), Mockito.argThat(apiMessage -> {
assertThat(apiMessage).isInstanceOf(MetadataRequestData.class);
assertThat(((MetadataRequestData) apiMessage).topics())
.allMatch(
metadataRequestTopic -> UNKNOWN_TOPIC_ID.equals(metadataRequestTopic.topicId()) || PENDING_TOPIC_ID.equals(metadataRequestTopic.topicId()));
return true;
}));
}

@Test
void shouldReturnPendingFutures() {
//Given

//When
topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID, PENDING_TOPIC_ID));

//Then
assertThat(topicIdCache.getTopicName(UNKNOWN_TOPIC_ID)).isNotNull().isNotDone();
assertThat(topicIdCache.getTopicName(PENDING_TOPIC_ID)).isNotNull().isNotDone();
}

@Test
void shouldCacheFutureForTopicId() {
//Given
topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID));

//When
final CompletableFuture<String> actualFuture = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID);

//Then
assertThat(actualFuture).isNotNull().isNotDone();
}

@Test
void shouldCompleteFutureWhenMetadataResponseDelivered() {
//Given
topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID));
final MetadataResponseData.MetadataResponseTopic responseTopic = new MetadataResponseData.MetadataResponseTopic();
responseTopic.setTopicId(UNKNOWN_TOPIC_ID).setName(RESOLVED_TOPIC_NAME);
final MetadataResponseData.MetadataResponseTopicCollection metadataResponseTopics = new MetadataResponseData.MetadataResponseTopicCollection();
metadataResponseTopics.add(responseTopic);

//When
pendingFuture.complete(new MetadataResponseData().setTopics(metadataResponseTopics));

//Then
final CompletableFuture<String> actualFuture = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID);
assertThat(actualFuture).isCompletedWithValue(RESOLVED_TOPIC_NAME);
}
}

0 comments on commit b391d55

Please sign in to comment.