Skip to content

Commit

Permalink
feat(iot-actions): support for sending messages to iot-events (aws#19953
Browse files Browse the repository at this point in the history
)

This PR includes to support the action for sending messages to IoT Events.

This feature is described [this documentation](https://docs.aws.amazon.com/iot/latest/developerguide/iotevents-rule-action.html).

I actually confirmed that the behavior of the action deployed by integ-test is all right.

----

### All Submissions:

* [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/master/CONTRIBUTING.md)

### Adding new Unconventional Dependencies:

* [ ] This PR adds new unconventional dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/master/CONTRIBUTING.md/#adding-new-unconventional-dependencies)

### New Features

* [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/master/INTEGRATION_TESTS.md)?
	* [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)?

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
yamatatsu authored and jmortlock committed Aug 8, 2022
1 parent d421984 commit 63d8c13
Show file tree
Hide file tree
Showing 29 changed files with 1,028 additions and 28 deletions.
37 changes: 33 additions & 4 deletions packages/@aws-cdk/aws-iot-actions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Currently supported are:
- Send messages to SQS queues
- Publish messages on SNS topics
- Write messages into columns of DynamoDB
- Put messages IoT Events input

## Republish a message to another MQTT topic

Expand Down Expand Up @@ -73,7 +74,7 @@ new iot.TopicRule(this, 'TopicRule', {

## Put objects to a S3 bucket

The code snippet below creates an AWS IoT Rule that put objects to a S3 bucket
The code snippet below creates an AWS IoT Rule that puts objects to a S3 bucket
when it is triggered.

```ts
Expand Down Expand Up @@ -126,7 +127,7 @@ new iot.TopicRule(this, 'TopicRule', {

## Put logs to CloudWatch Logs

The code snippet below creates an AWS IoT Rule that put logs to CloudWatch Logs
The code snippet below creates an AWS IoT Rule that puts logs to CloudWatch Logs
when it is triggered.

```ts
Expand Down Expand Up @@ -194,7 +195,7 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {

## Put records to Kinesis Data stream

The code snippet below creates an AWS IoT Rule that put records to Kinesis Data
The code snippet below creates an AWS IoT Rule that puts records to Kinesis Data
stream when it is triggered.

```ts
Expand All @@ -214,7 +215,7 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {

## Put records to Kinesis Data Firehose stream

The code snippet below creates an AWS IoT Rule that put records to Put records
The code snippet below creates an AWS IoT Rule that puts records to Put records
to Kinesis Data Firehose stream when it is triggered.

```ts
Expand Down Expand Up @@ -299,3 +300,31 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
],
});
```

## Put messages IoT Events input

The code snippet below creates an AWS IoT Rule that puts messages
to an IoT Events input when it is triggered:

```ts
import * as iotevents from '@aws-cdk/aws-iotevents';
import * as iam from '@aws-cdk/aws-iam';

declare const role: iam.IRole;

const input = new iotevents.Input(this, 'MyInput', {
attributeJsonPaths: ['payload.temperature', 'payload.transactionId'],
});
const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'device/+/data'",
),
actions: [
new actions.IotEventsPutMessageAction(input, {
batchMode: true, // optional property, default is 'false'
messageId: '${payload.transactionId}', // optional property, default is a new UUID
role: role, // optional property, default is a new UUID
}),
],
});
```
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export class CloudWatchLogsAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.logGroup.grantWrite(role);
this.logGroup.grant(role, 'logs:DescribeLogStreams');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ export class CloudWatchPutMetricAction implements iot.IAction {
constructor(private readonly props: CloudWatchPutMetricActionProps) {
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.props.role ?? singletonActionRole(rule);
cloudwatch.Metric.grantPutMetricData(role);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ export class CloudWatchSetAlarmStateAction implements iot.IAction {
) {
}

bind(topicRule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(topicRule: iot.ITopicRule): iot.ActionConfig {
const role = this.props.role ?? singletonActionRole(topicRule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['cloudwatch:SetAlarmState'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ export class DynamoDBv2PutItemAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['dynamodb:PutItem'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ export class FirehosePutRecordAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.stream.grantPutRecords(role);

Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './dynamodbv2-put-item-action';
export * from './firehose-put-record-action';
export * from './iotevents-put-message-action';
export * from './iot-republish-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ export class IotRepublishMqttAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['iot:Publish'],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as iotevents from '@aws-cdk/aws-iotevents';
import { CommonActionProps } from './common-action-props';
import { singletonActionRole } from './private/role';

/**
* Configuration properties of an action for the IoT Events.
*/
export interface IotEventsPutMessageActionProps extends CommonActionProps {
/**
* Whether to process the event actions as a batch.
*
* When batchMode is true, you can't specify a messageId.
*
* When batchMode is true and the rule SQL statement evaluates to an Array,
* each Array element is treated as a separate message when Events by calling BatchPutMessage.
* The resulting array can't have more than 10 messages.
*
* @default false
*/
readonly batchMode?: boolean;

/**
* The ID of the message.
*
* When batchMode is true, you can't specify a messageId--a new UUID value will be assigned.
* Assign a value to this property to ensure that only one input (message) with a given messageId will be processed by an AWS IoT Events detector.
*
* @default - none -- a new UUID value will be assigned
*/
readonly messageId?: string;
}

/**
* The action to put the message from an MQTT message to the IoT Events input.
*/
export class IotEventsPutMessageAction implements iot.IAction {
private readonly batchMode?: boolean;
private readonly messageId?: string;
private readonly role?: iam.IRole;

/**
* @param input The IoT Events input to put messages.
* @param props Optional properties to not use default
*/
constructor(private readonly input: iotevents.IInput, props: IotEventsPutMessageActionProps = {}) {
this.batchMode = props.batchMode;
this.messageId = props.messageId;
this.role = props.role;

if (this.batchMode && this.messageId) {
throw new Error('messageId is not allowed when batchMode is true');
}
}

/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.input.grantWrite(role);

return {
configuration: {
iotEvents: {
batchMode: this.batchMode,
inputName: this.input.inputName,
messageId: this.messageId,
roleArn: role.roleArn,
},
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ export class KinesisPutRecordAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['kinesis:PutRecord'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ export class LambdaFunctionAction implements iot.IAction {
*/
constructor(private readonly func: lambda.IFunction) {}

bind(topicRule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(topicRule: iot.ITopicRule): iot.ActionConfig {
this.func.addPermission(`${Names.nodeUniqueId(topicRule.node)}:IotLambdaFunctionAction`, {
action: 'lambda:InvokeFunction',
principal: new iam.ServicePrincipal('iot.amazonaws.com'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ export class S3PutObjectAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['s3:PutObject'],
Expand Down
7 changes: 5 additions & 2 deletions packages/@aws-cdk/aws-iot-actions/lib/sns-topic-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ export class SnsTopicAction implements iot.IAction {
this.messageFormat = props.messageFormat;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.topic.grantPublish(role);

Expand All @@ -72,4 +75,4 @@ export class SnsTopicAction implements iot.IAction {
},
};
}
}
}
5 changes: 4 additions & 1 deletion packages/@aws-cdk/aws-iot-actions/lib/sqs-queue-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ export class SqsQueueAction implements iot.IAction {
this.useBase64 = props.useBase64;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-iotevents": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
Expand All @@ -109,6 +110,7 @@
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-iotevents": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as iot from '@aws-cdk/aws-iot';
import * as iotevents from '@aws-cdk/aws-iotevents';
import * as logs from '@aws-cdk/aws-logs';
import * as cdk from '@aws-cdk/core';
import { IntegTest } from '@aws-cdk/integ-tests';
import * as actions from '../../lib';

class TestStack extends cdk.Stack {
public readonly detectorModelName: string;

constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);

const logGroup = new logs.LogGroup(this, 'logs', { removalPolicy: cdk.RemovalPolicy.DESTROY });
const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'device/+/data'",
),
errorAction: new actions.CloudWatchLogsAction(logGroup),
});

const input = new iotevents.Input(this, 'MyInput', {
attributeJsonPaths: ['payload.deviceId'],
});

const detectorModel = new iotevents.DetectorModel(this, 'MyDetectorModel', {
detectorKey: 'payload.deviceId',
initialState: new iotevents.State({
stateName: 'initialState',
onEnter: [{
eventName: 'enter',
condition: iotevents.Expression.currentInput(input),
}],
}),
});

topicRule.addAction(
new actions.IotEventsPutMessageAction(input, {
batchMode: true,
}),
);

this.detectorModelName = detectorModel.detectorModelName;
}
}

// App
const app = new cdk.App();
const stack = new TestStack(app, 'iotevents-put-message-action-test-stack');
new IntegTest(app, 'iotevents', { testCases: [stack] });

app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"20.0.0"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"version": "20.0.0",
"testCases": {
"iotevents/DefaultTest": {
"stacks": [
"iotevents-put-message-action-test-stack"
],
"assertionStack": "ioteventsDefaultTestDeployAssertE216288D"
}
}
}
Loading

0 comments on commit 63d8c13

Please sign in to comment.