Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dataflow): wait for kafka topic creation #47

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ object Cli {
val brokerSecret = Key("kafka.tls.broker.secret", stringType)
val endpointIdentificationAlgorithm = Key("kafka.tls.endpoint.identification.algorithm", stringType)

// Kafka waiting for topic creation
val topicCreateTimeoutMillis = Key("topic.create.timeout.millis", intType)
val topicDescribeTimeoutMillis = Key("topic.describe.timeout.millis", longType)
val topicDescribeRetries = Key("topic.describe.retry.attempts", intType)
val topicDescribeRetryDelayMillis = Key("topic.describe.retry.delay.millis", longType)

// Kafka SASL
val saslUsername = Key("kafka.sasl.username", stringType)
val saslSecret = Key("kafka.sasl.secret", stringType)
Expand Down Expand Up @@ -75,6 +81,10 @@ object Cli {
clientSecret,
brokerSecret,
endpointIdentificationAlgorithm,
topicCreateTimeoutMillis,
topicDescribeTimeoutMillis,
topicDescribeRetries,
topicDescribeRetryDelayMillis,
saslUsername,
saslSecret,
saslPasswordPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,19 @@ object Main {
useCleanState = config[Cli.kafkaUseCleanState],
joinWindowMillis = config[Cli.kafkaJoinWindowMillis],
)
val topicWaitRetryParams = TopicWaitRetryParams(
createTimeoutMillis = config[Cli.topicCreateTimeoutMillis],
describeTimeoutMillis = config[Cli.topicDescribeTimeoutMillis],
describeRetries = config[Cli.topicDescribeRetries],
describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis]
)
val subscriber = PipelineSubscriber(
"seldon-dataflow-engine",
kafkaProperties,
kafkaAdminProperties,
kafkaStreamsParams,
kafkaDomainParams,
topicWaitRetryParams,
config[Cli.upstreamHost],
config[Cli.upstreamPort],
GrpcServiceConfigProvider.config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ class PipelineSubscriber(
kafkaAdminProperties: KafkaAdminProperties,
kafkaStreamsParams: KafkaStreamsParams,
private val kafkaDomainParams: KafkaDomainParams,
private val topicWaitRetryParams: TopicWaitRetryParams,
private val upstreamHost: String,
private val upstreamPort: Int,
grpcServiceConfig: Map<String, Any>,
private val kafkaConsumerGroupIdPrefix: String,
private val namespace: String,
) {
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams)
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams, topicWaitRetryParams)
private val channel = ManagedChannelBuilder
.forAddress(upstreamHost, upstreamPort)
.defaultServiceConfig(grpcServiceConfig)
Expand Down Expand Up @@ -105,7 +106,6 @@ class PipelineSubscriber(
}
}
.collect()
// TODO - error handling?
// TODO - use supervisor job(s) for spawning coroutines?
}

