Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dataflow): handle pipeline errors and clear kafka streams state #5358

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.seldon.dataflow

import io.klogging.Klogger
import io.klogging.Level
import io.klogging.NoCoLogger
import kotlinx.coroutines.runBlocking

/**
* An interface designed for returning error or status information from various components.
*
* The idea is to leave exception throwing to program logic errors or checking invariants
* that can not be checked at compile time. Use implementations of this interface as function
* return values to indicate errors/status updates that require special handling in the code.
*/
interface DataflowStatus {
var exception : Exception?
var message : String?

fun getDescription() : String? {
val exceptionMsg = this.exception?.message
return if (exceptionMsg != null) {
"${this.message} Exception: $exceptionMsg"
} else {
this.message
}
}

// log status when logger is in a coroutine
fun log(logger: Klogger, levelIfNoException: Level) {
val exceptionMsg = this.exception?.message
val exceptionCause = this.exception?.cause ?: Exception("")
val statusMsg = this.message
if (exceptionMsg != null) {
runBlocking {
logger.error(exceptionCause, "$statusMsg, Exception: {exception}", exceptionMsg)
}
} else {
runBlocking {
logger.log(levelIfNoException, "$statusMsg")
}
}
}

// log status when logger is outside coroutines
fun log(logger: NoCoLogger, levelIfNoException: Level) {
val exceptionMsg = this.exception?.message
val exceptionCause = this.exception?.cause ?: Exception("")
if (exceptionMsg != null) {
logger.error(exceptionCause, "${this.message}, Exception: {exception}", exceptionMsg)
} else {
logger.log(levelIfNoException, "${this.message}")
}
}
}

fun <T: DataflowStatus> T.withException(e: Exception) : T {
this.exception = e
return this
}

fun <T: DataflowStatus> T.withMessage(msg: String): T {
this.message = msg
return this
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package io.seldon.dataflow

import io.klogging.Level
import io.klogging.config.loggingConfiguration
import io.klogging.rendering.RENDER_ANSI
import io.klogging.rendering.RENDER_ISO8601
lc525 marked this conversation as resolved.
Show resolved Hide resolved
import io.klogging.sending.STDOUT

object Logging {
Expand All @@ -20,7 +20,7 @@ object Logging {
fun configure(appLevel: Level = Level.INFO, kafkaLevel: Level = Level.WARN) =
loggingConfiguration {
kloggingMinLogLevel(appLevel)
sink(stdoutSink, RENDER_ANSI, STDOUT)
sink(stdoutSink, RENDER_ISO8601, STDOUT)
logging {
fromLoggerBase("io.seldon")
toSink(stdoutSink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ package io.seldon.dataflow
import com.github.michaelbull.retry.policy.binaryExponentialBackoff
import com.github.michaelbull.retry.retry
import io.grpc.ManagedChannelBuilder
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import io.klogging.Level
import io.seldon.dataflow.kafka.*
import io.seldon.mlops.chainer.ChainerGrpcKt
import io.seldon.mlops.chainer.ChainerOuterClass.*
Expand Down Expand Up @@ -92,6 +91,16 @@ class PipelineSubscriber(
if (cause == null) {
logger.info("pipeline subscription completed successfully")
} else {
pipelines
.onEach {
// Defend against any existing pipelines that have failed but are not yet stopped, so that
// kafka streams may clean up resources (including temporary files). This is a catch-all
// and indicates we've missed calling stop in a failure case.
if(it.value.status.isError) {
logger.debug("(bug) pipeline in error state when subscription terminates with error. pipeline id: {pipelineId}", it.key)
it.value.stop()
}
}
logger.error("pipeline subscription terminated with error ${cause}")
}
}
Expand All @@ -112,9 +121,25 @@ class PipelineSubscriber(
kafkaConsumerGroupIdPrefix: String,
namespace: String,
) {
logger.info("Create pipeline ${metadata.name} version: ${metadata.version} id: ${metadata.id}")
val pipeline = Pipeline.forSteps(metadata, steps, kafkaProperties, kafkaDomainParams, kafkaConsumerGroupIdPrefix, namespace)
logger.info("Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}", metadata.name, metadata.version, metadata.id)
lc525 marked this conversation as resolved.
Show resolved Hide resolved
val (pipeline, err) = Pipeline.forSteps(metadata, steps, kafkaProperties, kafkaDomainParams, kafkaConsumerGroupIdPrefix, namespace)
if (err != null) {
err.log(logger, Level.ERROR)
client.pipelineUpdateEvent(
makePipelineUpdateEvent(
metadata = metadata,
operation = PipelineOperation.Create,
success = false,
reason = err.getDescription() ?: "failed to initialize dataflow engine"
)
)

return
}

pipeline!! //assert pipeline is not null when err is null
if (pipeline.size != steps.size) {
pipeline.stop()
client.pipelineUpdateEvent(
makePipelineUpdateEvent(
metadata = metadata,
Expand All @@ -128,31 +153,49 @@ class PipelineSubscriber(
}

val previous = pipelines.putIfAbsent(metadata.id, pipeline)
var pipelineStarted = false
var updateEventReason = "created pipeline"
var pipelineStatus: PipelineStatus
if (previous == null) {
kafkaAdmin.ensureTopicsExist(steps)
pipelineStarted = pipeline.start()
if (pipelineStarted == false) {
updateEventReason = "kafka topic error"
val err = kafkaAdmin.ensureTopicsExist(steps)
if (err == null) {
pipelineStatus = pipeline.start()
} else {
pipelineStatus = PipelineStatus.Error(null)
.withException(err)
.withMessage("kafka streams topic creation error")
pipeline.stop()
}
} else {
pipelineStarted = true
logger.warn("pipeline ${metadata.name} with id ${metadata.id} already exists")
pipelineStatus = previous.status
logger.warn("pipeline {pipelineName} with id {pipelineId} already exists", metadata.name, metadata.id)
if (pipelineStatus.isError) {
// do not try to resuscitate an existing pipeline if in a failed state
// it's up to the scheduler to delete it & reinitialize it, as it might require
// coordination with {model, pipeline}gateway
previous.stop()
}
}

// There is a small chance that pipeline.start() returned a status of PipelineState.StreamStopped(),
// if the process is being signalled to shutdown during its execution, and calls pipeline.stop()
//
// For this case, we don't want to mark the Create operation as successful, so we force the state
// to be an error (despite no actual error having occurred) before sending the update to the scheduler.
if(pipelineStatus is PipelineStatus.StreamStopped) {
pipelineStatus.isError = true
}
pipelineStatus.log(logger, Level.INFO)
client.pipelineUpdateEvent(
makePipelineUpdateEvent(
metadata = metadata,
operation = PipelineOperation.Create,
success = pipelineStarted,
reason = updateEventReason
success = !pipelineStatus.isError,
reason = pipelineStatus.getDescription() ?: "pipeline created"
)
)
}

private suspend fun handleDelete(metadata: PipelineMetadata) {
logger.info("Delete pipeline ${metadata.name} version: ${metadata.version} id: ${metadata.id}")
logger.info("Delete pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}", metadata.name, metadata.version, metadata.id )
pipelines
.remove(metadata.id)
?.also { pipeline ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ the Change License after the Change Date as each is defined in accordance with t

package io.seldon.dataflow.kafka

import io.seldon.dataflow.kafka.security.KafkaSaslMechanisms
import io.klogging.noCoLogger
import io.seldon.dataflow.kafka.security.SaslConfig
import io.seldon.dataflow.mtls.CertificateConfig
import io.seldon.dataflow.mtls.K8sCertSecretsProvider
Expand All @@ -23,8 +23,13 @@ import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.errors.DeserializationExceptionHandler
import org.apache.kafka.streams.errors.ProductionExceptionHandler
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler
import java.util.*

const val KAFKA_UNCAUGHT_EXCEPTION_HANDLER_CLASS_CONFIG = "default.processing.exception.handler"

data class KafkaStreamsParams(
val bootstrapServers: String,
val numPartitions: Int,
Expand All @@ -50,6 +55,8 @@ val kafkaTopicConfig = { maxMessageSizeBytes: Int ->
)
}

private val logger = noCoLogger(Pipeline::class)

fun getKafkaAdminProperties(params: KafkaStreamsParams): KafkaAdminProperties {
return getSecurityProperties(params).apply {
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = params.bootstrapServers
Expand Down Expand Up @@ -156,10 +163,11 @@ fun getKafkaProperties(params: KafkaStreamsParams): KafkaProperties {
this[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = 1
this[StreamsConfig.SEND_BUFFER_CONFIG] = params.maxMessageSizeBytes
this[StreamsConfig.RECEIVE_BUFFER_CONFIG] = params.maxMessageSizeBytes
// tell Kafka Streams to optimize the topology
this[StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG] = StreamsConfig.OPTIMIZE
lc525 marked this conversation as resolved.
Show resolved Hide resolved

// Testing
this[StreamsConfig.REPLICATION_FACTOR_CONFIG] = params.replicationFactor
this[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
lc525 marked this conversation as resolved.
Show resolved Hide resolved
this[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 10_000

this[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
Expand Down Expand Up @@ -201,7 +209,20 @@ fun KafkaProperties.withStreamThreads(n: Int): KafkaProperties {
val properties = KafkaProperties()

properties.putAll(this.toMap())
this[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = n
lc525 marked this conversation as resolved.
Show resolved Hide resolved
properties[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = n

return properties
}

fun KafkaProperties.withErrorHandlers(deserializationExceptionHdl: DeserializationExceptionHandler?,
streamExceptionHdl: StreamsUncaughtExceptionHandler?,
productionExceptionHdl: ProductionExceptionHandler?): KafkaProperties {
val properties = KafkaProperties()
properties.putAll(this.toMap())

deserializationExceptionHdl?.let { properties[StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG] = it::class.java }
streamExceptionHdl?.let { properties[KAFKA_UNCAUGHT_EXCEPTION_HANDLER_CLASS_CONFIG] = it::class.java }
productionExceptionHdl?.let { properties[StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = it::class.java }

return properties
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,44 @@ class KafkaAdmin(

suspend fun ensureTopicsExist(
steps: List<PipelineStepUpdate>,
) {
steps
.flatMap { step -> step.sourcesList + step.sink + step.triggersList }
.map { topicName -> parseSource(topicName).first }
.toSet()
.also {
logger.info("Topics found are $it")
}
.map { topicName ->
NewTopic(
topicName,
streamsConfig.numPartitions,
streamsConfig.replicationFactor.toShort(),
).configs(
kafkaTopicConfig(
streamsConfig.maxMessageSizeBytes,
),
)
}
.run { adminClient.createTopics(this) }
.values()
.also { topicCreations ->
topicCreations.entries.forEach { creationResult ->
awaitKafkaResult(creationResult)
) : Exception? {
try {
steps
.flatMap { step -> step.sourcesList + step.sink + step.triggersList }
.map { topicName -> parseSource(topicName).first }
.toSet()
.also {
logger.info("Topics found are $it")
}
}
.map { topicName ->
NewTopic(
topicName,
streamsConfig.numPartitions,
streamsConfig.replicationFactor.toShort(),
).configs(
kafkaTopicConfig(
streamsConfig.maxMessageSizeBytes,
),
)
}
.run {
adminClient.createTopics(this)
}
.values()
.also { topicCreations ->
topicCreations.entries.forEach { creationResult ->
awaitKafkaResult(creationResult)
}
}
} catch (e: Exception) {
// we catch all exceptions here and return them instead, because we want to handle
// errors as part of programming logic, instead of them bubbling up to the scheduler
// subscription event loop. This way, errors for one pipeline don't interfere in the
// execution of others.
return e
}

return null
}

private suspend fun awaitKafkaResult(result: Map.Entry<String, KafkaFuture<Void>>) {
Expand Down
Loading
Loading