From ba24e6ec4d730838e49df0f6a02aa3210322f9c7 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Thu, 8 Oct 2020 10:05:29 -0700 Subject: [PATCH] KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#9393) In this PR, I have addressed the review comments from @chia7712 in #9001 which were provided after #9001 was merged. The changes are made mainly to KafkaAdminClient: Improve error message in updateFeatures api when feature name is empty. Propagate top-level error message in updateFeatures api. Add an empty-parameter variety for describeFeatures api. Minor documentation updates to @param and @return to make these resemble other apis. Reviewers: Chia-Ping Tsai chia7712@gmail.com, Jun Rao junrao@gmail.com --- .../org/apache/kafka/clients/admin/Admin.java | 22 ++++++++++++++----- .../kafka/clients/admin/KafkaAdminClient.java | 11 +++++----- .../clients/admin/KafkaAdminClientTest.java | 19 +++++----------- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 96620df17fba3..503dfd7be050a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1306,6 +1306,17 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, AlterUserScramCredentialsOptions options); + /** + * Describes finalized as well as supported features. + *

+ * This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options. + * See the overload for more details. + * + * @return the {@link DescribeFeaturesResult} containing the result + */ + default DescribeFeaturesResult describeFeatures() { + return describeFeatures(new DescribeFeaturesOptions()); + } /** * Describes finalized as well as supported features. By default, the request is issued to any @@ -1320,9 +1331,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List * *

- * @param options the options to use * - * @return the {@link DescribeFeaturesResult} containing the result + * @param options the options to use + * @return the {@link DescribeFeaturesResult} containing the result */ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); @@ -1367,10 +1378,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List * This operation is supported by brokers with version 2.7.0 or higher. - * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate} - * @param options the options to use - * - * @return the {@link UpdateFeaturesResult} containing the result + * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate} + * @param options the options to use + * @return the {@link UpdateFeaturesResult} containing the result */ UpdateFeaturesResult updateFeatures(Map featureUpdates, UpdateFeaturesOptions options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 43dc197157073..ba29b2024b3df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4410,6 +4410,10 @@ public UpdateFeaturesResult updateFeatures(final Map feat final Map> updateFutures = new HashMap<>(); for (final Map.Entry entry : featureUpdates.entrySet()) { + final String feature = entry.getKey(); + if (feature.trim().isEmpty()) { + throw new IllegalArgumentException("Provided feature can not be empty."); + } updateFutures.put(entry.getKey(), new KafkaFutureImpl<>()); } @@ -4424,10 +4428,6 @@ UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { for (Map.Entry entry : featureUpdates.entrySet()) { final String feature = entry.getKey(); final FeatureUpdate update = entry.getValue(); - if (feature.trim().isEmpty()) { - throw new IllegalArgumentException("Provided feature can not be null or empty."); - } - final UpdateFeaturesRequestData.FeatureUpdateKey requestItem = new UpdateFeaturesRequestData.FeatureUpdateKey(); requestItem.setFeature(feature); @@ -4471,7 +4471,8 @@ void handleResponse(AbstractResponse abstractResponse) { break; default: for (final Map.Entry> entry : updateFutures.entrySet()) { - entry.getValue().completeExceptionally(topLevelError.exception()); + final String errorMsg = response.data().errorMessage(); + entry.getValue().completeExceptionally(topLevelError.exception(errorMsg)); } break; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index e91e0d5e59b12..192692f31697a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4062,19 +4062,12 @@ public void testUpdateFeaturesShouldFailRequestForEmptyUpdates() { @Test public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() { try (final AdminClientUnitTestEnv env = mockClientEnv()) { - final UpdateFeaturesResult result = env.adminClient().updateFeatures( - Utils.mkMap(Utils.mkEntry("", new FeatureUpdate((short) 2, false))), - new UpdateFeaturesOptions()); - - final Map> futures = result.values(); - for (Map.Entry> entry : futures.entrySet()) { - final Throwable cause = assertThrows(ExecutionException.class, () -> entry.getValue().get()); - assertEquals(KafkaException.class, cause.getCause().getClass()); - } - - final KafkaFuture future = result.all(); - final Throwable cause = assertThrows(ExecutionException.class, () -> future.get()); - assertEquals(KafkaException.class, cause.getCause().getClass()); + assertThrows( + IllegalArgumentException.class, + () -> env.adminClient().updateFeatures( + Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)), + Utils.mkEntry("", new FeatureUpdate((short) 2, false))), + new UpdateFeaturesOptions())); } }