Skip to content

Commit

Permalink
[LI-HOTFIX] LIKAFKA-21968: Add broker-side observer interface and NoO…
Browse files Browse the repository at this point in the history
…pObserver implementation (apache#6)

TICKET =
LI_DESCRIPTION =

Reviewers: Radai Rosenblatt

EXIT_CRITERIA = MANUAL ["describe exit criteria"]
  • Loading branch information
Lincong Li authored and xiowu0 committed Jul 10, 2019
1 parent 3efb9ee commit 24624e6
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 2 deletions.
19 changes: 19 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val observer: Observer,
val quotas: QuotaManagers,
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
Expand Down Expand Up @@ -531,6 +532,14 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {

try
observer.observeProduceRequest(request.context, request.body[ProduceRequest])
catch {
case e: Exception => error(s"Observer failed to observe the produce request " +
s"${Observer.describeRequestAndResponse(request, null)}", e)
}

val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

// call the replica manager to append messages to the replicas
Expand Down Expand Up @@ -2597,8 +2606,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
observeRequestResponse(request, response)

new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
case None =>
observeRequestResponse(request, null)
new RequestChannel.NoOpResponse(request)
}
sendResponse(response)
Expand All @@ -2620,4 +2632,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

private def observeRequestResponse(request: RequestChannel.Request, response: AbstractResponse): Unit = {
try {
observer.observe(request.context, request.body[AbstractRequest], response)
} catch {
case e: Exception => error(s"Observer failed to observe ${Observer.describeRequestAndResponse(request, response)}", e)
}
}
}
23 changes: 23 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ object Defaults {
/************* Authorizer Configuration ***********/
val AuthorizerClassName = ""

/** ********* Broker-side configuration ***********/
val ObserverClassName = "kafka.server.NoOpObserver"
val ObserverShutdownTimeoutMs = 2000

/** ********* Socket Server Configuration ***********/
val Port = 9092
val HostName: String = new String("")
Expand Down Expand Up @@ -291,6 +295,11 @@ object KafkaConfig {
val ProducerBatchDecompressionEnableProp = "producer.batch.decompression.enable"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"

/** ********* Broker-side observer Configuration ****************/
val ObserverClassNameProp = "observer.class.name"
val ObserverShutdownTimeoutMsProp = "observer.shutdown.timeout"

/** ********* Socket Server Configuration ***********/
val PortProp = "port"
val HostNameProp = "host.name"
Expand Down Expand Up @@ -866,6 +875,12 @@ object KafkaConfig {
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."

/** ********* Broker-side Observer Configuration *********/
val ObserverClassNameDoc = "The name of the observer class that is used to observe requests and/or response on broker."
val ObserverShutdownTimeoutMsDoc = "The maximum time of closing/shutting down an observer. This property can not be less than or equal to " +
"zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " +
"the time it takes to flush the stats."

private val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
Expand Down Expand Up @@ -901,6 +916,10 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)

/************* Broker-side Observer Configuration ***********/
.define(ObserverClassNameProp, STRING, Defaults.ObserverClassName, MEDIUM, ObserverClassNameDoc)
.define(ObserverShutdownTimeoutMsProp, LONG, Defaults.ObserverShutdownTimeoutMs, atLeast(1), MEDIUM, ObserverShutdownTimeoutMsDoc)

/** ********* Socket Server Configuration ***********/
.define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
.define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
Expand Down Expand Up @@ -1203,6 +1222,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/************* Authorizer Configuration ***********/
val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)

/************* Broker-side Observer Configuration ********/
val ObserverClassName: String = getString(KafkaConfig.ObserverClassNameProp)
val ObserverShutdownTimeoutMs: Long = getLong(KafkaConfig.ObserverShutdownTimeoutMsProp)

/** ********* Socket Server Configuration ***********/
val hostName = getString(KafkaConfig.HostNameProp)
val port = getInt(KafkaConfig.PortProp)
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var controlPlaneRequestProcessor: KafkaApis = null

var authorizer: Option[Authorizer] = None
var observer: Observer = null
var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
Expand Down Expand Up @@ -309,21 +310,30 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
authZ
}

observer = try {
CoreUtils.createObject[Observer](config.ObserverClassName)
} catch {
case e: Exception =>
error(s"Creating observer instance from the given class name ${config.ObserverClassName} failed.", e)
new NoOpObserver
}
observer.configure(config.originals())

val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

/* start processing requests */
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, observer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, observer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
Expand Down Expand Up @@ -630,6 +640,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)

CoreUtils.swallow(observer.close(config.ObserverShutdownTimeoutMs, TimeUnit.MILLISECONDS), this)

if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)

Expand Down
46 changes: 46 additions & 0 deletions core/src/main/scala/kafka/server/NoOpObserver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.Map
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext}

/**
* An observer implementation that has no operation and serves as a place holder.
*/
class NoOpObserver extends Observer {

def configure(configs: Map[String, _]): Unit = {}

/**
* Observe a request and its corresponding response.
*/
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit = {}

/**
* Observe a produce request
*/
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit = {}

/**
* Close the observer with timeout.
*/
def close(timeout: Long, unit: TimeUnit): Unit = {}

}
105 changes: 105 additions & 0 deletions core/src/main/scala/kafka/server/Observer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.concurrent.TimeUnit
import kafka.network.RequestChannel
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ProduceRequest, RequestContext}
import org.apache.kafka.common.Configurable

/**
* Top level interface that all pluggable observer must implement. Kafka will read the 'observer.class.name' config
* value at startup time, create an instance of the specificed class using the default constructor, and call its
* 'configure' method.
*
* From that point onwards, every pair of request and response will be routed to the 'record' method.
*
* If 'observer.class.name' has no value specified or the specified class does not exist, the <code>NoOpObserver</code>
* will be used as a place holder.
*/
trait Observer extends Configurable {

/**
* Observe a request and its corresponding response
*
* @param requestContext the context information about the request
* @param request the request being observed for a various purpose(s)
* @param response the response to the request
*/
def observe(requestContext: RequestContext, request: AbstractRequest, response: AbstractResponse): Unit

/**
* Observe a produce request. This method handles only the produce request since produce request is special in
* two ways. Firstly, if ACK is set to be 0, there is no produce response associated with the produce request.
* Secondly, the lifecycle of some inner fields in a ProduceRequest is shorter than the lifecycle of the produce
* request itself. That means in some situations, when <code>observe</code> is called on a produce request and
* response pair, some fields in the produce request has been null-ed already so that the produce request and
* response is not observable (or no useful information). Therefore this method exists for the purpose of allowing
* users to observe on the produce request before its corresponding response is created.
*
* @param requestContext the context information about the request
* @param produceRequest the produce request being observed for a various purpose(s)
*/
def observeProduceRequest(requestContext: RequestContext, produceRequest: ProduceRequest): Unit

/**
* Close the observer with timeout.
*
* @param timeout the maximum time to wait to close the observer.
* @param unit the time unit.
*/
def close(timeout: Long, unit: TimeUnit): Unit
}

object Observer {

/**
* Generates a description of the given request and response. It could be used mostly for debugging purpose.
*
* @param request the request being described
* @param response the response to the request
*/
def describeRequestAndResponse(request: RequestChannel.Request, response: AbstractResponse): String = {
var requestDesc = "Request"
var responseDesc = "Response"
try {
if (request == null) {
requestDesc += " null"
} else {
requestDesc += (" header: " + request.header)
requestDesc += (" from service with principal: " +
request.session.sanitizedUser +
" IP address: " + request.session.clientAddress)
}
requestDesc += " | " // Separate the response description from the request description

if (response == null) {
responseDesc += " null"
} else {
responseDesc += (if (response.errorCounts == null || response.errorCounts.size == 0) {
" with no error"
} else {
" with errors: " + response.errorCounts
})
}
} catch {
case e: Exception => return e.toString // If describing fails, return the exception message directly
}
requestDesc + responseDesc
}
}
2 changes: 2 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class KafkaApisTest {
private val brokerId = 1
private val metadataCache = new MetadataCache(brokerId)
private val authorizer: Option[Authorizer] = None
private val observer: Observer = EasyMock.createNiceMock(classOf[Observer])
private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
Expand Down Expand Up @@ -102,6 +103,7 @@ class KafkaApisTest {
metadataCache,
metrics,
authorizer,
observer,
quotas,
fetchManager,
brokerTopicStats,
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,10 @@ class KafkaConfigTest {
case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore

// Broker-side observer configs
case KafkaConfig.ObserverClassNameProp => // ignore since even if the class name is invalid, a NoOpObserver class is used instead
case KafkaConfig.ObserverShutdownTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1", "0")

case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
Expand Down

0 comments on commit 24624e6

Please sign in to comment.