-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ORG] RU-AXAV: KAFKA-13149; Fix NPE when handling malformed record data in produce r… #19
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { | |
ApiError apiError = ApiError.fromThrowable(e); | ||
List<ReplicaElectionResult> electionResults = new ArrayList<>(); | ||
|
||
for (TopicPartitions topic : data.topicPartitions()) { | ||
ReplicaElectionResult electionResult = new ReplicaElectionResult(); | ||
if (data.topicPartitions() != null) { | ||
for (TopicPartitions topic : data.topicPartitions()) { | ||
ReplicaElectionResult electionResult = new ReplicaElectionResult(); | ||
|
||
electionResult.setTopic(topic.topic()); | ||
for (Integer partitionId : topic.partitions()) { | ||
PartitionResult partitionResult = new PartitionResult(); | ||
partitionResult.setPartitionId(partitionId); | ||
partitionResult.setErrorCode(apiError.error().code()); | ||
partitionResult.setErrorMessage(apiError.message()); | ||
electionResult.setTopic(topic.topic()); | ||
for (Integer partitionId : topic.partitions()) { | ||
PartitionResult partitionResult = new PartitionResult(); | ||
partitionResult.setPartitionId(partitionId); | ||
partitionResult.setErrorCode(apiError.error().code()); | ||
partitionResult.setErrorMessage(apiError.message()); | ||
|
||
electionResult.partitionResult().add(partitionResult); | ||
} | ||
electionResult.partitionResult().add(partitionResult); | ||
} | ||
|
||
electionResults.add(electionResult); | ||
electionResults.add(electionResult); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay |
||
} | ||
|
||
return new ElectLeadersResponse(throttleTimeMs, apiError.error().code(), electionResults, version()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig, | |
_state = BrokerState.PENDING_CONTROLLED_SHUTDOWN | ||
// Send the next heartbeat immediately in order to let the controller | ||
// begin processing the controlled shutdown as soon as possible. | ||
scheduleNextCommunication(0) | ||
scheduleNextCommunicationImmediately() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
|
||
case _ => | ||
info(s"Skipping controlled shutdown because we are in state ${_state}.") | ||
|
@@ -284,8 +284,8 @@ class BrokerLifecycleManager(val config: KafkaConfig, | |
setIncarnationId(incarnationId). | ||
setListeners(_advertisedListeners). | ||
setRack(rack.orNull) | ||
if (isTraceEnabled) { | ||
trace(s"Sending broker registration ${data}") | ||
if (isDebugEnabled) { | ||
debug(s"Sending broker registration ${data}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
} | ||
_channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data), | ||
new BrokerRegistrationResponseHandler()) | ||
|
@@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig, | |
scheduleNextCommunicationAfterSuccess() | ||
} | ||
} else { | ||
info(s"The controlled has asked us to exit controlled shutdown.") | ||
info(s"The controller has asked us to exit controlled shutdown.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
beginShutdown() | ||
} | ||
gotControlledShutdownResponse = true | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,11 @@ | |
|
||
package kafka.server | ||
|
||
import java.net.InetAddress | ||
import java.util | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import java.util.concurrent.locks.ReentrantLock | ||
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} | ||
import java.net.InetAddress | ||
|
||
import kafka.cluster.Broker.ServerInfo | ||
import kafka.coordinator.group.GroupCoordinator | ||
|
@@ -34,7 +34,6 @@ import kafka.security.CredentialProvider | |
import kafka.server.KafkaRaftServer.ControllerRole | ||
import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder} | ||
import kafka.utils.{CoreUtils, KafkaScheduler} | ||
import org.apache.kafka.snapshot.SnapshotWriter | ||
import org.apache.kafka.common.message.ApiMessageType.ListenerType | ||
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection} | ||
import org.apache.kafka.common.metrics.Metrics | ||
|
@@ -45,14 +44,15 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok | |
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} | ||
import org.apache.kafka.common.{ClusterResource, Endpoint} | ||
import org.apache.kafka.metadata.{BrokerState, VersionRange} | ||
import org.apache.kafka.raft.{RaftClient, RaftConfig} | ||
import org.apache.kafka.raft.RaftConfig.AddressSpec | ||
import org.apache.kafka.raft.{RaftClient, RaftConfig} | ||
import org.apache.kafka.server.authorizer.Authorizer | ||
import org.apache.kafka.server.common.ApiMessageAndVersion | ||
import org.apache.kafka.snapshot.SnapshotWriter | ||
|
||
import scala.collection.{Map, Seq} | ||
import scala.jdk.CollectionConverters._ | ||
import scala.compat.java8.OptionConverters._ | ||
import scala.jdk.CollectionConverters._ | ||
|
||
|
||
class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion]) | ||
|
@@ -92,8 +92,7 @@ class BrokerServer( | |
|
||
this.logIdent = logContext.logPrefix | ||
|
||
val lifecycleManager: BrokerLifecycleManager = | ||
new BrokerLifecycleManager(config, time, threadNamePrefix) | ||
@volatile private var lifecycleManager: BrokerLifecycleManager = null | ||
|
||
private val isShuttingDown = new AtomicBoolean(false) | ||
|
||
|
@@ -105,7 +104,7 @@ class BrokerServer( | |
var controlPlaneRequestProcessor: KafkaApis = null | ||
|
||
var authorizer: Option[Authorizer] = None | ||
var socketServer: SocketServer = null | ||
@volatile var socketServer: SocketServer = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null | ||
|
||
var logDirFailureChannel: LogDirFailureChannel = null | ||
|
@@ -162,6 +161,8 @@ class BrokerServer( | |
lock.lock() | ||
try { | ||
if (status != from) return false | ||
info(s"Transition from $status to $to") | ||
|
||
status = to | ||
if (to == SHUTTING_DOWN) { | ||
isShuttingDown.set(true) | ||
|
@@ -182,6 +183,8 @@ class BrokerServer( | |
try { | ||
info("Starting broker") | ||
|
||
lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
/* start scheduler */ | ||
kafkaScheduler = new KafkaScheduler(config.backgroundThreads) | ||
kafkaScheduler.startup() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,8 @@ package kafka.server | |
import java.util | ||
import java.util.Collections | ||
import java.util.Map.Entry | ||
import java.util.concurrent.{CompletableFuture, ExecutionException} | ||
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS} | ||
import java.util.concurrent.{CompletableFuture, ExecutionException} | ||
|
||
import kafka.network.RequestChannel | ||
import kafka.raft.RaftManager | ||
|
@@ -36,11 +36,10 @@ import org.apache.kafka.common.internals.FatalExitError | |
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse} | ||
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic | ||
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult | ||
import org.apache.kafka.common.message.CreateTopicsRequestData | ||
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult | ||
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} | ||
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse | ||
import org.apache.kafka.common.message._ | ||
import org.apache.kafka.common.message.{CreateTopicsRequestData, _} | ||
import org.apache.kafka.common.protocol.Errors._ | ||
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} | ||
import org.apache.kafka.common.requests._ | ||
|
@@ -108,6 +107,7 @@ class ControllerApis(val requestChannel: RequestChannel, | |
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) | ||
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) | ||
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request) | ||
case ApiKeys.ELECT_LEADERS => handleElectLeaders(request) | ||
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") | ||
} | ||
} catch { | ||
|
@@ -488,6 +488,24 @@ class ControllerApis(val requestChannel: RequestChannel, | |
handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData])) | ||
} | ||
|
||
def handleElectLeaders(request: RequestChannel.Request): Unit = { | ||
authHelper.authorizeClusterOperation(request, ALTER) | ||
|
||
val electLeadersRequest = request.body[ElectLeadersRequest] | ||
val future = controller.electLeaders(electLeadersRequest.data) | ||
future.whenComplete { (responseData, exception) => | ||
if (exception != null) { | ||
requestHelper.sendResponseMaybeThrottle(request, throttleMs => { | ||
electLeadersRequest.getErrorResponse(throttleMs, exception) | ||
}) | ||
} else { | ||
requestHelper.sendResponseMaybeThrottle(request, throttleMs => { | ||
new ElectLeadersResponse(responseData.setThrottleTimeMs(throttleMs)) | ||
}) | ||
} | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good |
||
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { | ||
val alterIsrRequest = request.body[AlterIsrRequest] | ||
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -209,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |
case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest) | ||
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) | ||
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal) | ||
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request) | ||
case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders) | ||
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest) | ||
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest) | ||
|
@@ -2993,9 +2993,8 @@ class KafkaApis(val requestChannel: RequestChannel, | |
true | ||
} | ||
|
||
def handleElectReplicaLeader(request: RequestChannel.Request): Unit = { | ||
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request)) | ||
|
||
def handleElectLeaders(request: RequestChannel.Request): Unit = { | ||
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
val electionRequest = request.body[ElectLeadersRequest] | ||
|
||
def sendResponseCallback( | ||
|
@@ -3006,7 +3005,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { | ||
val adjustedResults = if (electionRequest.data.topicPartitions == null) { | ||
/* When performing elections across all of the partitions we should only return | ||
* partitions for which there was an eleciton or resulted in an error. In other | ||
* partitions for which there was an election or resulted in an error. In other | ||
* words, partitions that didn't need election because they ready have the correct | ||
* leader are not returned to the client. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, | |
delta: MetadataDelta, | ||
newImage: MetadataImage): Unit = { | ||
try { | ||
trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset") | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
// Publish the new metadata image to the metadata cache. | ||
metadataCache.setImage(newImage) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,5 +95,11 @@ default Admin createAdminClient() { | |
|
||
void stop(); | ||
|
||
void shutdownBroker(int brokerId); | ||
|
||
void startBroker(int brokerId); | ||
|
||
void rollingBrokerRestart(); | ||
|
||
void waitForReadyBrokers() throws InterruptedException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok