Skip to content

Commit

Permalink
feature: sqs queue instrumentation (#2013)
Browse files Browse the repository at this point in the history
* feat: instrument amazon SQS

Adds support for Amazon SQS queues via `aws-sdk` instrumentation that
partially implements the APM messaging spec

https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging

Also adds `queue.latency.min.ms`, `queue.latency.max.ms`, and
`queue.latency.avg.ms` metrics for SQS queues.
  • Loading branch information
astorm authored Apr 7, 2021
1 parent 8d4f10d commit 5b3af00
Show file tree
Hide file tree
Showing 17 changed files with 1,265 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,8 @@ body-parser:
versions: '>=1.19.0'
commands:
- node test/sanitize-field-names/express.js

aws-sdk:
versions: '>=2.858 <3'
commands:
- node test/instrumentation/modules/aws-sdk/sqs.js
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Notes:
[float]
===== Features
* Adds support for Amazon SQS queues via `aws-sdk` instrumentation that
partially implements the https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging.md[APM messaging spec],
and adds `queue.latency.min.ms`, `queue.latency.max.ms`, and `queue.latency.avg.ms`
metrics for SQS queues.
* The APM agent's own internal logging now uses structured JSON logging using
the https://getpino.io/#/docs/api?id=logger[pino API], and formatted in
{ecs-logging-ref}/intro.html[ecs-logging] format. The log records on stdout
Expand Down
2 changes: 2 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ include::./source-maps.asciidoc[]

include::./distributed-tracing.asciidoc[]

include::./message-queues.asciidoc[]

include::./performance-tuning.asciidoc[]

include::./troubleshooting.asciidoc[]
Expand Down
104 changes: 104 additions & 0 deletions docs/message-queues.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
[[message-queues]]
== Message queues

The Node.js Agent will automatically create spans for activity to/from your Amazon SQS message queues. To record these spans, your message queue activity must occur during a transaction. If you're performing queue operations during an HTTP request from a <<compatibility-frameworks,supported framework>>, the agent will start a transaction automatically. However, if you're performing queue operations in a stand-alone program (such as a message processor), you'll need to use the Node.js Agent's <<apm-start-transaction,`startTransaction()`>> method to manually create transactions for your messages.

You can see an example of this in the following code sample.

[source,js]
----
const apm = require('elastic-apm-node').start({/*...*/})
const AWS = require('aws-sdk');
// Set the region
AWS.config.update({region: 'us-west'});
// Create an SQS service object
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});
/* ... */
const transaction = apm.startTransaction("Process Messages", 'cli') <1>
sqs.receiveMessage(params, function(err, data) {
if(err) {
console.log("Receive Error", err);
} else {
console.log(`Length: ${data.Messages.length}`)
/* process messages */
}
// end the transaction
transaction.end() <2>
})
----
<1> Prior to calling the `sqs.receiveMessage` method, start a new transaction.
<2> Only end the transaction _after_ the queue's processing callback finishes executing. The will ensure a transaction is active while processing your queue messages.

[float]
[[message-queues-distributed-tracing]]
=== Distributed tracing and messaging queues

To enable queue scheduling and queue processing with distributed tracing, use the Node.js Agent's API to _store_ a `traceparent` header with your queue message; then, provide that `traceparent` header when starting a new transaction.

Here's a _new_ example that uses the Node.js Agent API to store the `traceparent` as a message attribute and then uses that attribute to link your new transaction with the original.

**Storing the Traceparent**

When sending the message, you'll want to add the trace as one of the `MessageAttributes`.
[source,js]
----
// stores the traceparent when sending the queue message
const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : ''
// Use the Amazon SQS `MessageAttributes` to pass
// on the traceparent header
const params = {
/* ... other params ... */
MessageAttributes: {
/* ... other attributes ... */
"MyTraceparent":{
DataType: "String",
StringValue: traceParent
}
}
}
sqs.sendMessage(params, function(err, data) {
/* ... */
});
----

This will save the traceparent value so we can use it later on when receiving the messages.

**Applying the Traceparent**

When we receive our queue messages, we'll check the message for our Traceparent header, and use it to start a new transaction. By starting a transaction with this traceparent header we'll be linking the sending and receiving via distributed tracing.

