Skip to content

Commit

Permalink
SQS SendMessageBatch Function (#84)
Browse files Browse the repository at this point in the history
Created new function sendMessageBatch() to allow sending up to
10 messages at a time to SQS

Created private function prepareMessage() that is used by sendMessage()
and new sendMessageBatch() function

Updated all calls to use POST vs GET to match AWS documentation examples
Added comments regarding MessageDeduplicationId and MessageGroupId
as defined in AWS Docs
  • Loading branch information
GiancarloGomez authored Nov 3, 2023
1 parent bf0a6ed commit b910c54
Showing 1 changed file with 100 additions and 22 deletions.
122 changes: 100 additions & 22 deletions services/sqs.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ component {
var requestSettings = api.resolveRequestSettings( argumentCollection = arguments );
var apiResponse = apiCall(
requestSettings,
'GET',
'POST',
'/',
{ 'Action': 'ListQueues' }
);
Expand All @@ -40,8 +40,12 @@ component {
* Example: [{'Name':'AttName','Value':'AttVal','DataType':'String'},{'Name':'AttName3','Value':34,'DataType':'Number'}]
* @messageDeduplicationId Optional string. This parameter applies only to FIFO (first-in-first-out) queues.
* The token used for deduplication of sent messages.
* The maximum length of MessageDeduplicationId is 128 characters. MessageDeduplicationId can contain alphanumeric
* characters (a-z, A-Z, 0-9) and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).
* @messageGroupId Optional string. This parameter applies only to FIFO (first-in-first-out) queues.
* Note: MessageGroupId is required for FIFO queues. You can't use it for Standard queues.
* The maximum length of MessageGroupId is 128 characters. messageGroupId can contain alphanumeric
* characters (a-z, A-Z, 0-9) and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).
*/
public any function sendMessage(
required string queueName,
Expand All @@ -52,29 +56,48 @@ component {
string messageGroupId
) {
var requestSettings = api.resolveRequestSettings( argumentCollection = arguments );
var payload = { 'Action': 'SendMessage', 'MessageBody': message };
if ( structKeyExists( arguments, 'delaySeconds' ) && isNumeric( delaySeconds ) ) {
structAppend( payload, { 'DelaySeconds': delaySeconds } );
var payload = { 'Action': 'SendMessage' };
// prepare message and append to payload
structAppend( payload, prepareMessage( argumentCollection = arguments ) );
var apiResponse = apiCall(
requestSettings,
'POST',
'/' & queueName,
payload
);
if ( apiResponse.statusCode == 200 ) {
apiResponse[ 'data' ] = utils.parseXmlDocument( apiResponse.rawData );
}
for ( var idx = 1; idx <= arrayLen( messageAttributes ); idx++ ) {
return apiResponse;
}

/**
* Sends a batch of messages ( up to 10 messages )
* https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html
* @queueName Required string. The name of the queue to send to (e.g. "123456789/my-sqs-queue").
* @messages Required array. An array containing messages to post. Maximum 10.
* Each array item must contain a structure that resembles the arguments required by prepareMessage()
*/
public any function sendMessageBatch(
required string queueName,
required array messages
) {
var requestSettings = api.resolveRequestSettings( argumentCollection = arguments );
var payload = { 'Action': 'SendMessageBatch' };
if ( arrayLen(arguments.messages) > 10 ){
throw( type = 'aws.sqs', message = 'Maximum allowed of messages for a batch is 10.' );
}
for ( var idx = 1; idx <= arrayLen(arguments.messages); idx++ ) {
// add the id per message to declare as a batch job
arguments.messages[idx].batchID = idx;
structAppend(
payload,
{
'MessageAttribute.#idx#.Name': messageAttributes[ idx ].Name,
'MessageAttribute.#idx#.Value.StringValue': messageAttributes[ idx ].Value,
'MessageAttribute.#idx#.Value.DataType': messageAttributes[ idx ].DataType
}
prepareMessage( argumentCollection = arguments.messages[idx] )
);
}
if ( structKeyExists( arguments, 'messageDeduplicationId' ) && len( messageDeduplicationId ) ) {
structAppend( payload, { 'MessageDeduplicationId': messageDeduplicationId } );
}
if ( structKeyExists( arguments, 'messageGroupId' ) && len( messageGroupId ) ) {
structAppend( payload, { 'MessageGroupId': messageGroupId } );
}
var apiResponse = apiCall(
requestSettings,
'GET',
'POST',
'/' & queueName,
payload
);
Expand Down Expand Up @@ -124,7 +147,7 @@ component {
}
var apiResponse = apiCall(
requestSettings,
'GET',
'POST',
'/' & queueName,
payload
);
Expand All @@ -147,7 +170,7 @@ component {
var requestSettings = api.resolveRequestSettings( argumentCollection = arguments );
var apiResponse = apiCall(
requestSettings,
'GET',
'POST',
'/' & queueName,
{ 'Action': 'DeleteMessage', 'ReceiptHandle': receiptHandle }
);
Expand Down Expand Up @@ -210,7 +233,7 @@ component {
}
);
}
var apiResponse = apiCall( requestSettings, 'GET', '/', payload );
var apiResponse = apiCall( requestSettings, 'POST', '/', payload );
if ( apiResponse.statusCode == 200 ) {
apiResponse[ 'data' ] = utils.parseXmlDocument( apiResponse.rawData );
}
Expand All @@ -228,7 +251,7 @@ component {
var requestSettings = api.resolveRequestSettings( argumentCollection = arguments );
var apiResponse = apiCall(
requestSettings,
'GET',
'POST',
'/' & queueName,
{ 'Action': 'DeleteQueue', 'QueueName': queueName }
);
Expand All @@ -249,7 +272,7 @@ component {
var requestSettings = api.resolveRequestSettings( argumentCollection = arguments );
var apiResponse = apiCall(
requestSettings,
'GET',
'POST',
'/' & queueName,
{ 'Action': 'PurgeQueue' }
);
Expand All @@ -261,6 +284,61 @@ component {

// private

/**
* @message Required string. The message to post, text format.
* @delaySeconds Optional numeric. The length of time, in seconds, for which to delay a specific message.
* Valid values: 0 to 900. Maximum: 15 minutes. Messages with a positive DelaySeconds value become available for
* processing after the delay period is finished. If you don't specify a value, the default value for the queue applies.
* Note: When you set FifoQueue, you can't set DelaySeconds per message. You can set this parameter only on a queue level.
* @messageAttributes Optional array. An array of message attributes to be added to the message. Each array item must contain a structure
* containing 3 keys: Name, Value, DataType. String and Number are supported DataType values.
* Example: [{'Name':'AttName','Value':'AttVal','DataType':'String'},{'Name':'AttName3','Value':34,'DataType':'Number'}]
* @messageDeduplicationId Optional string. This parameter applies only to FIFO (first-in-first-out) queues.
* The token used for deduplication of sent messages.
* The maximum length of MessageDeduplicationId is 128 characters. MessageDeduplicationId can contain alphanumeric
* characters (a-z, A-Z, 0-9) and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).
* @messageGroupId Optional string. This parameter applies only to FIFO (first-in-first-out) queues.
* Note: MessageGroupId is required for FIFO queues. You can't use it for Standard queues.
* The maximum length of MessageGroupId is 128 characters. messageGroupId can contain alphanumeric
* characters (a-z, A-Z, 0-9) and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).
* @batchID Optional numeric. This is used for batch jobs, if sent, it will prepend SendMessageBatchRequestEntry.{{id}}. to each key create the required Id key
*/
private struct function prepareMessage(
required string message,
numeric delaySeconds,
array messageAttributes = [ ],
string messageDeduplicationId,
string messageGroupId,
numeric batchID
){
var isBatch = structKeyExists( arguments, 'batchID' ) && isNumeric( batchID );
var batchKey = isBatch ? 'SendMessageBatchRequestEntry.#arguments.batchID#.' : '';
var payload = { '#batchKey#MessageBody': message };
if ( isBatch ) {
payload['#batchKey#Id'] = arguments.batchID;
}
if ( structKeyExists( arguments, 'delaySeconds' ) && isNumeric( delaySeconds ) ) {
structAppend( payload, { '#batchKey#DelaySeconds': delaySeconds } );
}
for ( var idx = 1; idx <= arrayLen( messageAttributes ); idx++ ) {
structAppend(
payload,
{
'#batchKey#MessageAttribute.#idx#.Name': messageAttributes[ idx ].Name,
'#batchKey#MessageAttribute.#idx#.Value.StringValue': messageAttributes[ idx ].Value,
'#batchKey#MessageAttribute.#idx#.Value.DataType': messageAttributes[ idx ].DataType
}
);
}
if ( structKeyExists( arguments, 'messageDeduplicationId' ) && len( messageDeduplicationId ) ) {
structAppend( payload, { '#batchKey#MessageDeduplicationId': messageDeduplicationId } );
}
if ( structKeyExists( arguments, 'messageGroupId' ) && len( messageGroupId ) ) {
structAppend( payload, { '#batchKey#MessageGroupId': messageGroupId } );
}
return payload;
}

private string function getHost(
required string region
) {
Expand Down

0 comments on commit b910c54

Please sign in to comment.