From 78673a8745bb08898bf07abafff690658b81f358 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 30 Sep 2024 12:42:03 -0400 Subject: [PATCH 1/4] fix dsm checkpointing when no parent context is available when consuming --- packages/datadog-plugin-amqplib/src/consumer.js | 3 +-- .../src/services/kinesis.js | 2 +- .../datadog-plugin-aws-sdk/src/services/sqs.js | 2 +- .../src/batch-consumer.js | 2 +- packages/datadog-plugin-kafkajs/src/consumer.js | 2 +- packages/datadog-plugin-rhea/src/consumer.js | 3 +-- packages/dd-trace/src/data_streams_context.js | 3 +++ packages/dd-trace/src/datastreams/pathway.js | 17 ++++++++++++----- packages/dd-trace/src/datastreams/processor.js | 6 ++++++ 9 files changed, 27 insertions(+), 13 deletions(-) diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index da4efb33fd0..be0e8a48acc 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -30,8 +30,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin { }) if ( - this.config.dsmEnabled && message?.properties?.headers && - DsmPathwayCodec.contextExists(message.properties.headers) + this.config.dsmEnabled && message?.properties?.headers ) { const payloadSize = getAmqpMessageSize({ headers: message.properties.headers, content: message.content }) const queue = fields.queue ? fields.queue : fields.routingKey diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index 98547c564f8..60802bfc448 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -113,7 +113,7 @@ class Kinesis extends BaseAwsSdkPlugin { const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString()) if ( - parsedAttributes?._datadog && streamName && DsmPathwayCodec.contextExists(parsedAttributes._datadog) + parsedAttributes?._datadog && streamName ) { const payloadSize = getSizeOrZero(record.Data) this.tracer.decodeDataStreamsContext(parsedAttributes._datadog) diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index 35854ed3c1d..54a3e7e756c 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -194,7 +194,7 @@ class Sqs extends BaseAwsSdkPlugin { parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog) } } - if (parsedAttributes && DsmPathwayCodec.contextExists(parsedAttributes)) { + if (parsedAttributes) { const payloadSize = getHeadersSize({ Body: message.Body, MessageAttributes: message.MessageAttributes diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 5a531267e9b..2185b245d9f 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -9,7 +9,7 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { start ({ topic, partition, messages, groupId }) { if (!this.config.dsmEnabled) return for (const message of messages) { - if (!message || !message.headers || !DsmPathwayCodec.contextExists(message.headers)) continue + if (!message || !message.headers) continue const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) this.tracer diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 420fea10902..0bc02e5cace 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -78,7 +78,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { 'kafka.partition': partition } }) - if (this.config.dsmEnabled && message?.headers && DsmPathwayCodec.contextExists(message.headers)) { + if (this.config.dsmEnabled && message?.headers) { const payloadSize = getMessageSize(message) this.tracer.decodeDataStreamsContext(message.headers) this.tracer diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 226834885be..45e287dde22 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -34,8 +34,7 @@ class RheaConsumerPlugin extends ConsumerPlugin { if ( this.config.dsmEnabled && - msgObj?.message?.delivery_annotations && - DsmPathwayCodec.contextExists(msgObj.message.delivery_annotations) + msgObj?.message?.delivery_annotations ) { const payloadSize = getAmqpMessageSize( { headers: msgObj.message.delivery_annotations, content: msgObj.message.body } diff --git a/packages/dd-trace/src/data_streams_context.js b/packages/dd-trace/src/data_streams_context.js index 33354920443..e3c62d35e25 100644 --- a/packages/dd-trace/src/data_streams_context.js +++ b/packages/dd-trace/src/data_streams_context.js @@ -1,4 +1,5 @@ const { storage } = require('../../datadog-core') +const log = require('./log') function getDataStreamsContext () { const store = storage.getStore() @@ -6,6 +7,8 @@ function getDataStreamsContext () { } function setDataStreamsContext (dataStreamsContext) { + log.debug(() => `Setting new DSM Context: ${JSON.stringify(dataStreamsContext)}.`) + if (dataStreamsContext) storage.enterWith({ ...(storage.getStore()), dataStreamsContext }) } diff --git a/packages/dd-trace/src/datastreams/pathway.js b/packages/dd-trace/src/datastreams/pathway.js index 5d587a4768f..066af789e64 100644 --- a/packages/dd-trace/src/datastreams/pathway.js +++ b/packages/dd-trace/src/datastreams/pathway.js @@ -4,6 +4,8 @@ const crypto = require('crypto') const { encodeVarint, decodeVarint } = require('./encoding') const LRUCache = require('lru-cache') +const log = require('../log') +const pick = require('../../../datadog-core/src/utils/src/pick') const options = { max: 500 } const cache = new LRUCache(options) @@ -11,6 +13,8 @@ const cache = new LRUCache(options) const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx' const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64' +const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64] + function shaHash (checkpointString) { const hash = crypto.createHash('md5').update(checkpointString).digest('hex').slice(0, 16) return Buffer.from(hash, 'hex') @@ -80,9 +84,13 @@ class DsmPathwayCodec { return } carrier[CONTEXT_PROPAGATION_KEY_BASE64] = encodePathwayContextBase64(dataStreamsContext) + + log.debug(() => `Injected into DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`) } static decode (carrier) { + log.debug(() => `Attempting extract from DSM carrier: ${JSON.stringify(pick(carrier, logKeys))}.`) + if (carrier == null) return let ctx @@ -97,13 +105,12 @@ class DsmPathwayCodec { // pass } // cover case where base64 context was received under wrong key - if (!ctx) ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY]) + if (!ctx && CONTEXT_PROPAGATION_KEY in carrier) { + ctx = decodePathwayContextBase64(carrier[CONTEXT_PROPAGATION_KEY]) + } } - return ctx - } - static contextExists (carrier) { - return CONTEXT_PROPAGATION_KEY_BASE64 in carrier || CONTEXT_PROPAGATION_KEY in carrier + return ctx } } diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index 8670c1571f5..7a46a32b3b9 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -11,6 +11,7 @@ const { types } = require('util') const { PATHWAY_HASH } = require('../../../../ext/tags') const { SchemaBuilder } = require('./schemas/schema_builder') const { SchemaSampler } = require('./schemas/schema_sampler') +const { log } = require('../log') const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex') @@ -272,6 +273,11 @@ class DataStreamsProcessor { closestOppositeDirectionHash = parentHash closestOppositeDirectionEdgeStart = edgeStartNs } + log.debug( + () => `Setting DSM Checkpoint from extracted parent context with hash: ${parentHash} and edge tags: ${edgeTags}` + ) + } else { + log.debug(() => 'Setting DSM Checkpoint with empty parent context.') } const hash = computePathwayHash(this.service, this.env, edgeTags, parentHash) const edgeLatencyNs = nowNs - edgeStartNs From 0f9dc738d92cb290c7475fcdde33c05a60679d2e Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 30 Sep 2024 13:00:19 -0400 Subject: [PATCH 2/4] fix log --- .../dd-trace/src/datastreams/processor.js | 22 +------------------ 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index 7a46a32b3b9..708eb55cc19 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -11,7 +11,7 @@ const { types } = require('util') const { PATHWAY_HASH } = require('../../../../ext/tags') const { SchemaBuilder } = require('./schemas/schema_builder') const { SchemaSampler } = require('./schemas/schema_sampler') -const { log } = require('../log') +const log = require('../log') const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex') @@ -47,26 +47,6 @@ class StatsPoint { } } -class Backlog { - constructor ({ offset, ...tags }) { - this._tags = Object.keys(tags).sort().map(key => `${key}:${tags[key]}`) - this._hash = this._tags.join(',') - this._offset = offset - } - - get hash () { return this._hash } - - get offset () { return this._offset } - - get tags () { return this._tags } - - encode () { - return { - Tags: this.tags, - Value: this.offset - } - } -} class StatsBucket { constructor () { From 00d135569ebec7debe04afdc1740cf2ade6afb1b Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 30 Sep 2024 14:28:48 -0400 Subject: [PATCH 3/4] fix lint --- packages/datadog-plugin-amqplib/src/consumer.js | 1 - packages/datadog-plugin-kafkajs/src/batch-consumer.js | 1 - packages/datadog-plugin-kafkajs/src/consumer.js | 1 - packages/datadog-plugin-rhea/src/consumer.js | 1 - 4 files changed, 4 deletions(-) diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index be0e8a48acc..92684e3f9dc 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -3,7 +3,6 @@ const { TEXT_MAP } = require('../../../ext/formats') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') const { getResourceName } = require('./util') class AmqplibConsumerPlugin extends ConsumerPlugin { diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 2185b245d9f..8415b037644 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -1,6 +1,5 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 0bc02e5cace..84b6a02fdda 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -2,7 +2,6 @@ const dc = require('dc-polyfill') const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 45e287dde22..56aad8f7b9d 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -3,7 +3,6 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { storage } = require('../../datadog-core') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') class RheaConsumerPlugin extends ConsumerPlugin { static get id () { return 'rhea' } From b65263180cdf38a5b4c1b9b4f789eccb17c1ffd9 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 30 Sep 2024 14:49:08 -0400 Subject: [PATCH 4/4] fix incorrect code removal --- .../dd-trace/src/datastreams/processor.js | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index 708eb55cc19..d036af805a7 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -47,6 +47,26 @@ class StatsPoint { } } +class Backlog { + constructor ({ offset, ...tags }) { + this._tags = Object.keys(tags).sort().map(key => `${key}:${tags[key]}`) + this._hash = this._tags.join(',') + this._offset = offset + } + + get hash () { return this._hash } + + get offset () { return this._offset } + + get tags () { return this._tags } + + encode () { + return { + Tags: this.tags, + Value: this.offset + } + } +} class StatsBucket { constructor () {