From c37914775418c1b9c10bcb460bb832145f352754 Mon Sep 17 00:00:00 2001 From: "markus.eisl" Date: Mon, 15 Apr 2024 10:31:02 +0200 Subject: [PATCH] Add kafkaQueueProblems featureflag Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike The result of that featureflag can be observed with numerous metrics in grafana (e.g. kafka_consumer_lag_avg) --- CHANGELOG.md | 2 ++ docker-compose.yml | 2 ++ src/checkoutservice/main.go | 35 ++++++++++++++----- src/flagd/demo.flagd.json | 9 +++++ src/frauddetectionservice/build.gradle.kts | 8 +++++ .../main/kotlin/frauddetectionservice/main.kt | 31 ++++++++++++++++ 6 files changed, 79 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0aa24b8468..f261059b69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ the release. ([#1519](https://github.com/open-telemetry/opentelemetry-demo/pull/1519)) * [flagd] export flagd traces to otel collector ([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522)) +* [kafka] add kafkaQueueProblems feature flag + ([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528)) ## 1.9.0 diff --git a/docker-compose.yml b/docker-compose.yml index 220ce155d4..3fe9f9e043 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -250,6 +250,8 @@ services: memory: 200M restart: unless-stopped environment: + - FLAGD_HOST + - FLAGD_PORT - KAFKA_SERVICE_ADDR - OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP} - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index e67d3d8496..7be3789bc7 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -316,6 +316,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq // send to kafka only if kafka broker address is set if cs.kafkaBrokerSvcAddr != "" { + log.Infof("sending to postProcessor") cs.sendToPostProcessor(ctx, orderResult) } @@ -439,7 +440,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) { paymentService := cs.paymentSvcClient - if cs.checkPaymentFailure(ctx) { + if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") { badAddress := "badAddress:50051" c := mustCreateClient(context.Background(), badAddress) paymentService = pb.NewPaymentServiceClient(c) @@ -505,6 +506,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O cs.KafkaProducerClient.Input() <- &msg successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) + + if cs.isFeatureFlagEnabled(ctx, "kafakQueueProblems") { + log.Infof("Warning: FeatureFlag 'kafakQueueProblems' is activated, overloading queue now.") + messageCount := 100 + for i := 0; i < messageCount; i++ { + go func(i int) { + cs.KafkaProducerClient.Input() <- &msg + _ = <-cs.KafkaProducerClient.Successes() + }(i) + } + log.Infof("Done with #%d messages for overload simulation.", messageCount) + } } func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span { @@ -533,11 +546,17 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace. return span } -func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool { - openfeature.AddHooks(otelhooks.NewTracesHook()) - client := openfeature.NewClient("checkout") - failureEnabled, _ := client.BooleanValue( - ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{}, - ) - return failureEnabled +func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool { + openfeature.AddHooks(otelhooks.NewTracesHook()) + client := openfeature.NewClient("checkout") + + // Default value is set to false, but you could also make this a parameter. + featureEnabled, _ := client.BooleanValue( + ctx, + featureFlagName, + false, + openfeature.EvaluationContext{}, + ) + + return featureEnabled } diff --git a/src/flagd/demo.flagd.json b/src/flagd/demo.flagd.json index d4c38a390c..31d779913e 100644 --- a/src/flagd/demo.flagd.json +++ b/src/flagd/demo.flagd.json @@ -61,6 +61,15 @@ ] } }, + "kafakQueueProblems": { + "description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike", + "state": "ENABLED", + "variants": { + "on": true, + "off": false + }, + "defaultVariant": "on" + }, "cartServiceFailure": { "description": "Fail cart service", "state": "ENABLED", diff --git a/src/frauddetectionservice/build.gradle.kts b/src/frauddetectionservice/build.gradle.kts index 10cde2b7fb..1819c7b934 100644 --- a/src/frauddetectionservice/build.gradle.kts +++ b/src/frauddetectionservice/build.gradle.kts @@ -42,6 +42,8 @@ dependencies { implementation("org.apache.logging.log4j:log4j-core:2.21.1") implementation("org.slf4j:slf4j-api:2.0.9") implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}") + implementation("dev.openfeature:sdk:1.7.4") + implementation("dev.openfeature.contrib.providers:flagd:0.7.0") if (JavaVersion.current().isJava9Compatible) { // Workaround for @javax.annotation.Generated @@ -50,6 +52,12 @@ dependencies { } } +tasks { + shadowJar { + mergeServiceFiles() + } +} + tasks.test { useJUnitPlatform() } diff --git a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt index 009a849764..bb7061f7bb 100644 --- a/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt +++ b/src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt @@ -15,6 +15,12 @@ import oteldemo.Demo.* import java.time.Duration.ofMillis import java.util.* import kotlin.system.exitProcess +import dev.openfeature.contrib.providers.flagd.FlagdOptions +import dev.openfeature.contrib.providers.flagd.FlagdProvider +import dev.openfeature.sdk.Client +import dev.openfeature.sdk.EvaluationContext +import dev.openfeature.sdk.MutableContext +import dev.openfeature.sdk.OpenFeatureAPI const val topic = "orders" const val groupID = "frauddetectionservice" @@ -22,6 +28,12 @@ const val groupID = "frauddetectionservice" private val logger: Logger = LogManager.getLogger(groupID) fun main() { + val options = FlagdOptions.builder() + .withGlobalTelemetry(true) + .build() + val flagdProvider = FlagdProvider(options) + OpenFeatureAPI.getInstance().setProvider(flagdProvider) + val props = Properties() props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name @@ -46,8 +58,27 @@ fun main() { val newCount = accumulator + 1 val orders = OrderResult.parseFrom(record.value()) logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount") + if (getFeatureFlagEnabled("kafakQueueProblems")) { + logger.info("FeatureFlag 'kafakQueueProblems' is enabled, sleeping 1 second") + Thread.sleep(1000) + } newCount } } } } + +/** +* Retrieves the status of a feature flag from the Feature Flag service. +* +* @param ff The name of the feature flag to retrieve. +* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors. +*/ +fun getFeatureFlagEnabled(ff: String): Boolean { + val client = OpenFeatureAPI.getInstance().client + // TODO: Plumb the actual session ID from the frontend via baggage? + val uuid = UUID.randomUUID() + client.evaluationContext = MutableContext().add("session", uuid.toString()) + val booleanValue = client.getBooleanValue(ff, false) + return booleanValue +}