Skip to content

Commit

Permalink
feat(iot): add Action to republish MQTT messages to another MQTT topic (
Browse files Browse the repository at this point in the history
aws#18661)

resolve aws#17701

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
yamatatsu authored and TikiTDO committed Feb 21, 2022
1 parent 1189d88 commit 3c654fb
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 0 deletions.
17 changes: 17 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ supported AWS Services. Instances of these classes should be passed to

Currently supported are:

- Republish a message to another MQTT topic
- Invoke a Lambda function
- Put objects to a S3 bucket
- Put logs to CloudWatch Logs
Expand All @@ -30,6 +31,22 @@ Currently supported are:
- Put records to Kinesis Data Firehose stream
- Send messages to SQS queues

## Republish a message to another MQTT topic

The code snippet below creates an AWS IoT Rule that republish a message to
another MQTT topic when it is triggered.

```ts
new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id, timestamp() as timestamp, temperature FROM 'device/+/data'"),
actions: [
new actions.IotRepublishMqttAction('${topic()}/republish', {
qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE, // optional property, default is MqttQualityOfService.ZERO_OR_MORE_TIMES
}),
],
});
```

## Invoke a Lambda function

The code snippet below creates an AWS IoT Rule that invoke a Lambda function
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from './cloudwatch-put-metric-action';
export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './firehose-put-record-action';
export * from './iot-republish-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
export * from './s3-put-object-action';
Expand Down
72 changes: 72 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/iot-republish-action.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import { CommonActionProps } from './common-action-props';
import { singletonActionRole } from './private/role';

/**
* MQTT Quality of Service (QoS) indicates the level of assurance for delivery of an MQTT Message.
*
* @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos
*/
export enum MqttQualityOfService {
/**
* QoS level 0. Sent zero or more times.
* This level should be used for messages that are sent over reliable communication links or that can be missed without a problem.
*/
ZERO_OR_MORE_TIMES,

/**
* QoS level 1. Sent at least one time, and then repeatedly until a PUBACK response is received.
* The message is not considered complete until the sender receives a PUBACK response to indicate successful delivery.
*/
AT_LEAST_ONCE,
}

/**
* Configuration properties of an action to republish MQTT messages.
*/
export interface IotRepublishMqttActionProps extends CommonActionProps {
/**
* The Quality of Service (QoS) level to use when republishing messages.
*
* @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos
*
* @default MqttQualityOfService.ZERO_OR_MORE_TIMES
*/
readonly qualityOfService?: MqttQualityOfService;
}

/**
* The action to put the record from an MQTT message to republish another MQTT topic.
*/
export class IotRepublishMqttAction implements iot.IAction {
private readonly qualityOfService?: MqttQualityOfService;
private readonly role?: iam.IRole;

/**
* @param topic The MQTT topic to which to republish the message.
* @param props Optional properties to not use default.
*/
constructor(private readonly topic: string, props: IotRepublishMqttActionProps = {}) {
this.qualityOfService = props.qualityOfService;
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['iot:Publish'],
resources: ['*'],
}));

return {
configuration: {
republish: {
topic: this.topic,
qos: this.qualityOfService,
roleArn: role.roleArn,
},
},
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"Resources": {
"TopicRule40A4EA44": {
"Type": "AWS::IoT::TopicRule",
"Properties": {
"TopicRulePayload": {
"Actions": [
{
"Republish": {
"Qos": 1,
"RoleArn": {
"Fn::GetAtt": [
"TopicRuleTopicRuleActionRole246C4F77",
"Arn"
]
},
"Topic": "${topic()}/republish"
}
}
],
"AwsIotSqlVersion": "2016-03-23",
"Sql": "SELECT * FROM 'device/+/data'"
}
}
},
"TopicRuleTopicRuleActionRole246C4F77": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "iot:Publish",
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
"Roles": [
{
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
}
]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import * as iot from '@aws-cdk/aws-iot';
import * as cdk from '@aws-cdk/core';
import * as actions from '../../lib';

class TestStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);

const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'device/+/data'",
),
});

topicRule.addAction(
new actions.IotRepublishMqttAction('${topic()}/republish', {
qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE,
}),
);
}
}

const app = new cdk.App();
new TestStack(app, 'iot-republish-action-test-stack');
app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { Template, Match } from '@aws-cdk/assertions';
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as cdk from '@aws-cdk/core';
import * as actions from '../../lib';

let stack: cdk.Stack;
let topicRule:iot.TopicRule;
beforeEach(() => {
stack = new cdk.Stack();
topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
});

test('Default IoT republish action', () => {
// WHEN
topicRule.addAction(
new actions.IotRepublishMqttAction('test-topic'),
);

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
{
Republish: {
Topic: 'test-topic',
RoleArn: {
'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'],
},
},
},
],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
AssumeRolePolicyDocument: {
Statement: [
{
Action: 'sts:AssumeRole',
Effect: 'Allow',
Principal: {
Service: 'iot.amazonaws.com',
},
},
],
Version: '2012-10-17',
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: 'iot:Publish',
Effect: 'Allow',
Resource: '*',
},
],
Version: '2012-10-17',
},
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7',
Roles: [
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' },
],
});
});

test('can set qualityOfService', () => {
// WHEN
topicRule.addAction(
new actions.IotRepublishMqttAction('test-topic', { qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE }),
);

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
Match.objectLike({ Republish: { Qos: 1 } }),
],
},
});
});

test('can set role', () => {
// WHEN
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest');
topicRule.addAction(
new actions.IotRepublishMqttAction('test-topic', { role }),
);

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
Match.objectLike({ Republish: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }),
],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyName: 'MyRolePolicy64AB00A5',
Roles: ['ForTest'],
});
});

0 comments on commit 3c654fb

Please sign in to comment.