-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iot): add Action to put records to a Firehose stream #17466
Changes from 1 commit
72d9e0a
350859a
d8cb2fd
ae9cb88
645766d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,85 @@ | ||||||
import * as iam from '@aws-cdk/aws-iam'; | ||||||
import * as iot from '@aws-cdk/aws-iot'; | ||||||
import * as firehose from '@aws-cdk/aws-kinesisfirehose'; | ||||||
import { CommonActionProps } from './common-action-props'; | ||||||
import { singletonActionRole } from './private/role'; | ||||||
|
||||||
/** | ||||||
* Record Separator to be used to separate records. | ||||||
*/ | ||||||
export enum FirehoseStreamRecordSeparator { | ||||||
/** | ||||||
* Separate by a new line | ||||||
*/ | ||||||
NEWLINE = '\n', | ||||||
/** | ||||||
* Separate by a tab | ||||||
*/ | ||||||
TAB = '\t', | ||||||
/** | ||||||
* Separate by a windows new line | ||||||
*/ | ||||||
WINDOWS_NEWLINE = '\r\n', | ||||||
/** | ||||||
* Separate by a commma | ||||||
*/ | ||||||
COMMA = ',', | ||||||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
/** | ||||||
* Configuration properties of an action for the Kinesis Data Firehose stream. | ||||||
*/ | ||||||
export interface FirehoseStreamProps extends CommonActionProps { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missed this:
Suggested change
|
||||||
/** | ||||||
* Whether to deliver the Kinesis Data Firehose stream as a batch by using `PutRecordBatch`. | ||||||
* When batchMode is true and the rule's SQL statement evaluates to an Array, each Array | ||||||
* element forms one record in the PutRecordBatch request. The resulting array can't have | ||||||
* more than 500 records. | ||||||
* | ||||||
* @default false | ||||||
*/ | ||||||
readonly batchMode?: boolean; | ||||||
|
||||||
/** | ||||||
* A character separator that will be used to separate records written to the Kinesis Data Firehose stream. | ||||||
* | ||||||
* @default None -- the stream uses no separator | ||||||
yamatatsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
*/ | ||||||
readonly recordSeparator?: FirehoseStreamRecordSeparator; | ||||||
} | ||||||
|
||||||
|
||||||
/** | ||||||
* The action to put the record from an MQTT message to the Kinesis Data Firehose stream. | ||||||
*/ | ||||||
export class FirehoseStreamAction implements iot.IAction { | ||||||
private readonly batchMode?: boolean; | ||||||
private readonly recordSeparator?: string; | ||||||
private readonly role?: iam.IRole; | ||||||
|
||||||
/** | ||||||
* @param stream The Kinesis Data Firehose stream to which to put records. | ||||||
* @param props Optional properties to not use default | ||||||
*/ | ||||||
constructor(private readonly stream: firehose.IDeliveryStream, props: FirehoseStreamProps = {}) { | ||||||
this.batchMode = props.batchMode; | ||||||
this.recordSeparator = props.recordSeparator; | ||||||
this.role = props.role; | ||||||
} | ||||||
|
||||||
bind(rule: iot.ITopicRule): iot.ActionConfig { | ||||||
const role = this.role ?? singletonActionRole(rule); | ||||||
this.stream.grantPutRecords(role); | ||||||
|
||||||
return { | ||||||
configuration: { | ||||||
firehose: { | ||||||
batchMode: this.batchMode, | ||||||
deliveryStreamName: this.stream.deliveryStreamName, | ||||||
roleArn: role.roleArn, | ||||||
separator: this.recordSeparator, | ||||||
}, | ||||||
}, | ||||||
}; | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
export * from './cloudwatch-logs-action'; | ||
export * from './common-action-props'; | ||
export * from './firehose-stream-action'; | ||
export * from './lambda-function-action'; | ||
export * from './s3-put-object-action'; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,6 +71,7 @@ | |
"license": "Apache-2.0", | ||
"devDependencies": { | ||
"@aws-cdk/assertions": "0.0.0", | ||
"@aws-cdk/aws-kinesisfirehose-destinations": "0.0.0", | ||
"@aws-cdk/cdk-build-tools": "0.0.0", | ||
"@aws-cdk/cdk-integ-tools": "0.0.0", | ||
"@aws-cdk/pkglint": "0.0.0", | ||
|
@@ -79,6 +80,7 @@ | |
"jest": "^27.3.1" | ||
}, | ||
"dependencies": { | ||
"@aws-cdk/aws-kinesisfirehose": "0.0.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we keep these in alphabetical order please? |
||
"@aws-cdk/aws-iam": "0.0.0", | ||
"@aws-cdk/aws-iot": "0.0.0", | ||
"@aws-cdk/aws-lambda": "0.0.0", | ||
|
@@ -90,6 +92,7 @@ | |
}, | ||
"homepage": "https://github.com/aws/aws-cdk", | ||
"peerDependencies": { | ||
"@aws-cdk/aws-kinesisfirehose": "0.0.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here (alphabetical order). |
||
"@aws-cdk/aws-iam": "0.0.0", | ||
"@aws-cdk/aws-iot": "0.0.0", | ||
"@aws-cdk/aws-lambda": "0.0.0", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
import { Template, Match } from '@aws-cdk/assertions'; | ||
import * as iam from '@aws-cdk/aws-iam'; | ||
import * as iot from '@aws-cdk/aws-iot'; | ||
import * as firehose from '@aws-cdk/aws-kinesisfirehose'; | ||
import * as cdk from '@aws-cdk/core'; | ||
import * as actions from '../../lib'; | ||
|
||
test('Default firehose stream action', () => { | ||
// GIVEN | ||
const stack = new cdk.Stack(); | ||
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), | ||
}); | ||
const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); | ||
|
||
// WHEN | ||
topicRule.addAction( | ||
new actions.FirehoseStreamAction(stream), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
{ | ||
Firehose: { | ||
DeliveryStreamName: 'my-stream', | ||
RoleArn: { | ||
'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], | ||
}, | ||
}, | ||
}, | ||
], | ||
}, | ||
}); | ||
|
||
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { | ||
AssumeRolePolicyDocument: { | ||
Statement: [ | ||
{ | ||
Action: 'sts:AssumeRole', | ||
Effect: 'Allow', | ||
Principal: { | ||
Service: 'iot.amazonaws.com', | ||
}, | ||
}, | ||
], | ||
Version: '2012-10-17', | ||
}, | ||
}); | ||
|
||
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { | ||
PolicyDocument: { | ||
Statement: [ | ||
{ | ||
Action: ['firehose:PutRecord', 'firehose:PutRecordBatch'], | ||
Effect: 'Allow', | ||
Resource: 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream', | ||
}, | ||
], | ||
Version: '2012-10-17', | ||
}, | ||
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7', | ||
Roles: [ | ||
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' }, | ||
], | ||
}); | ||
}); | ||
|
||
test('can set batchMode', () => { | ||
// GIVEN | ||
const stack = new cdk.Stack(); | ||
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), | ||
}); | ||
const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); | ||
|
||
// WHEN | ||
topicRule.addAction( | ||
new actions.FirehoseStreamAction(stream, { batchMode: true }), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
Match.objectLike({ Firehose: { BatchMode: true } }), | ||
], | ||
}, | ||
}); | ||
}); | ||
|
||
test('can set separotor', () => { | ||
// GIVEN | ||
const stack = new cdk.Stack(); | ||
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), | ||
}); | ||
const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); | ||
|
||
// WHEN | ||
topicRule.addAction( | ||
new actions.FirehoseStreamAction(stream, { recordSeparator: actions.FirehoseStreamRecordSeparator.NEWLINE }), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
Match.objectLike({ Firehose: { Separator: '\n' } }), | ||
], | ||
}, | ||
}); | ||
}); | ||
|
||
test('can set role', () => { | ||
// GIVEN | ||
const stack = new cdk.Stack(); | ||
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), | ||
}); | ||
const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); | ||
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); | ||
|
||
// WHEN | ||
topicRule.addAction( | ||
new actions.FirehoseStreamAction(stream, { role }), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
Match.objectLike({ Firehose: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }), | ||
], | ||
}, | ||
}); | ||
|
||
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { | ||
PolicyName: 'MyRolePolicy64AB00A5', | ||
Roles: ['ForTest'], | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's kill the
removalPolicy
in the example, it doesn't add much: