Skip to content

Commit

Permalink
Speed up Integration test (#1740)
Browse files Browse the repository at this point in the history
* Speed up UdfInfTest by only starting / spotting Kafka & ZK once and by only waiting on expected number of messages
* Fix intermittent issue in `WindowingIntTest` caused by constant consumer group rebalancing
  • Loading branch information
big-andy-coates authored Aug 16, 2018
1 parent 13771d5 commit 251929e
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.TestDataProvider;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -44,11 +47,15 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@SuppressWarnings("unchecked")
public class IntegrationTestHarness {

private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestHarness.class);

public static final long TEST_RECORD_FUTURE_TIMEOUT_MS = 5000;
public static final long RESULTS_POLL_MAX_TIME_MS = 60000;
public static final long RESULTS_EXTRA_POLL_TIME_MS = 250;
Expand Down Expand Up @@ -97,34 +104,42 @@ public void createTopic(String topicName, int numPartitions, short replicatonFac
public Map<String, RecordMetadata> produceData(String topicName,
Map<String, GenericRow> recordsToPublish,
Serializer<GenericRow> serializer,
Long timestamp)
throws InterruptedException, TimeoutException, ExecutionException {
Long timestamp) {

createTopic(topicName);

Properties producerConfig = properties();
KafkaProducer<String, GenericRow> producer =
new KafkaProducer<>(producerConfig, new StringSerializer(), serializer);

Map<String, RecordMetadata> result = new HashMap<>();
for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
String key = recordEntry.getKey();
Future<RecordMetadata> recordMetadataFuture
= producer.send(buildRecord(topicName, timestamp, recordEntry, key));
result.put(key,
recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
final Properties producerConfig = properties();
try (KafkaProducer<String, GenericRow> producer =
new KafkaProducer<>(producerConfig, new StringSerializer(), serializer)) {

final Map<String, Future<RecordMetadata>> futures = recordsToPublish.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> {
final String key = entry.getKey();
final GenericRow value = entry.getValue();

LOG.debug("Producing message. topic:{}, key:{}, value:{}, timestamp:{}",
topicName, key, value, timestamp);

return producer.send(buildRecord(topicName, timestamp, value, key));
}));

return futures.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> {
try {
return entry.getValue().get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}));
}
producer.close();

return result;
}

private ProducerRecord<String, GenericRow> buildRecord(String topicName,
Long timestamp,
Map.Entry<String,
GenericRow> recordEntry,
String key) {
return new ProducerRecord<>(topicName, null, timestamp, key, recordEntry.getValue());
private static ProducerRecord<String, GenericRow> buildRecord(
final String topicName,
final Long timestamp,
final GenericRow value,
final String key) {
return new ProducerRecord<>(topicName, null, timestamp, key, value);
}

private Properties properties() {
Expand Down Expand Up @@ -190,9 +205,11 @@ public <K> Map<K, GenericRow> consumeData(String topic,
long pollEnd = pollStart + resultsPollMaxTimeMs;
while (System.currentTimeMillis() < pollEnd &&
continueConsuming(result.size(), expectedNumMessages)) {
for (ConsumerRecord<K, GenericRow> record :
consumer.poll(Math.max(1, pollEnd - System.currentTimeMillis()))) {
final Duration duration = Duration.ofMillis(Math.max(1, pollEnd - System.currentTimeMillis()));
for (ConsumerRecord<K, GenericRow> record : consumer.poll(duration)) {
if (record.value() != null) {
LOG.trace("Consumed record. topic:{}, key:{}, value:{}",
topic, record.key(), record.value());
result.put(record.key(), record.value());
}
}
Expand Down Expand Up @@ -317,17 +334,15 @@ public void stop() {

public Map<String, RecordMetadata> publishTestData(String topicName,
TestDataProvider dataProvider,
Long timestamp)
throws InterruptedException, ExecutionException, TimeoutException {
Long timestamp) {

return publishTestData(topicName, dataProvider, timestamp, DataSource.DataSourceSerDe.JSON);
}

public Map<String, RecordMetadata> publishTestData(String topicName,
TestDataProvider dataProvider,
Long timestamp,
DataSource.DataSourceSerDe dataSourceSerDe)
throws InterruptedException, ExecutionException, TimeoutException {
DataSource.DataSourceSerDe dataSourceSerDe) {
createTopic(topicName);
return produceData(topicName,
dataProvider.data(),
Expand Down
Loading

0 comments on commit 251929e

Please sign in to comment.