Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: sqs queue instrumentation #2013

Merged
merged 62 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
25f0a73
feat: instrument amazon SQS
Mar 15, 2021
e2bfc9b
feat: received message span measures work done to process it, start o…
Mar 15, 2021
77c2e92
feat: configuration
Mar 17, 2021
869e326
feat: tests
Mar 18, 2021
3bf9daa
feat: integration tests
Mar 19, 2021
6852fdb
test: integration test for five main method calls
Mar 19, 2021
fb4dcd7
fix: lint
Mar 19, 2021
20c85b0
fix: sqs is always polling
Mar 19, 2021
e5fa3a4
docs: first draft of documentation
Mar 19, 2021
48bcb77
feat: logging for no transaction, adding semver check
Mar 19, 2021
96251a6
fix: AWS not mysql2
Mar 19, 2021
8eeb926
test: add tav configuration
Mar 19, 2021
29b26d7
docs: post-tech review for docs
Mar 22, 2021
29987f9
Merge branch 'master' into astorm/aws-sdk-sqs
Mar 23, 2021
e963940
fix: merge debris
Mar 23, 2021
63ca069
feat: metrics
Mar 23, 2021
8dca1c7
chore: doc blocks
Mar 23, 2021
0b89dc2
fix: pre-commit fixes
Mar 23, 2021
79bc833
fix: more doc blocks
Mar 23, 2021
a05f7b0
fix: lint, aws sqs request failures
Mar 23, 2021
527df68
fix: node 8 syntax issue
Mar 23, 2021
7d95562
Merge branch 'master' into astorm/aws-sdk-sqs
Mar 24, 2021
90ac7ce
Merge branch 'master' into astorm/aws-sdk-sqs
astorm Mar 26, 2021
2a7ec7c
Merge branch 'master' into astorm/aws-sdk-sqs
Mar 29, 2021
621c1bb
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
0417cf0
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
5677e47
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
df09585
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
b2ab2d3
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
423b65f
Update lib/instrumentation/modules/aws-sdk/sqs.js
astorm Mar 29, 2021
6abec2a
docs: expanded changelog
Mar 29, 2021
504e5e9
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
e67e733
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
de9ee64
chore: grammar
Mar 29, 2021
c35b6ea
chore: subtype
Mar 29, 2021
9851c66
chore: enabled guard
Mar 29, 2021
d38762d
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
a88cc95
docs: remove confusing extra caveat
Mar 29, 2021
0bfefd7
docs: no such things as level 4 titles
Mar 29, 2021
2f8de38
docs: small rewrite of dt
Mar 29, 2021
cb56aa6
chore: flakey test
Mar 30, 2021
e14a6c9
feat: refactor to use bindFunction
Mar 30, 2021
49084cc
chore: test race condition
Mar 30, 2021
a3d3f2b
chore: any is a broad statement
Mar 30, 2021
7f2827f
Update lib/instrumentation/index.js
astorm Apr 1, 2021
058038e
Update lib/instrumentation/index.js
astorm Apr 1, 2021
337c3ea
Update lib/instrumentation/index.js
astorm Apr 1, 2021
1865b85
Update lib/instrumentation/index.js
astorm Apr 1, 2021
d7c4eee
Merge branch 'master' into astorm/aws-sdk-sqs
astorm Apr 1, 2021
02f6620
feat: shift span creation strat., adjust tests, ensure tests run in ci
Apr 1, 2021
7304c29
feat: docs, messaging context
Apr 1, 2021
3bc5a45
feat: adding support for .promise
Apr 1, 2021
9434579
feature: promise and tests for same
Apr 1, 2021
95a5d9b
feat: logger for tests, old age variable
Apr 1, 2021
0d111c8
fix: fixing test fixture to be message aware
Apr 1, 2021
c5603c1
chore: remove error logging
Apr 2, 2021
6c88f7f
chore: change to message instead of messaging
Apr 5, 2021
d99a68f
chore: update comment
Apr 5, 2021
2941bda
chore: unhardcode queue
Apr 5, 2021
e4eee59
chore: what's a Metics
Apr 6, 2021
4646ca0
dive bomb into alan's PR to see if I can temporarily fix tests
trentm Apr 6, 2021
b34cacf
Merge branch 'master' into astorm/aws-sdk-sqs
Apr 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,8 @@ body-parser:
versions: '>=1.19.0'
commands:
- node test/sanitize-field-names/express.js

aws-sdk:
versions: '>=2.858 <3'
astorm marked this conversation as resolved.
Show resolved Hide resolved
commands:
- node test/instrumentation/modules/aws-sdk/sqs.js
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Notes:
[float]
===== Features

* Adds support for Amazon SQS queues via `aws-sdk` instrumentation.
astorm marked this conversation as resolved.
Show resolved Hide resolved
* The APM agent's own internal logging now uses structured JSON logging using
the https://getpino.io/#/docs/api?id=logger[pino API], and formatted in
{ecs-logging-ref}/intro.html[ecs-logging] format. The log records on stdout
Expand Down
2 changes: 2 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ include::./source-maps.asciidoc[]

include::./distributed-tracing.asciidoc[]

include::./message-queues.asciidoc[]

include::./performance-tuning.asciidoc[]
astorm marked this conversation as resolved.
Show resolved Hide resolved

include::./troubleshooting.asciidoc[]
Expand Down
98 changes: 98 additions & 0 deletions docs/message-queues.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
[[message-quques]]
astorm marked this conversation as resolved.
Show resolved Hide resolved
== Message Queues
astorm marked this conversation as resolved.
Show resolved Hide resolved

The Node.js Agent will automatically create spans for any activity to/from your Amazon SQS message queues. In order to have those spans recorded, your message queue activity must occur during a transaction.
astorm marked this conversation as resolved.
Show resolved Hide resolved
astorm marked this conversation as resolved.
Show resolved Hide resolved

If you're performing queue operations during an HTTP request from a <<compatibility-frameworks,supported framework>> the agent will start a transaction automatically. However, if you're performing queue operations in a stand alone program (such as a message processor) you'll need to use the Node.js Agent's <<apm-start-transaction,`startTransaction()`>> method to create transactions for your messages.
astorm marked this conversation as resolved.
Show resolved Hide resolved

You can see an example of this in the following code sample.

[source,js]
----
const apm = require('elastic-apm-node').start({/*...*/})
const AWS = require('aws-sdk');
// Set the region
AWS.config.update({region: 'us-west'});

// Create an SQS service object
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});

/* ... */

const transaction = apm.startTransaction("Process Messages", 'cli')
sqs.receiveMessage(params, function(err, data) {
if(err) {
console.log("Receive Error", err);
} else {
console.log(`Length: ${data.Messages.length}`)
/* process messages */
}
// end the transaction
transaction.end()
})
----

In the above code you can see we've started a transaction before calling the `sqs.receiveMessage` method, and ended it _after_ the queue's processing callback finishes executing. The will ensure a transaction is active while processing your queue messages.
astorm marked this conversation as resolved.
Show resolved Hide resolved

[float]
[[message-queues-distributed-tracing]]
=== Distributed Tracing and Messaging Queues
astorm marked this conversation as resolved.
Show resolved Hide resolved

Similarly, if you want to connect your queue scheduling and queue processing via distributed tracing, you'll need to use the Node.js Agent's API to _store_ a `traceparent` header with your queue message and then provide that `traceparent` header when starting the new transaction.
astorm marked this conversation as resolved.
Show resolved Hide resolved

Here's an example that uses the Node.js Agent API to store the `traceparent` as a message attribute and then uses that attribute to link your new transaction with the original.

First, when sending the message, we add the trace as one of the `MessageAttributes`
astorm marked this conversation as resolved.
Show resolved Hide resolved
[source,js]
----
// stores the traceparent when sending the queue message
const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : ''

// Use the Amazon SQS `MessageAttributes` to pass
// on the traceparent header
const params = {
/* ... other params ... */
MessageAttributes: {
/* ... other attributes ... */
"MyTraceparent":{
DataType: "String",
StringValue: traceParent
}
}

}
sqs.sendMessage(params, function(err, data) {
/* ... */
});
----

Then, when receiving the messages, we start and end our transaction while looping over the messages.
astorm marked this conversation as resolved.
Show resolved Hide resolved

[source,js]
----
// uses the traceparent to start a transaction

sqs.receiveMessage(params, function(err, data) {
if(!data.Messages) {
return
}

// loop over your returned messages
for(const message of data.Messages) {
// start a transaction to process each message, using our previously
// saved distributed tracing traceparent header
let traceparent
if(message.MessageAttributes.MyTraceparent) {
traceparent = message.MessageAttributes.MyTraceparent.StringValue
}
const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', {
childOf:traceparent
})
/* ... process message ... */
transactionMessage.end()
}
})

