Skip to content

Commit

Permalink
MINOR: Add unit test for SerDe auto-configuration (#6610)
Browse files Browse the repository at this point in the history
Reviewers: Guozhang Wang <[email protected]>, Ted Yu <[email protected]>
  • Loading branch information
mjsax authored Apr 22, 2019
1 parent 409fabc commit 172fbb2
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,11 +647,11 @@ private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> so
}
}
for (final SourceNode sn : sources) {
if (sn.getKeyDeSerializer() != null) {
sn.getKeyDeSerializer().configure(config.originals(), true);
if (sn.getKeyDeserializer() != null) {
sn.getKeyDeserializer().configure(config.originals(), true);
}
if (sn.getValueDeSerializer() != null) {
sn.getValueDeSerializer().configure(config.originals(), false);
if (sn.getValueDeserializer() != null) {
sn.getValueDeserializer().configure(config.originals(), false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public SourceNode(final String name,
this(name, topics, null, keyDeserializer, valDeserializer);
}

public Deserializer getKeyDeSerializer() {
public Deserializer getKeyDeserializer() {
return keyDeserializer;
}

public Deserializer getValueDeSerializer() {
public Deserializer getValueDeserializer() {
return valDeserializer;
}

Expand Down
137 changes: 137 additions & 0 deletions streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
Expand Down Expand Up @@ -74,6 +79,13 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -725,6 +737,131 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true);
}

@SuppressWarnings("unchecked")
@Test
public void shouldInitializeUserSerdes() {
final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
expectLastCall();
final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
mockSourceValueDeserialzer.configure(anyObject(), eq(false));
expectLastCall();

final Serde mockSourceKeySerde = mock(Serde.class);
final Serde mockSourceValueSerde = mock(Serde.class);
expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();



final Serializer mockThroughKeySerializer = mock(Serializer.class);
mockThroughKeySerializer.configure(anyObject(), eq(true));
expectLastCall();
final Serializer mockThroughValueSerializer = mock(Serializer.class);
mockThroughValueSerializer.configure(anyObject(), eq(false));
expectLastCall();
final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
mockThroughKeyDeserializer.configure(anyObject(), eq(true));
expectLastCall();
final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
mockThroughValueDeserializer.configure(anyObject(), eq(false));
expectLastCall();

final Serde mockThroughKeySerde = mock(Serde.class);
final Serde mockThroughValueSerde = mock(Serde.class);
expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();



final Serializer mockGroupedKeySerializer = mock(Serializer.class);
mockGroupedKeySerializer.configure(anyObject(), eq(true));
expectLastCall();
final Serializer mockGroupedValueSerializer = mock(Serializer.class);
mockGroupedValueSerializer.configure(anyObject(), eq(false));
expectLastCall();
final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
expectLastCall();
final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
mockGroupedValueDeserializer.configure(anyObject(), eq(false));
expectLastCall();

final Serde mockGroupedKeySerde = mock(Serde.class);
final Serde mockGroupedValueSerde = mock(Serde.class);
expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();



final Serializer mockOutputKeySerializer = mock(Serializer.class);
mockOutputKeySerializer.configure(anyObject(), eq(true));
expectLastCall();
final Serializer mockOutputValueSerializer = mock(Serializer.class);
mockOutputValueSerializer.configure(anyObject(), eq(false));
expectLastCall();

final Serde mockOutputKeySerde = mock(Serde.class);
final Serde mockOutputValueSerde = mock(Serde.class);
expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();



final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
expectLastCall();
final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
mockGlobalValueDeserializer.configure(anyObject(), eq(false));
expectLastCall();

final Serde mockGlobalKeySerde = mock(Serde.class);
final Serde mockGlobalValueSerde = mock(Serde.class);
expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();



builder
.stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
.through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
.selectKey(KeyValue::pair)
.groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
.count()
.toStream()
.to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));

replay(
mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);

KafkaStreams kafkaStreams = null;
try {
kafkaStreams = new KafkaStreams(builder.build(), props);
} finally {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}

verify(
mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
mockOutputKeySerializer, mockOutputValueSerializer,
mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
}

@SuppressWarnings("unchecked")
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
Expand All @@ -39,8 +40,8 @@
import org.junit.runner.RunWith;

import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.utils.Utils.mkEntry;
Expand All @@ -52,6 +53,7 @@
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -102,6 +104,53 @@ private void init() {
metered.init(context, metered);
}

@SuppressWarnings("unchecked")
@Test
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
metered = new MeteredKeyValueStore<>(
inner,
"scope",
new MockTime(),
null,
null
);
final Serde mockSerde = mock(Serde.class);
replay(mockSerde);
expect(context.keySerde()).andReturn(mockSerde);
expect(context.valueSerde()).andReturn(mockSerde);

init();
verify(context, mockSerde);
}

@Test
public void shouldConfigureUserSerdes() {
final Serde<String> mockKeySerde = mock(Serde.class);
mockKeySerde.configure(anyObject(), eq(true));
expectLastCall();

final Serde<String> mockValueSerde = mock(Serde.class);
mockValueSerde.configure(anyObject(), eq(false));
expectLastCall();

replay(mockKeySerde, mockValueSerde);

metered = new MeteredKeyValueStore<>(
inner,
"scope",
new MockTime(),
mockKeySerde,
mockValueSerde
);

reset(context);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
expect(context.taskId()).andReturn(taskId).anyTimes();

init();
verify(context, mockKeySerde, mockValueSerde);
}

@Test
public void testMetrics() {
init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
Expand Down Expand Up @@ -54,6 +55,7 @@
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -101,6 +103,53 @@ private void init() {
metered.init(context, metered);
}

@SuppressWarnings("unchecked")
@Test
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
metered = new MeteredSessionStore<>(
inner,
"scope",
null,
null,
new MockTime()
);
final Serde mockSerde = mock(Serde.class);
replay(mockSerde);
expect(context.keySerde()).andReturn(mockSerde);
expect(context.valueSerde()).andReturn(mockSerde);

init();
verify(context, mockSerde);
}

@Test
public void shouldConfigureUserSerdes() {
final Serde<String> mockKeySerde = mock(Serde.class);
mockKeySerde.configure(anyObject(), eq(true));
expectLastCall();

final Serde<String> mockValueSerde = mock(Serde.class);
mockValueSerde.configure(anyObject(), eq(false));
expectLastCall();

replay(mockKeySerde, mockValueSerde);

metered = new MeteredSessionStore<>(
inner,
"scope",
mockKeySerde,
mockValueSerde,
new MockTime()
);

reset(context);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
expect(context.taskId()).andReturn(taskId).anyTimes();

init();
verify(context, mockKeySerde, mockValueSerde);
}

@Test
public void testMetrics() {
init();
Expand Down
Loading

0 comments on commit 172fbb2

Please sign in to comment.