-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iot): add Action to put record to Kinesis Data stream #18321
Changes from 4 commits
c1799a3
9249bd6
c1c4bd4
d64b3ff
4b362f1
e32e2d7
dd3edbb
1457dcc
7851981
1e6c4a8
b2cd603
b74dc6c
9fe5bb1
103df62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import * as iam from '@aws-cdk/aws-iam'; | ||
import * as iot from '@aws-cdk/aws-iot'; | ||
import * as kinesis from '@aws-cdk/aws-kinesis'; | ||
import { CommonActionProps } from './common-action-props'; | ||
import { singletonActionRole } from './private/role'; | ||
|
||
/** | ||
* Configuration properties of an action for the Kinesis Data stream. | ||
*/ | ||
export interface KinesisPutRecordActionProps extends CommonActionProps { | ||
/** | ||
* The partition key used to determine to which shard the data is written. | ||
* The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}). | ||
* For more information @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters | ||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* @default - None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate what does "None" mean here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I tried to elaborate, but I could not😞. Proberbly, IoT Core rule is fill default value (e.g. MQTT payload JSON string or hash of it) because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you try deploying an IoT app without this property filled? Maybe this is simply a mistake in the CloudFormation resource, and this should actually be required? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes I tried and the deploying secceeded. Again, I deployed, and I tested following for confirmation to be spreaded records to shards:
Results:
The IoT Core may be filling in a static There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I think that's a good guess. Let's put that in the docs - it's certainly better than just "None", which gives you no clue what that actually means 🙂. Thanks for the detailed testing! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! OK. If we beleave this guess, I think it is better that fill default value. Because in Kinesis Stream, it is pretty not helpful to not spread records by shards. I found that This is the documentation of I try to add default value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't hate it, but I'm a little concerned that the L2 won't have all of the capabilities of the underlying service (in this case, leaving the partition key as empty). Does the Console allow you to leave it as empty, or do you have to provide a value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is so reasonable I think! So I will never mind to revert my commit that add default value.
The Console requre PartitionKey, but CloudFormation does not. For keeping compatibility for CloudFormation, let's not add a default value, instead add a detailed description in JSDoc and Readme? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's do this. Make this field required, like it is in the console. But, allow specifying an empty string there ( This way, we will be close to the Console experience, while covering the entire service capabilities. What do you think about this @yamatatsu? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's good I think! 👍🏻 |
||
*/ | ||
readonly partitionKey?: string; | ||
} | ||
|
||
/** | ||
* The action to put the record from an MQTT message to the Kinesis Data stream. | ||
*/ | ||
export class KinesisPutRecordAction implements iot.IAction { | ||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private readonly partitionKey?: string; | ||
private readonly role?: iam.IRole; | ||
|
||
/** | ||
* @param stream The Kinesis Data stream to which to put records. | ||
* @param props Optional properties to not use default | ||
*/ | ||
constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps = {}) { | ||
this.partitionKey = props.partitionKey; | ||
this.role = props.role; | ||
} | ||
|
||
bind(rule: iot.ITopicRule): iot.ActionConfig { | ||
const role = this.role ?? singletonActionRole(rule); | ||
role.addToPrincipalPolicy(new iam.PolicyStatement({ | ||
actions: ['kinesis:PutRecord'], | ||
skinny85 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
resources: [this.stream.streamArn], | ||
})); | ||
|
||
return { | ||
configuration: { | ||
kinesis: { | ||
streamName: this.stream.streamName, | ||
partitionKey: this.partitionKey, | ||
roleArn: role.roleArn, | ||
}, | ||
}, | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
{ | ||
"Resources": { | ||
"Logs6819BB44": { | ||
"Type": "AWS::Logs::LogGroup", | ||
"Properties": { | ||
"RetentionInDays": 731 | ||
}, | ||
"UpdateReplacePolicy": "Delete", | ||
"DeletionPolicy": "Delete" | ||
}, | ||
"TopicRuleTopicRuleActionRole246C4F77": { | ||
"Type": "AWS::IAM::Role", | ||
"Properties": { | ||
"AssumeRolePolicyDocument": { | ||
"Statement": [ | ||
{ | ||
"Action": "sts:AssumeRole", | ||
"Effect": "Allow", | ||
"Principal": { | ||
"Service": "iot.amazonaws.com" | ||
} | ||
} | ||
], | ||
"Version": "2012-10-17" | ||
} | ||
} | ||
}, | ||
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": { | ||
"Type": "AWS::IAM::Policy", | ||
"Properties": { | ||
"PolicyDocument": { | ||
"Statement": [ | ||
{ | ||
"Action": [ | ||
"logs:CreateLogStream", | ||
"logs:PutLogEvents" | ||
], | ||
"Effect": "Allow", | ||
"Resource": { | ||
"Fn::GetAtt": [ | ||
"Logs6819BB44", | ||
"Arn" | ||
] | ||
} | ||
}, | ||
{ | ||
"Action": "logs:DescribeLogStreams", | ||
"Effect": "Allow", | ||
"Resource": { | ||
"Fn::GetAtt": [ | ||
"Logs6819BB44", | ||
"Arn" | ||
] | ||
} | ||
}, | ||
{ | ||
"Action": "kinesis:PutRecord", | ||
"Effect": "Allow", | ||
"Resource": { | ||
"Fn::GetAtt": [ | ||
"MyStream5C050E93", | ||
"Arn" | ||
] | ||
} | ||
} | ||
], | ||
"Version": "2012-10-17" | ||
}, | ||
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687", | ||
"Roles": [ | ||
{ | ||
"Ref": "TopicRuleTopicRuleActionRole246C4F77" | ||
} | ||
] | ||
} | ||
}, | ||
"TopicRule40A4EA44": { | ||
"Type": "AWS::IoT::TopicRule", | ||
"Properties": { | ||
"TopicRulePayload": { | ||
"Actions": [ | ||
{ | ||
"Kinesis": { | ||
"PartitionKey": "${timestamp()}", | ||
"RoleArn": { | ||
"Fn::GetAtt": [ | ||
"TopicRuleTopicRuleActionRole246C4F77", | ||
"Arn" | ||
] | ||
}, | ||
"StreamName": { | ||
"Ref": "MyStream5C050E93" | ||
} | ||
} | ||
} | ||
], | ||
"AwsIotSqlVersion": "2016-03-23", | ||
"ErrorAction": { | ||
"CloudwatchLogs": { | ||
"LogGroupName": { | ||
"Ref": "Logs6819BB44" | ||
}, | ||
"RoleArn": { | ||
"Fn::GetAtt": [ | ||
"TopicRuleTopicRuleActionRole246C4F77", | ||
"Arn" | ||
] | ||
} | ||
} | ||
}, | ||
"Sql": "SELECT * FROM 'device/+/data'" | ||
} | ||
} | ||
}, | ||
"MyStream5C050E93": { | ||
"Type": "AWS::Kinesis::Stream", | ||
"Properties": { | ||
"ShardCount": 3, | ||
"RetentionPeriodHours": 24, | ||
"StreamEncryption": { | ||
"Fn::If": [ | ||
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions", | ||
{ | ||
"Ref": "AWS::NoValue" | ||
}, | ||
{ | ||
"EncryptionType": "KMS", | ||
"KeyId": "alias/aws/kinesis" | ||
} | ||
] | ||
} | ||
} | ||
} | ||
}, | ||
"Conditions": { | ||
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions": { | ||
"Fn::Or": [ | ||
{ | ||
"Fn::Equals": [ | ||
{ | ||
"Ref": "AWS::Region" | ||
}, | ||
"cn-north-1" | ||
] | ||
}, | ||
{ | ||
"Fn::Equals": [ | ||
{ | ||
"Ref": "AWS::Region" | ||
}, | ||
"cn-northwest-1" | ||
] | ||
} | ||
] | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import * as iot from '@aws-cdk/aws-iot'; | ||
import * as kinesis from '@aws-cdk/aws-kinesis'; | ||
import * as logs from '@aws-cdk/aws-logs'; | ||
import * as cdk from '@aws-cdk/core'; | ||
import * as actions from '../../lib'; | ||
|
||
|
||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const app = new cdk.App(); | ||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
class TestStack extends cdk.Stack { | ||
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { | ||
super(scope, id, props); | ||
|
||
const topicRule = new iot.TopicRule(this, 'TopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323( | ||
"SELECT * FROM 'device/+/data'", | ||
), | ||
errorAction: new actions.CloudWatchLogsAction( | ||
new logs.LogGroup(this, 'Logs', { removalPolicy: cdk.RemovalPolicy.DESTROY }), | ||
), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this in this test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh it is not needed, but for confirmation that it is work on aws 😅 . |
||
}); | ||
|
||
const stream = new kinesis.Stream(this, 'MyStream', { | ||
shardCount: 3, | ||
}); | ||
topicRule.addAction( | ||
new actions.KinesisPutRecordAction(stream, { | ||
partitionKey: '${timestamp()}', | ||
}), | ||
); | ||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
new TestStack(app, 'test-stack'); | ||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
app.synth(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's mention what's the default.