From c7c5685ce4155882bbe9d5afa8c5005b30f8aef8 Mon Sep 17 00:00:00 2001 From: Chakshu Gupta Date: Thu, 25 Jul 2024 20:24:07 +0530 Subject: [PATCH] docs(step-functions): re-structure distributed map input source explanation **Reason for this change** - current structure for distributed map's input source content does not convey properly how to configure input source as per requirements (also it lacks examples). **Description of changes** - re-structure distributed map's input source explanation to convey how to configure input source as per requirements with examples. - nit: also addressed unrelated issue with README **Description of how you validated changes** - `yarn build+extract` --- .../aws-cdk-lib/aws-stepfunctions/README.md | 144 +++++++++++++----- 1 file changed, 106 insertions(+), 38 deletions(-) diff --git a/packages/aws-cdk-lib/aws-stepfunctions/README.md b/packages/aws-cdk-lib/aws-stepfunctions/README.md index b0aaf8765ee30..f7485c510ffca 100644 --- a/packages/aws-cdk-lib/aws-stepfunctions/README.md +++ b/packages/aws-cdk-lib/aws-stepfunctions/README.md @@ -574,16 +574,113 @@ const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', { distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State')); ``` -Map states in Distributed mode support multiple sources for an array to iterate: - -* JSON array from the state input payload -* objects in an S3 bucket and optional prefix +`DistributedMap` supports various input source types to determine an array to iterate over: + +* JSON array from the state input + * By default, `DistributedMap` assumes whole state input is an JSON array and iterates over it: + ```json + [ + "item1", + "item2" + ] + ``` + ```ts + const distributedMap = new sfn.DistributedMap(this, 'DistributedMap'); + distributedMap.itemProcessor(new sfn.Pass(this, 'Pass')); + ``` + * When input source is present at a specific path in state input, then `itemsPath` can be utilised to configure the iterator source. + ```json + { + "distributedMapItemList": [ + "item1", + "item2" + ] + } + ``` + ```ts + const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { + itemsPath: '$.distributedMapItemList', + }); + distributedMap.itemProcessor(new sfn.Pass(this, 'Pass')); + ``` +* Objects in a S3 bucket and optional prefix + * When `DistributedMap` is required to iterate over objects stored in a S3 bucket, then if required parameters like `bucket` are known while creating `StateMachine` (statically or at compile time), then an object of `S3ObjectsItemReader` (implementing `IItemReader`) can be passed to `itemReader` to configure the iterator source. + ```txt + my-bucket + | + +--item1 + | + +--otherItem + | + +--item2 + | + ... + ``` + ```ts + import * as s3 from 'aws-cdk-lib/aws-s3'; + + const bucket = new s3.Bucket(this, 'Bucket', { + bucketName: 'my-bucket', + }); + + const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { + itemReader: new sfn.S3ObjectsItemReader({ + bucket, + prefix: 'item', + }), + }); + distributedMap.itemProcessor(new sfn.Pass(this, 'Pass')); + ``` + * But if required parameters like `bucketName` are only known while starting execution of `StateMachine` (dynamically or at run-time) via state input, then an object of `S3ObjectsItemReaderPath` (implementing `IItemReaderPath`) can be passed to `itemReaderPath` to configure the iterator source. + ```json + { + "bucketName": "my-bucket", + "prefix": "item" + } + ``` + ```ts + const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { + itemReaderPath: new sfn.S3ObjectsItemReaderPath({ + bucketNamePath: '$.bucketName', + prefixPath: '$.prefix', + }), + }); + distributedMap.itemProcessor(new sfn.Pass(this, 'Pass')); + ``` + * Both `itemReader` and `itemReaderPath` are mutually exclusive. For example, if `bucket` is known at compile time but `prefix` is only known at run-time, then both cannot be used simultaneously. * JSON array in a JSON file stored in S3 + * When `DistributedMap` is required to iterate over a JSON array stored in a JSON file in a S3 bucket, then if required parameters like `bucket` are known while creating `StateMachine` (statically or at compile time), then an object of `S3JsonItemReader` (implementing `IItemReader`) can be passed to `itemReader` to configure the iterator source. + ```txt + my-bucket + | + +--input.json + | + ... + ``` + ```json + [ + "item1", + "item2" + ] + ``` + ```ts + import * as s3 from 'aws-cdk-lib/aws-s3'; + + const bucket = new s3.Bucket(this, 'Bucket', { + bucketName: 'my-bucket', + }); + + const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { + itemReader: new sfn.S3JsonItemReader({ + bucket, + key: 'input.json', + }), + }); + distributedMap.itemProcessor(new sfn.Pass(this, 'Pass')); + ``` * CSV file stored in S3 * S3 inventory manifest stored in S3 -There are multiple classes that implement `IItemReader` that can be used to configure the iterator source. These can be provided via the optional `itemReader` property. If S3 source needs to be configured dynamically at runtime using state input, then class implementations of `IItemReaderPath` can be utilised instead of `IItemReader`. These can be provided via the optional `itemReaderPath` property. Both `itemReader` and `itemReaderPath` are mutually exclusive. The default behavior, if both `itemReader` and `itemReaderPath` are omitted, is to use the input payload. - Map states in Distributed mode also support writing results of the iterator to an S3 bucket and optional prefix. Use a `ResultWriter` object provided via the optional `resultWriter` property to configure which S3 location iterator results will be written. The default behavior id `resultWriter` is omitted is to use the state output payload. However, if the iterator results are larger than the 256 kb limit for Step Functions payloads then the State Machine will fail. ```ts @@ -592,38 +689,13 @@ import * as s3 from 'aws-cdk-lib/aws-s3'; // create a bucket const bucket = new s3.Bucket(this, 'Bucket'); -const distributedMapWithStaticS3Source = new sfn.DistributedMap(this, 'Distributed Map State', { - itemReader: new sfn.S3JsonItemReader({ - bucket: bucket, - key: 'my-key.json', - }), - resultWriter: new sfn.ResultWriter({ - bucket: bucket, - prefix: 'my-prefix', - }) -}); -distributedMapWithStaticS3Source.itemProcessor(new sfn.Pass(this, 'Pass State')); - -/** - * State input shall include fields which are given to `S3ObjectItemReaderPath`: - * { - * ... - * bucketName: 'my-bucket', - * prefix: 'objectNamePrefix', - * ... - * } - */ -const distributedMapWithDynamicS3Source = new sfn.DistributedMap(this, 'Distributed Map State', { - itemReaderPath: new sfn.S3ObjectsItemReaderPath({ - bucketNamePath: '$.bucketName', - prefixPath: '$.prefix', - }), +const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', { resultWriter: new sfn.ResultWriter({ bucket: bucket, prefix: 'my-prefix', }) }); -distributedMapWithDynamicS3Source.itemProcessor(new sfn.Pass(this, 'Pass State')); +distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State')); ``` If you want to specify the execution type for the ItemProcessor in the DistributedMap, you must set the `mapExecutionType` property in the `DistributedMap` class. When using the `DistributedMap` class, the `ProcessorConfig.executionType` property is ignored. @@ -813,12 +885,8 @@ AWS Step functions integrate directly with other services, either through an opt Therefore, it is possible to change the `integrationPattern` of services, to enable additional functionality of the said AWS Service: ```ts -import * as glue from "@aws-cdk/aws-glue-alpha"; - -declare const submitGlue: glue.Job; - const submitJob = new tasks.GlueStartJobRun(this, "Submit Job", { - glueJobName: submitGlue.jobName, + glueJobName: 'JobName', integrationPattern: sfn.IntegrationPattern.RUN_JOB, }); ```