Skip to content

Commit

Permalink
Github comments / added 2 new tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Jan 26, 2018
1 parent cebe815 commit fab87fa
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 112 deletions.
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@
<!-- Streams Test-Utils -->
<suppress checks="ClassFanOutComplexity"
files="TopologyTestDriver.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="TopologyTestDriver.java"/>

<!-- Tools -->
<suppress checks="ClassDataAbstractionCoupling"
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
package org.apache.kafka.streams;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -33,12 +33,6 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.InternalTopologyBuilderAccessor;
import org.apache.kafka.streams.MockTime;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
Expand All @@ -62,6 +56,8 @@
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -75,6 +71,7 @@
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -209,7 +206,7 @@ public TopologyTestDriver(final Topology topology,
final StreamsConfig streamsConfig = new StreamsConfig(config);
mockTime = new MockTime(initialWallClockTimeMs);

internalTopologyBuilder = InternalTopologyBuilderAccessor.getInternalTopologyBuilder(topology);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));

processorTopology = internalTopologyBuilder.build(null);
Expand Down Expand Up @@ -503,7 +500,7 @@ public StateStore getStateStore(final String name) {
@SuppressWarnings("unchecked")
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
final StateStore store = getStateStore(name);
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) getStateStore(name) : null;
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
}

/**
Expand All @@ -523,7 +520,7 @@ public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
@SuppressWarnings("unchecked")
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
final StateStore store = getStateStore(name);
return store instanceof WindowStore ? (WindowStore<K, V>) getStateStore(name) : null;
return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
}

/**
Expand All @@ -543,7 +540,7 @@ public <K, V> WindowStore<K, V> getWindowStore(final String name) {
@SuppressWarnings("unchecked")
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
final StateStore store = getStateStore(name);
return store instanceof SessionStore ? (SessionStore<K, V>) getStateStore(name) : null;
return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
}

/**
Expand All @@ -562,6 +559,40 @@ public void close() {
}
}

static class MockTime implements Time {
private final AtomicLong timeMs;
private final AtomicLong highResTimeNs;

MockTime(final long startTimestampMs) {
this.timeMs = new AtomicLong(startTimestampMs);
this.highResTimeNs = new AtomicLong(startTimestampMs * 1000L * 1000L);
}

@Override
public long milliseconds() {
return timeMs.get();
}

@Override
public long nanoseconds() {
return highResTimeNs.get();
}

@Override
public long hiResClockMs() {
return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
}

@Override
public void sleep(final long ms) {
if (ms < 0) {
throw new IllegalArgumentException("Sleep ms cannot be negative.");
}
timeMs.addAndGet(ms);
highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
}
}

private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
@Override
Expand Down Expand Up @@ -589,5 +620,4 @@ public synchronized long position(final TopicPartition partition) {
}
return consumer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TopologyTestDriver;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.test;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.TopologyTestDriver;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,25 @@ public class MockTimeTest {

@Test
public void shouldSetStartTime() {
final MockTime time = new MockTime(42L);
final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
assertEquals(42L, time.milliseconds());
assertEquals(42L * 1000L * 1000L, time.nanoseconds());
}

@Test
public void shouldGetNanosAsMillis() {
final MockTime time = new MockTime(42L);
final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
assertEquals(42L, time.hiResClockMs());
}

@Test(expected = IllegalArgumentException.class)
public void shouldNotAllowNegativeSleep() {
new MockTime(42).sleep(-1L);
new TopologyTestDriver.MockTime(42).sleep(-1L);
}

@Test
public void shouldAdvanceTimeOnSleep() {
final MockTime time = new MockTime(42L);
final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);

assertEquals(42L, time.milliseconds());
time.sleep(1L);
Expand Down
Loading

0 comments on commit fab87fa

Please sign in to comment.