Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws-stepfunctions-tasks): allow specifying waitForTaskToken suffix in resourceArn #2686

Merged
merged 9 commits into from
Jun 18, 2019
55 changes: 53 additions & 2 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { FieldUtils } from '../../aws-stepfunctions/lib/fields';

/**
* Properties for InvokeFunction
*/
export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*/
readonly payload?: { [key: string]: any };

/**
* Whether to pause the workflow until a task token is returned
*
* If this is set to true, the Context.taskToken value must be included
* somewhere in the payload and the Lambda must call SendTaskSuccess
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendTaskSuccess, SendTaskFailure or SendTaskHeartbeat

* using that token.
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
* A StepFunctions Task to invoke a Lambda function.
Expand All @@ -9,19 +31,48 @@ import sfn = require('@aws-cdk/aws-stepfunctions');
* integration with other AWS services via a specific class instance.
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction) {
private readonly waitForTaskToken: boolean;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
this.waitForTaskToken = !!props.waitForTaskToken;

if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = this.waitForTaskToken
? 'arn:aws:states:::lambda:invoke.waitForTaskToken'
: this.lambdaFunction.functionArn;

let parameters: any;
if (this.waitForTaskToken) {
parameters = {
FunctionName: this.lambdaFunction.functionName,
Payload: nonEmptyObject(this.props.payload),
};
} else {
parameters = this.props.payload;
}

return {
resourceArn: this.lambdaFunction.functionArn,
resourceArn,
policyStatements: [new iam.PolicyStatement()
.addResource(this.lambdaFunction.functionArn)
.addActions("lambda:InvokeFunction")
],
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: nonEmptyObject(parameters)
};
}
}

function nonEmptyObject(x: any): any {
if (typeof x === 'object' && x !== null && Object.entries(x).length === 0) {
return undefined;
}
return x;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ export interface PublishToTopicProps {
* Message subject
*/
readonly subject?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -36,12 +43,16 @@ export interface PublishToTopicProps {
* integration with other AWS services via a specific class instance.
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
this.waitForTaskToken = props.waitForTaskToken === true;
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sns:publish',
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement()
.addAction('sns:Publish')
.addResource(this.topic.topicArn)
Expand Down
13 changes: 12 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ export interface SendToQueueProps {
* @default No group ID
*/
readonly messageGroupId?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -45,12 +52,16 @@ export interface SendToQueueProps {
* integration with other AWS services via a specific class instance.
*/
export class SendToQueue implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
this.waitForTaskToken = props.waitForTaskToken === true;
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sqs:sendMessage',
resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement()
.addAction('sqs:SendMessage')
.addResource(this.queue.queueArn)
Expand Down
Loading