[source,js]
----
// uses the traceparent to start a transaction
sqs.receiveMessage(params, function(err, data) {
if(!data.Messages) {
return
}
// loop over your returned messages
for(const message of data.Messages) { <1>
// start a transaction to process each message, using our previously
// saved distributed tracing traceparent header
let traceparent
if(message.MessageAttributes.MyTraceparent) {
traceparent = message.MessageAttributes.MyTraceparent.StringValue
}
const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', {
childOf:traceparent <2>
})
/* ... process message ... */
transactionMessage.end() <3>
}
})
----
<1> Even though we only scheduled one queue message, Amazon's SQS API returns an array of _multiple_ messages. Therefore we'll need to loop over each one.
<2> We extract the traceparent header we'd previously save, and use it to start a transaction.
<3> Once we're done processing a single message, we end the transaction and move on to the next.

1 change: 1 addition & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The Node.js agent will automatically instrument the following modules to give yo
[options="header"]
|=======================================================================
|Module |Version |Note
|https://www.npmjs.com/package/aws-sdk[aws-sdk] |>1 <3 |Will instrument SQS send/receive/delete messages
|https://www.npmjs.com/package/cassandra-driver[cassandra-driver] |>=3.0.0 |Will instrument all queries
|https://www.npmjs.com/package/elasticsearch[elasticsearch] |>=8.0.0 |Will instrument all queries
|https://www.npmjs.com/package/@elastic/elasticsearch[@elastic/elasticsearch] |>=7.0.0 <8.0.0 |Will instrument all queries
Expand Down
1 change: 0 additions & 1 deletion lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ Agent.prototype.handleUncaughtExceptions = function (cb) {
// The `uncaughtException` listener inhibits this behavor, and it's
// therefore necessary to manually do this to not break expectations.
if (agent._conf.logUncaughtExceptions === true) console.error(err)

agent.logger.debug('Elastic APM caught unhandled exception: %s', err.message)

agent.captureError(err, { handled: false }, function () {
Expand Down
30 changes: 30 additions & 0 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var Transaction = require('./transaction')
var MODULES = [
'@elastic/elasticsearch',
'apollo-server-core',
'aws-sdk',
'bluebird',
'cassandra-driver',
'elasticsearch',
Expand Down Expand Up @@ -304,6 +305,35 @@ Instrumentation.prototype.setSpanOutcome = function (outcome) {

var wrapped = Symbol('elastic-apm-wrapped-function')

// Binds a callback function to the currently active span
//
// An instrumentation programmer can use this function to wrap a callback
// function of another function at the call-time of the original function.
// The pattern is
//
// 1. Instrumentation programmer uses shimmer.wrap to wrap a function that also
// has an asyncronous callback as an argument
//
// 2. In the code that executes before calling the original function, extract
// the callback argument and pass it to bindFunction, which will return a
// new function
//
// 3. Pass the function returned by bindFunction in place of the callback
// argument when calling the original function.
//
// bindFunction function will "save" the currently active span via closure,
// and when the callback is invoked, the span and transaction active when
// the program called original function will be set as active. This ensures
// the callback function gets instrument on "the right" transaction and span.
//
// The instrumentation programmer is still responsible for starting a span,
// and ending a span. Additionally, this function will set a span's sync
// property to `false` -- it's up to the instrumentation programmer to ensure
// that the callback they're binding is really async. If bindFunction is
// passed a callback that the wrapped function executes synchronously, it will
// still mark the span's `async` property as `false`.
//
// @param {function} original
Instrumentation.prototype.bindFunction = function (original) {
if (typeof original !== 'function' || original.name === 'elasticAPMCallbackWrapper') return original

Expand Down
40 changes: 40 additions & 0 deletions lib/instrumentation/modules/aws-sdk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'
const semver = require('semver')
const shimmer = require('../shimmer')
const { instrumentationSqs } = require('./aws-sdk/sqs')

// Called in place of AWS.Request.send and AWS.Request.promise
//
// Determines which amazon service an API request is for
// and then passes call on to an appropriate instrumentation
// function.
function instrumentOperation (orig, origArguments, request, AWS, agent, { version, enabled }) {
if (request.service.serviceIdentifier === 'sqs') {
return instrumentationSqs(orig, origArguments, request, AWS, agent, { version, enabled })
}

// if we're still here, then we still need to call the original method
return orig.apply(request, origArguments)
}

// main entry point for aws-sdk instrumentation
module.exports = function (AWS, agent, { version, enabled }) {
if (!enabled) return AWS
if (!semver.satisfies(version, '>1 <3')) {
agent.logger.debug('aws-sdk version %s not supported - aborting...', version)
return AWS
}

shimmer.wrap(AWS.Request.prototype, 'send', function (orig) {
return function _wrappedAWSRequestSend () {
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled })
}
})

shimmer.wrap(AWS.Request.prototype, 'promise', function (orig) {
return function _wrappedAWSRequestPromise () {
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled })
}
})
return AWS
}
Loading

0 comments on commit 5b3af00

Please sign in to comment.