diff --git a/app/src/main/java/org/astraea/app/consumer/Consumer.java b/app/src/main/java/org/astraea/app/consumer/Consumer.java index dcb4ea6ec2..a89bea8bc0 100644 --- a/app/src/main/java/org/astraea/app/consumer/Consumer.java +++ b/app/src/main/java/org/astraea/app/consumer/Consumer.java @@ -55,6 +55,9 @@ default Collection> poll(Duration timeout) { /** unsubscribe all partitions. */ void unsubscribe(); + /** @return current partitions assigned to this consumer */ + Set assignments(); + /** * Create a consumer builder by setting specific topics * diff --git a/app/src/main/java/org/astraea/app/consumer/ConsumerRebalanceListener.java b/app/src/main/java/org/astraea/app/consumer/ConsumerRebalanceListener.java index 161265a86e..0b245a69d1 100644 --- a/app/src/main/java/org/astraea/app/consumer/ConsumerRebalanceListener.java +++ b/app/src/main/java/org/astraea/app/consumer/ConsumerRebalanceListener.java @@ -33,11 +33,24 @@ public interface ConsumerRebalanceListener { */ void onPartitionAssigned(Set partitions); + /** + * It is called when this consumer has to give up some partitions when running re-balance. + * + * @param partitions to give up + */ + default void onPartitionsRevoked(Set partitions) {} + static org.apache.kafka.clients.consumer.ConsumerRebalanceListener of( List listeners) { return new org.apache.kafka.clients.consumer.ConsumerRebalanceListener() { @Override - public void onPartitionsRevoked(Collection ignore) {} + public void onPartitionsRevoked( + Collection partitions) { + listeners.forEach( + l -> + l.onPartitionsRevoked( + partitions.stream().map(TopicPartition::from).collect(Collectors.toSet()))); + } @Override public void onPartitionsAssigned( diff --git a/app/src/main/java/org/astraea/app/consumer/PartitionsBuilder.java b/app/src/main/java/org/astraea/app/consumer/PartitionsBuilder.java index 916f2df681..98821c0710 100644 --- a/app/src/main/java/org/astraea/app/consumer/PartitionsBuilder.java +++ b/app/src/main/java/org/astraea/app/consumer/PartitionsBuilder.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.astraea.app.admin.TopicPartition; @@ -130,5 +131,12 @@ public AssignedConsumerImpl( protected void doResubscribe() { kafkaConsumer.assign(partitions.stream().map(TopicPartition::to).collect(toList())); } + + @Override + public Set assignments() { + return kafkaConsumer.assignment().stream() + .map(TopicPartition::from) + .collect(Collectors.toUnmodifiableSet()); + } } } diff --git a/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java b/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java index 369d190d36..3690f074f9 100644 --- a/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java +++ b/app/src/main/java/org/astraea/app/consumer/SubscribedConsumer.java @@ -17,10 +17,7 @@ package org.astraea.app.consumer; import java.time.Duration; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import org.astraea.app.admin.TopicPartition; /** * This inherited consumer offers function related to consumer group. @@ -45,10 +42,4 @@ public interface SubscribedConsumer extends Consumer { /** @return group instance id (static member) */ Optional groupInstanceId(); - - /** - * @return the historical subscription. key is the time of getting assignments. value is the - * assignments. - */ - Map> historicalSubscription(); } diff --git a/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java b/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java index 16f19949c7..4da24ad6e3 100644 --- a/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java +++ b/app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java @@ -19,14 +19,13 @@ import static java.util.Objects.requireNonNull; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.astraea.app.admin.TopicPartition; @@ -36,8 +35,6 @@ public class TopicsBuilder extends Builder { private final Set topics; private ConsumerRebalanceListener listener = ignore -> {}; - private boolean enableTrace = false; - TopicsBuilder(Set topics) { this.topics = requireNonNull(topics); } @@ -123,17 +120,6 @@ public TopicsBuilder isolation(Isolation isolation) { return this; } - /** - * enable to trace the historical subscription. see {@link - * SubscribedConsumer#historicalSubscription()} - * - * @return this builder - */ - public TopicsBuilder enableTrace() { - this.enableTrace = true; - return this; - } - public TopicsBuilder disableAutoCommitOffsets() { return config(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } @@ -150,22 +136,11 @@ public SubscribedConsumer build() { Deserializer.of((Deserializer) keyDeserializer), Deserializer.of((Deserializer) valueDeserializer)); - var tracker = - new ConsumerRebalanceListener() { - private final Map> history = new ConcurrentHashMap<>(); - - @Override - public void onPartitionAssigned(Set partitions) { - if (enableTrace) history.put(System.currentTimeMillis(), Set.copyOf(partitions)); - } - }; - if (seekStrategy != SeekStrategy.NONE) { // make sure this consumer is assigned before seeking var latch = new CountDownLatch(1); kafkaConsumer.subscribe( - topics, - ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown(), tracker))); + topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); while (latch.getCount() != 0) { // the offset will be reset, so it is fine to poll data // TODO: should we disable auto-commit here? @@ -174,13 +149,12 @@ public void onPartitionAssigned(Set partitions) { } } else { // nothing to seek so we just subscribe topics - kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener, tracker))); + kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); } seekStrategy.apply(kafkaConsumer, seekValue); - return new SubscribedConsumerImpl<>( - kafkaConsumer, topics, listener, Collections.unmodifiableMap(tracker.history)); + return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener); } private static class SubscribedConsumerImpl extends Builder.BaseConsumer @@ -188,17 +162,13 @@ private static class SubscribedConsumerImpl extends Builder.BaseCons private final Set topics; private final ConsumerRebalanceListener listener; - private final Map> history; - public SubscribedConsumerImpl( org.apache.kafka.clients.consumer.Consumer kafkaConsumer, Set topics, - ConsumerRebalanceListener listener, - Map> history) { + ConsumerRebalanceListener listener) { super(kafkaConsumer); this.topics = topics; this.listener = listener; - this.history = history; } @Override @@ -221,13 +191,15 @@ public Optional groupInstanceId() { } @Override - public Map> historicalSubscription() { - return history; + protected void doResubscribe() { + kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); } @Override - protected void doResubscribe() { - kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); + public Set assignments() { + return kafkaConsumer.assignment().stream() + .map(TopicPartition::from) + .collect(Collectors.toUnmodifiableSet()); } } } diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index c0d9ed6a77..f061b1b989 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -18,22 +18,26 @@ import java.time.Duration; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.common.errors.WakeupException; +import org.astraea.app.admin.TopicPartition; import org.astraea.app.common.Utils; +import org.astraea.app.consumer.ConsumerRebalanceListener; import org.astraea.app.consumer.SubscribedConsumer; public interface ConsumerThread extends AbstractThread { static List create( - int consumers, Supplier> consumerSupplier) { + int consumers, + Function> consumerSupplier) { if (consumers == 0) return List.of(); var closeLatches = IntStream.range(0, consumers) @@ -53,18 +57,19 @@ static List create( return IntStream.range(0, consumers) .mapToObj( index -> { - var consumer = consumerSupplier.get(); var report = new Report(); + var listener = new Listener(report); var closeLatch = closeLatches.get(index); var closed = new AtomicBoolean(false); var subscribed = new AtomicBoolean(true); executors.execute( () -> { - try { + try (var consumer = consumerSupplier.apply(listener)) { while (!closed.get()) { if (subscribed.get()) consumer.resubscribe(); else { consumer.unsubscribe(); + report.assignments(Set.of()); Utils.sleep(Duration.ofSeconds(1)); continue; } @@ -80,15 +85,12 @@ record -> record.offset(), System.currentTimeMillis() - record.timestamp(), record.serializedKeySize() + record.serializedValueSize())); + report.assignments(consumer.assignments()); } } catch (WakeupException ignore) { // Stop polling and being ready to clean up } finally { - try { - consumer.close(); - } finally { - closeLatch.countDown(); - } + closeLatch.countDown(); } }); return new ConsumerThread() { @@ -121,7 +123,6 @@ public Report report() { @Override public void close() { closed.set(true); - consumer.wakeup(); Utils.swallowException(closeLatch::await); } }; @@ -135,4 +136,68 @@ public void close() { /** @return report of this thread */ Report report(); + + class Listener implements ConsumerRebalanceListener { + private final Report report; + private long previousCall = System.currentTimeMillis(); + private long maxLatency = 0; + private long sumLatency = 0; + private long count = 0; + + public Listener(Report report) { + this.report = report; + } + + @Override + public void onPartitionAssigned(Set partitions) { + record(); + } + + @Override + public void onPartitionsRevoked(Set partitions) { + record(); + } + + private void record() { + count += 1; + var current = System.currentTimeMillis(); + var diff = current - previousCall; + maxLatency = Math.max(maxLatency, diff); + sumLatency += diff; + previousCall = current; + report.maxSubscriptionLatency(maxLatency); + report.avgSubscriptionLatency((double) sumLatency / count); + } + } + + class Report extends org.astraea.app.performance.Report.Impl { + private volatile long maxSubscriptionLatency = 0; + private volatile double avgSubscriptionLatency = 0; + + private volatile Set assignments; + + public long maxSubscriptionLatency() { + return maxSubscriptionLatency; + } + + public void maxSubscriptionLatency(long maxSubscriptionLatency) { + this.maxSubscriptionLatency = maxSubscriptionLatency; + } + + public double avgSubscriptionLatency() { + return avgSubscriptionLatency; + } + + public void avgSubscriptionLatency(double avgSubscriptionLatency) { + this.avgSubscriptionLatency = avgSubscriptionLatency; + } + + public void assignments(Set assignments) { + this.assignments = assignments; + } + + public Set assignments() { + return assignments; + } + } } diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 011dbc6ba2..346dfd417f 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -127,13 +127,14 @@ public static String execute(final Argument param) throws InterruptedException, var consumerThreads = ConsumerThread.create( param.consumers, - () -> + listener -> Consumer.forTopics(Set.of(param.topic)) .bootstrapServers(param.bootstrapServers()) .groupId(groupId) .configs(param.configs()) .isolation(param.isolation()) .seek(latestOffsets) + .consumerRebalanceListener(listener) .build()); var producerReports = diff --git a/app/src/main/java/org/astraea/app/performance/ProducerThread.java b/app/src/main/java/org/astraea/app/performance/ProducerThread.java index dfbafc0203..56d278cc0c 100644 --- a/app/src/main/java/org/astraea/app/performance/ProducerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ProducerThread.java @@ -57,13 +57,12 @@ static List create( return IntStream.range(0, producers) .mapToObj( index -> { - var producer = producerSupplier.get(); var report = new Report(); var closeLatch = closeLatches.get(index); var closed = new AtomicBoolean(false); executors.execute( () -> { - try { + try (var producer = producerSupplier.get()) { while (!closed.get()) { var data = IntStream.range(0, batchSize) @@ -105,11 +104,7 @@ static List create( m.serializedValueSize() + m.serializedKeySize()))); } } finally { - try { - producer.close(); - } finally { - closeLatch.countDown(); - } + closeLatch.countDown(); } }); return new ProducerThread() { @@ -141,4 +136,6 @@ public void close() { /** @return report of this thread */ Report report(); + + class Report extends org.astraea.app.performance.Report.Impl {} } diff --git a/app/src/main/java/org/astraea/app/performance/Report.java b/app/src/main/java/org/astraea/app/performance/Report.java index 1c92ba25b4..a9d6849370 100644 --- a/app/src/main/java/org/astraea/app/performance/Report.java +++ b/app/src/main/java/org/astraea/app/performance/Report.java @@ -22,66 +22,94 @@ import java.util.stream.Collectors; import org.astraea.app.admin.TopicPartition; -/** Used to record statistics. This is thread safe. */ -public class Report { +public interface Report { + /** * find the max offset from reports * * @param reports to search offsets * @return topic partition and its max offset */ - public static Map maxOffsets(List reports) { + static Map maxOffsets(List reports) { return reports.stream() .flatMap(r -> r.maxOffsets().entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Math::max, HashMap::new)); } - private double avgLatency = 0; - private long records = 0; - private long max = 0; - private long min = Long.MAX_VALUE; - private long totalBytes = 0; - private final Map currentOffsets = new HashMap<>(); - - /** Simultaneously add latency and bytes. */ - public synchronized void record( - String topic, int partition, long offset, long latency, int bytes) { - ++records; - min = Math.min(min, latency); - max = Math.max(max, latency); - avgLatency += (((double) latency) - avgLatency) / (double) records; - totalBytes += bytes; - var tp = TopicPartition.of(topic, partition); - currentOffsets.put(tp, Math.max(offset, offset(tp))); - } - /** @return Get the number of records. */ - public synchronized long records() { - return records; - } + long records(); /** @return Get the maximum of latency put. */ - public synchronized long max() { - return max; - } + long max(); /** @return Get the minimum of latency put. */ - public synchronized long min() { - return min; - } + long min(); /** @return Get the average latency. */ - public synchronized double avgLatency() { - return avgLatency; - } + double avgLatency(); /** @return total send/received bytes */ - public synchronized long totalBytes() { - return totalBytes; - } + long totalBytes(); - public synchronized long offset(TopicPartition tp) { - return currentOffsets.getOrDefault(tp, 0L); - } + long offset(TopicPartition tp); + + Map maxOffsets(); + + class Impl implements Report { + + private double avgLatency = 0; + private long records = 0; + private long max = 0; + private long min = Long.MAX_VALUE; + private long totalBytes = 0; + + private final Map currentOffsets = new HashMap<>(); + + /** Simultaneously add latency and bytes. */ + public synchronized void record( + String topic, int partition, long offset, long latency, int bytes) { + ++records; + min = Math.min(min, latency); + max = Math.max(max, latency); + avgLatency += (((double) latency) - avgLatency) / (double) records; + totalBytes += bytes; + var tp = TopicPartition.of(topic, partition); + currentOffsets.put(tp, Math.max(offset, offset(tp))); + } + + /** @return Get the number of records. */ + @Override + public synchronized long records() { + return records; + } + /** @return Get the maximum of latency put. */ + @Override + public synchronized long max() { + return max; + } + /** @return Get the minimum of latency put. */ + @Override + public synchronized long min() { + return min; + } + + /** @return Get the average latency. */ + @Override + public synchronized double avgLatency() { + return avgLatency; + } + + /** @return total send/received bytes */ + @Override + public synchronized long totalBytes() { + return totalBytes; + } + + @Override + public synchronized long offset(TopicPartition tp) { + return currentOffsets.getOrDefault(tp, 0L); + } - public synchronized Map maxOffsets() { - return Map.copyOf(currentOffsets); + @Override + public synchronized Map maxOffsets() { + return Map.copyOf(currentOffsets); + } } } diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index bbe880be1a..6520c00f65 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -64,8 +64,8 @@ public static Runnable createFileWriter( Supplier consumerDone, Supplier producerDone, Supplier producedRecords, - List producerReports, - List consumerReports) + List producerReports, + List consumerReports) throws IOException { var filePath = FileSystems.getDefault() @@ -161,8 +161,8 @@ private static boolean logToCSV( Supplier consumerDone, Supplier producerDone, Supplier producedRecords, - List producerReports, - List consumerReports) { + List producerReports, + List consumerReports) { var result = processResult(exeTime, producerReports, consumerReports, producedRecords); if (producerReports.stream().mapToLong(Report::records).sum() == 0) return false; try { @@ -207,8 +207,8 @@ private static boolean logToJSON( Supplier consumerDone, Supplier producerDone, Supplier producedRecords, - List producerReports, - List consumerReports) { + List producerReports, + List consumerReports) { var result = processResult(exeTime, producerReports, consumerReports, producedRecords); if (producerReports.stream().mapToLong(Report::records).sum() == 0) return false; try { @@ -264,8 +264,8 @@ private static boolean logToJSON( private static ProcessedResult processResult( ExeTime exeTime, - List producerReports, - List consumerReports, + List producerReports, + List consumerReports, Supplier producedRecords) { var duration = Duration.ofMillis(System.currentTimeMillis() - System.currentTimeMillis()); return new ProcessedResult( diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 6ba5e11076..4dc4e3b8fe 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -28,7 +28,9 @@ public interface TrackerThread extends AbstractThread { static TrackerThread create( - List producerReports, List consumerReports, ExeTime exeTime) { + List producerReports, + List consumerReports, + ExeTime exeTime) { var start = System.currentTimeMillis() - Duration.ofSeconds(1).toMillis(); Supplier logProducers = @@ -101,13 +103,22 @@ static TrackerThread create( .mapToLong(Report::min) .min() .ifPresent(i -> System.out.println(" end-to-end mim latency: " + i + " ms")); + consumerReports.stream() + .mapToLong(ConsumerThread.Report::maxSubscriptionLatency) + .max() + .ifPresent(i -> System.out.println(" subscription max latency: " + i + " ms")); + consumerReports.stream() + .mapToDouble(ConsumerThread.Report::avgSubscriptionLatency) + .average() + .ifPresent(i -> System.out.println(" subscription average latency: " + i + " ms")); for (int i = 0; i < consumerReports.size(); ++i) { + var report = consumerReports.get(i); + System.out.printf(" consumer[%d] has %d partitions%n", i, report.assignments().size()); System.out.printf( " consumer[%d] average throughput: %.3f MB%n", - i, Utils.averageMB(duration, consumerReports.get(i).totalBytes())); + i, Utils.averageMB(duration, report.totalBytes())); System.out.printf( - " consumer[%d] average ene-to-end latency: %.3f ms%n", - i, consumerReports.get(i).avgLatency()); + " consumer[%d] average ene-to-end latency: %.3f ms%n", i, report.avgLatency()); } System.out.println("\n"); // Target number of records consumed OR consumed all that produced diff --git a/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java b/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java index ee198e96dd..9b0f9721eb 100644 --- a/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java +++ b/app/src/test/java/org/astraea/app/consumer/ConsumerTest.java @@ -447,72 +447,4 @@ void testCreateConsumersConcurrent() throws ExecutionException, InterruptedExcep closed.set(true); fs.get(); } - - @Test - void testHistoricalSubscription() { - var partitions = 3; - var topic = Utils.randomString(10); - try (var admin = Admin.of(bootstrapServers())) { - admin.creator().topic(topic).numberOfPartitions(partitions).create(); - Utils.sleep(Duration.ofSeconds(3)); - } - - var groupId = Utils.randomString(10); - var closed = new AtomicBoolean(false); - - var consumers = - IntStream.range(0, 2) - .mapToObj( - ignored -> - Consumer.forTopics(Set.of(topic)) - .bootstrapServers(bootstrapServers()) - .groupId(groupId) - .enableTrace() - .build()) - .collect(Collectors.toUnmodifiableList()); - - var fs = - Utils.sequence( - consumers.stream() - .map( - c -> - CompletableFuture.runAsync( - () -> { - try (c) { - while (!closed.get()) c.poll(Duration.ofSeconds(1)); - } - })) - .collect(Collectors.toUnmodifiableList())); - - Utils.waitFor(() -> consumers.stream().allMatch(c -> c.historicalSubscription().size() >= 1)); - - // create another consumer to trigger balance - try (var consumer = - Consumer.forTopics(Set.of(topic)) - .bootstrapServers(bootstrapServers()) - .groupId(groupId) - .enableTrace() - .build()) { - - Utils.waitFor( - () -> { - consumer.poll(Duration.ofSeconds(1)); - return consumers.stream().allMatch(c -> c.historicalSubscription().size() > 1); - }); - } - - // produce data to make sure the balance get done. - produceData(topic, 100); - - try (var consumer = - Consumer.forTopics(Set.of(topic)) - .bootstrapServers(bootstrapServers()) - .groupId(groupId) - .fromBeginning() - .build()) { - Utils.waitFor(() -> !consumer.poll(Duration.ofSeconds(1)).isEmpty()); - Assertions.assertTrue( - consumers.stream().anyMatch(c -> c.historicalSubscription().size() > 1)); - } - } } diff --git a/app/src/test/java/org/astraea/app/performance/MetricsTest.java b/app/src/test/java/org/astraea/app/performance/MetricsTest.java index c7d2f9c3cb..276af7bae6 100644 --- a/app/src/test/java/org/astraea/app/performance/MetricsTest.java +++ b/app/src/test/java/org/astraea/app/performance/MetricsTest.java @@ -27,7 +27,7 @@ void testAverage() { Random rand = new Random(); final int num = 1000; double avg = 0.0; - Report metrics = new Report(); + var metrics = new Report.Impl(); Assertions.assertEquals(0, metrics.avgLatency()); @@ -42,7 +42,7 @@ void testAverage() { @Test void testBytes() { - var metrics = new Report(); + var metrics = new Report.Impl(); Assertions.assertEquals(0, metrics.totalBytes()); metrics.record("topic", 0, 100, 0L, 1000); diff --git a/app/src/test/java/org/astraea/app/performance/TrackerTest.java b/app/src/test/java/org/astraea/app/performance/TrackerTest.java index 4f087da82e..1cb4ae8c0f 100644 --- a/app/src/test/java/org/astraea/app/performance/TrackerTest.java +++ b/app/src/test/java/org/astraea/app/performance/TrackerTest.java @@ -34,7 +34,7 @@ void testClose() { @Test void testZeroConsumer() { - var producerReport = new Report(); + var producerReport = new ProducerThread.Report(); var tracker = TrackerThread.create(List.of(producerReport), List.of(), ExeTime.of("1records")); Assertions.assertFalse(tracker.closed()); producerReport.record("topic", 1, 100, 1L, 1); @@ -45,8 +45,8 @@ void testZeroConsumer() { @Test void testExeTime() { - var producerReport = new Report(); - var consumerReport = new Report(); + var producerReport = new ProducerThread.Report(); + var consumerReport = new ConsumerThread.Report(); var tracker = TrackerThread.create(List.of(producerReport), List.of(consumerReport), ExeTime.of("2s")); Assertions.assertFalse(tracker.closed()); @@ -58,8 +58,8 @@ void testExeTime() { @Test void testConsumerAndProducer() { - var producerReport = new Report(); - var consumerReport = new Report(); + var producerReport = new ProducerThread.Report(); + var consumerReport = new ConsumerThread.Report(); var tracker = TrackerThread.create( List.of(producerReport), List.of(consumerReport), ExeTime.of("1records"));