Skip to content

Commit

Permalink
add include/exclude filter to S3PutListenerBoundary
Browse files Browse the repository at this point in the history
update scenarios to test filtering
  • Loading branch information
cwensel committed Oct 4, 2023
1 parent 6c658be commit 5219fff
Show file tree
Hide file tree
Showing 18 changed files with 272 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package clusterless.model.deploy.partial;

import clusterless.model.Struct;

import java.util.ArrayList;
import java.util.List;

public class PathFilter implements Struct {
List<String> includes = new ArrayList<>();
List<String> excludes = new ArrayList<>();
char pathSeparator = '/';
boolean ignoreCase = false;

public List<String> includes() {
return includes;
}

public List<String> excludes() {
return excludes;
}

public char pathSeparator() {
return pathSeparator;
}

public boolean ignoreCase() {
return ignoreCase;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Filter{");
sb.append("includes=").append(includes);
sb.append(", excludes=").append(excludes);
sb.append(", pathSeparator=").append(pathSeparator);
sb.append(", ignoreCase=").append(ignoreCase);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
local stage = std.extVar('scenario.stage');
local account = std.extVar('scenario.aws.account');
local region = std.extVar('scenario.aws.region');
local bucketName = 'clusterless-frequent-filter-boundary-test-' + account + '-' + region;
local bucketPrefix = 's3://' + bucketName;
local unit = 'Twelfths';

{
project: {
name: 'S3FreqFiltBndy',
version: '20230101-00',
},
placement: {
stage: stage,
provider: 'aws',
account: account,
region: region,
},
resources: [
{
type: 'aws:core:s3Bucket',
name: 'bucket',
bucketName: bucketName,
},
],
boundaries: [
{
type: 'aws:core:s3PutListenerBoundary',
name: 'FreqPutListener',
eventArrival: 'frequent',
dataset: {
name: 'ingress-frequent-boundary',
version: '20220101',
pathURI: bucketPrefix + '/ingress/',
},
lotUnit: unit,
filter: {
excludes: ['**/_*'],
},
},
],
arcs: [
{
type: 'aws:core:s3CopyArc',
name: 'copyA',
sources: {
main: {
name: 'ingress-frequent-boundary',
version: '20220101',
pathURI: bucketPrefix + '/ingress/',
},
},
sinks: {
main: {
name: 'copy-a-frequent-boundary',
version: '20230101',
pathURI: bucketPrefix + '/copy-a/',
},
},
},
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
local project = import 's3-copy-arc.jsonnet';

{
name: 's3-frequent-filter-boundary',
description: 'copy workload with frequent arrivals',
projectFiles: [
's3-copy-arc.json',
],
placements: [
project.placement,
],
ingressStores: [
{
region: project.placement.region,
path: project.arcs[0].sources.main.pathURI,
uploadDelaySec: 15,
objectCount: 60/15 * 5 * 3,
},
{
region: project.placement.region,
path: project.arcs[0].sources.main.pathURI,
uploadDelaySec: 15,
objectCount: 60/15 * 5 * 3,
objectName: '_SUCCESS-%04d-%d.txt',
},
],
watchedStores: [
{
region: project.placement.region,
path: project.arcs[0].sinks.main.pathURI,
objectCount: 60/15 * 5 * 3,
},
],
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
local stage = std.extVar('scenario.stage');
local account = std.extVar('scenario.aws.account');
local region = std.extVar('scenario.aws.region');
local bucketName = 'clusterless-frequent-test-' + account + '-' + region;
local bucketName = 'clusterless-frequent-filter-copy-test-' + account + '-' + region;
local bucketPrefix = 's3://' + bucketName;
local unit = 'Twelfths';

{
project: {
name: 'S3Freq',
name: 'S3FreqFiltCp',
version: '20230101-00',
},
placement: {
Expand All @@ -29,7 +29,7 @@ local unit = 'Twelfths';
name: 'FreqPutListener',
eventArrival: 'frequent',
dataset: {
name: 'ingress-frequent',
name: 'ingress-frequent-copy',
version: '20220101',
pathURI: bucketPrefix + '/ingress/',
},
Expand All @@ -42,14 +42,14 @@ local unit = 'Twelfths';
name: 'copyA',
sources: {
main: {
name: 'ingress-frequent',
name: 'ingress-frequent-copy',
version: '20220101',
pathURI: bucketPrefix + '/ingress/',
},
},
sinks: {
main: {
name: 'copy-a-frequent',
name: 'copy-a-frequent-copy',
version: '20230101',
pathURI: bucketPrefix + '/copy-a/',
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local project = import 's3-copy-arc.jsonnet';

{
name: 's3-frequent',
name: 's3-frequent-filter-copy',
description: 'copy workload with frequent arrivals',
projectFiles: [
's3-copy-arc.json',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
synopsis = "The AWS S3 Copy Arc component copies data from one S3 bucket to another S3 bucket.",
description = """
All data in the source manifest will be copied to the specified dataset, except those paths that do
not pass the filter. The filter is a list of include and exclude patterns.
not pass the filter, if given. The filter is a list of include and exclude patterns.
Where:
- ? matches one character
- * matches zero or more characters
- ** matches zero or more directories in a path
A common exclude pattern would be '**/_*'. This would exclude all files that start with an underscore,
like '_SUCCESS' or '_metadata'.
workloadProps.filter.includes: A list of include patterns.
workloadProps.filter.includes: A list of include patterns. Default is '**'.
workloadProps.filter.excludes: A list of exclude patterns.
workloadProps.filter.pathSeparator: The path separator to use when matching patterns. Default is '/'.
workloadProps.filter.ignoreCase: Whether to ignore case when matching patterns. Default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public FrequentS3PutStrategyBoundaryConstruct(@NotNull ManagedComponentContext c
.withManifestCompletePath(manifestComplete)
.withManifestPartialPath(manifestPartial)
.withLotUnit(model.lotUnit())
.withFilter(model().filter())
.build();

Map<String, String> environment = Env.toEnv(transformProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public InfrequentS3PutStrategyBoundaryConstruct(@NotNull ManagedComponentContext
.withLotUnit(model.lotUnit())
.withLotSource(model().infrequent().lotSource())
.withKeyRegex(model().infrequent().keyRegex())
.withFilter(model().filter())
.build();

Map<String, String> environment = Env.toEnv(transformProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import clusterless.lambda.transform.s3put.EventArrival;
import clusterless.model.deploy.IngressBoundary;
import clusterless.model.deploy.partial.PathFilter;
import clusterless.substrate.aws.props.LambdaJavaRuntimeProps;
import clusterless.substrate.aws.props.Memory;

Expand All @@ -26,6 +27,8 @@ public class S3PutListenerBoundary extends IngressBoundary {
Infrequent infrequent = new Infrequent();
Frequent frequent = new Frequent();

PathFilter filter = new PathFilter();

public S3PutListenerBoundary() {
}

Expand All @@ -44,4 +47,8 @@ public Infrequent infrequent() {
public Frequent frequent() {
return frequent;
}

public PathFilter filter() {
return filter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,32 @@
infrequent.enableEventBridge: true|false
true will enable event bridge on the bucket
if the bucket was not declared with eventBridgeNotification enabled, it must be set here
frequent.queueFetchWaitSec: seconds
The duration (in seconds) for which the call waits for a message to arrive in the queue
before returning. If a message is available, the call returns sooner than WaitTimeSeconds.
If no messages are available and the wait time expires, the call returns successfully with an
empty list of messages.
It is recommended to leave this value at zero (0).
For frequently arriving events, all paths are collected until the end of the interval, except those
paths that do not pass the filter, if given. The filter is a list of include and exclude patterns.
Where:
- ? matches one character
- * matches zero or more characters
- ** matches zero or more directories in a path
A common exclude pattern would be '**/_*'. This would exclude all files that start with an underscore,
like '_SUCCESS' or '_metadata'.
frequent.filter.includes: A list of include patterns.
Default is '**'.
frequent.filter.excludes: A list of exclude patterns.
frequent.filter.pathSeparator: The path separator to use when matching patterns.
Default is '/'.
frequent.filter.ignoreCase: Whether to ignore case when matching patterns.
Default is false.
"""
)
public class S3PutListenerBoundaryProvider implements BoundaryComponentService<ManagedComponentContext, S3PutListenerBoundary, S3PutListenerBoundaryConstruct> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
Expand Down Expand Up @@ -67,11 +68,14 @@ private Predicate<String> predicate(String pattern, AntPathMatcher matcher) {
return path -> matcher.match(pattern, path.subSequence(this.path.length(), path.length()));
}

public boolean keep(URI uri) {
return keep(uri.getPath());
}

public boolean keep(String path) {
return predicate.test(path);
}


public static final class Builder {
protected String path;
protected char pathSeparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import clusterless.model.Struct;
import clusterless.model.deploy.Dataset;
import clusterless.model.deploy.partial.PathFilter;
import clusterless.substrate.uri.ManifestURI;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -23,6 +24,7 @@ public class TransformProps implements Struct {
@JsonProperty(required = true)
protected Dataset dataset;
protected String eventBusName;
protected PathFilter filter;

public String lotUnit() {
return lotUnit;
Expand All @@ -43,4 +45,8 @@ public Dataset dataset() {
public String eventBusName() {
return eventBusName;
}

public PathFilter filter() {
return filter;
}
}
Loading

0 comments on commit 5219fff

Please sign in to comment.