Skip to content

Commit

Permalink
Add kafkaQueueProblems featureflag
Browse files Browse the repository at this point in the history
Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike

The result of that featureflag can be observed with numerous metrics in grafana (e.g. kafka_consumer_lag_avg)
  • Loading branch information
EislM0203 committed Apr 15, 2024
1 parent 742594f commit c379147
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ the release.
([#1519](https://github.com/open-telemetry/opentelemetry-demo/pull/1519))
* [flagd] export flagd traces to otel collector
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [kafka] add kafkaQueueProblems feature flag
([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528))

## 1.9.0

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ services:
memory: 200M
restart: unless-stopped
environment:
- FLAGD_HOST
- FLAGD_PORT
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP}
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
Expand Down
35 changes: 27 additions & 8 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq

// send to kafka only if kafka broker address is set
if cs.kafkaBrokerSvcAddr != "" {
log.Infof("sending to postProcessor")
cs.sendToPostProcessor(ctx, orderResult)
}

Expand Down Expand Up @@ -439,7 +440,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
paymentService := cs.paymentSvcClient
if cs.checkPaymentFailure(ctx) {
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
paymentService = pb.NewPaymentServiceClient(c)
Expand Down Expand Up @@ -505,6 +506,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)

if cs.isFeatureFlagEnabled(ctx, "kafakQueueProblems") {
log.Infof("Warning: FeatureFlag 'kafakQueueProblems' is activated, overloading queue now.")
messageCount := 100
for i := 0; i < messageCount; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", messageCount)
}
}

func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
Expand Down Expand Up @@ -533,11 +546,17 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
return span
}

func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool {
openfeature.AddHooks(otelhooks.NewTracesHook())
client := openfeature.NewClient("checkout")
failureEnabled, _ := client.BooleanValue(
ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{},
)
return failureEnabled
func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
openfeature.AddHooks(otelhooks.NewTracesHook())
client := openfeature.NewClient("checkout")

// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)

return featureEnabled
}
9 changes: 9 additions & 0 deletions src/flagd/demo.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@
]
}
},
"kafakQueueProblems": {
"description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "on"
},
"cartServiceFailure": {
"description": "Fail cart service",
"state": "ENABLED",
Expand Down
8 changes: 8 additions & 0 deletions src/frauddetectionservice/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-core:2.21.1")
implementation("org.slf4j:slf4j-api:2.0.9")
implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}")
implementation("dev.openfeature:sdk:1.7.4")
implementation("dev.openfeature.contrib.providers:flagd:0.7.0")

if (JavaVersion.current().isJava9Compatible) {
// Workaround for @javax.annotation.Generated
Expand All @@ -50,6 +52,12 @@ dependencies {
}
}

tasks {
shadowJar {
mergeServiceFiles()
}
}

tasks.test {
useJUnitPlatform()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,25 @@ import oteldemo.Demo.*
import java.time.Duration.ofMillis
import java.util.*
import kotlin.system.exitProcess
import dev.openfeature.contrib.providers.flagd.FlagdOptions
import dev.openfeature.contrib.providers.flagd.FlagdProvider
import dev.openfeature.sdk.Client
import dev.openfeature.sdk.EvaluationContext
import dev.openfeature.sdk.MutableContext
import dev.openfeature.sdk.OpenFeatureAPI

const val topic = "orders"
const val groupID = "frauddetectionservice"

private val logger: Logger = LogManager.getLogger(groupID)

fun main() {
val options = FlagdOptions.builder()
.withGlobalTelemetry(true)
.build()
val flagdProvider = FlagdProvider(options)
OpenFeatureAPI.getInstance().setProvider(flagdProvider)

val props = Properties()
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
Expand All @@ -46,8 +58,27 @@ fun main() {
val newCount = accumulator + 1
val orders = OrderResult.parseFrom(record.value())
logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount")
if (getFeatureFlagEnabled("kafakQueueProblems")) {
logger.info("FeatureFlag 'kafakQueueProblems' is enabled, sleeping 1 second")
Thread.sleep(1000)
}
newCount
}
}
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors.
*/
fun getFeatureFlagEnabled(ff: String): Boolean {
val client = OpenFeatureAPI.getInstance().client
// TODO: Plumb the actual session ID from the frontend via baggage?
val uuid = UUID.randomUUID()
client.evaluationContext = MutableContext().add("session", uuid.toString())
val booleanValue = client.getBooleanValue(ff, false)
return booleanValue
}

0 comments on commit c379147

Please sign in to comment.