Skip to content

Commit

Permalink
AK to CCS merge 10-08-2020 (#425)
Browse files Browse the repository at this point in the history
* Updating trunk versions after cutting branch for 2.7

* KAFKA-9929: Support backward iterator on SessionStore (apache#9139)

Implements KIP-617 for `SessionStore`

Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]>

* MINOR: remove unused scala files from core module (apache#9296)


Reviewers: Mickael Maison <[email protected]>, Lee Dongjin <[email protected]>

* MINOR: correct package of LinuxIoMetricsCollector (apache#9271)


Reviewers: Mickael Maison <[email protected]>, Lee Dongjin <[email protected]>

* KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (apache#9393)

In this PR, I have addressed the review comments from @chia7712 in apache#9001 which were provided after apache#9001 was merged. The changes are made mainly to KafkaAdminClient:

Improve error message in updateFeatures api when feature name is empty.
Propagate top-level error message in updateFeatures api.
Add an empty-parameter variety for describeFeatures api.
Minor documentation updates to @param and @return to make these resemble other apis.

Reviewers: Chia-Ping Tsai [email protected], Jun Rao [email protected]

* KAFKA-10271: Performance regression while fetching a key from a single partition (apache#9020)

StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.

Reviewers: John Roesler <[email protected]>, Guozhang Wang <[email protected]>

Co-authored-by: Jorge Esteban Quilcate Otoya <[email protected]>
Co-authored-by: Chia-Ping Tsai <[email protected]>
Co-authored-by: Kowshik Prakasam <[email protected]>
Co-authored-by: Dima Reznik <[email protected]>
  • Loading branch information
5 people authored Oct 13, 2020
1 parent 83f1575 commit af35402
Show file tree
Hide file tree
Showing 45 changed files with 2,218 additions and 339 deletions.
22 changes: 16 additions & 6 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,17 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
*/
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);
/**
* Describes finalized as well as supported features.
* <p>
* This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options.
* See the overload for more details.
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
default DescribeFeaturesResult describeFeatures() {
return describeFeatures(new DescribeFeaturesOptions());
}

/**
* Describes finalized as well as supported features. By default, the request is issued to any
Expand All @@ -1320,9 +1331,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredenti
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
* @param options the options to use
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Expand Down Expand Up @@ -1367,10 +1378,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredenti
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4410,6 +4410,10 @@ public UpdateFeaturesResult updateFeatures(final Map<String, FeatureUpdate> feat

final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be empty.");
}
updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
}

Expand All @@ -4424,10 +4428,6 @@ UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
final FeatureUpdate update = entry.getValue();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be null or empty.");
}

final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
new UpdateFeaturesRequestData.FeatureUpdateKey();
requestItem.setFeature(feature);
Expand Down Expand Up @@ -4471,7 +4471,8 @@ void handleResponse(AbstractResponse abstractResponse) {
break;
default:
for (final Map.Entry<String, KafkaFutureImpl<Void>> entry : updateFutures.entrySet()) {
entry.getValue().completeExceptionally(topLevelError.exception());
final String errorMsg = response.data().errorMessage();
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4062,19 +4062,12 @@ public void testUpdateFeaturesShouldFailRequestForEmptyUpdates() {
@Test
public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
final UpdateFeaturesResult result = env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions());

final Map<String, KafkaFuture<Void>> futures = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
final Throwable cause = assertThrows(ExecutionException.class, () -> entry.getValue().get());
assertEquals(KafkaException.class, cause.getCause().getClass());
}

final KafkaFuture<Void> future = result.all();
final Throwable cause = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(KafkaException.class, cause.getCause().getClass());
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions()));
}
}

Expand Down
35 changes: 0 additions & 35 deletions core/src/main/scala/kafka/common/ClientIdAndTopic.scala

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package kafka.server
package kafka.metrics

import java.nio.file.{Files, Paths}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogConfig, LogManager}
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter, KafkaYammerMetrics, LinuxIoMetricsCollector}
import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.utils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package kafka.server
package kafka.metrics

import java.nio.charset.StandardCharsets
import java.nio.file.Files
Expand Down
6 changes: 3 additions & 3 deletions docs/js/templateData.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.

// Define variables for doc templates
var context={
"version": "27",
"dotVersion": "2.7",
"fullDotVersion": "2.7.0",
"version": "28",
"dotVersion": "2.8",
"fullDotVersion": "2.8.0",
"scalaVersion": "2.13"
};
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ group=org.apache.kafka
# - tests/kafkatest/__init__.py
# - tests/kafkatest/version.py (variable DEV_VERSION)
# - kafka-merge-pr.py
version=6.1.0-0-ccs

version=6.2.0-0-ccs
scalaVersion=2.13.3
task=build
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
2 changes: 1 addition & 1 deletion kafka-merge-pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

DEV_BRANCH_NAME = "trunk"

DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.7.0")
DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.8.0")

def get_json(url):
try:
Expand Down
2 changes: 1 addition & 1 deletion streams/quickstart/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.kafka</groupId>
<artifactId>streams-quickstart</artifactId>
<version>6.1.0-0-ccs</version>
<version>6.2.0-0-ccs</version>
<relativePath>..</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>6.1.0-0-ccs</kafka.version>
<kafka.version>6.2.0-0-ccs</kafka.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
Expand Down
2 changes: 1 addition & 1 deletion streams/quickstart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>streams-quickstart</artifactId>
<packaging>pom</packaging>
<version>6.1.0-0-ccs</version>
<version>6.2.0-0-ccs</version>

<name>Kafka Streams :: Quickstart</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
delegatingStateRestoreListener,
i + 1);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder));
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}

ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
Expand Down
Loading

0 comments on commit af35402

Please sign in to comment.