Expand All @@ -121,8 +121,20 @@ class PipelineSubscriber(
kafkaConsumerGroupIdPrefix: String,
namespace: String,
) {
logger.info("Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}", metadata.name, metadata.version, metadata.id)
val (pipeline, err) = Pipeline.forSteps(metadata, steps, kafkaProperties, kafkaDomainParams, kafkaConsumerGroupIdPrefix, namespace)
logger.info(
"Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
metadata.name,
metadata.version,
metadata.id
)
val (pipeline, err) = Pipeline.forSteps(
metadata,
steps,
kafkaProperties,
kafkaDomainParams,
kafkaConsumerGroupIdPrefix,
namespace
)
if (err != null) {
err.log(logger, Level.ERROR)
client.pipelineUpdateEvent(
Expand All @@ -133,7 +145,6 @@ class PipelineSubscriber(
reason = err.getDescription() ?: "failed to initialize dataflow engine"
)
)

return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ data class KafkaDomainParams(
val joinWindowMillis: Long,
)

data class TopicWaitRetryParams(
val createTimeoutMillis: Int, // int required by the underlying kafka-streams library
val describeTimeoutMillis: Long,
val describeRetries: Int,
val describeRetryDelayMillis: Long
)

val kafkaTopicConfig = { maxMessageSizeBytes: Int ->
mapOf(
TopicConfig.MAX_MESSAGE_BYTES_CONFIG to maxMessageSizeBytes.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,45 @@ the Change License after the Change Date as each is defined in accordance with t

package io.seldon.dataflow.kafka

import com.github.michaelbull.retry.ContinueRetrying
import com.github.michaelbull.retry.policy.RetryPolicy
import com.github.michaelbull.retry.policy.constantDelay
import com.github.michaelbull.retry.policy.limitAttempts
import com.github.michaelbull.retry.policy.plus
import com.github.michaelbull.retry.retry
import io.seldon.mlops.chainer.ChainerOuterClass.PipelineStepUpdate
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import io.klogging.logger as coLogger

class KafkaAdmin(
adminConfig: KafkaAdminProperties,
private val streamsConfig: KafkaStreamsParams,
private val topicWaitRetryParams: TopicWaitRetryParams,
) {
private val adminClient = Admin.create(adminConfig)

suspend fun ensureTopicsExist(
steps: List<PipelineStepUpdate>,
) : Exception? {
val missingTopicRetryPolicy: RetryPolicy<Throwable> = {
lc525 marked this conversation as resolved.
Show resolved Hide resolved
when (reason.cause) {
is TimeoutException,
is UnknownTopicOrPartitionException -> ContinueRetrying
else -> {
logger.warn("ignoring exception while waiting for topic creation: ${reason.message}")
ContinueRetrying
}
}
}

try {
steps
.flatMap { step -> step.sourcesList + step.sink + step.triggersList }
Expand All @@ -46,12 +68,27 @@ class KafkaAdmin(
)
}
.run {
adminClient.createTopics(this)
adminClient.createTopics(
this,
CreateTopicsOptions().timeoutMs(topicWaitRetryParams.createTimeoutMillis)
)
}
.values()
.also { topicCreations ->
topicCreations.entries.forEach { creationResult ->
awaitKafkaResult(creationResult)
logger.info("Waiting for kafka topic creation")
// We repeatedly attempt to describe all topics as a way of blocking until they exist at least on
// one broker. This is because the call to createTopics above returns before topics can actually
// be subscribed to.
retry(
missingTopicRetryPolicy + limitAttempts(topicWaitRetryParams.describeRetries) + constantDelay(
topicWaitRetryParams.describeRetryDelayMillis
)
) {
logger.debug("Still waiting for all topics to be created...")
// the KafkaFuture retrieved via .allTopicNames() only succeeds if all the topic
// descriptions succeed, so there is no need to check topic descriptions individually
adminClient.describeTopics(topicCreations.keys).allTopicNames()
lc525 marked this conversation as resolved.
Show resolved Hide resolved
.get(topicWaitRetryParams.describeTimeoutMillis, TimeUnit.MILLISECONDS)
}
}
} catch (e: Exception) {
Expand All @@ -62,22 +99,10 @@ class KafkaAdmin(
return e
}

logger.info("All topics created")
return null
}

private suspend fun awaitKafkaResult(result: Map.Entry<String, KafkaFuture<Void>>) {
try {
result.value.get()
logger.info("Topic created ${result.key}")
} catch (e: ExecutionException) {
if (e.cause is TopicExistsException) {
logger.info("Topic already exists ${result.key}")
} else {
throw e
}
}
}

companion object {
private val logger = coLogger(KafkaAdmin::class)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ data class PipelineMetadata(
val version: Int,
)


class Pipeline(
private val metadata: PipelineMetadata,
private val topology: Topology,
Expand Down Expand Up @@ -140,8 +139,8 @@ class Pipeline(
): Pair<Pipeline?, PipelineStatus.Error?> {
val (topology, numSteps) = buildTopology(metadata, steps, kafkaDomainParams)
val pipelineProperties = localiseKafkaProperties(kafkaProperties, metadata, numSteps, kafkaConsumerGroupIdPrefix, namespace)
var streamsApp : KafkaStreams? = null
var pipelineError: PipelineStatus.Error? = null
var streamsApp : KafkaStreams?
var pipelineError: PipelineStatus.Error?
try {
streamsApp = KafkaStreams(topology, pipelineProperties)
} catch (e: StreamsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
}

// log status when logger is in a coroutine
override fun log(logger: Klogger, level: Level) {
override fun log(logger: Klogger, levelIfNoException: Level) {
var exceptionMsg = this.exception?.message
var exceptionCause = this.exception?.cause ?: Exception("")
var statusMsg = this.message
Expand All @@ -47,17 +47,17 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
}
if (exceptionMsg != null) {
runBlocking {
logger.log(level, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
logger.log(levelIfNoException, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
}
} else {
runBlocking {
logger.log(level, "$statusMsg")
logger.log(levelIfNoException, "$statusMsg")
}
}
}

// log status when logger is outside coroutines
override fun log(logger: NoCoLogger, level: Level) {
override fun log(logger: NoCoLogger, levelIfNoException: Level) {
val exceptionMsg = this.exception?.message
val exceptionCause = this.exception?.cause ?: Exception("")
var statusMsg = this.message
Expand All @@ -66,9 +66,9 @@ open class PipelineStatus(val state: KafkaStreams.State?, var isError: Boolean)
statusMsg += ", stop cause: $prevStateDescription"
}
if (exceptionMsg != null) {
logger.log(level, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
logger.log(levelIfNoException, exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
} else {
logger.log(level, "$statusMsg")
logger.log(levelIfNoException, "$statusMsg")
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions scheduler/data-flow/src/main/resources/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ kafka.sasl.username=seldon
kafka.sasl.secret=
kafka.sasl.password.path=
kafka.sasl.mechanism=PLAIN
topic.create.timeout.millis=60000
topic.describe.timeout.millis=1000
topic.describe.retry.attempts=60
topic.describe.retry.delay.millis=1000