Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iot-actions): support for sending messages to iot-events #19953

Merged
merged 9 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions packages/@aws-cdk/aws-iot-actions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Currently supported are:
- Send messages to SQS queues
- Publish messages on SNS topics
- Write messages into columns of DynamoDB
- Put messages IoT Events input

## Republish a message to another MQTT topic

Expand Down Expand Up @@ -73,7 +74,7 @@ new iot.TopicRule(this, 'TopicRule', {

## Put objects to a S3 bucket

The code snippet below creates an AWS IoT Rule that put objects to a S3 bucket
The code snippet below creates an AWS IoT Rule that puts objects to a S3 bucket
when it is triggered.

```ts
Expand Down Expand Up @@ -126,7 +127,7 @@ new iot.TopicRule(this, 'TopicRule', {

## Put logs to CloudWatch Logs

The code snippet below creates an AWS IoT Rule that put logs to CloudWatch Logs
The code snippet below creates an AWS IoT Rule that puts logs to CloudWatch Logs
when it is triggered.

```ts
Expand Down Expand Up @@ -194,7 +195,7 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {

## Put records to Kinesis Data stream

The code snippet below creates an AWS IoT Rule that put records to Kinesis Data
The code snippet below creates an AWS IoT Rule that puts records to Kinesis Data
stream when it is triggered.

```ts
Expand All @@ -214,7 +215,7 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {

## Put records to Kinesis Data Firehose stream

The code snippet below creates an AWS IoT Rule that put records to Put records
The code snippet below creates an AWS IoT Rule that puts records to Put records
to Kinesis Data Firehose stream when it is triggered.

```ts
Expand Down Expand Up @@ -299,3 +300,31 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
],
});
```

## Put messages IoT Events input

The code snippet below creates an AWS IoT Rule that puts messages
to an IoT Events input when it is triggered:

```ts
import * as iotevents from '@aws-cdk/aws-iotevents';
import * as iam from '@aws-cdk/aws-iam';

declare role: iam.IRole;

