Skip to content

Commit

Permalink
CONFLUENT: Sync from apache/kafka (8 October 2020)
Browse files Browse the repository at this point in the history
* commit '2804257fe221f37e5098bd': (67 commits)
  KAFKA-10562: Properly invoke new StateStoreContext init (apache#9388)
  MINOR: trivial cleanups, javadoc errors, omitted StateStore tests, etc. (apache#8130)
  KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (apache#9373)
  KAFKA-9274: fix incorrect default value for `task.timeout.ms` config (apache#9385)
  KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted (apache#9247)
  KAFKA-10028: Implement write path for feature versioning system (KIP-584) (apache#9001)
  KAFKA-10402: Upgrade system tests to python3 (apache#9196)
  KAFKA-10186; Abort transaction with pending data with TransactionAbortedException (apache#9280)
  MINOR: Remove `TargetVoters` from `DescribeQuorum` (apache#9376)
  Revert "KAFKA-10469: Resolve logger levels hierarchically (apache#9266)"
  MINOR: Don't publish javadocs for raft module (apache#9336)
  KAFKA-9929: fix: add missing default implementations (apache#9321)
  KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (apache#8910)
  KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651) (apache#9345)
  KAFKA-10527; Voters should not reinitialize as leader in same epoch (apache#9348)
  MINOR: Refactor unit tests around RocksDBConfigSetter (apache#9358)
  KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter (apache#9099)
  MINOR: Annotate test BlockingConnectorTest as integration test (apache#9379)
  MINOR: Fix failing test due to KAFKA-10556 PR (apache#9372)
  KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (apache#9320)
  ...
  • Loading branch information
ijuma committed Oct 8, 2020
2 parents 38f4ea1 + 2804257 commit 83f1575
Show file tree
Hide file tree
Showing 456 changed files with 12,399 additions and 4,426 deletions.
2 changes: 1 addition & 1 deletion bin/kafka-server-stop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if [[ "$OSNAME" == "OS/390" ]]; then
elif [[ "$OSNAME" == "OS400" ]]; then
PIDS=$(ps -af | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $2}')
else
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')
PIDS_SUPPORT=$(ps ax | grep -i 'io\.confluent\.support\.metrics\.SupportedKafka' | grep java | grep -v grep | awk '{print $1}')
fi

Expand Down
7 changes: 3 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ subprojects {
}
}

def shouldUseJUnit5 = it.project.name == 'tools'
def shouldUseJUnit5 = ["tools", "raft"].contains(it.project.name)
def testLoggingEvents = ["passed", "skipped", "failed"]
def testShowStandardStreams = false
def testExceptionFormat = 'full'
Expand Down Expand Up @@ -1174,8 +1174,7 @@ project(':raft') {

testCompile project(':clients')
testCompile project(':clients').sourceSets.test.output
testCompile libs.junitJupiterApi
testCompile libs.junitVintageEngine
testCompile libs.junitJupiter
testCompile libs.mockitoCore

testRuntime libs.slf4jlog4j
Expand Down Expand Up @@ -1235,7 +1234,7 @@ project(':raft') {
}

javadoc {
include "**/org/apache/kafka/raft/*"
enabled = false
}
}

Expand Down
5 changes: 4 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.clients" />
</subpackage>
<subpackage name="ssl">
<allow pkg="javax.crypto" />
</subpackage>
<subpackage name="scram">
<allow pkg="javax.crypto" />
</subpackage>
Expand Down Expand Up @@ -288,7 +291,6 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.log4j" />
<subpackage name="testutil">
<allow pkg="org.apache.log4j" />
Expand All @@ -307,6 +309,7 @@
<subpackage name="test">
<allow pkg="org.apache.kafka" />
<allow pkg="org.bouncycastle" />
<allow pkg="org.rocksdb" />
</subpackage>

<subpackage name="raft">
Expand Down
9 changes: 6 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
Expand Down Expand Up @@ -96,7 +96,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest"/>
files="MemoryRecordsTest|MetricsTest|TestSslUtils"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down Expand Up @@ -185,7 +185,7 @@

<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"
files="StreamThreadTest.java"/>
files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>

<suppress checks="MethodLength"
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
Expand All @@ -208,6 +208,9 @@
<suppress checks="MethodLength"
files="KStreamSlidingWindowAggregateTest.java"/>

<suppress checks="ClassFanOutComplexity"
files="StreamTaskTest.java"/>

<!-- Streams test-utils -->
<suppress checks="ClassFanOutComplexity"
files="TopologyTestDriver.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public class CommonClientConfigs {
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.";

@Deprecated
public static final String RETRIES_CONFIG = "retries";
public static final String RETRIES_DOC = "(Deprecated) Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.";
public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." +
" It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.";

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
Expand Down Expand Up @@ -194,10 +194,4 @@ public static void warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
ClientDnsLookup.USE_ALL_DNS_IPS);
}

public static void warnIfDeprecatedRetriesValue(AbstractConfig config) {
if (config.originals().containsKey(RETRIES_CONFIG)) {
log.warn("Configuration '{}' is deprecated and will be removed in future version.", RETRIES_CONFIG);
}
}
}
68 changes: 68 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;
Expand Down Expand Up @@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);

/**
* Describes finalized as well as supported features. By default, the request is issued to any
* broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
* parameter. This is particularly useful if the user requires strongly consistent reads of
* finalized features.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeFeaturesResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
* Applies specified updates to finalized features. This operation is not transactional so some
* updates may succeed while the rest may fail.
* <p>
* The API takes in a map of finalized feature names to {@link FeatureUpdate} that needs to be
* applied. Each entry in the map specifies the finalized feature to be added or updated or
* deleted, along with the new max feature version level value. This request is issued only to
* the controller since the API is only served by the controller. The return value contains an
* error code for each supplied {@link FeatureUpdate}, and the code indicates if the update
* succeeded or failed in the controller.
* <ul>
* <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
* in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set. Setting this
* flag conveys user intent to attempt downgrade of a feature max version level. Note that
* despite the allowDowngrade flag being set, certain downgrades may be rejected by the
* controller if it is deemed impossible.</li>
* <li>Deletion of a finalized feature version is not a regular operation/intent. It could be
* done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting
* the max version level to a value less than 1.</li>
* </ul>
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the futures
* obtained from the returned {@link UpdateFeaturesResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* If the request details are invalid. e.g., a non-existing finalized feature is attempted
* to be deleted or downgraded.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the updates could finish. It cannot be guaranteed whether
* the updates succeeded or not.</li>
* <li>{@link FeatureUpdateFailedException}
* This means there was an unexpected error encountered when the update was applied on
* the controller. There is no guarantee on whether the update succeeded or failed. The best
* way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
* request to the controller to get the latest features.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ public class AdminClientConfig extends AbstractConfig {
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;

/**
* <code>retries</code>
* @deprecated since 2.7
*/
@Deprecated
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;

Expand Down Expand Up @@ -227,7 +222,6 @@ public class AdminClientConfig extends AbstractConfig {
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
*
* The API of this class is evolving. See {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {

/**
* - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
* issued only to the controller.
* - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
* issued to any random broker.
*/
private boolean sendRequestToController = false;

/**
* Sets a flag indicating that the describe features request must be issued only to the controller.
*/
public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController) {
this.sendRequestToController = sendRequestToController;
return this;
}

public boolean sendRequestToController() {
return sendRequestToController;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;

/**
* The result of the {@link Admin#describeFeatures(DescribeFeaturesOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class DescribeFeaturesResult {

private final KafkaFuture<FeatureMetadata> future;

DescribeFeaturesResult(KafkaFuture<FeatureMetadata> future) {
this.future = future;
}

public KafkaFuture<FeatureMetadata> featureMetadata() {
return future;
}
}
Loading

0 comments on commit 83f1575

Please sign in to comment.