Skip to content

Commit

Permalink
fix: still set dsm consume checkpoint when no DSM parent context is a…
Browse files Browse the repository at this point in the history
…vailable (#4741)

* fix dsm checkpointing when no parent context is available when consuming
  • Loading branch information
wconti27 authored and juan-fernandez committed Oct 1, 2024
1 parent 68b1035 commit dd7e503
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 17 deletions.
4 changes: 1 addition & 3 deletions packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const { TEXT_MAP } = require('../../../ext/formats')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getResourceName } = require('./util')

class AmqplibConsumerPlugin extends ConsumerPlugin {
Expand All @@ -30,8 +29,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin {
})

if (
this.config.dsmEnabled && message?.properties?.headers &&
DsmPathwayCodec.contextExists(message.properties.headers)
this.config.dsmEnabled && message?.properties?.headers
) {
const payloadSize = getAmqpMessageSize({ headers: message.properties.headers, content: message.content })
const queue = fields.queue ? fields.queue : fields.routingKey
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class Kinesis extends BaseAwsSdkPlugin {
const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString())

if (
parsedAttributes?._datadog && streamName && DsmPathwayCodec.contextExists(parsedAttributes._datadog)
parsedAttributes?._datadog && streamName
) {
const payloadSize = getSizeOrZero(record.Data)
this.tracer.decodeDataStreamsContext(parsedAttributes._datadog)
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class Sqs extends BaseAwsSdkPlugin {
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
}
}
if (parsedAttributes && DsmPathwayCodec.contextExists(parsedAttributes)) {
if (parsedAttributes) {
const payloadSize = getHeadersSize({
Body: message.Body,
MessageAttributes: message.MessageAttributes
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
Expand All @@ -9,7 +8,7 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
start ({ topic, partition, messages, groupId }) {
if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue
if (!message || !message.headers) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const dc = require('dc-polyfill')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart')
Expand Down Expand Up @@ -78,7 +77,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
'kafka.partition': partition
}
})
if (this.config.dsmEnabled && message?.headers && DsmPathwayCodec.contextExists(message.headers)) {
if (this.config.dsmEnabled && message?.headers) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
Expand Down
4 changes: 1 addition & 3 deletions packages/datadog-plugin-rhea/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { storage } = require('../../datadog-core')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class RheaConsumerPlugin extends ConsumerPlugin {
static get id () { return 'rhea' }
Expand Down Expand Up @@ -34,8 +33,7 @@ class RheaConsumerPlugin extends ConsumerPlugin {

if (
this.config.dsmEnabled &&
msgObj?.message?.delivery_annotations &&
DsmPathwayCodec.contextExists(msgObj.message.delivery_annotations)
msgObj?.message?.delivery_annotations
) {
const payloadSize = getAmqpMessageSize(
{ headers: msgObj.message.delivery_annotations, content: msgObj.message.body }
Expand Down
3 changes: 3 additions & 0 deletions packages/dd-trace/src/data_streams_context.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
const { storage } = require('../../datadog-core')
const log = require('./log')

function getDataStreamsContext () {
const store = storage.getStore()
return (store && store.dataStreamsContext) || null
}

function setDataStreamsContext (dataStreamsContext) {
log.debug(() => `Setting new DSM Context: ${JSON.stringify(dataStreamsContext)}.`)

if (dataStreamsContext) storage.enterWith({ ...(storage.getStore()), dataStreamsContext })
}

Expand Down
17 changes: 12 additions & 5 deletions packages/dd-trace/src/datastreams/pathway.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
const crypto = require('crypto')
const { encodeVarint, decodeVarint } = require('./encoding')
const LRUCache = require('lru-cache')
const log = require('../log')
const pick = require('../../../datadog-core/src/utils/src/pick')

const options = { max: 500 }
const cache = new LRUCache(options)

const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx'
const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64'

const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64]

function shaHash (checkpointString) {
const hash = crypto.createHash('md5').update(checkpointString).digest('hex').slice(0, 16)
return Buffer.from(hash, 'hex')
Expand Down Expand Up @@ -80,9 +84,13 @@ class DsmPathwayCodec {
return
}
carrier[CONTEXT_PROPAGATION_KEY_BASE64] = encodePathwayContextBase64(dataStreamsContext)

log.debug(() => `Injected into DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`)
}

static decode (carrier) {
log.debug(() => `Attempting extract from DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`)

if (carrier == null) return

let ctx
Expand All @@ -97,13 +105,12 @@ class DsmPathwayCodec {
// pass
}
// cover case where base64 context was received under wrong key
if (!ctx) ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY])
if (!ctx && CONTEXT_PROPAGATION_KEY in carrier) {
ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY])
}
}
return ctx
}

static contextExists (carrier) {
return CONTEXT_PROPAGATION_KEY_BASE64 in carrier || CONTEXT_PROPAGATION_KEY in carrier
return ctx
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const { types } = require('util')
const { PATHWAY_HASH } = require('../../../../ext/tags')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')
const log = require('../log')

const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex')

Expand Down Expand Up @@ -272,6 +273,11 @@ class DataStreamsProcessor {
closestOppositeDirectionHash = parentHash
closestOppositeDirectionEdgeStart = edgeStartNs
}
log.debug(
() => `Setting DSM Checkpoint from extracted parent context with hash: ${parentHash} and edge tags: ${edgeTags}`
)
} else {
log.debug(() => 'Setting DSM Checkpoint with empty parent context.')
}
const hash = computePathwayHash(this.service, this.env, edgeTags, parentHash)
const edgeLatencyNs = nowNs - edgeStartNs
Expand Down

0 comments on commit dd7e503

Please sign in to comment.