-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ORG] RU-AXAV: KAFKA-13149; Fix NPE when handling malformed record data in produce r… #19
Conversation
…equests (#11080) Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches. Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
This patch fixes several problems with the `ElectLeaders` API in KRaft: - `KafkaApis` did not properly forward this request type to the controller. - `ControllerApis` did not handle the request type. - `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null. - Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election. - Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election. - Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected. - When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary. In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft. Reviewers: dengziming <[email protected]>, José Armando García Sancio <[email protected]>, David Arthur <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good work
@@ -84,7 +84,7 @@ | |||
EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true), | |||
DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN), | |||
DELETE_GROUPS(ApiMessageType.DELETE_GROUPS), | |||
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS), | |||
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
||
electionResults.add(electionResult); | ||
electionResults.add(electionResult); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
@@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig, | |||
_state = BrokerState.PENDING_CONTROLLED_SHUTDOWN | |||
// Send the next heartbeat immediately in order to let the controller | |||
// begin processing the controlled shutdown as soon as possible. | |||
scheduleNextCommunication(0) | |||
scheduleNextCommunicationImmediately() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
if (isTraceEnabled) { | ||
trace(s"Sending broker registration ${data}") | ||
if (isDebugEnabled) { | ||
debug(s"Sending broker registration ${data}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig, | |||
scheduleNextCommunicationAfterSuccess() | |||
} | |||
} else { | |||
info(s"The controlled has asked us to exit controlled shutdown.") | |||
info(s"The controller has asked us to exit controlled shutdown.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
val createTopicResult = admin.createTopics(newTopics) | ||
createTopicResult.all().get() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very good
)) | ||
assertTrue(e.getCause.isInstanceOf[TimeoutException]) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good job!
s"but found ${response.getClass}") | ||
} | ||
} | ||
|
||
@AfterEach |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very good work
val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000) | ||
testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
requestBuilder | ||
) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
…equests (apache#11080)
Raise
InvalidRecordException
fromDefaultRecordBatch.readFrom
instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.Reviewers: Ismael Juma [email protected], Jason Gustafson [email protected]
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)