Skip to content

Commit

Permalink
fix(stepfunctions): fix passing of Token in RunLambdaTask (#2939)
Browse files Browse the repository at this point in the history
Fixes #2937.
  • Loading branch information
wqzoww authored and rix0rrr committed Jun 21, 2019
1 parent f30bdd3 commit 58a80ab
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,9 @@ export class PublishToTopic implements sfn.IStepFunctionsTask {
})],
parameters: {
TopicArn: this.topic.topicArn,
...sfn.FieldUtils.renderObject({
Message: this.props.message.value,
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
Subject: this.props.subject,
})
Message: this.props.message.value,
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
Subject: this.props.subject,
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function renderOverrides(containerOverrides?: ContainerOverride[]) {

const ret = new Array<any>();
for (const override of containerOverrides) {
ret.push(sfn.FieldUtils.renderObject({
ret.push({
Name: override.containerName,
Command: override.command,
Cpu: override.cpu,
Expand All @@ -176,7 +176,7 @@ function renderOverrides(containerOverrides?: ContainerOverride[]) {
Name: e.name,
Value: e.value,
}))
}));
});
}

return { ContainerOverrides: ret };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ export interface RunLambdaTaskProps {
* @default - No context
*/
readonly clientContext?: string;

/**
* Version or alias of the function to be invoked
*
* @default - No qualifier
*/
readonly qualifier?: string;
}

