-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9615: Clean up task/producer create and close #8213
KAFKA-9615: Clean up task/producer create and close #8213
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang , do you mind taking a look at this refactor?
|
||
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; | ||
|
||
class ActiveTaskCreator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulled out of StreamThread (could have been done a long time ago, since it was a static class anyway). I didn't embed it in TaskManager, just to keep the file size lower.
I also dropped AbstractTaskCreator, since the creation of Active and Standby tasks are only similar, not exactly the same. We weren't really using the abstraction for much except de-duplicating a few field declarations. On the con side, the abstraction made it hard to see that we were requiring several arguments for Standby task creation that were actually not ever used. The indirection also made it harder to read the task creation logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me
@@ -25,7 +25,7 @@ | |||
|
|||
import java.util.Map; | |||
|
|||
public interface RecordCollector extends AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never used it as an AutoCloseable, and having it makes it hard to trace the callers.
@@ -262,7 +262,7 @@ public void close() { | |||
if (eosEnabled) { | |||
streamsProducer.abortTransaction(); | |||
} | |||
streamsProducer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the creator may close the producer, but we can go ahead and call flush()
here.
@@ -544,11 +306,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, | |||
|
|||
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); | |||
|
|||
final Map<TaskId, Producer<byte[], byte[]>> taskProducers = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Managed fully inside the task factory now.
You'll also notice a bunch of references to the producers are similarly gone in the following lines.
threadProducer == null ? | ||
Collections.emptySet() : | ||
Collections.singleton(getThreadProducerClientId(this.getName())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sooo many switch statements are now gone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
} | ||
} | ||
return result; | ||
return taskManager.producerMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't manage the producers anymore, so we can defer to the taskManager (who will defer to the active task creator, but that's none of the thread's business)
public StreamsProducer(final Producer<byte[], byte[]> producer, | ||
final boolean eosEnabled, | ||
final LogContext logContext, | ||
final String applicationId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit on the side, but there was some wacky stuff going on in here, and afaict, the only purpose of all the nullable fields was to allow including the task id in exception messages. Do we really need to do that? If so, I'll just add the logContext to the exception message instead, since it already has the task id in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the logContext.logPrefix()
should have the format of stream-thread [%s] task [%s]
already, so all the log4j entries are good. For exception messages, we can just get the prefix
and then encode that into the exception message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's what I was thinking. I'll go ahead and do it.
"Error encountered sending record to topic %s%s due to:%n%s", | ||
record.topic(), | ||
taskId == null ? "" : " " + logMessage, | ||
uncaughtException.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to add the exception to the message, since we also pass it as the cause below.
@@ -212,17 +190,6 @@ public void flush() { | |||
producer.flush(); | |||
} | |||
|
|||
public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No one who has a reference to the StreamsProducer has any business calling close, so the method is gone now.
@@ -227,7 +227,7 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, | |||
} | |||
|
|||
if (!standbyTasksToCreate.isEmpty()) { | |||
standbyTaskCreator.createTasks(mainConsumer, standbyTasksToCreate).forEach(this::addNewTask); | |||
standbyTaskCreator.createTasks(standbyTasksToCreate).forEach(this::addNewTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standby tasks don't need the mainConsumer (obvious, in retrospect).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a pass over the PR.
// Instead, we should register and record the metrics properly inside of the record collector. | ||
Map<TaskId, StreamTask> fixmeStreamTasks() { | ||
return tasks.values().stream().filter(t -> t instanceof StreamTask).map(t -> (StreamTask) t).collect(Collectors.toMap(Task::id, t -> t)); | ||
Map<MetricName, Metric> producerMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These two functions are not for testing only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback; I didn't understand this particular comment, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment line above these two method declaration says the following functions are for test only
, but these two functions are not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I just found what you were talking about:
// below are for testing only
I didn't notice that up there.
final StreamThread.AbstractTaskCreator<? extends Task> activeTaskCreator, | ||
final StreamThread.AbstractTaskCreator<? extends Task> standbyTaskCreator, | ||
final Map<TaskId, Producer<byte[], byte[]>> taskProducers, | ||
final ActiveTaskCreator activeTaskCreator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In shutdown(final boolean clean)
we should also release task producers as well right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Good catch.
threadProducer == null ? | ||
Collections.emptySet() : | ||
Collections.singleton(getThreadProducerClientId(this.getName())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
public StreamsProducer(final Producer<byte[], byte[]> producer, | ||
final boolean eosEnabled, | ||
final LogContext logContext, | ||
final String applicationId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the logContext.logPrefix()
should have the format of stream-thread [%s] task [%s]
already, so all the log4j entries are good. For exception messages, we can just get the prefix
and then encode that into the exception message.
@@ -262,7 +262,7 @@ public void close() { | |||
if (eosEnabled) { | |||
streamsProducer.abortTransaction(); | |||
} | |||
streamsProducer.close(); | |||
streamsProducer.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we are introducing a latency regression without EOS here: in the old code when closing without EOS we actually do nothing, and now we would block on flushing.
On the other hand, flushing maybe needed when we close a task to make sure all the tasks' records are acked already.
If the task is in RUNNING before shutting down, we would always commit before closing, so flush
is already called; if the task is in RESTORING / SUSPENDED there's nothing written from this task, so a flush
is not needed. So I think it is safe to not call flush
after all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll swap this out.
@@ -612,25 +582,4 @@ public void shouldFailOnEosAbortTxFatal() { | |||
|
|||
assertThat(thrown.getMessage(), equalTo("KABOOM!")); | |||
} | |||
|
|||
@Test | |||
public void shouldFailOnCloseFatal() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not created new test classes of ActiveTaskCreator / StandbyTaskCreator, but we should still have those coverage to make sure the exception thrown from producers are wrapped correctly.
Also it seems in the new code we no long rethrow -- is that intentional. I left a comment above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Actually, there's a bunch of coverage missing from TaskManager. I'll add several more tests.
try { | ||
threadProducer.close(); | ||
} catch (final RuntimeException e) { | ||
log.error("Failed to close producer due to the following error:", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old code we re-throw exceptions whereas here we just swallow the error. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thanks for noticing this. I meant to ask about it in here.
I thought it was strange that we would only re-throw when closing the task producers, not the thread producer. It seems like we should do the same thing in both cases, but which thing should we do?
I went with an error log in both cases, but it sounds like you wanted to throw the exception instead. Should we also rethrow for the thread producer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we should make them consistent: previously the closing producer is within task#close
and if it is called via closeDirty
we should make sure it never throws. Now since it is extracted out of the close
call we should just rethrow for both cases.
@@ -201,7 +201,7 @@ private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) { | |||
logContext, | |||
new TaskId(0, 0), | |||
consumer, | |||
new StreamsProducer(logContext, producer), | |||
new StreamsProducer(producer, null != null, logContext, null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null != null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! Auto-refactoring. I already made two passes to clean these up, looks like I missed one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cleanup. There is quite some overlap with #8215 even if both PR address a different issue.
return createdTasks; | ||
} | ||
|
||
public void releaseProducer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeThreadProducer()
The method below is not public -- does this one need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better maybeCloseThreadProducer
} | ||
} | ||
|
||
void releaseProducer(final TaskId id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeProducerForTask(TaskId)
?
} | ||
} | ||
|
||
public InternalTopologyBuilder builder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah, doesn't need to be there at all, actually.
return builder; | ||
} | ||
|
||
public StateDirectory stateDirectory() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public?
@@ -262,7 +262,7 @@ public void close() { | |||
if (eosEnabled) { | |||
streamsProducer.abortTransaction(); | |||
} | |||
streamsProducer.close(); | |||
streamsProducer.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me
|
||
if (threadProducer == null) { | ||
// create one producer per task for EOS | ||
// TODO: after KIP-447 this would be removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to keep this TODO, as only after a stream 3.0 is there, we shall remove the support for task producer.
log.error("Failed to close producer due to the following error:", e); | ||
} | ||
} | ||
if (!taskProducers.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw illegal state first, since we are already in an error state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to rename the method to specify that it should do exactly "close thread producer", then this check is no longer appropriate.
return createdTasks; | ||
} | ||
|
||
public void releaseProducer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better maybeCloseThreadProducer
Hey @guozhangwang , @mjsax , and @abbccdda , I've addressed all the feedback. In particular I added about 500 lines of missing tests to TaskManagerTest. It looks like we've done a fair amount of un-unit-tested refactoring in TaskManagerTest, so I figured I'd go ahead and add the missing coverage while I was adding coverage for this refactor itself. Be forewarned, the tests are... extensive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei I made a pass over the testing code, and only have a couple minor comments.
After addressing them please feel free to merge.
try { | ||
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); | ||
} catch (final RuntimeException e) { | ||
log.debug("Error handling lostAll", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Let's make it a warn instead of a debug.
- The error message can be more specific here:
Error closing task producer for task {} while handling lostAll
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if (clean) { | ||
firstException.compareAndSet(null, e); | ||
} else { | ||
log.warn("Ignoring an exception while closing task producer.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here about error message
@@ -440,6 +872,41 @@ public void shouldCommitActiveAndStandbyTasks() { | |||
assertThat(taskManager.commitAll(), equalTo(2)); | |||
} | |||
|
|||
@Test | |||
public void shouldNotCommitActiveAndStandbyTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ...WhileRebalanceInProgress
assertTrue(eosMockProducer.history().isEmpty()); | ||
assertThat(eosMockProducer.uncommittedRecords().size(), equalTo(1)); | ||
assertThat(eosMockProducer.uncommittedRecords().get(0), equalTo(record)); | ||
assertThat(eosMockProducer.transactionInFlight(), is(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering what's the standard for using assertThat
vs assertTrue
? Do we have a convention to follow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat
is nicer in general, but it doesn't really matter. In this case, IDEA offered to translate, and I was already changing a lot of assertions, so I just accepted the translation.
@@ -24,14 +24,20 @@ | |||
import org.apache.kafka.clients.consumer.Consumer; | |||
import org.apache.kafka.clients.consumer.ConsumerRecord; | |||
import org.apache.kafka.common.KafkaException; | |||
import org.apache.kafka.common.Metric; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the tests added in TaskManager only trying for more coverage? @vvcephei
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, that's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only have some minor comments about tests. After addressing them, feel free to merge.
The last commit was trivial, and the Streams tests passed locally for me, so I'm going to go ahead and merge. |
Consolidates task/producer management.
Now, exactly one component manages the creation and destruction of Producers,
whether they are per-thread or per-task.
Committer Checklist (excluded from commit message)