Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3625: Add public test utils for Kafka Streams #4402

Merged
merged 12 commits into from
Jan 30, 2018

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jan 8, 2018

  • add new artifact test-utils
  • add TopologyTestDriver
  • add MockTime, TestRecord, add TestRecordFactory

This PR requires a KIP and is WIP. DO NOT MERGE.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax force-pushed the kafka-3625-test branch 5 times, most recently from a386e97 to a91611f Compare January 9, 2018 04:28
@bbejeck
Copy link
Member

bbejeck commented Jan 9, 2018

@mjsax failures look related

*/
@InterfaceStability.Unstable
public class InternalTopologyBuilderAccessor {
public static InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

}

@Override
public void sleep(final long ms) {
Copy link
Member

@bbejeck bbejeck Jan 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO sleep doesn't feel quite right, what about SMT along the lines of advance or process
EDIT: NM just saw that this implements Time.sleep I get how this is intended to work but I still find it somewhat confusing as it seems nothing is actually paused (I could be wrong).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the mock impl nothing is paused, but to the runtime it's just Time object that could build on SystemTime and thus really sleep here. We could add a second method advance() that internally just calls sleep() though to make tests more readable. But having two methods doing the same thing might be confusion, too. Thus, I prefer to leave as is. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd have to agree it doesn't make sense to have two methods doing the same thing, I'd say leave as is.

}

public void setCurrentTimeMs(final long newMs) {
long oldMs = timeMs.getAndSet(newMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

}

private void tick() {
for (MockTimeListener listener : listeners) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

@bbejeck
Copy link
Member

bbejeck commented Jan 9, 2018

left some initial comments.

I have two meta-comments regarding TestRecord and TestRecordFactory

  1. IMHO if we add a builder to TestRecord (additionally the builder will have a copy constructor) it could help with using the TestRecordFactory I know this isn't entirely clear so I'll need to come up with an example.

  2. What do you think of providing methods auto-generating simple key-value pairs of basic types such as KeyValue<Integer, String> etc

@mjsax mjsax force-pushed the kafka-3625-test branch 2 times, most recently from 435afb9 to a3fb054 Compare January 10, 2018 00:35
@mjsax
Copy link
Member Author

mjsax commented Jan 12, 2018

@bbejeck @guozhangwang @dguy Updated PR according to KIP. Feedback is welcome.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax, made a pass and left some initial comments

* This is an internal class and should not be used.
*/
@InterfaceStability.Unstable
public class MockTime implements Time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need another MockTime? We have one already in o.a.k.common.utils

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockTime in utils is part of the test package, and thus not public API. I don't think we should depend on this.

final long autoAdvanceMs) {
Objects.requireNonNull(keySerializer, "keySerializer cannot be null");
Objects.requireNonNull(valueSerializer, "valueSerializer cannot be null");
this.topicName = defaultTopicName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also be checking that topicName is non-null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to allow topicName=null for the case that somebody wants a factory without default topic name. We have a constructor without taking a defaultTopicName.

final K key,
final V value,
final long timestampMs) {
if (topicName == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objects.requireNonNull(..) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. No strong opinion -- IllegalArgumentException seems more appropriate than NPE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be consistent with what we have done elsewhere in the API

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class ConsumerRecordFactoryConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerRecordFactoryTest ?

}

@Test
public void shouldNotAllowToCreateTopicWithNullTopicName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd probably do this as separate tests for each method

}

@Test
public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


@Test
public void shouldCreateConsumerRecord() {
record = factory.create(rawKey, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why save record to a field? Can just inline it in call to verifyRecord - elsewhere too

assertEquals(1, processedRecords1.size());
assertEquals(0, processedRecords2.size());

Record record = processedRecords1.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you added an equals(...) to Record you could use assertThat(expected, equalTo(..))

@mjsax
Copy link
Member Author

mjsax commented Jan 16, 2018

Updates this.

@mjsax mjsax changed the title KAFKA-3625: Add public test utils for Kafka Streams [WIP] KAFKA-3625: Add public test utils for Kafka Streams Jan 18, 2018
@mjsax
Copy link
Member Author

mjsax commented Jan 18, 2018

Add OutputVerfier as proposed in KIP. Added more tests for TopologyTestDriver. Remove "WIP" annotation. Please review and comment on KIP.

topology.addStateStore(
new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore("store"),
new Serdes.ByteArraySerde(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serdes.ByteArray()? other places too

@mjsax
Copy link
Member Author

mjsax commented Jan 20, 2018

Updates this. (reworked JavaDocs and added update notes).

@mjsax
Copy link
Member Author

mjsax commented Jan 20, 2018

Retest this please.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor comment otherwise LGTM

@SuppressWarnings("unchecked")
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
final StateStore store = getStateStore(name);
return store instanceof SessionStore ? (SessionStore<K, V>) getStateStore(name) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could just return the store you already have rather than getting it again

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. As pointed out that we should consider replacing the internal test drivers with this one as a way to "further unit testing" it and verify if there's any functionality gaps: this can be done in a follow-up PR, but we should consider doing it sooner.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: should we consider using a sub-package name than o.a.k.streams? Perhaps o.a.k.streams.testutils?

Copy link
Member Author

@mjsax mjsax Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this specific class, we need it to put it in this package, because the InternalTopologyBuilder is accessible package private only and I don't think we should make it public.

The package testutils you suggest is the package that is already there but called test -- I can rename test to testutils and update the KIP of course. Thus, the classes in test/testutils are the actual public part. The two classed in this packaged (InternalTopologyBuilderAccessor and MockTime are actually internal -- as we need InternalTopologyBuilderAccessor to be in this package, the idea is to not create an additional internal package, but use this package for this purpose)

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the package name is different with other classes under the test-utils artifact, is this intentional? (see my other comment above)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See answer above.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

* @see OutputVerifier
*/
@InterfaceStability.Evolving
public class TopologyTestDriver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up PR: we should remove the KStreamTestDriver and ProcessorTopologyTestDriver and replace them with this public class in our internal unit tests as well. It also helps us to identify if there are any gaps that worth closing (i.e. enhancing the TopologyTestDriver in next release).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree.

};

final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
final StateDirectory stateDirectory = new StateDirectory(streamsConfig, mockTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user do not modify the state directory path in their unit tests, does that mean two unit tests would conflict on the state directory path then? Should we just use a random directory than enforcing themselves to remember setting to a different path each time?

}
};

private final static class Record {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just using the ConsumerRecord directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerRecord does not implement equals or hashCode and we thus cannot use assertThat if we use ConsumerRecord.

We are also just interested in some fields, for example we don't need partition, headers, serializedKeySize, etc.

}
}

private final class MockPunctuator implements Punctuator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend making MockPunctuator a separate class in o.a.k.test package under streams/tests folder, to be shared with others.

}
}

