Skip to content

Commit

Permalink
KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#…
Browse files Browse the repository at this point in the history
…9393)

In this PR, I have addressed the review comments from @chia7712 in #9001 which were provided after #9001 was merged. The changes are made mainly to KafkaAdminClient:

Improve error message in updateFeatures api when feature name is empty.
Propagate top-level error message in updateFeatures api.
Add an empty-parameter variety for describeFeatures api.
Minor documentation updates to @param and @return to make these resemble other apis.

Reviewers: Chia-Ping Tsai [email protected], Jun Rao [email protected]
  • Loading branch information
kowshik authored Oct 8, 2020
1 parent de546ba commit de41834
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 24 deletions.
22 changes: 16 additions & 6 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,17 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
*/
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);
/**
* Describes finalized as well as supported features.
* <p>
* This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options.
* See the overload for more details.
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
default DescribeFeaturesResult describeFeatures() {
return describeFeatures(new DescribeFeaturesOptions());
}

/**
* Describes finalized as well as supported features. By default, the request is issued to any
Expand All @@ -1320,9 +1331,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredenti
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
* @param options the options to use
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Expand Down Expand Up @@ -1367,10 +1378,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredenti
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4410,6 +4410,10 @@ public UpdateFeaturesResult updateFeatures(final Map<String, FeatureUpdate> feat

final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be empty.");
}
updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
}

Expand All @@ -4424,10 +4428,6 @@ UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
final FeatureUpdate update = entry.getValue();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be null or empty.");
}

final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
new UpdateFeaturesRequestData.FeatureUpdateKey();
requestItem.setFeature(feature);
Expand Down Expand Up @@ -4471,7 +4471,8 @@ void handleResponse(AbstractResponse abstractResponse) {
break;
default:
for (final Map.Entry<String, KafkaFutureImpl<Void>> entry : updateFutures.entrySet()) {
entry.getValue().completeExceptionally(topLevelError.exception());
final String errorMsg = response.data().errorMessage();
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4062,19 +4062,12 @@ public void testUpdateFeaturesShouldFailRequestForEmptyUpdates() {
@Test
public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
final UpdateFeaturesResult result = env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions());

final Map<String, KafkaFuture<Void>> futures = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
final Throwable cause = assertThrows(ExecutionException.class, () -> entry.getValue().get());
assertEquals(KafkaException.class, cause.getCause().getClass());
}

final KafkaFuture<Void> future = result.all();
final Throwable cause = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(KafkaException.class, cause.getCause().getClass());
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions()));
}
}

Expand Down

0 comments on commit de41834

Please sign in to comment.