Skip to content

Commit

Permalink
fixy flaky integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Dec 18, 2024
1 parent 0f17ed1 commit 1859292
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;

public class TablePartitionerIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(TablePartitionerIntegrationTest.class);

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA);

Expand Down Expand Up @@ -149,30 +153,38 @@ public void shouldFlushToRemoteTableWithSubpartitions() throws Exception {
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);

try (
final var streams = new ResponsiveKafkaStreams(
keyValueStoreTopology(ResponsiveKeyValueParams.keyValue(storeName)),
properties);
final var serializer = new LongSerializer();
final var deserializer = new LongDeserializer()
) {
// When:
// this will send one key to each virtual partition using the LongBytesHasher
IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(20), streams);
IntegrationTestUtils.pipeInput(
inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L,
LongStream.range(0, 32).toArray());
try (
final var streams = new ResponsiveKafkaStreams(
keyValueStoreTopology(ResponsiveKeyValueParams.keyValue(storeName)),
properties
)
) {
// When:
// this will send one key to each virtual partition using the LongBytesHasher
IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(20), streams);
IntegrationTestUtils.pipeInput(
inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L,
LongStream.range(0, 32).toArray()
);

// Then
IntegrationTestUtils.awaitOutput(
outputTopic(),
0,
LongStream.range(0, 32)
.boxed()
.map(k -> new KeyValue<>(k, 100L))
.collect(Collectors.toSet()),
true,
properties
);
// Then
IntegrationTestUtils.awaitOutput(
outputTopic(),
0,
LongStream.range(0, 32)
.boxed()
.map(k -> new KeyValue<>(k, 100L))
.collect(Collectors.toSet()),
false,
properties
);
}

// have this outside the try block so that kafka streams is closed and fully
// flushed before we assert
final String cassandraName = new TableName(storeName).tableName();
final var partitioner = SubPartitioner.create(
OptionalInt.empty(),
Expand All @@ -181,13 +193,15 @@ public void shouldFlushToRemoteTableWithSubpartitions() throws Exception {
ResponsiveConfig.responsiveConfig(properties),
storeName + "-changelog"
);

final CassandraKeyValueTable table = CassandraKeyValueTable.create(
new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL), client);

assertThat(client.numPartitions(cassandraName), is(OptionalInt.of(32)));
assertThat(client.count(cassandraName, 0), is(2L));
assertThat(client.count(cassandraName, 16), is(2L));

LOG.info("Checking data in remote table");
for (long tp = 0; tp < 32; ++tp) {
final var kBytes = Bytes.wrap(serializer.serialize("", tp));
final byte[] valBytes = table.get((int) tp % NUM_PARTITIONS_INPUT, kBytes, MIN_VALID_TS);
Expand All @@ -204,31 +218,38 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception {
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);

try (
final var streams = new ResponsiveKafkaStreams(
keyValueStoreTopology(ResponsiveKeyValueParams.fact(storeName)),
properties
);
final var serializer = new LongSerializer();
final var deserializer = new LongDeserializer();
final var deserializer = new LongDeserializer()
) {
// When:
// this will send one key to each virtual partition using the LongBytesHasher
IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(10), streams);
IntegrationTestUtils.pipeInput(
inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L,
LongStream.range(0, 32).toArray());
try (
final var streams = new ResponsiveKafkaStreams(
keyValueStoreTopology(ResponsiveKeyValueParams.fact(storeName)),
properties
)
) {
// When:
// this will send one key to each virtual partition using the LongBytesHasher
IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(10), streams);
IntegrationTestUtils.pipeInput(
inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L,
LongStream.range(0, 32).toArray()
);

// Then
IntegrationTestUtils.awaitOutput(
outputTopic(),
0,
LongStream.range(0, 32)
.boxed()
.map(k -> new KeyValue<>(k, 100L))
.collect(Collectors.toSet()),
false,
properties
);
// Then
IntegrationTestUtils.awaitOutput(
outputTopic(),
0,
LongStream.range(0, 32)
.boxed()
.map(k -> new KeyValue<>(k, 100L))
.collect(Collectors.toSet()),
false,
properties
);
}

// have this outside the try block so that kafka streams is closed and fully
// flushed before we assert
final String cassandraName = new TableName(storeName).tableName();
final var partitioner = TablePartitioner.defaultPartitioner();
final CassandraFactTable table = CassandraFactTable.create(
Expand All @@ -242,6 +263,7 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception {

Assertions.assertEquals(table.fetchOffset(2), NO_COMMITTED_OFFSET);

LOG.info("Checking data in remote table");
// these store ValueAndTimestamp, so we need to just pluck the last 8 bytes
for (long k = 0; k < 32; k++) {
final var kBytes = Bytes.wrap(serializer.serialize("", k));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.Topology;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class IntegrationTestUtils {

private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);

/**
* Simple override that allows plugging in a custom CassandraClientFactory
* to mock or verify this connection in tests
Expand Down Expand Up @@ -336,6 +340,7 @@ public static void awaitOutput(
final boolean readUncommitted,
final Map<String, Object> originals
) throws TimeoutException {
LOG.info("Awaiting {} output events on topic {}", expected.size(), topic);
final Map<String, Object> properties = new HashMap<>(originals);
properties.put(ISOLATION_LEVEL_CONFIG, readUncommitted
? IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)
Expand Down Expand Up @@ -363,6 +368,7 @@ public static void awaitOutput(
}
}
}
LOG.info("Read {} output events on topic {} (finished awaitOutput)", expected.size(), topic);
}

public static <K, V> List<KeyValue<K, V>> readOutput(
Expand Down

0 comments on commit 1859292

Please sign in to comment.