Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6054: Update Kafka Streams metadata to version 3 #4880

Merged
merged 3 commits into from
Apr 18, 2018

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Apr 16, 2018

  • adds Streams upgrade tests for 1.1 release

 - adds Streams upgrade tests for 1.1 release
 - introduces metadata version 3 and 'version probing'
@mjsax mjsax added the streams label Apr 16, 2018
@mjsax mjsax requested review from dguy and guozhangwang April 16, 2018 14:23
@mjsax
Copy link
Member Author

mjsax commented Apr 16, 2018

\cc @bbejeck @vvcephei

This PR does not yet add "version probing" -- it bumps rebalance metadata version to 3 and introduces a new field "supported version" into the metadata. Additionally, we add corresponding flag to downgrade to metadata 2 (for versions 0.10.1 to 1.1). System tests are changed accordingly.

@mjsax mjsax force-pushed the kafka-6054-fix-upgrade-trunk branch from ea73ba1 to 12becef Compare April 16, 2018 17:20
@mjsax
Copy link
Member Author

mjsax commented Apr 17, 2018

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just a couple of small questions...

@@ -512,7 +526,7 @@ public Subscription subscription(final Set<String> topics) {

// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
if (minUserMetadataVersion == 2) {
if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think it's safe to say minUserMetadata >= 2 here so we don't have to add a new clause with every version? Or do you think the logic that follows may not apply to later metadata versions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe --- for know I think it's fine, but we should consider to do a "closed range" if we add version 4. I don't like open-ended. Not sure what others think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, closed range seems less risky.

@@ -161,6 +196,13 @@ private void writeTopicPartitions(final DataOutputStream out,
}
}

private void encodeVersionThree(final DataOutputStream out) throws IOException {
out.writeInt(usedVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other two variants, we used a literal 1 and 2 here. Not sure if it matters at all, since the way we get here is that usedVersion == 3.

Copy link
Member Author

@mjsax mjsax Apr 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is "left over" from "reverting" version probing -- we will need it there.

Copy link
Contributor

@guozhangwang guozhangwang Apr 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this reply completely: could you elaborate a bit more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checked the code. We can change it to hard coded 3.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over it.

@@ -161,6 +196,13 @@ private void writeTopicPartitions(final DataOutputStream out,
}
}

private void encodeVersionThree(final DataOutputStream out) throws IOException {
out.writeInt(usedVersion);
Copy link
Contributor

@guozhangwang guozhangwang Apr 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this reply completely: could you elaborate a bit more?

@@ -161,6 +196,13 @@ private void writeTopicPartitions(final DataOutputStream out,
}
}

private void encodeVersionThree(final DataOutputStream out) throws IOException {
out.writeInt(usedVersion);
out.writeInt(LATEST_SUPPORTED_VERSION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we encode latestSupportedVersion here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this and following usages around latestSupportedVersion are related to the upcoming version probing code.

It's a little mysterious to have a "latest supported version" always equal to the "current version" in this PR in isolation, but I don' think it's actually problematic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. For version probing, the assignment must sent back the leaders supported version to tell already upgraded instances to which version they need to downgrade.

I though it's better to do the metadata change in this PR already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool.

@@ -233,27 +293,39 @@ private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
return partitions;
}

private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
decodeActiveTasks(assignmentInfo, in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just call decodeVersionTwoData as we did for processVersionThreeAssignment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like these two approaches are fighting for dominance in this code.

Personally, I favor the "flat" design, where these methods don't call each other. It's just easier to read when each one in isolation lists all the operations it needs to do.

It's also more general, since a hypothetical future version N might add some new metadata in the middle, or even drop some previous metadata and so wouldn't be able to call decodeVersion{N-1} in its implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version three will change and be different to version two. Thus, I prefer to not just call encodeVersionTwo. Guess it's personal taste.

@@ -66,6 +88,10 @@ public int version() {
return usedVersion;
}

public int latestSupportedVersion() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this field going to be used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version Probing. cf : #4636 (full PR -- this PR is just a "reduced" version with version probing reverted to simplify reviewing)

final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));

buf.putInt(3); // used version
buf.putInt(3); // supported version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If latestSupportedVersion is ever going be different, we should use that field than hardcoding it here. But personally I am not sure where is latestSupportedVersion ever going to be used. Although the KIP did include this in the proposed changes, it only talks about how the SupportedVersionNumber of AssignmentInfo will be used, but not SubscriptionInfo..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Will make it dynamic.

The field is unused atm, but we wanted to include it to be "future prove".

decodeVersionOneData(subscriptionInfo, data);
decodeClientUUID(subscriptionInfo, data);

subscriptionInfo.prevTasks = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we just pass in subscriptionInfo into decodeTasks, and initialize its prevTasks and standbyTasks internally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not possible if we want to share code. We call decodeTasks that does not know if it's decoding prevTasks or standbyTasks -- alternative, we can do two methods. Let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd prefer to have decodeTasksData in which we hard-code the logic of doing both prevTasks and standbyTasks, we do not code-share for these two task sets but we share code of constructing the set for version two and version three. I guess we cannot get both code sharing, and since it is really a nit I'm fine either way :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll clean this up in the next PR. Merging this now.

SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
assertEquals(original, decoded);
public void shouldEncodeAndDecodeVersion1() {
final SubscriptionInfo info = new SubscriptionInfo(1, processId, activeTasks, standbyTasks, IGNORED_USER_ENDPOINT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it more rigid: we can pass a valid end point string, and then check that the field is still null below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -64,7 +64,7 @@
"""


class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService):
class oConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm embarrassed that I missed this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack


def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version

def set_upgrade_from(self, upgrade_from):
self.UPGRADE_FROM = upgrade_from

def set_upgrade_to(self, upgrade_to):
self.UPGRADE_TO = upgrade_to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is UPGRADE_TO used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to revert this -- needed for version probing system test

@mjsax
Copy link
Member Author

mjsax commented Apr 17, 2018

Updated this. New system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1712/

@guozhangwang
Copy link
Contributor

LGTM assuming unit test and system test passed. Please feel free to merge afterwards.

@mjsax mjsax merged commit cae4221 into apache:trunk Apr 18, 2018
jeqo pushed a commit to jeqo/kafka that referenced this pull request May 2, 2018
 - adds Streams upgrade tests for 1.1 release
 - introduces metadata version 3

Reviewers: John Roesler <[email protected]>, Guozhang Wang <[email protected]>
@mjsax mjsax deleted the kafka-6054-fix-upgrade-trunk branch June 5, 2018 23:47
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
 - adds Streams upgrade tests for 1.1 release
 - introduces metadata version 3

Reviewers: John Roesler <[email protected]>, Guozhang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants