Skip to content

Commit

Permalink
fix(dataflow): move constants used in topic retries to config vars
Browse files Browse the repository at this point in the history
  • Loading branch information
lc525 committed Feb 26, 2024
1 parent 5edd4ac commit 8b70780
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 14 deletions.
10 changes: 10 additions & 0 deletions scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ object Cli {
val brokerSecret = Key("kafka.tls.broker.secret", stringType)
val endpointIdentificationAlgorithm = Key("kafka.tls.endpoint.identification.algorithm", stringType)

// Kafka waiting for topic creation
val topicCreateTimeoutMillis = Key("topic.create.timeout.millis", intType)
val topicDescribeTimeoutMillis = Key("topic.describe.timeout.millis", longType)
val topicDescribeRetries = Key("topic.describe.retry.attempts", intType)
val topicDescribeRetryDelayMillis = Key("topic.describe.retry.delay.millis", longType)

// Kafka SASL
val saslUsername = Key("kafka.sasl.username", stringType)
val saslSecret = Key("kafka.sasl.secret", stringType)
Expand Down Expand Up @@ -75,6 +81,10 @@ object Cli {
clientSecret,
brokerSecret,
endpointIdentificationAlgorithm,
topicCreateTimeoutMillis,
topicDescribeTimeoutMillis,
topicDescribeRetries,
topicDescribeRetryDelayMillis,
saslUsername,
saslSecret,
saslPasswordPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,19 @@ object Main {
useCleanState = config[Cli.kafkaUseCleanState],
joinWindowMillis = config[Cli.kafkaJoinWindowMillis],
)
val topicWaitRetryParams = TopicWaitRetryParams(
createTimeoutMillis = config[Cli.topicCreateTimeoutMillis],
describeTimeoutMillis = config[Cli.topicDescribeTimeoutMillis],
describeRetries = config[Cli.topicDescribeRetries],
describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis]
)
val subscriber = PipelineSubscriber(
"seldon-dataflow-engine",
kafkaProperties,
kafkaAdminProperties,
kafkaStreamsParams,
kafkaDomainParams,
topicWaitRetryParams,
config[Cli.upstreamHost],
config[Cli.upstreamPort],
GrpcServiceConfigProvider.config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ class PipelineSubscriber(
kafkaAdminProperties: KafkaAdminProperties,
kafkaStreamsParams: KafkaStreamsParams,
private val kafkaDomainParams: KafkaDomainParams,
private val topicWaitRetryParams: TopicWaitRetryParams,
private val upstreamHost: String,
private val upstreamPort: Int,
grpcServiceConfig: Map<String, Any>,
private val kafkaConsumerGroupIdPrefix: String,
private val namespace: String,
) {
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams)
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams, topicWaitRetryParams)
private val channel = ManagedChannelBuilder
.forAddress(upstreamHost, upstreamPort)
.defaultServiceConfig(grpcServiceConfig)
Expand Down Expand Up @@ -144,7 +145,6 @@ class PipelineSubscriber(
reason = err.getDescription() ?: "failed to initialize dataflow engine"
)
)

return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ data class KafkaDomainParams(
val joinWindowMillis: Long,
)

data class TopicWaitRetryParams(
val createTimeoutMillis: Int, // int required by the underlying kafka-streams library
val describeTimeoutMillis: Long,
val describeRetries: Int,
val describeRetryDelayMillis: Long
)

val kafkaTopicConfig = { maxMessageSizeBytes: Int ->
mapOf(
TopicConfig.MAX_MESSAGE_BYTES_CONFIG to maxMessageSizeBytes.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import io.klogging.logger as coLogger
class KafkaAdmin(
adminConfig: KafkaAdminProperties,
private val streamsConfig: KafkaStreamsParams,
private val topicWaitRetryParams: TopicWaitRetryParams,
) {
private val adminClient = Admin.create(adminConfig)

suspend fun ensureTopicsExist(
steps: List<PipelineStepUpdate>,
) : Exception? {
val missingTopicRetryPolicy: RetryPolicy<Throwable> = {
when (reason) {
when (reason.cause) {
is TimeoutException,
is UnknownTopicOrPartitionException -> ContinueRetrying
else -> {
Expand Down Expand Up @@ -67,17 +68,27 @@ class KafkaAdmin(
)
}
.run {
adminClient.createTopics(this, CreateTopicsOptions().timeoutMs(60_000))
adminClient.createTopics(
this,
CreateTopicsOptions().timeoutMs(topicWaitRetryParams.createTimeoutMillis)
)
}
.values()
.also { topicCreations ->
logger.info("Waiting for kafka topic creation")
// We repeatedly attempt to describe all topics as a way of blocking until they exist at least on
// one broker. This is because the call to createTopics above returns before topics can actually
// be subscribed to.
retry(missingTopicRetryPolicy + limitAttempts(60) + constantDelay(delayMillis = 1000L)) {
retry(
missingTopicRetryPolicy + limitAttempts(topicWaitRetryParams.describeRetries) + constantDelay(
topicWaitRetryParams.describeRetryDelayMillis
)
) {
logger.debug("Still waiting for all topics to be created...")
adminClient.describeTopics(topicCreations.keys).allTopicNames().get(500, TimeUnit.MILLISECONDS)
// the KafkaFuture retrieved via .allTopicNames() only succeeds if all the topic
// descriptions succeed, so there is no need to check topic descriptions individually
adminClient.describeTopics(topicCreations.keys).allTopicNames()
.get(topicWaitRetryParams.describeTimeoutMillis, TimeUnit.MILLISECONDS)
}
}
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ class Pipeline(
): Pair<Pipeline?, PipelineStatus.Error?> {
val (topology, numSteps) = buildTopology(metadata, steps, kafkaDomainParams)
val pipelineProperties = localiseKafkaProperties(kafkaProperties, metadata, numSteps, kafkaConsumerGroupIdPrefix, namespace)
var streamsApp : KafkaStreams? = null
var pipelineError: PipelineStatus.Error? = null
var streamsApp : KafkaStreams?
var pipelineError: PipelineStatus.Error?
try {
streamsApp = KafkaStreams(topology, pipelineProperties)
} catch (e: StreamsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
}

// log status when logger is in a coroutine
override fun log(logger: Klogger, level: Level) {
override fun log(logger: Klogger, levelIfNoException: Level) {
var exceptionMsg = this.exception?.message
var exceptionCause = this.exception?.cause ?: Exception("")
var statusMsg = this.message
Expand All @@ -47,17 +47,17 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
}
if (exceptionMsg != null) {
runBlocking {
logger.log(level, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
logger.log(levelIfNoException, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
}
} else {
runBlocking {
logger.log(level, "$statusMsg")
logger.log(levelIfNoException, "$statusMsg")
}
}
}

// log status when logger is outside coroutines
override fun log(logger: NoCoLogger, level: Level) {
override fun log(logger: NoCoLogger, levelIfNoException: Level) {
val exceptionMsg = this.exception?.message
val exceptionCause = this.exception?.cause ?: Exception("")
var statusMsg = this.message
Expand All @@ -66,9 +66,9 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
statusMsg += ", stop cause: $prevStateDescription"
}
if (exceptionMsg != null) {
logger.log(level, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
logger.log(levelIfNoException, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
} else {
logger.log(level, "$statusMsg")
logger.log(levelIfNoException, "$statusMsg")
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions scheduler/data-flow/src/main/resources/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ kafka.sasl.username=seldon
kafka.sasl.secret=
kafka.sasl.password.path=
kafka.sasl.mechanism=PLAIN
topic.create.timeout.millis=60000
topic.describe.timeout.millis=1000
topic.describe.retry.attempts=60
topic.describe.retry.delay.millis=1000

0 comments on commit 8b70780

Please sign in to comment.