From 1859292c788a1caf81bf73e3dd5f5348f4d05640 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 17 Dec 2024 17:26:20 -0800 Subject: [PATCH] fixy flaky integration test --- .../TablePartitionerIntegrationTest.java | 106 +++++++++++------- .../kafka/testutils/IntegrationTestUtils.java | 6 + 2 files changed, 70 insertions(+), 42 deletions(-) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java index 5a636ba41..96f0eb651 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java @@ -82,10 +82,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.CassandraContainer; public class TablePartitionerIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(TablePartitionerIntegrationTest.class); + @RegisterExtension static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA); @@ -149,30 +153,38 @@ public void shouldFlushToRemoteTableWithSubpartitions() throws Exception { final KafkaProducer producer = new KafkaProducer<>(properties); try ( - final var streams = new ResponsiveKafkaStreams( - keyValueStoreTopology(ResponsiveKeyValueParams.keyValue(storeName)), - properties); final var serializer = new LongSerializer(); final var deserializer = new LongDeserializer() ) { - // When: - // this will send one key to each virtual partition using the LongBytesHasher - IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(20), streams); - IntegrationTestUtils.pipeInput( - inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L, - LongStream.range(0, 32).toArray()); + try ( + final var streams = new ResponsiveKafkaStreams( + keyValueStoreTopology(ResponsiveKeyValueParams.keyValue(storeName)), + properties + ) + ) { + // When: + // this will send one key to each virtual partition using the LongBytesHasher + IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(20), streams); + IntegrationTestUtils.pipeInput( + inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L, + LongStream.range(0, 32).toArray() + ); - // Then - IntegrationTestUtils.awaitOutput( - outputTopic(), - 0, - LongStream.range(0, 32) - .boxed() - .map(k -> new KeyValue<>(k, 100L)) - .collect(Collectors.toSet()), - true, - properties - ); + // Then + IntegrationTestUtils.awaitOutput( + outputTopic(), + 0, + LongStream.range(0, 32) + .boxed() + .map(k -> new KeyValue<>(k, 100L)) + .collect(Collectors.toSet()), + false, + properties + ); + } + + // have this outside the try block so that kafka streams is closed and fully + // flushed before we assert final String cassandraName = new TableName(storeName).tableName(); final var partitioner = SubPartitioner.create( OptionalInt.empty(), @@ -181,6 +193,7 @@ public void shouldFlushToRemoteTableWithSubpartitions() throws Exception { ResponsiveConfig.responsiveConfig(properties), storeName + "-changelog" ); + final CassandraKeyValueTable table = CassandraKeyValueTable.create( new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL), client); @@ -188,6 +201,7 @@ public void shouldFlushToRemoteTableWithSubpartitions() throws Exception { assertThat(client.count(cassandraName, 0), is(2L)); assertThat(client.count(cassandraName, 16), is(2L)); + LOG.info("Checking data in remote table"); for (long tp = 0; tp < 32; ++tp) { final var kBytes = Bytes.wrap(serializer.serialize("", tp)); final byte[] valBytes = table.get((int) tp % NUM_PARTITIONS_INPUT, kBytes, MIN_VALID_TS); @@ -204,31 +218,38 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception { final KafkaProducer producer = new KafkaProducer<>(properties); try ( - final var streams = new ResponsiveKafkaStreams( - keyValueStoreTopology(ResponsiveKeyValueParams.fact(storeName)), - properties - ); final var serializer = new LongSerializer(); - final var deserializer = new LongDeserializer(); + final var deserializer = new LongDeserializer() ) { - // When: - // this will send one key to each virtual partition using the LongBytesHasher - IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(10), streams); - IntegrationTestUtils.pipeInput( - inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L, - LongStream.range(0, 32).toArray()); + try ( + final var streams = new ResponsiveKafkaStreams( + keyValueStoreTopology(ResponsiveKeyValueParams.fact(storeName)), + properties + ) + ) { + // When: + // this will send one key to each virtual partition using the LongBytesHasher + IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(10), streams); + IntegrationTestUtils.pipeInput( + inputTopic(), NUM_PARTITIONS_INPUT, producer, System::currentTimeMillis, 0, 100L, + LongStream.range(0, 32).toArray() + ); - // Then - IntegrationTestUtils.awaitOutput( - outputTopic(), - 0, - LongStream.range(0, 32) - .boxed() - .map(k -> new KeyValue<>(k, 100L)) - .collect(Collectors.toSet()), - false, - properties - ); + // Then + IntegrationTestUtils.awaitOutput( + outputTopic(), + 0, + LongStream.range(0, 32) + .boxed() + .map(k -> new KeyValue<>(k, 100L)) + .collect(Collectors.toSet()), + false, + properties + ); + } + + // have this outside the try block so that kafka streams is closed and fully + // flushed before we assert final String cassandraName = new TableName(storeName).tableName(); final var partitioner = TablePartitioner.defaultPartitioner(); final CassandraFactTable table = CassandraFactTable.create( @@ -242,6 +263,7 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception { Assertions.assertEquals(table.fetchOffset(2), NO_COMMITTED_OFFSET); + LOG.info("Checking data in remote table"); // these store ValueAndTimestamp, so we need to just pluck the last 8 bytes for (long k = 0; k < 32; k++) { final var kBytes = Bytes.wrap(serializer.serialize("", k)); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index 8476394df..c20d3c60b 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -76,9 +76,13 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.Topology; import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class IntegrationTestUtils { + private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class); + /** * Simple override that allows plugging in a custom CassandraClientFactory * to mock or verify this connection in tests @@ -336,6 +340,7 @@ public static void awaitOutput( final boolean readUncommitted, final Map originals ) throws TimeoutException { + LOG.info("Awaiting {} output events on topic {}", expected.size(), topic); final Map properties = new HashMap<>(originals); properties.put(ISOLATION_LEVEL_CONFIG, readUncommitted ? IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT) @@ -363,6 +368,7 @@ public static void awaitOutput( } } } + LOG.info("Read {} output events on topic {} (finished awaitOutput)", expected.size(), topic); } public static List> readOutput(