const input = new iotevents.Input(this, 'MyInput', {
attributeJsonPaths: ['payload.temperature', 'payload.transactionId'],
});
const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'device/+/data'",
),
actions: [
new actions.IotEventsPutMessageAction(input, {
batchMode: true, // optional property, default is 'false'
messageId: '${payload.transactionId}', // optional property, default is a new UUID
yamatatsu marked this conversation as resolved.
Show resolved Hide resolved
role: // optional property, default is a new UUID
}),
],
});
```
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export class CloudWatchLogsAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.logGroup.grantWrite(role);
this.logGroup.grant(role, 'logs:DescribeLogStreams');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ export class CloudWatchPutMetricAction implements iot.IAction {
constructor(private readonly props: CloudWatchPutMetricActionProps) {
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.props.role ?? singletonActionRole(rule);
cloudwatch.Metric.grantPutMetricData(role);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ export class CloudWatchSetAlarmStateAction implements iot.IAction {
) {
}

bind(topicRule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(topicRule: iot.ITopicRule): iot.ActionConfig {
const role = this.props.role ?? singletonActionRole(topicRule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['cloudwatch:SetAlarmState'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ export class DynamoDBv2PutItemAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['dynamodb:PutItem'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ export class FirehosePutRecordAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.stream.grantPutRecords(role);

Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './dynamodbv2-put-item-action';
export * from './firehose-put-record-action';
export * from './iotevents-put-message-action';
export * from './iot-republish-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ export class IotRepublishMqttAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['iot:Publish'],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as iotevents from '@aws-cdk/aws-iotevents';
import { CommonActionProps } from './common-action-props';
import { singletonActionRole } from './private/role';

/**
* Configuration properties of an action for the IoT Events.
*/
export interface IotEventsPutMessageActionProps extends CommonActionProps {
/**
* Whether to process the event actions as a batch.
*
* When batchMode is true, you can't specify a messageId.
*
* When batchMode is true and the rule SQL statement evaluates to an Array,
* each Array element is treated as a separate message when Events by calling BatchPutMessage.
* The resulting array can't have more than 10 messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the array does have more than 10 messages how/when does this fail? What's the user experience here? We should probably have some enforcement here, if possible.

Copy link
Contributor Author

@yamatatsu yamatatsu Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not explained in the documentation, I've experimented it with the below command:

aws iot-data publish --topic device/mydevice/data --qos 1 --payload (echo '[{"payload":{"deviceId":"001"}},{"payload":{"deviceId":"002"}},{"payload":{"deviceId":"003"}},{"payload":{"deviceId":"004"}},{"payload":{"deviceId":"005"}},{"payload":{"deviceId":"006"}},{"payload":{"deviceId":"007"}},{"payload":{"deviceId":"008"}},{"payload":{"deviceId":"009"}},{"payload":{"deviceId":"010"}},{"payload":{"deviceId":"011"}}]' | base64) --region us-east-1

and get below error message:

Failed to send message to Iot Events. The payload containing 11 messages cannot have more than 10 messages when batchMode=true.. 

I thing it is not possible to restrict it, because it depends to published payloads (instead of SQL) that is out of CDK.
WDYT?

*
* @default false
*/
readonly batchMode?: boolean;

/**
* The ID of the message.
*
* When batchMode is true, you can't specify a messageId--a new UUID value will be assigned.
* Assign a value to this property to ensure that only one input (message) with a given messageId will be processed by an AWS IoT Events detector.
*
* @default - none -- a new UUID value will be assigned
*/
readonly messageId?: string;
}

/**
* The action to put the message from an MQTT message to the IoT Events input.
*/
export class IotEventsPutMessageAction implements iot.IAction {
private readonly batchMode?: boolean;
private readonly messageId?: string;
private readonly role?: iam.IRole;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior if a role is not set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a role is not set, a new role will be created and be granted some policies needed.

It's explained in CommonActionProps.

/**
* The IAM role that allows access to AWS service.
*
* @default a new role will be created
*/
readonly role?: iam.IRole;


/**
* @param input The IoT Events input to put messages.
* @param props Optional properties to not use default
*/
constructor(private readonly input: iotevents.IInput, props: IotEventsPutMessageActionProps = {}) {
this.batchMode = props.batchMode;
this.messageId = props.messageId;
this.role = props.role;

if (this.batchMode && this.messageId) {
throw new Error('messageId is not allowed when batchMode is true');
}
}

/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.input.grantWrite(role);

return {
configuration: {
iotEvents: {
batchMode: this.batchMode,
inputName: this.input.inputName,
messageId: this.messageId,
roleArn: role.roleArn,
},
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ export class KinesisPutRecordAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['kinesis:PutRecord'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ export class LambdaFunctionAction implements iot.IAction {
*/
constructor(private readonly func: lambda.IFunction) {}

bind(topicRule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(topicRule: iot.ITopicRule): iot.ActionConfig {
this.func.addPermission(`${Names.nodeUniqueId(topicRule.node)}:IotLambdaFunctionAction`, {
action: 'lambda:InvokeFunction',
principal: new iam.ServicePrincipal('iot.amazonaws.com'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ export class S3PutObjectAction implements iot.IAction {
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['s3:PutObject'],
Expand Down
7 changes: 5 additions & 2 deletions packages/@aws-cdk/aws-iot-actions/lib/sns-topic-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ export class SnsTopicAction implements iot.IAction {
this.messageFormat = props.messageFormat;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
this.topic.grantPublish(role);

Expand All @@ -72,4 +75,4 @@ export class SnsTopicAction implements iot.IAction {
},
};
}
}
}
5 changes: 4 additions & 1 deletion packages/@aws-cdk/aws-iot-actions/lib/sqs-queue-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ export class SqsQueueAction implements iot.IAction {
this.useBase64 = props.useBase64;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
/**
* @internal
*/
public _bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-iotevents": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
Expand All @@ -109,6 +110,7 @@
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-iotevents": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as iot from '@aws-cdk/aws-iot';
import * as iotevents from '@aws-cdk/aws-iotevents';
import * as logs from '@aws-cdk/aws-logs';
import * as cdk from '@aws-cdk/core';
import { IntegTest } from '@aws-cdk/integ-tests';
import * as actions from '../../lib';

class TestStack extends cdk.Stack {
public readonly detectorModelName: string;

constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);

const logGroup = new logs.LogGroup(this, 'logs', { removalPolicy: cdk.RemovalPolicy.DESTROY });
const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'device/+/data'",
),
errorAction: new actions.CloudWatchLogsAction(logGroup),
});

const input = new iotevents.Input(this, 'MyInput', {
attributeJsonPaths: ['payload.deviceId'],
});

const detectorModel = new iotevents.DetectorModel(this, 'MyDetectorModel', {
detectorKey: 'payload.deviceId',
initialState: new iotevents.State({
stateName: 'initialState',
onEnter: [{
eventName: 'enter',
condition: iotevents.Expression.currentInput(input),
}],
}),
});

topicRule.addAction(
new actions.IotEventsPutMessageAction(input, {
batchMode: true,
}),
);

this.detectorModelName = detectorModel.detectorModelName;
}
}

// App
const app = new cdk.App();
const stack = new TestStack(app, 'iotevents-put-message-action-test-stack');
new IntegTest(app, 'iotevents', { testCases: [stack] });

app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"20.0.0"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"version": "20.0.0",
"testCases": {
"iotevents/DefaultTest": {
"stacks": [
"iotevents-put-message-action-test-stack"
],
"assertionStack": "ioteventsDefaultTestDeployAssertE216288D"
}
}
}
Loading