-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9113: Unit test for RecordCollector #5
Conversation
@abbbccdda @ableegoldman this should be reviewed after #4 |
…to K9113-unit-tests-record-collector
Rebased on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a couple of comments
@@ -301,7 +306,7 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
if (isRecoverable(uncaughtException)) { | |||
// producer.send() call may throw a KafkaException which wraps a FencedException, | |||
// in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException | |||
throw new TaskMigratedException(taskId, "Producer cannot send records anymore since it got fenced", uncaughtException); | |||
throw new TaskMigratedException(taskId, "Producer cannot send records anymore since it got fenced", uncaughtException.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: this assumes the uncaught exception is always wrapped, is that the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isRecoverable
will only return true if 1) itself is a KafkaException, and 2) it has a wrapped exception of ProducerFencedException or UnknownProducerIdException. So here we are certain that it has a wrapped exception and we are only interested in that exception.
@Test | ||
public void shouldNotAllowOffsetsToBeUpdatedExternally() { | ||
final String topic = "topic1"; | ||
// should not all offsets to be modified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not all offsets should be modified.
@@ -32,7 +32,7 @@ | |||
import org.apache.kafka.common.Node; | |||
import org.apache.kafka.common.PartitionInfo; | |||
import org.apache.kafka.common.TopicPartition; | |||
import org.apache.kafka.common.errors.OutOfOrderSequenceException; | |||
import org.apache.kafka.common.errors.AuthenticationException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
req: for L264, we didn't test the case where partition = null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
); | ||
|
||
final Map<TopicPartition, Long> offsets = collector.offsets(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: so the offsets map is ever-changing with the underlying map inside collector? And this is the correct behavior as we don't need to create a deep-copy every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the old behavior. But I think it is more appropriate to not making a view but a deep-copy, will change.
} | ||
|
||
@Test | ||
public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() { | ||
public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerForgottenInCallback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prop: forgotten -> Unknown
@Test(expected = StreamsException.class) | ||
public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() { | ||
@Test | ||
public void shouldSwallowOnEOSAbortTxnFatalException() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: what does this test cover? The abortTxn
call is not triggered in this case right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I will change the test code.
@@ -742,7 +825,7 @@ public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() { | |||
} | |||
); | |||
|
|||
collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); | |||
assertThrows(StreamsException.class, () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prop: be more specific of the thrown cause
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@abbccdda Addressed comments. |
assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0))); | ||
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); | ||
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); | ||
// with mock producer without specific partition, we would use default producer partitioner with murmur hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -354,7 +359,7 @@ public void close() { | |||
|
|||
@Override | |||
public Map<TopicPartition, Long> offsets() { | |||
return Collections.unmodifiableMap(offsets); | |||
return Collections.unmodifiableMap(new HashMap<>(offsets)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making a deep copy and making it read-only seems to be redundant to me -- why do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the case where you get offsets
and then read from it once, and then you tried to do sth with it later but since it is a view and its values have been changed without the caller noticing, given its call frequency I think it is safer to make the returned values final.
collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); | ||
@Test | ||
public void shouldSendWithNoPartition() { | ||
final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to pass headers in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we do not necessarily need to, do you have any concerns about this?
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <[email protected]>, Bruno Cadonna <[email protected]>, Boyang Chen <[email protected]>
While adding the coverage I also found a couple of minor issues fixed inside RecordCollectorImpl.
Committer Checklist (excluded from commit message)