/**
Expand Down Expand Up @@ -75,6 +82,7 @@ export class RunLambdaTask implements sfn.IStepFunctionsTask {
Payload: this.props.payload,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
Qualifier: this.props.qualifier
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export class SagemakerTrainTask implements ec2.IConnectable, sfn.IStepFunctionsT
public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''),
parameters: sfn.FieldUtils.renderObject(this.renderParameters()),
parameters: this.renderParameters(),
policyStatements: this.makePolicyStatements(task),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''),
parameters: sfn.FieldUtils.renderObject(this.renderParameters()),
parameters: this.renderParameters(),
policyStatements: this.makePolicyStatements(task),
};
}
Expand Down
10 changes: 4 additions & 6 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,10 @@ export class SendToQueue implements sfn.IStepFunctionsTask {
})],
parameters: {
QueueUrl: this.queue.queueUrl,
...sfn.FieldUtils.renderObject({
MessageBody: this.props.messageBody.value,
DelaySeconds: this.props.delay && this.props.delay.toSeconds(),
MessageDeduplicationId: this.props.messageDeduplicationId,
MessageGroupId: this.props.messageGroupId,
})
MessageBody: this.props.messageBody.value,
DelaySeconds: this.props.delay && this.props.delay.toSeconds(),
MessageDeduplicationId: this.props.messageDeduplicationId,
MessageGroupId: this.props.messageGroupId,
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
{
"Ref": "CallbackHandler4434C38D"
},
"\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
"\",\"Payload\":{\"token.$\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
]
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ beforeEach(() => {
});
});

test('Lambda function can be used in a Task', () => {
test('Invoke lambda with function ARN', () => {
// WHEN
const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) });
new sfn.StateMachine(stack, 'SM', {
Expand All @@ -39,7 +39,7 @@ test('Lambda function payload ends up in Parameters', () => {
definition: new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, {
payload: {
foo: 'bar'
foo: sfn.Data.stringAt('$.bar')
}
})
})
Expand All @@ -48,45 +48,10 @@ test('Lambda function payload ends up in Parameters', () => {
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo\":\"bar\"},\"Type\":\"Task\",\"Resource\":\"",
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo.$\":\"$.bar\"},\"Type\":\"Task\",\"Resource\":\"",
{ "Fn::GetAtt": ["Fn9270CBC0", "Arn"] },
"\"}}}"
]]
},
});
});

test('Lambda function can be used in a Task with Task Token', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
waitForTaskToken: true,
payload: {
token: sfn.Context.taskToken
}
})
});
new sfn.StateMachine(stack, 'SM', {
definition: task
});

// THEN
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{ Ref: "Fn9270CBC0" },
"\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}"
]]
},
});
});

test('Task throws if waitForTaskToken is supplied but task token is not included', () => {
expect(() => {
new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
waitForTaskToken: true
})
});
}).toThrow(/Task Token is missing in payload/i);
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import sfn = require('@aws-cdk/aws-stepfunctions');
import cdk = require('@aws-cdk/cdk');
import tasks = require('../lib');

test('publish to SNS', () => {
test('Publish literal message to SNS topic', () => {
// GIVEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');

// WHEN
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
message: sfn.TaskInput.fromText('Send this message')
message: sfn.TaskInput.fromText('Publish this message')
}) });

// THEN
Expand All @@ -20,33 +20,72 @@ test('publish to SNS', () => {
End: true,
Parameters: {
TopicArn: { Ref: 'TopicBFC7AF6E' },
Message: 'Send this message'
Message: 'Publish this message'
},
});
});

test('publish JSON to SNS', () => {
test('Publish JSON to SNS topic with task token', () => {
// GIVEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');

// WHEN
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
waitForTaskToken: true,
message: sfn.TaskInput.fromObject({
Input: 'Send this message'
Input: 'Publish this message',
Token: sfn.Context.taskToken
})
}) });

// THEN
expect(stack.resolve(pub.toStateJson())).toEqual({
Type: 'Task',
Resource: 'arn:aws:states:::sns:publish',
Resource: 'arn:aws:states:::sns:publish.waitForTaskToken',
End: true,
Parameters: {
TopicArn: { Ref: 'TopicBFC7AF6E' },
Message: {
Input: 'Send this message'
'Input': 'Publish this message',
'Token.$': '$$.Task.Token'
}
},
});
});

test('Task throws if waitForTaskToken is supplied but task token is not included in message', () => {
expect(() => {
// GIVEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');
// WHEN
new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
waitForTaskToken: true,
message: sfn.TaskInput.fromText('Publish this message')
}) });
// THEN
}).toThrow(/Task Token is missing in message/i);
});

test('Publish to topic with ARN from payload', () => {
// GIVEN
const stack = new cdk.Stack();
const topic = sns.Topic.fromTopicArn(stack, 'Topic', sfn.Data.stringAt('$.topicArn'));

// WHEN
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
message: sfn.TaskInput.fromText('Publish this message')
}) });

// THEN
expect(stack.resolve(pub.toStateJson())).toEqual({
Type: 'Task',
Resource: 'arn:aws:states:::sns:publish',
End: true,
Parameters: {
'TopicArn.$': '$.topicArn',
'Message': 'Publish this message'
},
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import '@aws-cdk/assert/jest';
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { Stack } from '@aws-cdk/cdk';
import tasks = require('../lib');

let stack: Stack;
let fn: lambda.Function;
beforeEach(() => {
stack = new Stack();
fn = new lambda.Function(stack, 'Fn', {
code: lambda.Code.inline('hello'),
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
});

test('Invoke lambda with default magic ARN', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
payload: {
foo: 'bar'
},
invocationType: tasks.InvocationType.RequestResponse,
clientContext: "eyJoZWxsbyI6IndvcmxkIn0=",
qualifier: "1",
})
});
new sfn.StateMachine(stack, 'SM', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{ Ref: "Fn9270CBC0" },
"\",\"Payload\":{\"foo\":\"bar\"},\"InvocationType\":\"RequestResponse\",\"ClientContext\":\"eyJoZWxsbyI6IndvcmxkIn0=\","
+ "\"Qualifier\":\"1\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke\"}}}"
]]
},
});
});

test('Lambda function can be used in a Task with Task Token', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
waitForTaskToken: true,
payload: {
token: sfn.Context.taskToken
}
})
});
new sfn.StateMachine(stack, 'SM', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{ Ref: "Fn9270CBC0" },
"\",\"Payload\":{\"token.$\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}"
]]
},
});
});

test('Task throws if waitForTaskToken is supplied but task token is not included in payLoad', () => {
expect(() => {
new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
waitForTaskToken: true
})
});
}).toThrow(/Task Token is missing in payload/i);
});
Loading

0 comments on commit 58a80ab

Please sign in to comment.