-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6054: Add 'version probing' to Kafka Streams rebalance #4636
KAFKA-6054: Add 'version probing' to Kafka Streams rebalance #4636
Conversation
This is a WIP PR that also includes all changes from #4630 -- thus, must eventually be rebased. I added a new config just for now to test the approach -- this might change after the KIP is done, but I wanted to get a initial passing system test setup (the system test also contains some code that is put in comment, that allows to fail the test -- just FYI -- this part needs cleanup as well). I also needed to update the Docker setup to get the older jar files we need to run the test locally using docker. We could backport this fix to 0.10.1, 0.10.2, 0.11.0, 1.0, and 1.1, too. But cherry-picking won't work, as too much code change in-between. Basically, the fix contains, setting the correct min-version in the assignment and a config that tells bouncing instances to stay on protocol version 1. So it's quite a change -- not sure if it's worth it, considering that not many people will use 0.10.0 anyway anymore (Having said this, it might not even be required to fix the JIRA at all -- nevertheless, this PR gives an idea for the upcoming version change and how to do a system test for it). Please share your thoughts. |
becd580
to
af39ef9
Compare
Rebased this and cleanup the code. This is a proper fix for the issue including system test. Still depends on a KIP that is WIP. Triggered 10 runs of the new system test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1387/ |
@@ -196,6 +197,12 @@ public void configure(final Map<String, ?> configs) { | |||
final LogContext logContext = new LogContext(logPrefix); | |||
log = logContext.logger(getClass()); | |||
|
|||
final String upgradeMode = streamsConfig.getString(StreamsConfig.UPGRADE_MODE_CONFIG); | |||
if (StreamsConfig.UPGRADE_FROM_0100X.equals(upgradeMode)) { | |||
log.debug("Downgrading to metadata version 1 for upgrade from 0.10.0.x."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since upgrading won't be a common occurrence should this be log.info
? Just a thought I don't have a strong opinion either way.
EDIT: Thinking some more it's fine as debug
, ignore my previous comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine either way -- put as debug because it's usually not helpful information for the user.
tests/kafkatest/services/streams.py
Outdated
"collect_default": True}, | ||
"streams_stderr.1-6": { | ||
"path": STDERR_FILE + ".1-6", | ||
"collect_default": True}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why all the log configs? I'm guessing separate ones for each version upgrade?
EDIT: Should have looked further down, each one is for each roll then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. In line https://github.com/apache/kafka/pull/4636/files#diff-21e906e43e313a665d07a8e1cd61a9c3R330 we move files around to roll them over -- it's super helpful for debugging to know which file belongs to what phase in the test.
'SmokeTest-sum-STATE-STORE-0000000026-changelog' : { 'partitions': self.partitions, | ||
'replication-factor': self.replication, | ||
'configs': {"min.insync.replicas": self.isr} }, | ||
'SmokeTest-cnt-STATE-STORE-0000000035-changelog' : { 'partitions': self.partitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just playing devil's advocate here, do we want to create the internal topics ahead of time? As if we change the SmokeTest
streams application at we'll need to update these again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to. We use the StreamsEosTestDriverService that expects a replication factor of 3, however, when we start up the 0.10.0.x version, it would by default create those topic with replication factor 1 and thus the test crashes later on.
Note, that we pull in the 0.10.0.1 code as-is and cannot update the Streams app there, to change the replication factor config to 3.
@mjsax thanks for the work on this and just one minor comment. Like you said above, we can't merge the changes to the pre-KIP-182 versions, but maybe it's worthwhile to cherry-pick the code changes to the |
@bbejeck -- for old versions, we only pull in officially release artifacts. Thus, as long as there is not bug fix release for older versions, back porting does not help :( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code change generally makes sense to me. Per the config names there are some comment on the KIP itself which we can continue discussing there.
@@ -317,6 +325,10 @@ public Subscription subscription(final Set<String> topics) { | |||
clientMetadata.addConsumer(consumerId, info); | |||
} | |||
|
|||
if (minUserMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) { | |||
log.debug("Downgrading metadata to version " + minUserMetadataVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: better log the latest supported version as well.
af39ef9
to
8f00806
Compare
Updated this. Also had a look into the system tests. One run failed. The issue is, that it can happen that one instance only gets repartition topics assigned. For for this case, we never get expected output I think, there are the following options:
The latest options would give us most control. Might be the most difficult to implement though. However, if we want to get more test like this, it might be worth doing it. Atm, being stuck with old client code if we want to run a older Streams version that we cannot change seems to be quite some limitation. Decoupling the test code from the library jars would be valuable. WDYT? |
@mjsax I'm very much in favor of the last proposal and decoupling the library jars from the test code. I recently ran into the same difficulty with writing a rolling upgrade test against all versions and had to make some trade-offs because we can't change any test code. |
I'm also in favor of the last approach as well. But for this specific failure case, after we have implemented that approach, we still need to figure out how to modify the client code used in system test suite, if we want to go further in this direction such as (re-)compiling from non-trunk code for some modules before the test, we likely need to modify ducktape as well. These should theoretically all be doable but I'd let us to figure out all the details before dig into it. |
tests/kafkatest/services/streams.py
Outdated
@@ -42,6 +44,132 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): | |||
"streams_stderr": { | |||
"path": STDERR_FILE, | |||
"collect_default": True}, | |||
"streams_log.0": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these more log files intentional or for debugging only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both. Originally, I added this for debugging. But I think it's super useful to keep them. Otherwise, it's hard to inspect log/stdout/stderr files if there are multiple processors that get bounced (it's hard to tell from a single file, which statements belong to which "group generation" -- rolling over the files make debugging much easier.) Thus, I would suggest to keep this change and also apply it to other system test with similar pattern.
Sorry for the radio silence. For the record, I also think it's a great idea to keep a copy of SmokeTest for each version that we want to test. It will free us up to alter the test scenario, amd simplify the test orchestration. I think we also wouldn't need to download the test artifacts anymore also, which saves a little testing time. |
@@ -340,6 +345,10 @@ | |||
public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; | |||
private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead"; | |||
|
|||
/** {@code upgrade.from} */ | |||
public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; | |||
public static final String UPGRADE_FROM_DOC = "Enables a backward compatible rebalance mode for the first round of rolling bounces. Default is null. Accepted values are \"" + UPGRADE_FROM_0100X + "\" (for upgrading from 0.10.0.x)."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be specific on the scope of this config. For exmaple:
Allows versions equal or older than 1.1.0 to upgrade to newer versions including 1.2.0 in a backward compatible way. When upgrading from other versions this config would never need to be specified. Default is null ...
8f00806
to
f665293
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you ready to start collecting "final" reviews? If so, it looks good to me.
This PR is a little outdated -- I am going to do a new PR for |
69a851d
to
c7bedb3
Compare
Updated this to add system test for 1.1 release, increase metadata to version 3, add version probing, and version probing system test. System test passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1671/ This should be the last PR to complete KIP-268. Call for review @guozhangwang @bbejeck @vvcephei @dguy |
- don't reassign tasks of new instance but wait for second rebalance
d44014d
to
6a077ec
Compare
Update this. Couple of notes what is changed:
Also did some additional code cleanup. |
Triggered system test (50 runs): https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1758/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a pass, overall looks good, just have a couple of questions. The system test looks good, but I might need another look later.
@@ -145,13 +145,15 @@ | |||
*/ | |||
// TODO: currently we cannot get the full topic configurations and hence cannot allow topic configs without the prefix, | |||
// this can be lifted once kafka.log.LogConfig is completely deprecated by org.apache.kafka.common.config.TopicConfig | |||
@SuppressWarnings("WeakerAccess") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a question more for my information, why do we need this SuppressWarnings
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strictly. However, Intellij shows a warning that those could be private because it's never used. It's just to get rid of those warnings. Can also revert if you insist.
if (streamThread.versionProbingFlag.get()) { | ||
streamThread.versionProbingFlag.set(false); | ||
} else { | ||
taskManager.suspendTasksAndState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably missing something and brought this up before, but above in onPartitionsAssigned
we create tasks with the assignment
when not version probing. But in onPartitionsRevoked
if we are version probing we flip the version probing flag, hence on assignment we create tasks. Why don't we flip the version probing flag in onPartitionedAssigned
as an else statement on line 270 so we are only every suspending and creating tasks during non-version probing rebalances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onPartitionsAssigned
: if version probing flag is set, it means assignment is empty and we want to trigger a new rebalance. If we call taskManager.createTasks(assignment);
, we would close suspended task and that is what we do not want to do at this point, because we hope to get those task assigned after the second rebalance.
onPartitionsRevoked
: if version probing flag is set, we don't want to suspend tasks either. Tasks are already suspended but if we call taskManager.suspendTasksAndState();
again, we loose the information about currently suspended tasks (but we need to keep this information; ie, we avoid an incorrect internal metadata update here).
The flow is the following:
- trigger first rebalance
- onPartitionsRevoke -> version probing flag not set: suspend tasks regularly
- onPartitionAssigned -> version probing flag set by StreamsPartitionsAssignor: we skip task creation as we will rebalance again (we cannot reset the flag here, because we need it in the next step)
- trigger second rebalance
- onPartitionsRevoke -> version probing flag is still set; we can reset the flag and skip suspending tasks to preserve metadata
- onPartitionAssigned -> version probing flag not set: we do regular assignment and start processing
Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep - thanks for the clarification
@@ -204,15 +220,15 @@ public void configure(final Map<String, ?> configs) { | |||
switch (upgradeFrom) { | |||
case StreamsConfig.UPGRADE_FROM_0100: | |||
log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); | |||
userMetadataVersion = 1; | |||
usedSubscriptionMetadataVersion = VERSION_ONE; | |||
break; | |||
case StreamsConfig.UPGRADE_FROM_0101: | |||
case StreamsConfig.UPGRADE_FROM_0102: | |||
case StreamsConfig.UPGRADE_FROM_0110: | |||
case StreamsConfig.UPGRADE_FROM_10: | |||
case StreamsConfig.UPGRADE_FROM_11: | |||
log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use "{}.x." vs. string concatenation
@@ -52,6 +52,33 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): | |||
"streams_stderr": { | |||
"path": STDERR_FILE, | |||
"collect_default": True}, | |||
"streams_log.1": { | |||
"path": LOG_FILE + ".1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will the system test results still get errors when these files aren't found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to specify all used files here -- otherwise they won't be collected after the test finished.
found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True)) | ||
if len(found) == self.leader_counter[p] + 1: | ||
self.leader = p | ||
self.leader_counter[p] = self.leader_counter[p] + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the + 1
? Is that for the leader to kick off version probing with a future version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a processor
is the leader, it will print Finished assignment for group
in the log. A processor
can be the leader multiple times, and thus we count how many of those lines we have seen in leader_counter[p]
. To identify the new leader, it's count must increase by exactly one. We increase by one as we have found one occurrence as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made another pass on non-testing code as I've reviewed the testing code before. Left some comments.
private final static int VERSION_THREE = 3; | ||
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE; | ||
protected int minUserMetadataVersion = UNKNOWN; | ||
protected Set<Integer> supportedVersions = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is only used for testing purposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, yes. However, after we bump metadata version to 4, parts of the logic of FutureStreamsPartitionAssignor
will move into StreamsPartitionAssignor
and than also StreamsPartitionAssignor
need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
@@ -602,6 +674,51 @@ public Subscription subscription(final Set<String> topics) { | |||
|
|||
return assignment; | |||
} | |||
private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line space.
for (final ClientMetadata clientMetadata : clientsMetadata.values()) { | ||
for (final String consumerId : clientMetadata.consumers) { | ||
|
||
final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could skip if futureConsumers.contains(consumerId)
?
if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion | ||
&& receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { | ||
|
||
if (info.version() == supportedVersion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info.version()
could be replaced with receivedAssignmentMetadataVersion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's my reasoning of the cases:
receivedAssignmentMetadataVersion > supportedVersion
: this should never happen.receivedAssignmentMetadataVersion == supportedVersion
: normal case, the leader only knows up tosupportedVersion
, and hence sends this version back.receivedAssignmentMetadataVersion < supportedVersion
: if some other consumer used an even older-than-supportedVersion
, in this case this consumer will again send the subscription withsupportedVersion
.
So it seems we do not need to distinguish 2) and 3) since for either case, line 763 and line 770 will actually assign usedSubscriptionMetadataVersion = supportedVersion
right?
Or do you just want to distinguish the log entry? If that's the case I think simplifying this to:
if (receivedAssignmentMetadataVersion > supportedVersion) {
// throw runtime exception
} else {
if (receivedAssignmentMetadataVersion == supportedVersion)
// log normally
else
// log differently
usedSubscriptionMetadataVersion = supportedVersion;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think unifying the assignment to usedSubscriptionMetadataVersion
into a single line is harder to read for humans.
Using
usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion
make clear that it's a downgrade while
usedSubscriptionMetadataVersion = supportedVersion
makes clear it is an upgrade.
I apply the other suggestions, thought.
final int supportedVersion = info.latestSupportedVersion(); | ||
|
||
if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion | ||
&& receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION
should be guaranteed at the server side as always right? If that is true, I'd suggest we refactor it as:
if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion) {
if (receivedAssignmentMetadataVersion < EARLIEST_PROBEABLE_VERSION) {
// throw illegal state exception.
}
// .. below logic
}
So that we can detect potential bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is actually correct. If I change a suggested, it break for the manual upgrade path from version 1/2 to version 3. For this case, a received version can be smaller than the used-subscription-version, and smaller than EARLIEST_PROBEABLE_VERSION.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you're right. My understanding is that to manual upgrade from version 1/2 to version 3, we set upgrade.from
config accordingly, so first rebalance everyone use version 1/2 in subscriptionInfo and AssignmentInfo; then in second rebalance someone send subscriptionInfo with version 3, and someone send with version 1/2 (they have not bounced yet), so assignmentInfo with version 1/2 are sent back again.
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); | ||
partitionsByHost = info.partitionsByHost(); | ||
break; | ||
case 3: | ||
case VERSION_THREE: | ||
final int latestSupportedVersionGroupLeader = info.latestSupportedVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reuse supportedVersion
in line 753 above.
Actually how about renaming that field to latestLeaderSupportedVersion
?
} | ||
|
||
private ClientState(Set<TaskId> activeTasks, Set<TaskId> standbyTasks, Set<TaskId> assignedTasks, Set<TaskId> prevActiveTasks, Set<TaskId> prevAssignedTasks, int capacity) { | ||
private ClientState(final Set<TaskId> activeTasks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The added prevStandbyTasks
seems not set anywhere? I.e. it will always be empty hashset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is set in
public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
prevStandbyTasks.addAll(standbyTasks);
prevAssignedTasks.addAll(standbyTasks);
}
@mjsax Please feel free to merge after those comments are addressed. I have no more feedbacks. |
Retest this please. |
Upgrade system test passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1762/ |
This PR fixes some regressions introduced into streams system tests and sets the upgrade tests to ignore until PR apache#4636 is merged as it has the fixes for the upgrade tests. Reviewers: Guozhang Wang <[email protected]>
…4636) implements KIP-268 Reviewers: Bill Bejeck <[email protected]>, John Roesler <[email protected]>, Guozhang Wang <[email protected]>
No description provided.