Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter #9099

Merged

Conversation

badaiaqrandista
Copy link
Contributor

Implementation of KIP-431 - Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@badaiaqrandista Thanks for the PR. It looks good overall. I left few comments.

newProps
var headersDeserializer: Option[Deserializer[_]] = None

override def init(props: Properties): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init(props: Properties) has been deprecated. It would be great if we could keep using configure(configs: Map[String, _]) as before. I think that we should also try to directly extract the values from the Map instead of using a Properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac

I have replaced init with configure and changed code to extract the values directly from Map. Please review again.

Array(
"print all possible fields with default delimiters",
consumerRecord(),
Map("print.key" -> "true",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Move print.key to next line to remain consistent with the formatting of the other Maps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac

I've re-indent these as well.

@badaiaqrandista
Copy link
Contributor Author

@dajac Can you please re-review the changes I've made?

@bbejeck
Copy link
Member

bbejeck commented Aug 20, 2020

Ok to test

@badaiaqrandista
Copy link
Contributor Author

@bbejeck Thank you for including the changes in the test. I've fixed the errors found by the test. Please test again.

@bbejeck
Copy link
Member

bbejeck commented Aug 22, 2020

Ok to test

@bbejeck
Copy link
Member

bbejeck commented Aug 22, 2020

ok to test

@bbejeck
Copy link
Member

bbejeck commented Aug 22, 2020

retest this please.

@badaiaqrandista
Copy link
Contributor Author

@bbejeck

One question: As part of my PR, I add a new test for DefaultMessageFormatter class: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala

But I found that there is another unit test that include a test for DefaultMessageFormatter class:
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala#L501

This was the cause of the system test failure yesterday. I changed the latter test to work with the updated DefaultMessageFormatter. But after thinking about it further, I should just delete the test for DefaultMessageFormatter in ConsoleConsumerTest.scala.

What do you think?

@bbejeck
Copy link
Member

bbejeck commented Aug 24, 2020

I should just delete the test for DefaultMessageFormatter in ConsoleConsumerTest.scala.

@badaiaqrandista

I'm not sure. My vote would be to keep the test but move it over to the DefaultMessageFormatterTest.scala class.

But I'm not familiar enough with this code to say for sure. From a quick look at the old test, it's not clear to me why it failed. I guess the Partition number gets printed by default now?

\cc @dajac

@badaiaqrandista
Copy link
Contributor Author

@bbejeck

It failed because it print the partition number as a single integer after the value. I moved the partition to be before the key (if printed) and value, and also added prefix "Partition:" to differentiate it from "Offset:".

It's only printed if "print.partition=true".

I will keep the tests as is then. ConsoleConsumerTest is a unit test for the class while DefaultMessageFormatterTest is more of an integration test.

@bbejeck
Copy link
Member

bbejeck commented Sep 30, 2020

@badaiaqrandista can you rebase this PR? We have a little bit of time to get this into 2.7 before the code freeze on 10/21

@dajac
Copy link
Member

dajac commented Oct 1, 2020

Sorry guys. I have completely forgot this one. I think that it is fine to keep both tests as suggested by @badaiaqrandista.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@badaiaqrandista Thanks for the PR. The code LGTM pending Jenkins and the rebase.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @badaiaqrandista this LGTM modulo rebasing and a clean build.

@badaiaqrandista badaiaqrandista force-pushed the KIP-431_print_header_console_consumer_2 branch from 3f65b14 to 2aa2580 Compare October 2, 2020 12:31
@badaiaqrandista badaiaqrandista force-pushed the KIP-431_print_header_console_consumer_2 branch from 2aa2580 to 4f1c2f8 Compare October 2, 2020 13:09
@bbejeck
Copy link
Member

bbejeck commented Oct 5, 2020

retest this please

@bbejeck
Copy link
Member

bbejeck commented Oct 5, 2020

ok to test

@bbejeck
Copy link
Member

bbejeck commented Oct 5, 2020

Java 11 and Java 15 passed.

Java 8 failed with org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState known flaky test.

@badaiaqrandista
Copy link
Contributor Author

@bbejeck Do I need to do anything on this?

@bbejeck
Copy link
Member

bbejeck commented Oct 5, 2020

Do I need to do anything on this?

@badaiaqrandista, nope as soon as we can get a green build, I'll merge it.

@bbejeck bbejeck merged commit c77183d into apache:trunk Oct 6, 2020
@bbejeck
Copy link
Member

bbejeck commented Oct 6, 2020

Merged #9099 into trunk

@bbejeck
Copy link
Member

bbejeck commented Oct 6, 2020

Thanks for the contribution @badaiaqrandista!

@badaiaqrandista badaiaqrandista deleted the KIP-431_print_header_console_consumer_2 branch October 7, 2020 12:24
javierfreire pushed a commit to javierfreire/kafka that referenced this pull request Oct 8, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Oct 8, 2020
* commit '2804257fe221f37e5098bd': (67 commits)
  KAFKA-10562: Properly invoke new StateStoreContext init (apache#9388)
  MINOR: trivial cleanups, javadoc errors, omitted StateStore tests, etc. (apache#8130)
  KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (apache#9373)
  KAFKA-9274: fix incorrect default value for `task.timeout.ms` config (apache#9385)
  KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted (apache#9247)
  KAFKA-10028: Implement write path for feature versioning system (KIP-584) (apache#9001)
  KAFKA-10402: Upgrade system tests to python3 (apache#9196)
  KAFKA-10186; Abort transaction with pending data with TransactionAbortedException (apache#9280)
  MINOR: Remove `TargetVoters` from `DescribeQuorum` (apache#9376)
  Revert "KAFKA-10469: Resolve logger levels hierarchically (apache#9266)"
  MINOR: Don't publish javadocs for raft module (apache#9336)
  KAFKA-9929: fix: add missing default implementations (apache#9321)
  KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (apache#8910)
  KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651) (apache#9345)
  KAFKA-10527; Voters should not reinitialize as leader in same epoch (apache#9348)
  MINOR: Refactor unit tests around RocksDBConfigSetter (apache#9358)
  KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter (apache#9099)
  MINOR: Annotate test BlockingConnectorTest as integration test (apache#9379)
  MINOR: Fix failing test due to KAFKA-10556 PR (apache#9372)
  KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (apache#9320)
  ...
rgo pushed a commit to rgo/kafka that referenced this pull request Oct 20, 2020
dajac pushed a commit that referenced this pull request Oct 31, 2020
KIP-431 (#9099) changed the format of console consumer output to `Partition:$PARTITION\t$VALUE` whereas previously the output format was `$VALUE\t$PARTITION`. This PR updates the message verifier to accommodate the updated console consumer output format.
dajac pushed a commit that referenced this pull request Oct 31, 2020
KIP-431 (#9099) changed the format of console consumer output to `Partition:$PARTITION\t$VALUE` whereas previously the output format was `$VALUE\t$PARTITION`. This PR updates the message verifier to accommodate the updated console consumer output format.
anujg added a commit to anujg/kafka that referenced this pull request Dec 15, 2020
* KAFKA-10437: Implement new PAPI support for test-utils (#9396)

Implements KIP-478 for the test-utils module:
* adds mocks of the new ProcessorContext and StateStoreContext
* adds tests that all stores and store builders are usable with the new mock
* adds tests that the new Processor api is usable with the new mock
* updates the demonstration Processor to the new api

Reviewers: Guozhang Wang <[email protected]>

* KAFKA-10494: Eager handling of sending old values (#9415)

Nodes that are materialized should not forward requests to `enableSendingOldValues` to parent nodes, as they themselves can handle fulfilling this request. However, some instances of `KTableProcessorSupplier` were still forwarding requests to parent nodes, which was causing unnecessary materialization of table sources.

The following instances of `KTableProcessorSupplier` have been updated to not forward `enableSendingOldValues` to parent nodes if they themselves are materialized and can handle sending old values downstream:

 * `KTableFilter`
 * `KTableMapValues`
 * `KTableTransformValues`

Other instances of `KTableProcessorSupplier` have not be modified for reasons given below:
 * `KTableSuppressProcessorSupplier`: though it has a `storeName` field, it didn't seem right for this to handle sending old values itself. Its only job is to suppress output.
 * `KTableKTableAbstractJoin`: doesn't have a store name, i.e. it is never materialized, so can't handle the call itself.
 * `KTableKTableJoinMerger`: table-table joins already have materialized sources, which are sending old values. It would be an unnecessary performance hit to have this class do a lookup to retrieve the old value from its store.
 * `KTableReduce`: is always materialized and already handling the call without forwarding
 * `KTableAggregate`: is always materialized and already handling the call without forwarding

Reviewer: Matthias J. Sax <[email protected]>

* KAFKA-10570; Rename JMXReporter configs for KIP-629

* rename whitelist/blacklist to include/exclude
* add utility methods to translate deprecated configs

Author: Xavier Léauté <[email protected]>

Reviewers: Gwen Shapira

Closes #9367 from xvrl/kafka-10570

* MINOR rename kafka.utils.Whitelist to IncludeList

rename internal classes, methods, and related constants for KIP-629

Author: Xavier Léauté <[email protected]>

Reviewers: Gwen Shapira

Closes #9400 from xvrl/rename-topic-includelist

* MINOR internal KIP-629 changes to methods and variables

cc gwenshap

Author: Xavier Léauté <[email protected]>

Reviewers: Gwen Shapira

Closes #9405 from xvrl/minor-kip-629-vars

* KAFKA-10573 Update connect transforms configs for KIP-629 (#9403)

Changes the Connect `ReplaceField` SMT's configuration properties, deprecating and replacing `blacklist` with `exclude`, and `whitelist` with `include`. The old configurations are still allowed (ensuring backward compatibility), but warning messages are written to the log to suggest users change to `include` and `exclude`.

This is part of KIP-629.

Author: Xavier Léauté <[email protected]>
Reviewer: Randall Hauch <[email protected]>

* KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)

Before `AlterIsr` which was introduced in KIP-497, the controller would register watches in Zookeeper for each reassigning partition so that it could be notified immediately when the ISR was expanded and the reassignment could be completed. This notification is not needed with the latest IBP when `AlterIsr` is enabled because the controller will execute all ISR changes itself.

There is one subtle detail. If we are in the middle of a roll in order to bump the IBP, then it is possible for the controller to be on the latest IBP while some of the brokers are still on the older one. In this case, the brokers on the older IBP will not send `AlterIsr`, but we can still rely on the delayed notification through the `isr_notifications` path to complete reassignments. This seems like a reasonable tradeoff since it should be a short window before the roll is completed.

Reviewers: David Jacot <[email protected]>, Jun Rao <[email protected]>

* KAFKA-9587: Add omitted configs in KafkaProducer javadoc (#8150)

Simple javadoc fix that aligns the properties with the text. 

Reviewers: Konstantine Karantasis <[email protected]>

* MINOR: fix a bug in removing elements from an ImplicitLinkedHashColle… (#9428)

Fix a bug that was introduced by change 86013dc that resulted in incorrect behavior when
deleting through an iterator.

The bug is that the hash table relies on a denseness invariant... if you remove something,
you might have to move some other things. Calling removeElementAtSlot will do this.
Calling removeFromList is not enough.

Reviewers: Jason Gustafson <[email protected]>

* KAFKA-10611: Merge log error to avoid double error (#9407)

When using an error tracking system, two error log messages result into two different alerts.
It's best to group the logs and have one error with all the information.

For example when using with Sentry, this double line of log.error will create 2 different Issues. One can merge the issues but it will be simpler to have a single error log line.

Signed-off-by: Benoit Maggi <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>, Konstantine Karantasis <[email protected]>

* MINOR: Fix typos in DefaultSslEngineFactory javadoc (#9413)

Fix comment typos.

Reviewers: Boyang Chen <[email protected]>, Lee Dongjin <[email protected]>

* MINOR: Upgrade to gradle 6.7 (#9440)

This release includes a key fix:
* Zinc leaks its dependencies to user classpath (https://github.com/gradle/gradle/issues/14168)

Release notes:
https://docs.gradle.org/6.7/release-notes.html

Reviewers: Manikumar Reddy <[email protected]>

* MINOR: fix error in quota_test.py system tests (#9443)

* KAFKA-10613: Only set leader epoch when list-offset version >= 4 (#9438)

The leader epoch field is added in version 4, and the auto-generated protocol code would throw unsupported version exception if the field is set to any non-default values for version < 4. This would cause older versioned clients to never receive list-offset results.

Reviewers: Boyang Chen <[email protected]>

* MINOR: more log4j entry on elect / resignation of coordinators (#9416)

When a coordinator module is being elected / resigned, our log entry is usually associated with a background scheduler on loading / unloading entries and hence it is unclear at the exact time when the election or resignation happens, and we have to then compare with the KafkaAPI's log entry for leaderAndISR / StopReplica to infer the actual time. I think add a couple new log entries indicating the exact time when it happens is helpful.

Reviewers: Boyang Chen <[email protected]>, Lee Dongjin <[email protected]>, Bruno Cadonna <[email protected]>

* MINOR: Check for active controller in UpdateFeatures request processing logic (#9436)

Reviewers: Jun Rao <[email protected]>

* KAFKA-10340: Improve trace logging under connector based topic creation (#9149)

Reviewers: Konstantine Karantasis <[email protected]>

* KAFKA-10602: Make RetryWithToleranceOperator thread safe (#9422)

ErrantRecordReporter uses a RetryWithToleranceOperator instance, which is necessarily stateful, having a ProcessingContext of which there's supposed to be one per task. That ProcessingContext is used by both RetryWithToleranceOperator.executeFailed() and execute(), so it's not enough to just synchronize executeFailed().

So make all public methods of RetryWithToleranceOperator synchronized so that RetryWithToleranceOperator is now threadsafe.

Tested with the addition of a multithreaded test case that fails consistently if the methods are not properly synchronized. 

Reviewers: Konstantine Karantasis <[email protected]>

* KAFKA-10559: Not letting TimeoutException shutdown the app during internal topic validation (#9432)

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Matthias J. Sax <[email protected]>

* MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor (#9384)

Currently, we pass multiple object reference (AdminClient,TaskManager, and a few more) into StreamsPartitionAssignor. Furthermore, we (miss)use TaskManager#mainConsumer() to get access to the main consumer (we need to do this, to avoid a cyclic dependency).

This PR unifies how object references are passed into a single ReferenceContainer class to
 - not "miss use" the TaskManager as reference container
 - unify how object references are passes

Note: we need to use a reference container to avoid cyclic dependencies, instead of using a config for each passed reference individually.

Reviewers: John Roesler <[email protected]>

* MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch (#9434)

In 2.7, we added lastFetchedEpoch to fetch requests and divergingEpoch to fetch responses. We are not using these for truncation yet, but in order to use these for truncation with IBP 2.7 onwards in the next release, we should make sure that we handle these in all the supporting classes even in 2.7.

Reviewers: Jason Gustafson <[email protected]>

* MINOR: Use debug level logging for noisy log messages in Connect (#8918)

Author: Cyrus Vafadari <[email protected]>
Reviewers: Chris Egerton <[email protected]>, Arjun Satish <[email protected]>, Randall Hauch <[email protected]>

* KAFKA-10600: Connect should not add error to connector validation values for properties not in connector’s ConfigDef (#9425)

Connect should not always add an error to configuration values in validation results that don't have a `ConfigKey` defined in the connector's `ConfigDef`, and any errors on such configuration values included by the connector should be counted in the total number of errors. Added more unit tests for `AbstractHerder.generateResult(...)`.

Author: Randall Hauch <[email protected]>
Reviewer: Konstantine Karantasis <[email protected]>

* MINOR: Fix consumer/producer properties override (#9313)


Reviewers: Mickael Maison <[email protected]>, Ryanne Dolan <[email protected]>

* MINOR: fix potential NPE in PartitionData.equals (#9391)

the field metadata is nullable (see https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetFetchResponse.json#L50)

Reviewers: David Jacot <[email protected]>

* MINOR: Fix comment about AbstractFetcherThread.handlePartitionsWithError (#7205)

Reviewers: Stanislav Kozlovski<[email protected]>, William Hammond<[email protected]>, Chia-Ping Tsai<[email protected]>

* MINOR: inconsistent naming for the output topic in the stream documentation (#9265)

Reviewers: Chia-Ping Tsai <[email protected]>

* KAFKA-10583: Add documentation on the thread-safety of KafkaAdminClient (#9397)

Other than a Stack Overflow comment (see https://stackoverflow.com/a/61738065) by Colin Patrick McCabe and a proposed design note on KIP-117 wiki, there is no source that verifies the thread-safety of KafkaAdminClient.

This patch updates JavaDoc of KafkaAdminClient to clarify its thread-safety.

Reviewers: Tom Bentley <[email protected]>, Chia-Ping Tsai <[email protected]>

* KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)

Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster

Authors: Mickael Maison <[email protected]>, Edoardo Comar <[email protected]>
Reviewer: Randall Hauch <[email protected]>

* KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409)

This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR:

--describe:
Describe supported and finalized features.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config <path_to_java_properties_file>]
Optionally, use the --from-controller option to get features from the controller.
--upgrade-all:
Upgrades all features known to the tool to their highest max version levels.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config <path_to_java_properties_file>]
Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.
--downgrade-all:
Downgrades existing finalized features to the highest max version levels known to this tool.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config <path_to_java_properties_file>].
Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.

Reviewers: Boyang Chen <[email protected]>, Jun Rao <[email protected]>

* MINOR: Update jdk and maven names in Jenkinsfile (#9453)

* KAFKA-9274: Add timeout handling for state restore and StandbyTasks (#9368)

* Part of KIP-572
* If a TimeoutException happens during restore of active tasks, or updating standby tasks, we need to trigger task.timeout.ms timeout.

Reviewers: John Roesler <[email protected]>

* KAFKA-10455: Ensure that probing rebalances always occur (#9383)

Add dummy data to subscriptionUserData to make sure that
it is different each time a member rejoins.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]>

* MINOR: Fixed comment to refer to UpdateMetadataPartitionState rather than UpdateMetadataTopicState. (#9447)

Reviewers: David Jacot <[email protected]>

* KAFKA-10605: Deprecate old PAPI registration methods (#9448)

Add deprecation annotations to the methods replaced in KIP-478.

Reviewers: Bill Bejeck <[email protected]>

* MINOR: the top-level error message of AlterPartitionReassignmentsResponseData does not get propagated correctly (#9392)

Reviewers: David Jacot <[email protected]>

* MINOR: Remove unnecessary assertion from ConnectHeader (#9452)

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR; Return timed out connections as a List instead of a Set (#8999)

Using a Set is not necessary as the caller only cares about having the list of timed out connections/nodes.

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)

Reviewers: Manikumar Reddy <[email protected]>, Chia-Ping Tsai <[email protected]>

* MINOR: fix system tests sending ACLs through ZooKeeper (#9458)

Reviewers: Rajini Sivaram <[email protected]>

* KAFKA-10572 mirror-maker config changes for KIP-629 (#9429)

Author: Xavier Léauté <[email protected]>
Reviewer: Randall Hauch <[email protected]>

* MINOR: Fix upgrade.mode references (#5645)

Reviewers: John Roesler <[email protected]>, Andrew Choi <[email protected]>, Chia-Ping Tsai <[email protected]>

* KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)

DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.

Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.

This results to a deadlock.

To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).

Co-authored-by: Taisiia Goltseva <[email protected]>

Reviewers: Konstantine Karantasis <[email protected]>

* MINOR: Fix flaky ControllerMutationQuotaTest.testQuotaMetric (#9417)

`ClientQuotaManager.updateQuota` updates the in-memory quota before updating the configuration of the metric. Therefore, `quotaLimit` can return the updated value while the metric's configuration still returns the previous one. This patch updates the test to be resilient to this case.

Reviewers: David Jacot <[email protected]>

* MINOR; Fix UpdateMetadataRequestTest.testVersionLogic's assertions (#9462)

UpdateMetadataRequestTest.testVersionLogic's assertions must verify the deserialized request instead of the original one.

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: Clean-up client javadoc warnings (#9463)

Reviewers: Boyang Chen <[email protected]>, Chia-Ping Tsai <[email protected]>

* KAFKA-10515: Properly initialize nullable Serdes with default values (#9338)

Also introduced the notion of WrappingNullableSerdes (aligned to the concept
of WrappingNullableSerializer and WrappingNullableDeserializer) and centralized
initialization in WrappingNullables.

The added integeration test KTableKTableForeignKeyJoinDistributedTest tests
whether all serdes are now correctly set on all stream clients.

Reviewers: John Roesler <[email protected]>

* KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered (#9237)

Fix KAFKA-10454 bug

Main issue was that when optimization algorithm was removing repartition nodes, corresponding copartitionSourceGroups was never updated. As a result, copartition enforcer wasn't able to do the checks and set proper number of partitions.

Test ensures that whenever optimization is set, changelog topic for the table is not created. And whenever optimization is turned off, appropriate changelog topic for the table is created.

Reviewers: John Roesler <[email protected]>, Bill Bejeck <[email protected]>

* MINOR: Clean-up streams javadoc warnings (#9461)

Reviewers: Matthias J. Sax <[email protected]>,  John Roesler <[email protected]>

* KAFKA-10564: fix flaky test (#9466)

Minor update to fix flaky state directory test by advancing the MockTime.

Reviewers: Chia-Ping Tsai <[email protected]>, A. Sophie Blee-Goldman <[email protected]>

* MINOR: Remove unused TopicCommand.askToProceed() method (#9465)

Reviewers: Gwen (Chen) Shapira <[email protected]>, Chia-Ping Tsai <[email protected]>

* MINOR: Fix JDK8 compatibility issue in Snappy (#9460)

See https://github.com/xerial/snappy-java/releases/tag/1.1.7.7 for more details.

Reviewers: Manikumar Reddy <[email protected]>

* KAFKA-10618: Add UUID class, use in protocols (part of KIP-516) (#9454)

In order to support topic IDs, we need to create a public UUID class. This class will be used in protocols. This PR creates the class, modifies code to use the class in the message protocol and changes the code surrounding the existing messages/json that used the old UUID class.

SimpleExampleMessage was used only for testing, so all usages of UUID have been switched to the new class.

SubscriptionInfoData uses UUID for processId extensively. It also utilizes java.util.UUID implementation of Comparable so that UUIDs can be ordered. This functionality was not necessary for the UUIDs used for topic IDs converted to java.util.UUID on the boundary of SubscriptionInfoData. Sorting was used only for testing, though, so this still may be changed.

Also added tests for the methods of the new UUID class. The existing SimpleExampleMessage tests should be sufficient for testing the new UUID class in message protocols.

Reviewers: Rajini Sivaram <[email protected]>

* MINOR: distinguish between missing source topics and internal assignment errors (#9446)

Introduce an ASSIGNMENT_ERROR code to distinguish from INCOMPLETE_SOURCE_TOPIC_METADATA and shut down all members in case of an unexpected exception during task assignment.

Reviewers: Matthias J. Sax <[email protected]>,  John Roesler <[email protected]>

* KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1 (#9406)

We currently stop polling in `Sender` in a transactional producer if there is only one broker in the bootstrap server list and `max.in.flight.requests.per.connection=1` and Metadata response is pending when InitProducerId request is ready to be sent. In this scenario, we attempt to send FindCoordinator to `leastLoadedNode`, but since that is blocked due to `max.in.flight=1` as a result of the pending metadata response, we never unblock unless we poll. This PR ensures we poll in this case.

Reviewers: Chia-Ping Tsai <[email protected]>, Jason Gustafson <[email protected]>, David Jacot <[email protected]>

* MINOR: refactor CandidateState.unrecordedVoters (#9442)

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: Fix timestampDelta type in doc (#8870)

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: simplify implementation of ConsumerGroupOperationContext.hasCo… (#9449)

Reviewers: David Jacot <[email protected]>

* MINOR: Add some class javadoc to Admin client (#9459)

Reviewers: Lee Dongjin <[email protected]>

* MINOR: TopologyTestDriver should not require dummy parameters (#9477)

TopologyTestDriver comes with a paper cut that it passes through a
config requirement that application.id and bootstrap.servers must be
configured. But these configs are not required in the context of
TopologyTestDriver specifically. This change relaxes the requirement.

Reviewers: Boyang Chen <[email protected]>, Matthias J. Sax <[email protected]>

* KAFKA-10284: Group membership update due to static member rejoin should be persisted (#9270)

Reviewers: Boyang Chen <[email protected]>, John Roesler <[email protected]>

* Handle ProducerFencedException on offset commit (#9479)

The transaction manager does currently not handle producer fenced errors returned from a offset commit request.

Adds the handling of the producer fenced errors.

Reviewers: Boyang Chen <[email protected]>, John Roesler <[email protected]>

* MINOR: Update docs to point to next release add notable features for 2.7 (#9483)

Reviewers: Matthias J. Sax <[email protected]>

* MINOR: Update raft/README.md and minor RaftConfig tweaks (#9484)

* Replace quorum.bootstrap.servers and quorum.bootstrap.voters with
quorum.voters.
* Remove seemingly unused `verbose` config.
* Use constant to avoid unnecessary repeated concatenation.

Reviewers: Jason Gustafson <[email protected]>

* MINOR: Refactor RaftClientTest to be used by other tests (#9476)

There is a lot of functionality in KafkaRaftClientTest that is useful for writing other tests. Refactor that functionality into another class that can be reused in other tests.

Reviewers: Jason Gustafson <[email protected]>

* KAFKA-10592: Fix vagrant for a system tests with python3

Fix vagrant for a system tests with a python3.

Author: Nikolay Izhikov <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>

Closes #9480 from nizhikov/KAFKA-10592

* MINOR: fix error in quota_test.py system tests

quota_test.py tests are failing with below error.

```
23:24:42 [INFO:2020-10-24 17:54:42,366]: RunnerClient: kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=user.override_quota=False: FAIL: not enough arguments for format string
23:24:42 Traceback (most recent call last):
23:24:42   File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.6/site-packages/ducktape-0.8.0-py3.6.egg/ducktape/tests/runner_client.py", line 134, in run
23:24:42     data = self.run_test()
23:24:42   File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.6/site-packages/ducktape-0.8.0-py3.6.egg/ducktape/tests/runner_client.py", line 192, in run_test
23:24:42     return self.test_context.function(self.test)
23:24:42   File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.6/site-packages/ducktape-0.8.0-py3.6.egg/ducktape/mark/_mark.py", line 429, in wrapper
23:24:42     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
23:24:42   File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", line 141, in test_quota
23:24:42     self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
23:24:42   File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", line 60, in __init__
23:24:42     self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None])
23:24:42   File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", line 83, in configure_quota
23:24:42     (kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_conection), producer_byte_rate, consumer_byte_rate)
23:24:42 TypeError: not enough arguments for format string
23:24:42
```

ran thee tests locally.

Author: Manikumar Reddy <[email protected]>

Reviewers: David Jacot <[email protected]>, Ron Dagostino <[email protected]>

Closes #9496 from omkreddy/quota-tests

* KAFKA-10092: Remove unnecessary contructor and exception in NioEchoServer (#8794)

Reviewers: notifygd, Andrew Choi <[email protected]>, Jakob Homan, Chia-Ping Tsai <[email protected]>

* MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials (#9374)

`KafkaAdminClient.describeUserScramCredentials` should not fail with a NPE when `users` is `null` as `null` means that all the users must be returned.

Reviewers: Ron Dagostino <[email protected]>, Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>

* MINOR; DescribeUserScramCredentialsRequest API should handle request with users equals to `null` (#9504)

DescribeUserScramCredentialsRequest states that all users are described when Users is empty or null. null is not handled at the moment and throws an NPE.

Reviewers: Ron Dagostino <[email protected]>, Colin P. McCabe <[email protected]>

* KAFKA-10616: Always call prepare-commit before suspending for active tasks (#9464)

Today for active tasks we the following active task suspension:

1. closeAndRevive in handleTaskCorruption.
2. closeClean in assignor#onAssignment.
3. closeClean in shutdown.
4. closeDirty in assignor#onAssignment.
5. closeDirty in listener#onPartitionsLost.
6. closeDirty in shutdown.
7. suspend in listener#onPartitionsRevoked.

Among those, 1/4/5/6 do not call prepareCommit which would stateManager#flushCache and may cause illegal state manager. This PR would require a prepareCommit triggered before suspend.

Reviewers: A. Sophie Blee-Goldman <[email protected]>

* KAFKA-9381: Fix publishing valid scaladoc for streams-scala (#9486)

Reviewers: Ismael Juma <[email protected]>, Matthias J. Sax <[email protected]>

* MINOR: refactor Log to get rid of "return" in nested anonymous function (#9162)

Scala uses NonLocalReturnException to implement the control flow of returning from a nested anonymous function. That is anti-pattern so we should avoid using it in the hot methods.

Reviewers: Ismael Juma <[email protected]>

* KAFKA-10647; Only serialize owned partitions when consumer protocol version >= 1 (#9506)

A regression got introduced by https://github.com/apache/kafka/commit/466f8fd21c6651ea5daa50154239e85fa629dbb4. The owned partition field must be ignored for version < 1 otherwise the serialization fails with an unsupported version exception.

Reviewers: Jason Gustafson <[email protected]>, Chia-Ping Tsai <[email protected]>

* MINOR: Add KIP-584 to upgrade.html file (#9511)

Reviewers: Kowshik Prakasam <[email protected]>, Manikumar Reddy <[email protected]>, Boyang Chen <[email protected]>

* KAFKA-10601; Add support for append linger to Raft implementation (#9418)

The patch adds `quorum.append.linger.ms` behavior to the raft implementation. This gives users a powerful knob to tune the impact of fsync.  When an append is accepted from the state machine, it is held in an accumulator (similar to the producer) until the configured linger time is exceeded. This allows the implementation to amortize fsync overhead at the expense of some write latency.

The patch also improves our methodology for testing performance. Up to now, we have relied on the producer performance test, but it is difficult to simulate expected controller loads because producer performance is limited by other factors such as the number of producer clients and head-of-line blocking. Instead, this patch adds a workload generator which runs on the leader after election.

Finally, this patch brings us nearer to the write semantics expected by the KIP-500 controller. It makes the following changes:

- Introduce `RecordSerde<T>` interface which abstracts the underlying log implementation from `RaftClient`. The generic type is carried over to `RaftClient<T>` and is exposed through the read/write APIs.
- `RaftClient.append` is changed to `RaftClient.scheduleAppend` and returns the last offset of the expected log append.
- `RaftClient.scheduleAppend` accepts a list of records and ensures that the full set are included in a single batch.
- Introduce `RaftClient.Listener` with a single `handleCommit` API which will eventually replace `RaftClient.read` in order to surface committed data to the controller state machine. Currently `handleCommit` is only used for records appended by the leader.

Reviewers: José Armando García Sancio <[email protected]>, Guozhang Wang <[email protected]>

* MINOR: Add KIP-431 to upgrade.html file (#9514)

Reviewers: Matthias J. Sax <[email protected]>

* KAFKA-10644; Fix VotedToUnattached test error (#9503)

This patch fixes a test a test case in `QuorumStateTest`. The method name is "testVotedToUnattachedHigherEpoch," but the code initialized in the unattached state instead of the voted state.

Reviewers: Jason Gustafson <[email protected]>

* MINOR: call super.close() when closing RocksDB options (#9498)

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>

* MINOR: Fix documentation for KIP-585 (#9524)

Reviewers: Chia-Ping Tsai <[email protected]>

* Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523)

This reverts commit 21dc5231ce9c7398c7ede4dbefa2f2202e06b2d4 as we decide to use Envelope for redirection instead of initial principal.

Reviewers: Jason Gustafson <[email protected]>

* KAFKA-10638: Fix QueryableStateIntegrationTest (#9521)

This test has been observed to have flaky failures.
Apparently, in the failed runs, Streams had entered a rebalance
before some of the assertions were made. We recently made
IQ a little stricter on whether it would return errors instead of
null responses in such cases:
KAFKA-10598: Improve IQ name and type checks (#9408)

As a result, we have started seeing failures now instead of
silently executing an invalid test (I.e., it was asserting the
return to be null, but the result was null for the wrong
reason).

Now, if the test discovers that Streams is no longer running,
it will repeat the verification until it actually gets a valid
positive or negative result.

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: Fix flaky shouldRejectNonExistentStoreName (#9426)

Fix flaky test by making sure Streams is
running before making assertions about IQ.

Reviewers: Lee Dongjin <[email protected]>, Guozhang Wang <[email protected]>, Chia-Ping Tsai <[email protected]>

* MINOR: Fix verification in StreamsUpgradeTest.test_version_probing_upgrade (#9530)

The system test StreamsUpgradeTest.test_version_probing_upgrade tries to verify the wrong version for version probing.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>

* MINOR: improve `null` checks for headers (#9513)

Reviewers: Chia-Ping Tsai <[email protected]>, Luke Chen @showuon

* MINOR: Add missing DESCRIBE_QUORUM ApiKey in AbstractRequest.parseRequest (#9537)

Reviewers: David Jacot <[email protected]>

* KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)

Delete the existing checkpoint file if told to write empty offsets map to ensure that corrupted offsets are not re-initialized from

Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang <[email protected]>

* KAFKA-10651: read  offsets directly from checkpoint for uninitialized tasks (#9515)

Read offsets directly from the checkpoint file if a task is uninitialized or closed

Reviewers: Bruno Cadonna <[email protected]>, John Roesler <[email protected]>

* MINOR: Add releaseTarGz to args for building docs (#9528)

Reviewers: Ismael Juma <[email protected]>

* MINOR: Move upgraded docs from site to kafak docs (#9532)

Reviewers: Matthias J. Sax <[email protected]>

* MINOR: Fix group_mode_transactions_test (#9538)

KIP-431 (#9099) changed the format of console consumer output to `Partition:$PARTITION\t$VALUE` whereas previously the output format was `$VALUE\t$PARTITION`. This PR updates the message verifier to accommodate the updated console consumer output format.

* MINOR: remove redudant return statement (#9535)

Reviewers: Chia-Ping Tsai <[email protected]>

* KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

Couple of failures observed after KAFKA-9627: Replace ListOffset request/response with automated protocol (https://github.com/apache/kafka/pull/8295)

1. Latest consumer fails to consume from 0.10.0.1 brokers. Below system tests are failing
kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest
kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest

Solution: Current default value for MaxNumOffsets is 0. because to this brokers are not returning offsets for v0 request. Set default value for MaxNumOffsets field to 1.  This is similar to previous [approach]
(https://github.com/apache/kafka/blob/2.6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java#L204)

2. In some scenarios, latest consumer fails with below error when connecting to a Kafka cluster which consists of newer and older (<=2.0) Kafka brokers
`org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default currentLeaderEpoch at version 3`

Solution: After #8295, consumer can set non-default CurrentLeaderEpoch value for v3 and below requests. One solution is to make CurrentLeaderEpoch ignorable.

Author: Manikumar Reddy <[email protected]>

Reviewers: David Jacot <[email protected]>

Closes #9540 from omkreddy/fix-listoffsets

* KAFKA-10471 Mark broker crash during log loading as unclean shutdown (#9364)

LogManager writes a clean shutdown file when the broker shuts down. The
presence of this file indicates that the broker had a clean shutdown and
log recovery is not needed upon the next boot up.

Earlier, LogManager would check for this file at the start of log loading workflow,
and delete it after the log has been loaded. If the broker were to crash
while loading logs, the file would not be deleted and mislead LogManager when it
tries to load logs upon next boot up. Hence, a crash during log loading
will not be considered a hard reset of broker.

As part of this fix, we delete the clean shutdown file as soon as we
look it up, at the start of log loading workflow. Thereafter, we maintain a boolean
flag to indicate if broker underwent clean shutdown or not. So, if the
broker were to crash while logs are being loaded, LogManager will be
able to detect this as a hard reset.

Reviewers: Jun Rao <[email protected]>

* KAFKA-10632; Raft client should push all committed data to state machines (#9482)

In #9418, we add a listener to the `RaftClient` interface. In that patch, we used it only to send commit notifications for writes from the leader. In this PR, we extend the `handleCommit` API to accept all committed data and we remove the pull-based `read` API. Additionally, we add two new callbacks to the listener interface in order to notify the state machine when the raft client has claimed or resigned leadership.

Finally, this patch allows the `RaftClient` to support multiple listeners. This is necessary for KIP-500 because we will have one listener for the controller role and one for the broker role.

Reviewers: José Armando García Sancio <[email protected]>, Boyang Chen <[email protected]>

* MINOR: KIP-584: Remove admin client facility to read features from controller (#9536)

In this PR, I have eliminated the facility in Admin#describeFeatures API and it's implementation to be able to optionally send a describeFeatures request to the controller. This feature was not seen to be particularly useful, and besides it also poses some hindrance to post KIP-500 world where no client would be able to access the controller directly.

Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>

* MINOR: Fix an example in the Kafka Streams tutorial to be compilable (#6647)

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: revise assertions in AbstractConfigTest (#9180)

Reviewers: Chia-Ping Tsai <[email protected]>

* KAFKA-10679: Migrate upgrade changes from site to kafka/docs (#9551)

During the AK website upgrade, changes made to kafka-site weren't migrated back to kafka-docs.

This PR is an initial attempt at porting the changes to kafka/docs, but it does not include the streams changes. Those will come in a separate PR.

For the most part, the bulk of the changes in the PR are cosmetic. Only the introduction.html has substantial changes, but it's a direct port from the live documentation.

For testing:

I reviewed the PR diffs
Rendered the changes locally

Reviewers: Matthias J. Sax <[email protected]>

* KAFKA-10036: Improve handling and documentation of Suppliers (#9000)

Reviewer: Matthias J. Sax <[email protected]>

* MINOR: remove duplicate code in PartitionStateMachine.doHandleStateChanges (#9546)

Reviewers: Chia-Ping Tsai <[email protected]>

* MINOR: move connectorConfig to AbstractHerder (#6820)

StandaloneHerder and DistributedHerder have identical implementations of connectorConfig (apart from one line of logging). This commit moves the common implementation of connectorConfig to AbstractHerder.

Reviewers: Konstantine Karantasis <[email protected]>, Chia-Ping Tsai <[email protected]>

* KAFKA-10679: [Streams] migrate kafka-site updated docs to kafka/docs (#9554)

During the AK website upgrade, changes made to kafka-site weren't migrated back to kafka-docs.

This PR is an attempt at porting the streams changes to kafka/docs

For the most part, the bulk of the changes in the PR are cosmetic.

For testing:

I reviewed the PR diffs
Rendered the changes locally

Reviewers: John Roesler <[email protected]>

* update version in quickstart to current

* MINOR: Add back section taken out by mistake (#9544)

Reviewers: Matthias J. Sax <[email protected]>

* KAFKA-7987: Reinitialize ZookeeperClient after auth failures (#7751)

Schedules client reinitialization if Zookeeper auth failure is encountered. This allows for reconnections when transient network errors are encountered during connection establishment. The Zookeeper client doesn't expose details of the auth failure so we can't determine whether an error is retriable or not, so all auth failures are retried.

Co-authored-by: Rajini Sivaram <[email protected]>

Reviewers: Jun Rao <[email protected]>

* KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics (#9103)

This PR adds support for forwarding of the following RPCs:

AlterConfigs
IncrementalAlterConfigs
AlterClientQuotas
CreateTopics

Co-authored-by: Jason Gustafson <[email protected]>
Reviewers: Jason Gustafson <[email protected]>

* HOTFIX: RequestContext constructor change (#9559)

Reviewers: Kowshik Prakasam <[email protected]>, Jason Gustafson <[email protected]>, Chia-Ping Tsai <[email protected]>

* MINOR: Fix comment in FinalizedFeatureChangeListener.initOrThrow (#9562)

Fixed the param doc in FinalizedFeatureChangeListener.initOrThrow method. The parameter waitOnceForCacheUpdateMs is expected to be > 0, but the doc was incorrect.

Reviewers: Chia-Ping Tsai <[email protected]>

* KAFKA-10500: Makes the Stream thread list resizable (#9543)

Change the StreamThreads to be held in a List so that we
can dynamically change the number of threads easily.

Reviewers: Bruno Cadonna <[email protected]>, John Roesler <[email protected]>

* KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration (#9561)

This is a follow-up to initial KIP-584 development. In this PR, I've switched the FeatureZNodeStatus enum to be a sealed trait. In Scala, we prefer sealed traits over Enumeration since the former gives you exhaustiveness checking. With Scala enumeration, you don't get a warning if you add a new value that is not handled in a given pattern match.

Reviewers: Jun Rao <[email protected]>

* KAFKA-10673: Cache inter broker listener name used in connection quotas (#9555)

`config.interBrokerListenerName` and `config.listeners` were called several times per connection accept. As these calls are expensive, it is preferable to rely on cached values.

Reviewers: Anna Povzner <[email protected]>, David Jacot <[email protected]>

* MINOR: Always return partitions with diverging epochs in fetch response (#9567)

Reviewers: Jason Gustafson <[email protected]>, Chia-Ping Tsai <[email protected]>

* MINOR: Update jetty to 9.4.33

Jetty 9.4.32 and before are affected by CVE-2020-27216. This vulnerability is fixed in Jetty 9.4.33, please see the jetty project security advisory for details: https://github.com/eclipse/jetty.project/security/advisories/GHSA-g3wg-6mcf-8jj6#advisory-comment-63053

Unit tests and integration tests pass locally after the upgrade.

Author: Nitesh Mor <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>

Closes #9556 from niteshmor/trunk

* KAFKA-10661; Add new resigned state for graceful shutdown/initialization (#9531)

When initializing the raft state machine after shutting down as a leader, we were previously entering the "unattached" state, which means we have no leader and no voted candidate. This was a bug because it allowed a reinitialized leader to cast a vote for a candidate in the same epoch that it was already the leader of. This patch fixes the problem by introducing a new "resigned" state which allows us to retain the leader state so that we cannot change our vote and we will not accept additional appends.

This patch also revamps the shutdown logic to make use of the new "resigned" state. Previously we had a separate path in `KafkaRaftClient.poll` for the shutdown logic which resulted in some duplication. Instead now we incorporate shutdown behavior into each state's respective logic.

Finally, this patch changes the shutdown logic so that `EndQuorumEpoch` is only sent by resigning leaders. Previously we allowed this request to be sent by candidates as well.

Reviewers: dengziming <[email protected]>, Guozhang Wang <[email protected]>

* MINOR: Log resource pattern of ACL updates at INFO level (#9578)

Reviewers: Manikumar Reddy <[email protected]>

* KAFKA-10469: Respect logging hierarchy (KIP-676) (#9266)

Previously the root logger level was used, ignoring intervening loggers with
different levels.

This was initially applied via fda67018375 and reverted via 75e53585255a0
due to the fact that the previous behavior was specified in KIP-412. It
has been been reapplied since KIP-676 has been approved.

Reviewers: Stanislav Kozlovski <[email protected]>, Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>

* migrate remaining RPCs (#9558)

This PR follows up 0814e4f to migrate the remaining RPCs which need forwarding, including:
CreateAcls/DeleteAcls/CreateDelegationToken/RenewDelegationToken/ExpireDelegationToken/AlterPartitionReassignment/CreatePartition/DeleteTopics/UpdateFeatures/Scram

Reviewers: David Arthur <[email protected]>

* KAFKA-10470: Zstd upgrade and buffering (#9499)

Zstd-jni 1.4.5-6 allocates large internal buffers inside of ZstdInputStream and ZstdOutputStream. This caused a lot of allocation and GC activity when creating and closing the streams. It also does not buffer the reads or writes. This causes inefficiency when DefaultRecord.writeTo() does a series of small single bytes reads using various ByteUtils methods. The JNI is more efficient if the writes of uncompressed data were flushed in large pieces rather than for each byte. This is due to the the expense of context switching between the Java code and the native code. This is also the case when reading as well. Per luben/zstd-jni#141 the maintainer of zstd-jni and I agreed to not buffer reads and writes in favor of having the caller do that, so here we are updating the caller.

In this patch, I upgraded to the most recent zstd-jni version with the buffer reuse built-in. This was done in luben/zstd-jni#143 and luben/zstd-jni#146 Since we decided not to add additional buffering of input/output with zstd-jni, I added the BufferedInputStream and BufferedOutputStream to CompressionType.ZSTD just like we currently do for CompressionType.GZIP which also is inefficient for single byte reads and writes. I used the same buffer sizes as that existing implementation.

NOTE: if so desired we could pass a wrapped BufferSupplier into the Zstd*Stream classes to have Kafka decide how the buffer recycling occurs. This functionality was added in the latter PR linked above. I am holding off on this since based on jmh benchmarking the performance gains were not clear and personally I don't know if it worth the complexity of trying to hack around the reflection at this point in time. The zstd-jni uses a very similar default recycler as snappy does currently which seems to provide decent efficiency. While this PR fixes the defect, I feel that using BufferSupplier in both zstd-jni and snappy is outside of the scope of this bugfix and should be considered a separate improvement. I would prefer this change get merged in on its own since the performance gains here are very significant relative to the more incremental and minor optimizations which could be achieved by doing that separate work.

There are some noticeable improvements in the JMH benchmarks (excerpt):

BEFORE:
Benchmark                                                                                                                    (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score     Error   Units
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed                                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   27743.260 ± 673.869   ops/s
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    3399.966 ±  82.608  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  134968.010 ±   0.012    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    3850.985 ±  84.476  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  152881.128 ± 942.189    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     174.241 ±   3.486  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    6917.758 ±  82.522    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1689.000            counts
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   82621.000                ms
JMH benchmarks done

Benchmark                                                                                                    (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score       Error   Units
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage                                                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   24095.711 ±   895.866   ops/s
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate                                     CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2932.289 ±   109.465  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  134032.012 ±     0.013    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    3282.912 ±   115.042  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  150073.914 ±  1342.235    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     149.697 ±     5.786  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    6842.462 ±    64.515    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count                                          CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1449.000              counts
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time                                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   82518.000                  ms
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize                                                     CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1449.060 ±   230.498   ops/s
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     198.051 ±    31.532  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  150502.519 ±     0.186    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     200.064 ±    31.879  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  152569.341 ± 13826.686    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count                                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15      91.000              counts
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time                                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   75869.000                  ms
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize                                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2609.660 ±  1145.160   ops/s
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     815.441 ±   357.818  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  344309.097 ±     0.238    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     808.952 ±   354.975  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  345712.061 ± 51434.034    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.019 ±     0.042  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15      18.615 ±    42.045    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15      24.132 ±    12.254  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   13540.960 ± 14649.192    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     148.000              counts
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   23848.000                  ms
JMH benchmarks done

AFTER
Benchmark                                                                                                                (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      Error   Units
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed                                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  147792.454 ± 2721.318   ops/s
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2708.481 ±   50.012  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   20184.002 ±    0.002    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2732.667 ±   59.258  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   20363.460 ±  120.585    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.042 ±    0.033  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen.norm                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.316 ±    0.249    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count                                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     833.000             counts
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time                                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    8390.000                 ms
JMH benchmarks done

Benchmark                                                                                                (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      Error   Units
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage                                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  166786.092 ± 3285.702   ops/s
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2926.914 ±   57.464  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   19328.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2938.541 ±   66.850  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   19404.357 ±  177.485    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.516 ±    0.100  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen.norm                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       3.409 ±    0.657    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.032 ±    0.131  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.207 ±    0.858    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     834.000             counts
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    9370.000                 ms
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize                                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   15988.116 ±  137.427   ops/s
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate                                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     448.636 ±    3.851  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   30907.698 ±    0.020    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                         CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     450.905 ±    5.587  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   31064.113 ±  291.190    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.043 ±    0.007  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       2.931 ±    0.493    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     790.000             counts
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time                                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     999.000                 ms
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize                                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   11345.169 ±  206.528   ops/s
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2314.800 ±   42.094  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm                        CREATE   RANDOM               ZSTD             200           1000                 2  thrp…
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants