Skip to content

Commit

Permalink
use promises and finally working retries
Browse files Browse the repository at this point in the history
  • Loading branch information
claudiadadamo committed Jul 28, 2020
1 parent fa29b66 commit 4b9a9f5
Showing 1 changed file with 61 additions and 40 deletions.
101 changes: 61 additions & 40 deletions azure/activity_logs_monitoring/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2020 Datadog, Inc.

var https = require('https');
const httpsLib = require('https');

const VERSION = '0.1.2';

Expand All @@ -26,63 +26,82 @@ const DD_SOURCE_CATEGORY = process.env.DD_SOURCE_CATEGORY || 'azure';

const ONE_SEC = 1000;

module.exports = function(context, eventHubMessages) {
module.exports = async function(context, eventHubMessages) {
if (!DD_API_KEY || DD_API_KEY === '<DATADOG_API_KEY>') {
context.log.error(
'You must configure your API key before starting this function (see ## Parameters section)'
);
return;
}
handleLogs(sender, eventHubMessages, context);
};

const options = {
hostname: DD_URL,
port: 443,
path: '/v1/input',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'DD-API-KEY': DD_API_KEY
},
timeout: ONE_SEC
};
var sender = tagger => record => {
record = tagger(record, context);

const request = https.request(options, res => {
if (res.statusCode < 200 || res.statusCode > 299) {
context.log.error(
'unable to send message, err code: ' + res.statusCode
);
}
});
function sender(tagger, record, context) {
record = tagger(record, context);
// retry once
asyncSend(tagger, record, context).catch(
asyncSend(tagger, record, context).catch(handleFailure(context))
);
}

request.on('error', e => {
context.log.error('unable to send request');
});
function handleFailure(context) {
context.log.error('Unable to send message');
}

// Write data to request body
request.write(JSON.stringify(record));
request.end();
};
handleLogs(sender, eventHubMessages, context);
context.done();
};
async function asyncSend(tagger, record, context, tries) {
return await send(record, context);
}

async function send(record, context) {
return new Promise((resolve, reject) => {
const options = {
hostname: DD_URL,
port: 443,
path: '/v1/input',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'DD-API-KEY': DD_API_KEY
},
timeout: ONE_SEC
};
const myReq = httpsLib
.request(options, myResponse => {
if (
myResponse.statusCode < 200 ||
myResponse.statusCode > 299
) {
reject(`invalid status code ${myResponse.statusCode}`);
} else {
resolve();
}
})
.on('error', error => {
reject(error);
});

myReq.write(JSON.stringify(record));
myReq.end();
});
}

function handleLogs(sender, logs, context) {
var logsType = getLogFormat(logs);
switch (logsType) {
case STRING:
sender(addTagsToStringLog)(logs);
sender(addTagsToStringLog, logs, context);
break;
case JSON_STRING:
logs = JSON.parse(logs);
sender(addTagsToJsonLog)(logs);
sender(addTagsToJsonLog, logs, context);
break;
case JSON_OBJECT:
sender(addTagsToJsonLog)(logs);
sender(addTagsToJsonLog, logs, context);
break;
case STRING_ARRAY:
logs.forEach(sender(addTagsToStringLog));
logs.forEach(log => {
sender(addTagsToStringLog, log, context);
});
break;
case JSON_ARRAY:
handleJSONArrayLogs(sender, context, logs, JSON_ARRAY);
Expand All @@ -104,14 +123,16 @@ function handleJSONArrayLogs(sender, context, logs, logsType) {
message = JSON.parse(message);
} catch (err) {
context.log.warn('log is malformed json, sending as string');
sender(addTagsToStringLog)(message);
sender(addTagsToStringLog, message, context);
return;
}
}
if (message.records != undefined) {
message.records.forEach(sender(addTagsToJsonLog));
message.records.forEach(log => {
sender(addTagsToJsonLog, log, context);
});
} else {
sender(addTagsToJsonLog)(message);
sender(addTagsToJsonLog, message, context);
}
});
}
Expand Down

0 comments on commit 4b9a9f5

Please sign in to comment.