-
Notifications
You must be signed in to change notification settings - Fork 225
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: sqs queue instrumentation (#2013)
* feat: instrument amazon SQS Adds support for Amazon SQS queues via `aws-sdk` instrumentation that partially implements the APM messaging spec https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging Also adds `queue.latency.min.ms`, `queue.latency.max.ms`, and `queue.latency.avg.ms` metrics for SQS queues.
- Loading branch information
Showing
17 changed files
with
1,265 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
[[message-queues]] | ||
== Message queues | ||
|
||
The Node.js Agent will automatically create spans for activity to/from your Amazon SQS message queues. To record these spans, your message queue activity must occur during a transaction. If you're performing queue operations during an HTTP request from a <<compatibility-frameworks,supported framework>>, the agent will start a transaction automatically. However, if you're performing queue operations in a stand-alone program (such as a message processor), you'll need to use the Node.js Agent's <<apm-start-transaction,`startTransaction()`>> method to manually create transactions for your messages. | ||
|
||
You can see an example of this in the following code sample. | ||
|
||
[source,js] | ||
---- | ||
const apm = require('elastic-apm-node').start({/*...*/}) | ||
const AWS = require('aws-sdk'); | ||
// Set the region | ||
AWS.config.update({region: 'us-west'}); | ||
// Create an SQS service object | ||
const sqs = new AWS.SQS({apiVersion: '2012-11-05'}); | ||
/* ... */ | ||
const transaction = apm.startTransaction("Process Messages", 'cli') <1> | ||
sqs.receiveMessage(params, function(err, data) { | ||
if(err) { | ||
console.log("Receive Error", err); | ||
} else { | ||
console.log(`Length: ${data.Messages.length}`) | ||
/* process messages */ | ||
} | ||
// end the transaction | ||
transaction.end() <2> | ||
}) | ||
---- | ||
<1> Prior to calling the `sqs.receiveMessage` method, start a new transaction. | ||
<2> Only end the transaction _after_ the queue's processing callback finishes executing. The will ensure a transaction is active while processing your queue messages. | ||
|
||
[float] | ||
[[message-queues-distributed-tracing]] | ||
=== Distributed tracing and messaging queues | ||
|
||
To enable queue scheduling and queue processing with distributed tracing, use the Node.js Agent's API to _store_ a `traceparent` header with your queue message; then, provide that `traceparent` header when starting a new transaction. | ||
|
||
Here's a _new_ example that uses the Node.js Agent API to store the `traceparent` as a message attribute and then uses that attribute to link your new transaction with the original. | ||
|
||
**Storing the Traceparent** | ||
|
||
When sending the message, you'll want to add the trace as one of the `MessageAttributes`. | ||
[source,js] | ||
---- | ||
// stores the traceparent when sending the queue message | ||
const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : '' | ||
// Use the Amazon SQS `MessageAttributes` to pass | ||
// on the traceparent header | ||
const params = { | ||
/* ... other params ... */ | ||
MessageAttributes: { | ||
/* ... other attributes ... */ | ||
"MyTraceparent":{ | ||
DataType: "String", | ||
StringValue: traceParent | ||
} | ||
} | ||
} | ||
sqs.sendMessage(params, function(err, data) { | ||
/* ... */ | ||
}); | ||
---- | ||
|
||
This will save the traceparent value so we can use it later on when receiving the messages. | ||
|
||
**Applying the Traceparent** | ||
|
||
When we receive our queue messages, we'll check the message for our Traceparent header, and use it to start a new transaction. By starting a transaction with this traceparent header we'll be linking the sending and receiving via distributed tracing. | ||
|
||
[source,js] | ||
---- | ||
// uses the traceparent to start a transaction | ||
sqs.receiveMessage(params, function(err, data) { | ||
if(!data.Messages) { | ||
return | ||
} | ||
// loop over your returned messages | ||
for(const message of data.Messages) { <1> | ||
// start a transaction to process each message, using our previously | ||
// saved distributed tracing traceparent header | ||
let traceparent | ||
if(message.MessageAttributes.MyTraceparent) { | ||
traceparent = message.MessageAttributes.MyTraceparent.StringValue | ||
} | ||
const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', { | ||
childOf:traceparent <2> | ||
}) | ||
/* ... process message ... */ | ||
transactionMessage.end() <3> | ||
} | ||
}) | ||
---- | ||
<1> Even though we only scheduled one queue message, Amazon's SQS API returns an array of _multiple_ messages. Therefore we'll need to loop over each one. | ||
<2> We extract the traceparent header we'd previously save, and use it to start a transaction. | ||
<3> Once we're done processing a single message, we end the transaction and move on to the next. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
'use strict' | ||
const semver = require('semver') | ||
const shimmer = require('../shimmer') | ||
const { instrumentationSqs } = require('./aws-sdk/sqs') | ||
|
||
// Called in place of AWS.Request.send and AWS.Request.promise | ||
// | ||
// Determines which amazon service an API request is for | ||
// and then passes call on to an appropriate instrumentation | ||
// function. | ||
function instrumentOperation (orig, origArguments, request, AWS, agent, { version, enabled }) { | ||
if (request.service.serviceIdentifier === 'sqs') { | ||
return instrumentationSqs(orig, origArguments, request, AWS, agent, { version, enabled }) | ||
} | ||
|
||
// if we're still here, then we still need to call the original method | ||
return orig.apply(request, origArguments) | ||
} | ||
|
||
// main entry point for aws-sdk instrumentation | ||
module.exports = function (AWS, agent, { version, enabled }) { | ||
if (!enabled) return AWS | ||
if (!semver.satisfies(version, '>1 <3')) { | ||
agent.logger.debug('aws-sdk version %s not supported - aborting...', version) | ||
return AWS | ||
} | ||
|
||
shimmer.wrap(AWS.Request.prototype, 'send', function (orig) { | ||
return function _wrappedAWSRequestSend () { | ||
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled }) | ||
} | ||
}) | ||
|
||
shimmer.wrap(AWS.Request.prototype, 'promise', function (orig) { | ||
return function _wrappedAWSRequestPromise () { | ||
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled }) | ||
} | ||
}) | ||
return AWS | ||
} |
Oops, something went wrong.