From dd7e50395d6703c52564f8b4712a795bcabf9289 Mon Sep 17 00:00:00 2001 From: William Conti <58711692+wconti27@users.noreply.github.com> Date: Mon, 30 Sep 2024 15:07:19 -0400 Subject: [PATCH] fix: still set dsm consume checkpoint when no DSM parent context is available (#4741) * fix dsm checkpointing when no parent context is available when consuming --- packages/datadog-plugin-amqplib/src/consumer.js | 4 +--- .../src/services/kinesis.js | 2 +- .../datadog-plugin-aws-sdk/src/services/sqs.js | 2 +- .../src/batch-consumer.js | 3 +-- packages/datadog-plugin-kafkajs/src/consumer.js | 3 +-- packages/datadog-plugin-rhea/src/consumer.js | 4 +--- packages/dd-trace/src/data_streams_context.js | 3 +++ packages/dd-trace/src/datastreams/pathway.js | 17 ++++++++++++----- packages/dd-trace/src/datastreams/processor.js | 6 ++++++ 9 files changed, 27 insertions(+), 17 deletions(-) diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index da4efb33fd0..92684e3f9dc 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -3,7 +3,6 @@ const { TEXT_MAP } = require('../../../ext/formats') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') const { getResourceName } = require('./util') class AmqplibConsumerPlugin extends ConsumerPlugin { @@ -30,8 +29,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin { }) if ( - this.config.dsmEnabled && message?.properties?.headers && - DsmPathwayCodec.contextExists(message.properties.headers) + this.config.dsmEnabled && message?.properties?.headers ) { const payloadSize = getAmqpMessageSize({ headers: message.properties.headers, content: message.content }) const queue = fields.queue ? fields.queue : fields.routingKey diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index 98547c564f8..60802bfc448 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -113,7 +113,7 @@ class Kinesis extends BaseAwsSdkPlugin { const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString()) if ( - parsedAttributes?._datadog && streamName && DsmPathwayCodec.contextExists(parsedAttributes._datadog) + parsedAttributes?._datadog && streamName ) { const payloadSize = getSizeOrZero(record.Data) this.tracer.decodeDataStreamsContext(parsedAttributes._datadog) diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index 35854ed3c1d..54a3e7e756c 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -194,7 +194,7 @@ class Sqs extends BaseAwsSdkPlugin { parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog) } } - if (parsedAttributes && DsmPathwayCodec.contextExists(parsedAttributes)) { + if (parsedAttributes) { const payloadSize = getHeadersSize({ Body: message.Body, MessageAttributes: message.MessageAttributes diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 5a531267e9b..8415b037644 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -1,6 +1,5 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } @@ -9,7 +8,7 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { start ({ topic, partition, messages, groupId }) { if (!this.config.dsmEnabled) return for (const message of messages) { - if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue + if (!message || !message.headers) continue const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) this.tracer diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 420fea10902..84b6a02fdda 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -2,7 +2,6 @@ const dc = require('dc-polyfill') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') @@ -78,7 +77,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { 'kafka.partition': partition } }) - if (this.config.dsmEnabled && message?.headers && DsmPathwayCodec.contextExists(message.headers)) { + if (this.config.dsmEnabled && message?.headers) { const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) this.tracer diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 226834885be..56aad8f7b9d 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -3,7 +3,6 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { storage } = require('../../datadog-core') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') class RheaConsumerPlugin extends ConsumerPlugin { static get id () { return 'rhea' } @@ -34,8 +33,7 @@ class RheaConsumerPlugin extends ConsumerPlugin { if ( this.config.dsmEnabled && - msgObj?.message?.delivery_annotations && - DsmPathwayCodec.contextExists(msgObj.message.delivery_annotations) + msgObj?.message?.delivery_annotations ) { const payloadSize = getAmqpMessageSize( { headers: msgObj.message.delivery_annotations, content: msgObj.message.body } diff --git a/packages/dd-trace/src/data_streams_context.js b/packages/dd-trace/src/data_streams_context.js index 33354920443..e3c62d35e25 100644 --- a/packages/dd-trace/src/data_streams_context.js +++ b/packages/dd-trace/src/data_streams_context.js @@ -1,4 +1,5 @@ const { storage } = require('../../datadog-core') +const log = require('./log') function getDataStreamsContext () { const store = storage.getStore() @@ -6,6 +7,8 @@ function getDataStreamsContext () { } function setDataStreamsContext (dataStreamsContext) { + log.debug(() => `Setting new DSM Context: ${JSON.stringify(dataStreamsContext)}.`) + if (dataStreamsContext) storage.enterWith({ ...(storage.getStore()), dataStreamsContext }) } diff --git a/packages/dd-trace/src/datastreams/pathway.js b/packages/dd-trace/src/datastreams/pathway.js index 5d587a4768f..066af789e64 100644 --- a/packages/dd-trace/src/datastreams/pathway.js +++ b/packages/dd-trace/src/datastreams/pathway.js @@ -4,6 +4,8 @@ const crypto = require('crypto') const { encodeVarint, decodeVarint } = require('./encoding') const LRUCache = require('lru-cache') +const log = require('../log') +const pick = require('../../../datadog-core/src/utils/src/pick') const options = { max: 500 } const cache = new LRUCache(options) @@ -11,6 +13,8 @@ const cache = new LRUCache(options) const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx' const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64' +const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64] + function shaHash (checkpointString) { const hash = crypto.createHash('md5').update(checkpointString).digest('hex').slice(0, 16) return Buffer.from(hash, 'hex') @@ -80,9 +84,13 @@ class DsmPathwayCodec { return } carrier[CONTEXT_PROPAGATION_KEY_BASE64] = encodePathwayContextBase64(dataStreamsContext) + + log.debug(() => `Injected into DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`) } static decode (carrier) { + log.debug(() => `Attempting extract from DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`) + if (carrier == null) return let ctx @@ -97,13 +105,12 @@ class DsmPathwayCodec { // pass } // cover case where base64 context was received under wrong key - if (!ctx) ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY]) + if (!ctx && CONTEXT_PROPAGATION_KEY in carrier) { + ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY]) + } } - return ctx - } - static contextExists (carrier) { - return CONTEXT_PROPAGATION_KEY_BASE64 in carrier || CONTEXT_PROPAGATION_KEY in carrier + return ctx } } diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index 8670c1571f5..d036af805a7 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -11,6 +11,7 @@ const { types } = require('util') const { PATHWAY_HASH } = require('../../../../ext/tags') const { SchemaBuilder } = require('./schemas/schema_builder') const { SchemaSampler } = require('./schemas/schema_sampler') +const log = require('../log') const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex') @@ -272,6 +273,11 @@ class DataStreamsProcessor { closestOppositeDirectionHash = parentHash closestOppositeDirectionEdgeStart = edgeStartNs } + log.debug( + () => `Setting DSM Checkpoint from extracted parent context with hash: ${parentHash} and edge tags: ${edgeTags}` + ) + } else { + log.debug(() => 'Setting DSM Checkpoint with empty parent context.') } const hash = computePathwayHash(this.service, this.env, edgeTags, parentHash) const edgeLatencyNs = nowNs - edgeStartNs