private final class MockProcessor implements Processor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a MockProcessor in o.a.k.test, could we refactor that class and use it here? Similar for MockProcessorSupplier.

}
}

private final static class Punctuation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other comment for MockProcessor: it's better to just reuse the existing one with improved punctuation configuration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the classes in test package? I think we should not depend on the test package, as test package is not public API and the new artifact should not depend on the test artifact from my point of view. We can refactor this later though, adding Mocks to this new test artifact and remove the mocks from our unit test to use the mocks from the public test artifact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

@mjsax
Copy link
Member Author

mjsax commented Jan 23, 2018

Retest this please.

@guozhangwang
Copy link
Contributor

LGTM. Asking @norwood @xvrl for another round of reviews.

@guozhangwang
Copy link
Contributor

retest this please

@norwood
Copy link
Contributor

norwood commented Jan 23, 2018

💝

@guozhangwang
Copy link
Contributor

retest this please

@guozhangwang
Copy link
Contributor

@mjsax could you rebase this PR (there is no conflicts but some PR that just committed in caused jenkins to fail)

@mjsax
Copy link
Member Author

mjsax commented Jan 26, 2018

Rebased.

@guozhangwang
Copy link
Contributor

This seems relevant:

/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java:303: error: cannot find symbol
00:30:11             task.initialize();

@mjsax
Copy link
Member Author

mjsax commented Jan 29, 2018

Updated.

@guozhangwang
Copy link
Contributor

Unit test passed locally on my machine.

@guozhangwang guozhangwang merged commit a78f66a into apache:trunk Jan 30, 2018
@mjsax mjsax deleted the kafka-3625-test branch January 30, 2018 02:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants