Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10028: Implement write path for feature versioning system (KIP-584) #9001

Merged
merged 41 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
824e2f7
Implement KIP-584 write path
kowshik Jul 11, 2020
cc23765
Fix checkstyle issues
kowshik Jul 13, 2020
114e24d
Minor: Improved comment
kowshik Jul 13, 2020
9af2a01
Minor: cosmetics
kowshik Jul 14, 2020
afa3ab9
Fix small bug
kowshik Jul 14, 2020
59d8038
Minor: improve one of the tests slightly to handle +1 case
kowshik Jul 17, 2020
9cdfc31
Address comments from Boyang
kowshik Jul 21, 2020
08f064b
Minor: add missing header in UpdateFeaturesTest.scala
kowshik Jul 21, 2020
6b9e237
Minor cosmetic changes
kowshik Jul 22, 2020
c2772a1
Address comments from Boyang
kowshik Jul 28, 2020
7620c88
Minor: Remove newline
kowshik Jul 28, 2020
89372ba
Minor: cosmetics
kowshik Jul 28, 2020
78bfc4c
Minor: Remove unused imports to fix checkstyle issues
kowshik Jul 28, 2020
c3201a1
Minor cosmetics to fix checkstyle issue
kowshik Jul 28, 2020
7a7f716
Minor cosmetics
kowshik Jul 29, 2020
7afd81b
Minor: improve code slightly in KafkaController
kowshik Jul 29, 2020
cec2505
Address review comments from Boyang
kowshik Aug 3, 2020
3b4b370
Minor cosmetics
kowshik Aug 5, 2020
2a1dee2
Minor cosmetics
kowshik Aug 6, 2020
a7c32a0
Address latest review comments
kowshik Sep 23, 2020
4729737
Fix checkstyle issues for CI
kowshik Sep 24, 2020
deaad42
Small improvements
kowshik Sep 24, 2020
21491b2
Rebase on latest AK trunk
kowshik Sep 25, 2020
8ec01e7
Fix checkstyle issue
kowshik Sep 25, 2020
436d816
Remove unused code
kowshik Sep 25, 2020
1150a1f
Reinstante timeoutMs & change FinalizedFeaturesEpoch to long data type
kowshik Sep 26, 2020
45372f3
Minor improvements
kowshik Sep 27, 2020
389b7aa
Cosmetics
kowshik Sep 27, 2020
5f3af18
Small improvement
kowshik Sep 27, 2020
4d067f9
Implement firstActiveVersion
kowshik Sep 27, 2020
3342f14
Small doc change
kowshik Sep 28, 2020
3c59a17
Address comments from Jun
kowshik Sep 29, 2020
cc378c6
Address comments from Boyang
kowshik Sep 30, 2020
116352b
Address comments from Jun
kowshik Oct 1, 2020
50f53dd
Address comment from Jun: Revert firstActiveVersion change
kowshik Oct 2, 2020
0ba831d
Minor change to code format
kowshik Oct 2, 2020
c821e85
Fix ControllerIntegrationTest
kowshik Oct 2, 2020
5e3fc96
Minor cosmetic changes
kowshik Oct 2, 2020
3c5c04f
Address comments from Jun
kowshik Oct 3, 2020
b69f7fe
Minor formatting change
kowshik Oct 3, 2020
e1c79ce
Address comments from Jun
kowshik Oct 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;
Expand Down Expand Up @@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);

/**
* Describes finalized as well as supported features. By default, the request is issued to any
abbccdda marked this conversation as resolved.
Show resolved Hide resolved
* broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
* parameter. This is particularly useful if the user requires strongly consistent reads of
* finalized features.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeFeaturesResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
kowshik marked this conversation as resolved.
Show resolved Hide resolved
abbccdda marked this conversation as resolved.
Show resolved Hide resolved
kowshik marked this conversation as resolved.
Show resolved Hide resolved
kowshik marked this conversation as resolved.
Show resolved Hide resolved

/**
* Applies specified updates to finalized features. This operation is not transactional so some
* updates may succeed while the rest may fail.
* <p>
* The API takes in a map of finalized feature names to {@link FeatureUpdate} that needs to be
* applied. Each entry in the map specifies the finalized feature to be added or updated or
* deleted, along with the new max feature version level value. This request is issued only to
* the controller since the API is only served by the controller. The return value contains an
* error code for each supplied {@link FeatureUpdate}, and the code indicates if the update
* succeeded or failed in the controller.
* <ul>
* <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
* in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set. Setting this
* flag conveys user intent to attempt downgrade of a feature max version level. Note that
* despite the allowDowngrade flag being set, certain downgrades may be rejected by the
* controller if it is deemed impossible.</li>
* <li>Deletion of a finalized feature version is not a regular operation/intent. It could be
* done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting
* the max version level to a value less than 1.</li>
* </ul>
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the futures
* obtained from the returned {@link UpdateFeaturesResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
kowshik marked this conversation as resolved.
Show resolved Hide resolved
* If the request details are invalid. e.g., a non-existing finalized feature is attempted
abbccdda marked this conversation as resolved.
Show resolved Hide resolved
* to be deleted or downgraded.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the updates could finish. It cannot be guaranteed whether
* the updates succeeded or not.</li>
* <li>{@link FeatureUpdateFailedException}
* This means there was an unexpected error encountered when the update was applied on
* the controller. There is no guarantee on whether the update succeeded or failed. The best
* way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
* request to the controller to get the latest features.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.

* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
kowshik marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
*
* The API of this class is evolving. See {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
kowshik marked this conversation as resolved.
Show resolved Hide resolved
kowshik marked this conversation as resolved.
Show resolved Hide resolved

/**
* - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
* issued only to the controller.
* - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
kowshik marked this conversation as resolved.
Show resolved Hide resolved
* issued to any random broker.
*/
private boolean sendRequestToController = false;

/**
* Sets a flag indicating that the describe features request must be issued only to the controller.
*/
public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController) {
this.sendRequestToController = sendRequestToController;
return this;
}

public boolean sendRequestToController() {
return sendRequestToController;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;

/**
* The result of the {@link Admin#describeFeatures(DescribeFeaturesOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class DescribeFeaturesResult {

private final KafkaFuture<FeatureMetadata> future;

DescribeFeaturesResult(KafkaFuture<FeatureMetadata> future) {
this.future = future;
}

public KafkaFuture<FeatureMetadata> featureMetadata() {
return future;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import static java.util.stream.Collectors.joining;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* Encapsulates details about finalized as well as supported features. This is particularly useful
* to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.
kowshik marked this conversation as resolved.
Show resolved Hide resolved
*/
public class FeatureMetadata {
kowshik marked this conversation as resolved.
Show resolved Hide resolved

private final Map<String, FinalizedVersionRange> finalizedFeatures;

private final Optional<Long> finalizedFeaturesEpoch;

private final Map<String, SupportedVersionRange> supportedFeatures;

FeatureMetadata(final Map<String, FinalizedVersionRange> finalizedFeatures,
final Optional<Long> finalizedFeaturesEpoch,
final Map<String, SupportedVersionRange> supportedFeatures) {
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.supportedFeatures = new HashMap<>(supportedFeatures);
}

/**
* Returns a map of finalized feature versions. Each entry in the map contains a key being a
* feature name and the value being a range of version levels supported by every broker in the
* cluster.
*/
public Map<String, FinalizedVersionRange> finalizedFeatures() {
return new HashMap<>(finalizedFeatures);
}

/**
* The epoch for the finalized features.
* If the returned value is empty, it means the finalized features are absent/unavailable.
*/
public Optional<Long> finalizedFeaturesEpoch() {
return finalizedFeaturesEpoch;
}

/**
* Returns a map of supported feature versions. Each entry in the map contains a key being a
* feature name and the value being a range of versions supported by a particular broker in the
* cluster.
*/
public Map<String, SupportedVersionRange> supportedFeatures() {
return new HashMap<>(supportedFeatures);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof FeatureMetadata)) {
return false;
}

final FeatureMetadata that = (FeatureMetadata) other;
return Objects.equals(this.finalizedFeatures, that.finalizedFeatures) &&
Objects.equals(this.finalizedFeaturesEpoch, that.finalizedFeaturesEpoch) &&
Objects.equals(this.supportedFeatures, that.supportedFeatures);
}

@Override
public int hashCode() {
return Objects.hash(finalizedFeatures, finalizedFeaturesEpoch, supportedFeatures);
}

private static <ValueType> String mapToString(final Map<String, ValueType> featureVersionsMap) {
return String.format(
"{%s}",
featureVersionsMap
.entrySet()
.stream()
.map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
.collect(joining(", "))
);
}

@Override
public String toString() {
return String.format(
"FeatureMetadata{finalizedFeatures:%s, finalizedFeaturesEpoch:%s, supportedFeatures:%s}",
mapToString(finalizedFeatures),
finalizedFeaturesEpoch.map(Object::toString).orElse("<none>"),
mapToString(supportedFeatures));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import java.util.Objects;

/**
* Encapsulates details about an update to a finalized feature.
*/
public class FeatureUpdate {
kowshik marked this conversation as resolved.
Show resolved Hide resolved
private final short maxVersionLevel;
private final boolean allowDowngrade;

/**
* @param maxVersionLevel the new maximum version level for the finalized feature.
* a value < 1 is special and indicates that the update is intended to
* delete the finalized feature, and should be accompanied by setting
* the allowDowngrade flag to true.
* @param allowDowngrade - true, if this feature update was meant to downgrade the existing
* maximum version level of the finalized feature.
* - false, otherwise.
*/
public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade) {
if (maxVersionLevel < 1 && !allowDowngrade) {
throw new IllegalArgumentException(String.format(
"The allowDowngrade flag should be set when the provided maxVersionLevel:%d is < 1.",
maxVersionLevel));
}
this.maxVersionLevel = maxVersionLevel;
this.allowDowngrade = allowDowngrade;
}

public short maxVersionLevel() {
return maxVersionLevel;
}

public boolean allowDowngrade() {
return allowDowngrade;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (!(other instanceof FeatureUpdate)) {
return false;
}

final FeatureUpdate that = (FeatureUpdate) other;
return this.maxVersionLevel == that.maxVersionLevel && this.allowDowngrade == that.allowDowngrade;
}

@Override
public int hashCode() {
return Objects.hash(maxVersionLevel, allowDowngrade);
}

@Override
public String toString() {
return String.format("FeatureUpdate{maxVersionLevel:%d, allowDowngrade:%s}", maxVersionLevel, allowDowngrade);
}
}
Loading