Skip to content

Commit

Permalink
docs(step-functions): re-structure distributed map input source expla…
Browse files Browse the repository at this point in the history
…nation

**Reason for this change**
- current structure for distributed map's input source content does not convey properly how to configure input source as per requirements (also it lacks examples).

**Description of changes**
- re-structure distributed map's input source explanation to convey how to configure input source as per requirements with examples.
- nit: also addressed unrelated issue with README

**Description of how you validated changes**
- `yarn build+extract`
  • Loading branch information
Chakshu Gupta authored and ChakshuGupta13 committed Jul 25, 2024
1 parent c2d1af1 commit c7c5685
Showing 1 changed file with 106 additions and 38 deletions.
144 changes: 106 additions & 38 deletions packages/aws-cdk-lib/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,16 +574,113 @@ const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', {
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State'));
```

Map states in Distributed mode support multiple sources for an array to iterate:

* JSON array from the state input payload
* objects in an S3 bucket and optional prefix
`DistributedMap` supports various input source types to determine an array to iterate over:

* JSON array from the state input
* By default, `DistributedMap` assumes whole state input is an JSON array and iterates over it:
```json
[
"item1",
"item2"
]
```
```ts
const distributedMap = new sfn.DistributedMap(this, 'DistributedMap');
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
```
* When input source is present at a specific path in state input, then `itemsPath` can be utilised to configure the iterator source.
```json
{
"distributedMapItemList": [
"item1",
"item2"
]
}
```
```ts
const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemsPath: '$.distributedMapItemList',
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
```
* Objects in a S3 bucket and optional prefix
* When `DistributedMap` is required to iterate over objects stored in a S3 bucket, then if required parameters like `bucket` are known while creating `StateMachine` (statically or at compile time), then an object of `S3ObjectsItemReader` (implementing `IItemReader`) can be passed to `itemReader` to configure the iterator source.
```txt
my-bucket
|
+--item1
|
+--otherItem
|
+--item2
|
...
```
```ts
import * as s3 from 'aws-cdk-lib/aws-s3';

const bucket = new s3.Bucket(this, 'Bucket', {
bucketName: 'my-bucket',
});

const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemReader: new sfn.S3ObjectsItemReader({
bucket,
prefix: 'item',
}),
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
```
* But if required parameters like `bucketName` are only known while starting execution of `StateMachine` (dynamically or at run-time) via state input, then an object of `S3ObjectsItemReaderPath` (implementing `IItemReaderPath`) can be passed to `itemReaderPath` to configure the iterator source.
```json
{
"bucketName": "my-bucket",
"prefix": "item"
}
```
```ts
const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemReaderPath: new sfn.S3ObjectsItemReaderPath({
bucketNamePath: '$.bucketName',
prefixPath: '$.prefix',
}),
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
```
* Both `itemReader` and `itemReaderPath` are mutually exclusive. For example, if `bucket` is known at compile time but `prefix` is only known at run-time, then both cannot be used simultaneously.
* JSON array in a JSON file stored in S3
* When `DistributedMap` is required to iterate over a JSON array stored in a JSON file in a S3 bucket, then if required parameters like `bucket` are known while creating `StateMachine` (statically or at compile time), then an object of `S3JsonItemReader` (implementing `IItemReader`) can be passed to `itemReader` to configure the iterator source.
```txt
my-bucket
|
+--input.json
|
...
```
```json
[
"item1",
"item2"
]
```
```ts
import * as s3 from 'aws-cdk-lib/aws-s3';

const bucket = new s3.Bucket(this, 'Bucket', {
bucketName: 'my-bucket',
});

const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemReader: new sfn.S3JsonItemReader({
bucket,
key: 'input.json',
}),
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
```
* CSV file stored in S3
* S3 inventory manifest stored in S3

There are multiple classes that implement `IItemReader` that can be used to configure the iterator source. These can be provided via the optional `itemReader` property. If S3 source needs to be configured dynamically at runtime using state input, then class implementations of `IItemReaderPath` can be utilised instead of `IItemReader`. These can be provided via the optional `itemReaderPath` property. Both `itemReader` and `itemReaderPath` are mutually exclusive. The default behavior, if both `itemReader` and `itemReaderPath` are omitted, is to use the input payload.

Map states in Distributed mode also support writing results of the iterator to an S3 bucket and optional prefix. Use a `ResultWriter` object provided via the optional `resultWriter` property to configure which S3 location iterator results will be written. The default behavior id `resultWriter` is omitted is to use the state output payload. However, if the iterator results are larger than the 256 kb limit for Step Functions payloads then the State Machine will fail.

```ts
Expand All @@ -592,38 +689,13 @@ import * as s3 from 'aws-cdk-lib/aws-s3';
// create a bucket
const bucket = new s3.Bucket(this, 'Bucket');

const distributedMapWithStaticS3Source = new sfn.DistributedMap(this, 'Distributed Map State', {
itemReader: new sfn.S3JsonItemReader({
bucket: bucket,
key: 'my-key.json',
}),
resultWriter: new sfn.ResultWriter({
bucket: bucket,
prefix: 'my-prefix',
})
});
distributedMapWithStaticS3Source.itemProcessor(new sfn.Pass(this, 'Pass State'));

/**
* State input shall include fields which are given to `S3ObjectItemReaderPath`:
* {
* ...
* bucketName: 'my-bucket',
* prefix: 'objectNamePrefix',
* ...
* }
*/
const distributedMapWithDynamicS3Source = new sfn.DistributedMap(this, 'Distributed Map State', {
itemReaderPath: new sfn.S3ObjectsItemReaderPath({
bucketNamePath: '$.bucketName',
prefixPath: '$.prefix',
}),
const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', {
resultWriter: new sfn.ResultWriter({
bucket: bucket,
prefix: 'my-prefix',
})
});
distributedMapWithDynamicS3Source.itemProcessor(new sfn.Pass(this, 'Pass State'));
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State'));
```

If you want to specify the execution type for the ItemProcessor in the DistributedMap, you must set the `mapExecutionType` property in the `DistributedMap` class. When using the `DistributedMap` class, the `ProcessorConfig.executionType` property is ignored.
Expand Down Expand Up @@ -813,12 +885,8 @@ AWS Step functions integrate directly with other services, either through an opt
Therefore, it is possible to change the `integrationPattern` of services, to enable additional functionality of the said AWS Service:

```ts
import * as glue from "@aws-cdk/aws-glue-alpha";

declare const submitGlue: glue.Job;

const submitJob = new tasks.GlueStartJobRun(this, "Submit Job", {
glueJobName: submitGlue.jobName,
glueJobName: 'JobName',
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
});
```
Expand Down

0 comments on commit c7c5685

Please sign in to comment.