----

Unlike the first code sample, this approach will create a single transaction for each processed queue message. This allows you to link each individual message with the transaction it started in. The <<apm-start-transaction,`startTransaction()`>> API allows _you_ to control how your queue data is reported to Elastic APM.
astorm marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var Transaction = require('./transaction')
var MODULES = [
'@elastic/elasticsearch',
'apollo-server-core',
'aws-sdk',
'bluebird',
'cassandra-driver',
'elasticsearch',
Expand Down
33 changes: 33 additions & 0 deletions lib/instrumentation/modules/aws-sdk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'
const semver = require('semver')
var shimmer = require('../shimmer')
const { instrumentationSqs } = require('./aws-sdk/sqs')

// Called in place of AWS.Request.send
astorm marked this conversation as resolved.
Show resolved Hide resolved
//
// Determines which amazon service an API request is for
// and then passes call on to an appropriate instrumentation
// function.
function instrumentOperation (orig, origArguments, request, AWS, agent, { version, enabled }) {
if (request.service.serviceIdentifier === 'sqs') {
return instrumentationSqs(orig, origArguments, request, AWS, agent, { version, enabled })
}

// if we're still here, then we still need to call the original method
return orig.apply(request, origArguments)
}

// main entry point for aws-sdk instrumentation
module.exports = function (AWS, agent, { version, enabled }) {
astorm marked this conversation as resolved.
Show resolved Hide resolved
if (!semver.satisfies(version, '>1 <3')) {
agent.logger.debug('aws-sdk version %s not supported - aborting...', version)
return AWS
}

shimmer.wrap(AWS.Request.prototype, 'send', function (orig) {
return function _wrappedAWSRequestSend () {
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled })
astorm marked this conversation as resolved.
Show resolved Hide resolved
}
})
return AWS
}
202 changes: 202 additions & 0 deletions lib/instrumentation/modules/aws-sdk/sqs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
'use strict'

const OPERATIONS_TO_ACTIONS = {
deleteMessage: 'delete',
deleteMessageBatch: 'delete_batch',
receiveMessage: 'poll',
sendMessageBatch: 'send_batch',
sendMessage: 'send',
unknown: 'unknown'
}
const OPERATIONS = Object.keys(OPERATIONS_TO_ACTIONS)
const TYPE = 'messaging'
const SUB_TYPE = 'sqs'
astorm marked this conversation as resolved.
Show resolved Hide resolved

const queueMetics = new Map()

// Returns Message Queue action from AWS SDK method name
function getActionFromRequest (request) {
request = request || {}
const operation = request.operation ? request.operation : 'unknown'
const action = OPERATIONS_TO_ACTIONS[operation]

return action
}

// Returns preposition to use in span name
//
// POLL from ...
// SEND to ...
function getToFromFromOperation (operation) {
let result = 'from'
if (operation === 'sendMessage' || operation === 'sendMessageBatch') {
result = 'to'
}
return result
}

// Parses queue/topic name from AWS queue URL
function getQueueNameFromRequest (request) {
const unknown = 'unknown'
if (!request || !request.params || !request.params.QueueUrl) {
return unknown
}
try {
const url = new URL(request.params.QueueUrl)
astorm marked this conversation as resolved.
Show resolved Hide resolved
return url.pathname.split('/').pop()
} catch (e) {
return unknown
}
}

// Parses region name from AWS service configuration
function getRegionFromRequest (request) {
const region = request && request.service &&
request.service.config && request.service.config.region
return region || ''
}

// Creates message destination context suitable for setDestinationContext
function getMessagingDestinationContextFromRequest (request) {
const destination = {
service: {
name: SUB_TYPE,
resource: `${SUB_TYPE}/${getQueueNameFromRequest(request)}`,
type: TYPE
},
cloud: {
region: getRegionFromRequest(request)
}
}
return destination
}

// Measures the time spent in the function call that schedules the operation.
function instrumentOperation (orig, origArguments, request, AWS, agent, { version, enabled }) {
if (!agent.currentTransaction) {
agent.logger.trace('no active transaction found, skipping sqs instrumentation')
const originalResult = orig.apply(request, origArguments)
return originalResult
}
const type = TYPE
const subtype = SUB_TYPE
const action = getActionFromRequest(request)
const name = getSpanNameFromRequest(request)
const span = agent.startSpan(name, type, subtype, action)
span.setDestinationContext(getMessagingDestinationContextFromRequest(request))
// call original function
const originalResult = orig.apply(request, origArguments)
span.end()
return originalResult
}

// Record queue related metrics
//
// Creates metric collector objects on first run, and
// updates their data with data from received messages
function recordMetrics (queueName, data, agent) {
const messages = data && data.Messages
if (!messages || messages.length < 1) {
return
}

if (!queueMetics.get(queueName)) {
const collector = agent._metrics.createQueueMetricsCollector(queueName)
queueMetics.set(queueName, collector)
}
const metrics = queueMetics.get(queueName)

for (const message of messages) {
const sentTimestamp = message.Attributes && message.Attributes.SentTimestamp
const delay = (new Date()).getTime() - sentTimestamp
metrics.updateStats(delay)
}
}

// Measures the time spent in the operation's callback
//
// Used to measure calls to receiveMessage
function instrumentReceiveCallback (orig, origArguments, request, AWS, agent, { version, enabled }) {
origArguments[0] = wrapCallback(origArguments[0])
// call original function
const originalResult = orig.apply(request, origArguments)
return originalResult

function wrapCallback (cb) {
return function (err, data) {
// if no transaction, just call the callback and return
if (!agent.currentTransaction || err) {
agent.logger.trace('no active transaction found, skipping sqs instrumentation')
const result = cb && cb.apply(this, arguments)
return result
}
recordMetrics(getQueueNameFromRequest(request), data, agent)
const type = TYPE
const subtype = SUB_TYPE
const action = getActionFromRequest(request)
const name = getSpanNameFromRequest(request)
const span = agent.startSpan(name, type, subtype, action)
span.setDestinationContext(getMessagingDestinationContextFromRequest(request))
// call the callback
const result = cb && cb.apply(this, arguments)
span.end()
return result
}
}
}

// Creates the span name from request information
function getSpanNameFromRequest (request) {
const action = getActionFromRequest(request)
const toFrom = getToFromFromOperation(request.operation)
const queueName = getQueueNameFromRequest(request)
astorm marked this conversation as resolved.
Show resolved Hide resolved

const name = `${SUB_TYPE.toUpperCase()} ${action.toUpperCase()} ${toFrom} ${queueName}`
return name
}

function shouldIgnoreRequest (request, agent) {
const operation = request && request.operation
// are we interested in this operation/method call?
if (OPERATIONS.indexOf(operation) === -1) {
astorm marked this conversation as resolved.
Show resolved Hide resolved
return true
}

// is the named queue on our ignore list?
if (agent._conf && agent._conf.ignoreMessageQueuesRegExp) {
const queueName = getQueueNameFromRequest(request)
for (const rule of agent._conf.ignoreMessageQueuesRegExp) {
if (rule.test(queueName)) {
return true
}
}
}
return false
}

// Main entrypoint for SQS instrumentation
//
// Must call (or one of its function calls must call) the
// `orig` function/method
function instrumentationSqs (orig, origArguments, request, AWS, agent, { version, enabled }) {
if (shouldIgnoreRequest(request, agent)) {
return orig.apply(request, origArguments)
}
const action = getActionFromRequest(request)

if (action === 'poll') {
return instrumentReceiveCallback(orig, origArguments, request, AWS, agent, { version, enabled })
} else {
return instrumentOperation(orig, origArguments, request, AWS, agent, { version, enabled })
}
}

module.exports = {
instrumentationSqs,
getToFromFromOperation,
getActionFromRequest,
getQueueNameFromRequest,
getRegionFromRequest,
getMessagingDestinationContextFromRequest,
shouldIgnoreRequest
astorm marked this conversation as resolved.
Show resolved Hide resolved
}
9 changes: 9 additions & 0 deletions lib/metrics/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const MetricsRegistry = require('./registry')
const { createQueueMetrics } = require('./queue')

const registrySymbol = Symbol('metrics-registry')
const agentSymbol = Symbol('metrics-agent')
Expand Down Expand Up @@ -54,6 +55,14 @@ class Metrics {
getOrCreateGauge (...args) {
return this[registrySymbol].getOrCreateGauge(...args)
}

// factory function for creating a queue metrics collector
//
// called from instrumentation, only when the agent receives a queue message
createQueueMetricsCollector (queueOrTopicName) {
const collector = createQueueMetrics(queueOrTopicName, this[registrySymbol])
return collector
}
}

module.exports = Metrics
Loading