From 5219fff31fc1d9d4195d97cde1e3b11997ae0e20 Mon Sep 17 00:00:00 2001 From: Chris K Wensel Date: Tue, 3 Oct 2023 21:03:41 -0700 Subject: [PATCH] add include/exclude filter to S3PutListenerBoundary update scenarios to test filtering --- .../model/deploy/partial/PathFilter.java | 48 ++++++++++++++ .../s3-copy-arc.jsonnet | 62 +++++++++++++++++++ .../scenario.jsonnet | 34 ++++++++++ .../s3-copy-arc.jsonnet | 10 +-- .../scenario.jsonnet | 2 +- .../aws/arc/s3copy/S3CopyArcProvider.java | 11 +++- ...requentS3PutStrategyBoundaryConstruct.java | 1 + ...requentS3PutStrategyBoundaryConstruct.java | 1 + .../boundary/s3put/S3PutListenerBoundary.java | 7 +++ .../s3put/S3PutListenerBoundaryProvider.java | 21 ++++++- .../clusterless/lambda/util/PathMatcher.java | 6 +- .../lambda/transform/TransformProps.java | 6 ++ .../FrequentS3PutTransformProps.java | 35 +++++++---- .../transform/s3put/S3PutTransformProps.java | 23 ++++--- .../FrequentPutEventTransformHandler.java | 10 +++ .../s3put/PutEventTransformHandler.java | 38 +++++++----- .../lambda/workload/s3copy/S3CopyProps.java | 32 +--------- .../s3copy/S3CopyArcEventHandler.java | 1 - 18 files changed, 272 insertions(+), 76 deletions(-) create mode 100644 clusterless-model/src/main/java/clusterless/model/deploy/partial/PathFilter.java create mode 100644 clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet create mode 100644 clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet rename clusterless-scenario/src/main/cls/scenarios/{s3-frequent => s3-frequent-filter-copy}/s3-copy-arc.jsonnet (84%) rename clusterless-scenario/src/main/cls/scenarios/{s3-frequent => s3-frequent-filter-copy}/scenario.jsonnet (95%) diff --git a/clusterless-model/src/main/java/clusterless/model/deploy/partial/PathFilter.java b/clusterless-model/src/main/java/clusterless/model/deploy/partial/PathFilter.java new file mode 100644 index 00000000..6c884b53 --- /dev/null +++ b/clusterless-model/src/main/java/clusterless/model/deploy/partial/PathFilter.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.model.deploy.partial; + +import clusterless.model.Struct; + +import java.util.ArrayList; +import java.util.List; + +public class PathFilter implements Struct { + List includes = new ArrayList<>(); + List excludes = new ArrayList<>(); + char pathSeparator = '/'; + boolean ignoreCase = false; + + public List includes() { + return includes; + } + + public List excludes() { + return excludes; + } + + public char pathSeparator() { + return pathSeparator; + } + + public boolean ignoreCase() { + return ignoreCase; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Filter{"); + sb.append("includes=").append(includes); + sb.append(", excludes=").append(excludes); + sb.append(", pathSeparator=").append(pathSeparator); + sb.append(", ignoreCase=").append(ignoreCase); + sb.append('}'); + return sb.toString(); + } +} diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet new file mode 100644 index 00000000..128541c6 --- /dev/null +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet @@ -0,0 +1,62 @@ +local stage = std.extVar('scenario.stage'); +local account = std.extVar('scenario.aws.account'); +local region = std.extVar('scenario.aws.region'); +local bucketName = 'clusterless-frequent-filter-boundary-test-' + account + '-' + region; +local bucketPrefix = 's3://' + bucketName; +local unit = 'Twelfths'; + +{ + project: { + name: 'S3FreqFiltBndy', + version: '20230101-00', + }, + placement: { + stage: stage, + provider: 'aws', + account: account, + region: region, + }, + resources: [ + { + type: 'aws:core:s3Bucket', + name: 'bucket', + bucketName: bucketName, + }, + ], + boundaries: [ + { + type: 'aws:core:s3PutListenerBoundary', + name: 'FreqPutListener', + eventArrival: 'frequent', + dataset: { + name: 'ingress-frequent-boundary', + version: '20220101', + pathURI: bucketPrefix + '/ingress/', + }, + lotUnit: unit, + filter: { + excludes: ['**/_*'], + }, + }, + ], + arcs: [ + { + type: 'aws:core:s3CopyArc', + name: 'copyA', + sources: { + main: { + name: 'ingress-frequent-boundary', + version: '20220101', + pathURI: bucketPrefix + '/ingress/', + }, + }, + sinks: { + main: { + name: 'copy-a-frequent-boundary', + version: '20230101', + pathURI: bucketPrefix + '/copy-a/', + }, + }, + }, + ], +} diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet new file mode 100644 index 00000000..431ff162 --- /dev/null +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet @@ -0,0 +1,34 @@ +local project = import 's3-copy-arc.jsonnet'; + +{ + name: 's3-frequent-filter-boundary', + description: 'copy workload with frequent arrivals', + projectFiles: [ + 's3-copy-arc.json', + ], + placements: [ + project.placement, + ], + ingressStores: [ + { + region: project.placement.region, + path: project.arcs[0].sources.main.pathURI, + uploadDelaySec: 15, + objectCount: 60/15 * 5 * 3, + }, + { + region: project.placement.region, + path: project.arcs[0].sources.main.pathURI, + uploadDelaySec: 15, + objectCount: 60/15 * 5 * 3, + objectName: '_SUCCESS-%04d-%d.txt', + }, + ], + watchedStores: [ + { + region: project.placement.region, + path: project.arcs[0].sinks.main.pathURI, + objectCount: 60/15 * 5 * 3, + }, + ], +} diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet similarity index 84% rename from clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet rename to clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet index a785f58d..94183b10 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet @@ -1,13 +1,13 @@ local stage = std.extVar('scenario.stage'); local account = std.extVar('scenario.aws.account'); local region = std.extVar('scenario.aws.region'); -local bucketName = 'clusterless-frequent-test-' + account + '-' + region; +local bucketName = 'clusterless-frequent-filter-copy-test-' + account + '-' + region; local bucketPrefix = 's3://' + bucketName; local unit = 'Twelfths'; { project: { - name: 'S3Freq', + name: 'S3FreqFiltCp', version: '20230101-00', }, placement: { @@ -29,7 +29,7 @@ local unit = 'Twelfths'; name: 'FreqPutListener', eventArrival: 'frequent', dataset: { - name: 'ingress-frequent', + name: 'ingress-frequent-copy', version: '20220101', pathURI: bucketPrefix + '/ingress/', }, @@ -42,14 +42,14 @@ local unit = 'Twelfths'; name: 'copyA', sources: { main: { - name: 'ingress-frequent', + name: 'ingress-frequent-copy', version: '20220101', pathURI: bucketPrefix + '/ingress/', }, }, sinks: { main: { - name: 'copy-a-frequent', + name: 'copy-a-frequent-copy', version: '20230101', pathURI: bucketPrefix + '/copy-a/', }, diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet similarity index 95% rename from clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet rename to clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet index bce6f774..12a9f846 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet @@ -1,7 +1,7 @@ local project = import 's3-copy-arc.jsonnet'; { - name: 's3-frequent', + name: 's3-frequent-filter-copy', description: 'copy workload with frequent arrivals', projectFiles: [ 's3-copy-arc.json', diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java index 33bb602f..59795df6 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java @@ -20,12 +20,17 @@ synopsis = "The AWS S3 Copy Arc component copies data from one S3 bucket to another S3 bucket.", description = """ All data in the source manifest will be copied to the specified dataset, except those paths that do - not pass the filter. The filter is a list of include and exclude patterns. + not pass the filter, if given. The filter is a list of include and exclude patterns. + + Where: + - ? matches one character + - * matches zero or more characters + - ** matches zero or more directories in a path A common exclude pattern would be '**/_*'. This would exclude all files that start with an underscore, like '_SUCCESS' or '_metadata'. - - workloadProps.filter.includes: A list of include patterns. + + workloadProps.filter.includes: A list of include patterns. Default is '**'. workloadProps.filter.excludes: A list of exclude patterns. workloadProps.filter.pathSeparator: The path separator to use when matching patterns. Default is '/'. workloadProps.filter.ignoreCase: Whether to ignore case when matching patterns. Default is false. diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/FrequentS3PutStrategyBoundaryConstruct.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/FrequentS3PutStrategyBoundaryConstruct.java index 7a351d93..b1d5c4ae 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/FrequentS3PutStrategyBoundaryConstruct.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/FrequentS3PutStrategyBoundaryConstruct.java @@ -95,6 +95,7 @@ public FrequentS3PutStrategyBoundaryConstruct(@NotNull ManagedComponentContext c .withManifestCompletePath(manifestComplete) .withManifestPartialPath(manifestPartial) .withLotUnit(model.lotUnit()) + .withFilter(model().filter()) .build(); Map environment = Env.toEnv(transformProps); diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/InfrequentS3PutStrategyBoundaryConstruct.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/InfrequentS3PutStrategyBoundaryConstruct.java index 61228bbd..2603e3fe 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/InfrequentS3PutStrategyBoundaryConstruct.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/InfrequentS3PutStrategyBoundaryConstruct.java @@ -83,6 +83,7 @@ public InfrequentS3PutStrategyBoundaryConstruct(@NotNull ManagedComponentContext .withLotUnit(model.lotUnit()) .withLotSource(model().infrequent().lotSource()) .withKeyRegex(model().infrequent().keyRegex()) + .withFilter(model().filter()) .build(); Map environment = Env.toEnv(transformProps); diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundary.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundary.java index 99cf4578..be8e2692 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundary.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundary.java @@ -10,6 +10,7 @@ import clusterless.lambda.transform.s3put.EventArrival; import clusterless.model.deploy.IngressBoundary; +import clusterless.model.deploy.partial.PathFilter; import clusterless.substrate.aws.props.LambdaJavaRuntimeProps; import clusterless.substrate.aws.props.Memory; @@ -26,6 +27,8 @@ public class S3PutListenerBoundary extends IngressBoundary { Infrequent infrequent = new Infrequent(); Frequent frequent = new Frequent(); + PathFilter filter = new PathFilter(); + public S3PutListenerBoundary() { } @@ -44,4 +47,8 @@ public Infrequent infrequent() { public Frequent frequent() { return frequent; } + + public PathFilter filter() { + return filter; + } } diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundaryProvider.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundaryProvider.java index 451ff74c..47a9dde6 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundaryProvider.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/boundary/s3put/S3PutListenerBoundaryProvider.java @@ -40,13 +40,32 @@ infrequent.enableEventBridge: true|false true will enable event bridge on the bucket if the bucket was not declared with eventBridgeNotification enabled, it must be set here - + frequent.queueFetchWaitSec: seconds The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. It is recommended to leave this value at zero (0). + + For frequently arriving events, all paths are collected until the end of the interval, except those + paths that do not pass the filter, if given. The filter is a list of include and exclude patterns. + + Where: + - ? matches one character + - * matches zero or more characters + - ** matches zero or more directories in a path + + A common exclude pattern would be '**/_*'. This would exclude all files that start with an underscore, + like '_SUCCESS' or '_metadata'. + + frequent.filter.includes: A list of include patterns. + Default is '**'. + frequent.filter.excludes: A list of exclude patterns. + frequent.filter.pathSeparator: The path separator to use when matching patterns. + Default is '/'. + frequent.filter.ignoreCase: Whether to ignore case when matching patterns. + Default is false. """ ) public class S3PutListenerBoundaryProvider implements BoundaryComponentService { diff --git a/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java b/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java index 014f71f5..ae197498 100644 --- a/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java +++ b/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java @@ -10,6 +10,7 @@ import org.jetbrains.annotations.NotNull; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.function.Predicate; @@ -67,11 +68,14 @@ private Predicate predicate(String pattern, AntPathMatcher matcher) { return path -> matcher.match(pattern, path.subSequence(this.path.length(), path.length())); } + public boolean keep(URI uri) { + return keep(uri.getPath()); + } + public boolean keep(String path) { return predicate.test(path); } - public static final class Builder { protected String path; protected char pathSeparator; diff --git a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/TransformProps.java b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/TransformProps.java index 30d84880..fcc72c52 100644 --- a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/TransformProps.java +++ b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/TransformProps.java @@ -10,6 +10,7 @@ import clusterless.model.Struct; import clusterless.model.deploy.Dataset; +import clusterless.model.deploy.partial.PathFilter; import clusterless.substrate.uri.ManifestURI; import com.fasterxml.jackson.annotation.JsonProperty; @@ -23,6 +24,7 @@ public class TransformProps implements Struct { @JsonProperty(required = true) protected Dataset dataset; protected String eventBusName; + protected PathFilter filter; public String lotUnit() { return lotUnit; @@ -43,4 +45,8 @@ public Dataset dataset() { public String eventBusName() { return eventBusName; } + + public PathFilter filter() { + return filter; + } } diff --git a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/frequents3put/FrequentS3PutTransformProps.java b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/frequents3put/FrequentS3PutTransformProps.java index 90f03ef6..61cf3a53 100644 --- a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/frequents3put/FrequentS3PutTransformProps.java +++ b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/frequents3put/FrequentS3PutTransformProps.java @@ -10,9 +10,11 @@ import clusterless.lambda.transform.TransformProps; import clusterless.model.deploy.Dataset; +import clusterless.model.deploy.partial.PathFilter; import clusterless.substrate.uri.ManifestURI; public class FrequentS3PutTransformProps extends TransformProps { + protected String sqsQueueName; protected int sqsWaitTimeSeconds = 0; @@ -30,15 +32,17 @@ public int sqsWaitTimeSeconds() { @Override public String toString() { - String sb = "FrequentS3PutTransformProps{" + "sqsQueueName='" + sqsQueueName + '\'' + - ", sqsWaitTimeSeconds=" + sqsWaitTimeSeconds + - ", lotUnit='" + lotUnit + '\'' + - ", manifestCompletePath=" + manifestCompletePath + - ", manifestPartialPath=" + manifestPartialPath + - ", dataset=" + dataset + - ", eventBusName='" + eventBusName + '\'' + - '}'; - return sb; + final StringBuilder sb = new StringBuilder("FrequentS3PutTransformProps{"); + sb.append("sqsQueueName='").append(sqsQueueName).append('\''); + sb.append(", sqsWaitTimeSeconds=").append(sqsWaitTimeSeconds); + sb.append(", lotUnit='").append(lotUnit).append('\''); + sb.append(", manifestCompletePath=").append(manifestCompletePath); + sb.append(", manifestPartialPath=").append(manifestPartialPath); + sb.append(", dataset=").append(dataset); + sb.append(", eventBusName='").append(eventBusName).append('\''); + sb.append(", filter=").append(filter); + sb.append('}'); + return sb.toString(); } public static final class Builder { @@ -47,6 +51,7 @@ public static final class Builder { protected ManifestURI manifestPartialPath; protected Dataset dataset; protected String eventBusName; + protected PathFilter filter; protected String sqsQueueName; protected int sqsWaitTimeSeconds = 0; @@ -82,6 +87,11 @@ public Builder withEventBusName(String eventBusName) { return this; } + public Builder withFilter(PathFilter filter) { + this.filter = filter; + return this; + } + public Builder withSqsQueueName(String sqsQueueName) { this.sqsQueueName = sqsQueueName; return this; @@ -94,12 +104,13 @@ public Builder withSqsWaitTimeSeconds(int sqsWaitTimeSeconds) { public FrequentS3PutTransformProps build() { FrequentS3PutTransformProps frequentS3PutTransformProps = new FrequentS3PutTransformProps(); + frequentS3PutTransformProps.filter = this.filter; frequentS3PutTransformProps.dataset = this.dataset; - frequentS3PutTransformProps.sqsQueueName = this.sqsQueueName; - frequentS3PutTransformProps.manifestCompletePath = this.manifestCompletePath; frequentS3PutTransformProps.lotUnit = this.lotUnit; - frequentS3PutTransformProps.sqsWaitTimeSeconds = this.sqsWaitTimeSeconds; + frequentS3PutTransformProps.manifestCompletePath = this.manifestCompletePath; frequentS3PutTransformProps.manifestPartialPath = this.manifestPartialPath; + frequentS3PutTransformProps.sqsQueueName = this.sqsQueueName; + frequentS3PutTransformProps.sqsWaitTimeSeconds = this.sqsWaitTimeSeconds; frequentS3PutTransformProps.eventBusName = this.eventBusName; return frequentS3PutTransformProps; } diff --git a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/s3put/S3PutTransformProps.java b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/s3put/S3PutTransformProps.java index 1127283b..dcd0c322 100644 --- a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/s3put/S3PutTransformProps.java +++ b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/lambda/transform/s3put/S3PutTransformProps.java @@ -10,6 +10,7 @@ import clusterless.lambda.transform.TransformProps; import clusterless.model.deploy.Dataset; +import clusterless.model.deploy.partial.PathFilter; import clusterless.substrate.uri.ManifestURI; import com.fasterxml.jackson.annotation.JsonProperty; @@ -17,10 +18,8 @@ * */ public class S3PutTransformProps extends TransformProps { - @JsonProperty(required = true) LotSource lotSource; - String keyRegex; public S3PutTransformProps() { @@ -40,14 +39,15 @@ public String keyRegex() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("TransformProps{"); - sb.append("lotUnit='").append(lotUnit).append('\''); - sb.append(", lotSource=").append(lotSource); + final StringBuilder sb = new StringBuilder("S3PutTransformProps{"); + sb.append("lotSource=").append(lotSource); sb.append(", keyRegex='").append(keyRegex).append('\''); + sb.append(", lotUnit='").append(lotUnit).append('\''); sb.append(", manifestCompletePath=").append(manifestCompletePath); sb.append(", manifestPartialPath=").append(manifestPartialPath); sb.append(", dataset=").append(dataset); sb.append(", eventBusName='").append(eventBusName).append('\''); + sb.append(", filter=").append(filter); sb.append('}'); return sb.toString(); } @@ -58,6 +58,7 @@ public static final class Builder { protected ManifestURI manifestPartialPath; protected Dataset dataset; protected String eventBusName; + protected PathFilter filter; LotSource lotSource; String keyRegex; @@ -93,6 +94,11 @@ public Builder withEventBusName(String eventBusName) { return this; } + public Builder withFilter(PathFilter filter) { + this.filter = filter; + return this; + } + public Builder withLotSource(LotSource lotSource) { this.lotSource = lotSource; return this; @@ -105,13 +111,14 @@ public Builder withKeyRegex(String keyRegex) { public S3PutTransformProps build() { S3PutTransformProps s3PutTransformProps = new S3PutTransformProps(); - s3PutTransformProps.lotUnit = this.lotUnit; - s3PutTransformProps.manifestCompletePath = this.manifestCompletePath; - s3PutTransformProps.eventBusName = this.eventBusName; + s3PutTransformProps.filter = this.filter; s3PutTransformProps.lotSource = this.lotSource; s3PutTransformProps.dataset = this.dataset; s3PutTransformProps.keyRegex = this.keyRegex; + s3PutTransformProps.lotUnit = this.lotUnit; + s3PutTransformProps.manifestCompletePath = this.manifestCompletePath; s3PutTransformProps.manifestPartialPath = this.manifestPartialPath; + s3PutTransformProps.eventBusName = this.eventBusName; return s3PutTransformProps; } } diff --git a/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandler.java b/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandler.java index 9b05751d..fab89f47 100644 --- a/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandler.java +++ b/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandler.java @@ -12,6 +12,7 @@ import clusterless.lambda.arc.ArcNotifyEventPublisher; import clusterless.lambda.manifest.ManifestWriter; import clusterless.lambda.transform.json.event.AWSEvent; +import clusterless.lambda.util.PathMatcher; import clusterless.model.UriType; import clusterless.substrate.aws.sdk.S3; import clusterless.substrate.aws.sdk.SQS; @@ -62,6 +63,14 @@ public class FrequentPutEventTransformHandler extends EventHandler !isEmpty(uri.getPath()) && uri.getPath().charAt(uri.getPath().length() - 1) != '/') // only retain files .peek(eventObserver::applyIdentifierURI) + .filter(pathMatcher::keep) .forEach(uris::add); } diff --git a/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/s3put/PutEventTransformHandler.java b/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/s3put/PutEventTransformHandler.java index d5fe68c7..d5ae90a2 100644 --- a/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/s3put/PutEventTransformHandler.java +++ b/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/lambda/transform/s3put/PutEventTransformHandler.java @@ -12,6 +12,7 @@ import clusterless.lambda.arc.ArcNotifyEventPublisher; import clusterless.lambda.manifest.ManifestWriter; import clusterless.lambda.transform.json.object.AWSEvent; +import clusterless.lambda.util.PathMatcher; import clusterless.model.UriType; import clusterless.substrate.aws.sdk.S3; import clusterless.temporal.IntervalBuilder; @@ -22,6 +23,7 @@ import java.net.URI; import java.time.OffsetDateTime; +import java.util.Collections; import java.util.List; /** @@ -48,6 +50,14 @@ public class PutEventTransformHandler extends EventHandler intervalBuilder.truncateAndFormat(time); + case objectModifiedTime -> fromModifiedTime(identifier); + case keyTimestampRegex -> null; + }; eventObserver.applyLotId(lotId); - List uris = List.of(identifier); + List uris = Collections.emptyList(); + + if (pathMatcher.keep(key)) { + uris = List.of(identifier); + } eventObserver.applyDatasetItemsSize(uris.size()); - URI manifestURI = manifestWriter.writeSuccessManifest(uris, lotId); + URI manifestURI = uris.isEmpty() ? + manifestWriter.writeEmptyManifest(lotId) : manifestWriter.writeSuccessManifest(uris, lotId); eventObserver.applyManifestURI(manifestURI); diff --git a/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java b/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java index 212a87db..d8bc2e3e 100644 --- a/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java +++ b/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java @@ -8,41 +8,15 @@ package clusterless.lambda.workload.s3copy; -import clusterless.model.Struct; import clusterless.model.deploy.WorkloadProps; - -import java.util.ArrayList; -import java.util.List; +import clusterless.model.deploy.partial.PathFilter; public class S3CopyProps extends WorkloadProps { - public static class Filter implements Struct { - List includes = new ArrayList<>(); - List excludes = new ArrayList<>(); - char pathSeparator = '/'; - boolean ignoreCase = false; - - public List includes() { - return includes; - } - - public List excludes() { - return excludes; - } - - public char pathSeparator() { - return pathSeparator; - } - - public boolean ignoreCase() { - return ignoreCase; - } - } - - Filter filter = new Filter(); + PathFilter filter = new PathFilter(); float failArcOnPartialPercent = 0f; - public Filter filter() { + public PathFilter filter() { return filter; } diff --git a/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java b/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java index 5b9668dc..47c8baf1 100644 --- a/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java +++ b/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java @@ -48,7 +48,6 @@ public class S3CopyArcEventHandler extends ArcEventHandler { arcProps().sinkManifestTemplates(), UriType.identifier ); - protected PathMatcher.Builder pathMatcher = PathMatcher.builder() .withPathSeparator(arcProps().workloadProps().filter().pathSeparator()) .withIgnoreCase(arcProps().workloadProps().filter().ignoreCase())