Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: still set dsm consume checkpoint when no DSM parent context is available #4741

Merged
merged 4 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const { TEXT_MAP } = require('../../../ext/formats')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getResourceName } = require('./util')

class AmqplibConsumerPlugin extends ConsumerPlugin {
Expand All @@ -30,8 +29,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin {
})

if (
this.config.dsmEnabled && message?.properties?.headers &&
DsmPathwayCodec.contextExists(message.properties.headers)
this.config.dsmEnabled && message?.properties?.headers
) {
const payloadSize = getAmqpMessageSize({ headers: message.properties.headers, content: message.content })
const queue = fields.queue ? fields.queue : fields.routingKey
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class Kinesis extends BaseAwsSdkPlugin {
const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString())

if (
parsedAttributes?._datadog && streamName && DsmPathwayCodec.contextExists(parsedAttributes._datadog)
parsedAttributes?._datadog && streamName
) {
const payloadSize = getSizeOrZero(record.Data)
this.tracer.decodeDataStreamsContext(parsedAttributes._datadog)
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class Sqs extends BaseAwsSdkPlugin {
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
}
}
if (parsedAttributes && DsmPathwayCodec.contextExists(parsedAttributes)) {
if (parsedAttributes) {
const payloadSize = getHeadersSize({
Body: message.Body,
MessageAttributes: message.MessageAttributes
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
Expand All @@ -9,7 +8,7 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
start ({ topic, partition, messages, groupId }) {
if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue
if (!message || !message.headers) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const dc = require('dc-polyfill')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart')
Expand Down Expand Up @@ -78,7 +77,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
'kafka.partition': partition
}
})
if (this.config.dsmEnabled && message?.headers && DsmPathwayCodec.contextExists(message.headers)) {
if (this.config.dsmEnabled && message?.headers) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
Expand Down
4 changes: 1 addition & 3 deletions packages/datadog-plugin-rhea/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { storage } = require('../../datadog-core')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')

class RheaConsumerPlugin extends ConsumerPlugin {
static get id () { return 'rhea' }
Expand Down Expand Up @@ -34,8 +33,7 @@ class RheaConsumerPlugin extends ConsumerPlugin {

if (
this.config.dsmEnabled &&
msgObj?.message?.delivery_annotations &&
DsmPathwayCodec.contextExists(msgObj.message.delivery_annotations)
msgObj?.message?.delivery_annotations
) {
const payloadSize = getAmqpMessageSize(
{ headers: msgObj.message.delivery_annotations, content: msgObj.message.body }
Expand Down
3 changes: 3 additions & 0 deletions packages/dd-trace/src/data_streams_context.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
const { storage } = require('../../datadog-core')
const log = require('./log')

function getDataStreamsContext () {
const store = storage.getStore()
return (store && store.dataStreamsContext) || null
}

function setDataStreamsContext (dataStreamsContext) {
log.debug(() => `Setting new DSM Context: ${JSON.stringify(dataStreamsContext)}.`)

if (dataStreamsContext) storage.enterWith({ ...(storage.getStore()), dataStreamsContext })
}

Expand Down
17 changes: 12 additions & 5 deletions packages/dd-trace/src/datastreams/pathway.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
const crypto = require('crypto')
const { encodeVarint, decodeVarint } = require('./encoding')
const LRUCache = require('lru-cache')
const log = require('../log')
const pick = require('../../../datadog-core/src/utils/src/pick')

const options = { max: 500 }
const cache = new LRUCache(options)

const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx'
const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64'

const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64]

function shaHash (checkpointString) {
const hash = crypto.createHash('md5').update(checkpointString).digest('hex').slice(0, 16)
return Buffer.from(hash, 'hex')
Expand Down Expand Up @@ -80,9 +84,13 @@ class DsmPathwayCodec {
return
}
carrier[CONTEXT_PROPAGATION_KEY_BASE64] = encodePathwayContextBase64(dataStreamsContext)

log.debug(() => `Injected into DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`)
}

static decode (carrier) {
log.debug(() => `Attempting extract from DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`)

if (carrier == null) return

let ctx
Expand All @@ -97,13 +105,12 @@ class DsmPathwayCodec {
// pass
}
// cover case where base64 context was received under wrong key
if (!ctx) ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY])
if (!ctx && CONTEXT_PROPAGATION_KEY in carrier) {
ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY])
}
}
return ctx
}

static contextExists (carrier) {
return CONTEXT_PROPAGATION_KEY_BASE64 in carrier || CONTEXT_PROPAGATION_KEY in carrier
return ctx
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const { types } = require('util')
const { PATHWAY_HASH } = require('../../../../ext/tags')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')
const log = require('../log')

const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex')

Expand Down Expand Up @@ -272,6 +273,11 @@ class DataStreamsProcessor {
closestOppositeDirectionHash = parentHash
closestOppositeDirectionEdgeStart = edgeStartNs
}
log.debug(
() => `Setting DSM Checkpoint from extracted parent context with hash: ${parentHash} and edge tags: ${edgeTags}`
)
} else {
log.debug(() => 'Setting DSM Checkpoint with empty parent context.')
}
const hash = computePathwayHash(this.service, this.env, edgeTags, parentHash)
const edgeLatencyNs = nowNs - edgeStartNs
Expand Down
Loading