diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet index 87f0c651..c87e56a9 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet @@ -2,7 +2,7 @@ local stage = std.extVar('scenario.stage'); local account = std.extVar('scenario.aws.account'); local region = std.extVar('scenario.aws.region'); local bucketName = 'clusterless-chain-test-' + account + '-' + region; -local bucketPrefix = 's3://'+bucketName; +local bucketPrefix = 's3://' + bucketName; local unit = 'Twelfths'; { @@ -30,7 +30,7 @@ local unit = 'Twelfths'; dataset: { name: 'ingress-chain', version: '20220101', - pathURI: bucketPrefix+'/ingress/', + pathURI: bucketPrefix + '/ingress/', }, lotUnit: unit, }, @@ -43,14 +43,14 @@ local unit = 'Twelfths'; main: { name: 'ingress-chain', version: '20220101', - pathURI: bucketPrefix+'/ingress/', + pathURI: bucketPrefix + '/ingress/', }, }, sinks: { main: { name: 'copy-a-chain', version: '20230101', - pathURI: bucketPrefix+'/copy-a/', + pathURI: bucketPrefix + '/copy-a/', }, }, }, @@ -61,14 +61,14 @@ local unit = 'Twelfths'; main: { name: 'copy-a-chain', version: '20230101', - pathURI: bucketPrefix+'/copy-a/', + pathURI: bucketPrefix + '/copy-a/', }, }, sinks: { main: { name: 'copy-b-chain', version: '20230101', - pathURI: bucketPrefix+'/copy-b/', + pathURI: bucketPrefix + '/copy-b/', }, }, workload: { @@ -84,14 +84,14 @@ local unit = 'Twelfths'; main: { name: 'copy-b-chain', version: '20230101', - pathURI: bucketPrefix+'/copy-b/', + pathURI: bucketPrefix + '/copy-b/', }, }, sinks: { main: { name: 'copy-c-chain', version: '20230101', - pathURI: bucketPrefix+'/copy-c/', + pathURI: bucketPrefix + '/copy-c/', }, }, }, diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet index 55549d09..a785f58d 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent/s3-copy-arc.jsonnet @@ -54,6 +54,13 @@ local unit = 'Twelfths'; pathURI: bucketPrefix + '/copy-a/', }, }, + workload: { + workloadProps: { + filter: { + excludes: ['**/_*'], + }, + }, + }, }, ], } diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet index 7f0514a0..bce6f774 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent/scenario.jsonnet @@ -16,6 +16,13 @@ local project = import 's3-copy-arc.jsonnet'; uploadDelaySec: 15, objectCount: 60/15 * 5 * 3, }, + { + region: project.placement.region, + path: project.arcs[0].sources.main.pathURI, + uploadDelaySec: 15, + objectCount: 60/15 * 5 * 3, + objectName: '_SUCCESS-%04d-%d.txt', + }, ], watchedStores: [ { diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java index e0291e2b..33bb602f 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/s3copy/S3CopyArcProvider.java @@ -19,7 +19,16 @@ type = "aws:core:s3CopyArc", synopsis = "The AWS S3 Copy Arc component copies data from one S3 bucket to another S3 bucket.", description = """ - All data in the source manifest will be copied to the specified dataset. + All data in the source manifest will be copied to the specified dataset, except those paths that do + not pass the filter. The filter is a list of include and exclude patterns. + + A common exclude pattern would be '**/_*'. This would exclude all files that start with an underscore, + like '_SUCCESS' or '_metadata'. + + workloadProps.filter.includes: A list of include patterns. + workloadProps.filter.excludes: A list of exclude patterns. + workloadProps.filter.pathSeparator: The path separator to use when matching patterns. Default is '/'. + workloadProps.filter.ignoreCase: Whether to ignore case when matching patterns. Default is false. workloadProps.failArcOnPartialPercent: The percentage of files that can fail before the Arc fails. Default is 0.0. """ diff --git a/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/AntPathMatcher.java b/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/AntPathMatcher.java new file mode 100644 index 00000000..aff8bf94 --- /dev/null +++ b/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/AntPathMatcher.java @@ -0,0 +1,484 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.lambda.util; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class AntPathMatcher { + private static final String[] EMPTY_STRING_ARRAY = {}; + + /** + * Default path separator: "/". + */ + public static final char DEFAULT_PATH_SEPARATOR = '/'; + + + private static final int CACHE_TURNOFF_THRESHOLD = 65536; + + private static final char[] WILDCARD_CHARS = {'*', '?', '{'}; + + private char pathSeparator; + + private boolean caseSensitive = true; + + private boolean trimTokens = false; + + private volatile Boolean cachePatterns; + + private final Map tokenizedPatternCache = new ConcurrentHashMap<>(256); + + final Map stringMatcherCache = new ConcurrentHashMap<>(256); + + /** + * Create a new instance with the {@link #DEFAULT_PATH_SEPARATOR}. + */ + public AntPathMatcher() { + this.pathSeparator = DEFAULT_PATH_SEPARATOR; + } + + /** + * A convenient, alternative constructor to use with a custom path separator. + * + * @param pathSeparator the path separator to use, must not be {@code null}. + */ + public AntPathMatcher(char pathSeparator) { + Objects.requireNonNull(pathSeparator, "'pathSeparator' is required"); + this.pathSeparator = pathSeparator; + } + + /** + * Specify whether to cache parsed pattern metadata for patterns passed + * into this matcher's {@link #match} method. A value of {@code true} + * activates an unlimited pattern cache; a value of {@code false} turns + * the pattern cache off completely. + *

Default is for the cache to be on, but with the variant to automatically + * turn it off when encountering too many patterns to cache at runtime + * (the threshold is 65536), assuming that arbitrary permutations of patterns + * are coming in, with little chance for encountering a recurring pattern. + * + * @see #getStringMatcher(String) + */ + public void setCachePatterns(boolean cachePatterns) { + this.cachePatterns = cachePatterns; + } + + public AntPathMatcher setCaseSensitive(boolean caseSensitive) { + this.caseSensitive = caseSensitive; + return this; + } + + private void deactivatePatternCache() { + this.cachePatterns = false; + this.tokenizedPatternCache.clear(); + this.stringMatcherCache.clear(); + } + + + public boolean match(String pattern, CharSequence path) { + return doMatch(pattern, path, true, null); + } + + + /** + * Actually match the given {@code path} against the given {@code pattern}. + * + * @param pattern the pattern to match against + * @param path the path to test + * @param fullMatch whether a full pattern match is required (else a pattern match + * as far as the given base path goes is sufficient) + * @return {@code true} if the supplied {@code path} matched, {@code false} if it didn't + */ + protected boolean doMatch(String pattern, CharSequence path, boolean fullMatch, Map uriTemplateVariables) { + + if (path == null) { + return false; + } + + if (path.isEmpty() && pattern.isEmpty()) { + return true; + } + + if ((path.charAt(0) == pathSeparator) != (pattern.charAt(0) == pathSeparator)) { + return false; + } + + String[] pattDirs = tokenizePattern(pattern); + if (fullMatch && this.caseSensitive && !isPotentialMatch(path, pattDirs)) { + return false; + } + + String[] pathDirs = tokenizePath(path); + int pattIdxStart = 0; + int pattIdxEnd = pattDirs.length - 1; + int pathIdxStart = 0; + int pathIdxEnd = pathDirs.length - 1; + + // Match all elements up to the first ** + while (pattIdxStart <= pattIdxEnd && pathIdxStart <= pathIdxEnd) { + String pattDir = pattDirs[pattIdxStart]; + if ("**".equals(pattDir)) { + break; + } + if (!matchStrings(pattDir, pathDirs[pathIdxStart], uriTemplateVariables)) { + return false; + } + pattIdxStart++; + pathIdxStart++; + } + + if (pathIdxStart > pathIdxEnd) { + // Path is exhausted, only match if rest of pattern is * or **'s + if (pattIdxStart > pattIdxEnd) { +// return (pattern.endsWith(this.pathSeparator) == path.endsWith(this.pathSeparator)); + return ((pattern.charAt(pattern.length() - 1) == pathSeparator) == (path.charAt(path.length() - 1) == pathSeparator)); + } + if (!fullMatch) { + return true; + } + if (pattIdxStart == pattIdxEnd && pattDirs[pattIdxStart].equals("*") && (path.charAt(path.length() - 1) == pathSeparator)) { + return true; + } + for (int i = pattIdxStart; i <= pattIdxEnd; i++) { + if (!pattDirs[i].equals("**")) { + return false; + } + } + return true; + } else if (pattIdxStart > pattIdxEnd) { + // String not exhausted, but pattern is. Failure. + return false; + } else if (!fullMatch && "**".equals(pattDirs[pattIdxStart])) { + // Path start definitely matches due to "**" part in pattern. + return true; + } + + // up to last '**' + while (pattIdxStart <= pattIdxEnd && pathIdxStart <= pathIdxEnd) { + String pattDir = pattDirs[pattIdxEnd]; + if (pattDir.equals("**")) { + break; + } + if (!matchStrings(pattDir, pathDirs[pathIdxEnd], uriTemplateVariables)) { + return false; + } + pattIdxEnd--; + pathIdxEnd--; + } + if (pathIdxStart > pathIdxEnd) { + // String is exhausted + for (int i = pattIdxStart; i <= pattIdxEnd; i++) { + if (!pattDirs[i].equals("**")) { + return false; + } + } + return true; + } + + while (pattIdxStart != pattIdxEnd && pathIdxStart <= pathIdxEnd) { + int patIdxTmp = -1; + for (int i = pattIdxStart + 1; i <= pattIdxEnd; i++) { + if (pattDirs[i].equals("**")) { + patIdxTmp = i; + break; + } + } + if (patIdxTmp == pattIdxStart + 1) { + // '**/**' situation, so skip one + pattIdxStart++; + continue; + } + // Find the pattern between padIdxStart & padIdxTmp in str between + // strIdxStart & strIdxEnd + int patLength = (patIdxTmp - pattIdxStart - 1); + int strLength = (pathIdxEnd - pathIdxStart + 1); + int foundIdx = -1; + + strLoop: + for (int i = 0; i <= strLength - patLength; i++) { + for (int j = 0; j < patLength; j++) { + String subPat = pattDirs[pattIdxStart + j + 1]; + String subStr = pathDirs[pathIdxStart + i + j]; + if (!matchStrings(subPat, subStr, uriTemplateVariables)) { + continue strLoop; + } + } + foundIdx = pathIdxStart + i; + break; + } + + if (foundIdx == -1) { + return false; + } + + pattIdxStart = patIdxTmp; + pathIdxStart = foundIdx + patLength; + } + + for (int i = pattIdxStart; i <= pattIdxEnd; i++) { + if (!pattDirs[i].equals("**")) { + return false; + } + } + + return true; + } + + private boolean isPotentialMatch(CharSequence path, String[] pattDirs) { + if (!trimTokens) { + int pos = 0; + for (String pattDir : pattDirs) { + int skipped = skipSeparator(path, pos, pathSeparator); + pos += skipped; + skipped = skipSegment(path, pos, pattDir); + if (skipped < pattDir.length()) { + return (skipped > 0 || (!pattDir.isEmpty() && isWildcardChar(pattDir.charAt(0)))); + } + pos += skipped; + } + } + return true; + } + + private int skipSegment(CharSequence path, int pos, String prefix) { + int skipped = 0; + for (int i = 0; i < prefix.length(); i++) { + char c = prefix.charAt(i); + if (isWildcardChar(c)) { + return skipped; + } + int currPos = pos + skipped; + if (currPos >= path.length()) { + return 0; + } + if (c == path.charAt(currPos)) { + skipped++; + } + } + return skipped; + } + + private int skipSeparator(CharSequence path, int pos, char separator) { + int skipped = 0; + while (path.toString().startsWith(String.valueOf(separator), pos + skipped)) { +// while (path.charAt(pos + skipped) == separator) { + skipped += 1; + } + return skipped; + } + + private boolean isWildcardChar(char c) { + for (char candidate : WILDCARD_CHARS) { + if (c == candidate) { + return true; + } + } + return false; + } + + /** + * Tokenize the given path pattern into parts, based on this matcher's settings. + *

Performs caching based on {@link #setCachePatterns}, delegating to + * {@link #tokenizePath(CharSequence)} for the actual tokenization algorithm. + * + * @param pattern the pattern to tokenize + * @return the tokenized pattern parts + */ + protected String[] tokenizePattern(String pattern) { + String[] tokenized = null; + Boolean cachePatterns = this.cachePatterns; + if (cachePatterns == null || cachePatterns) { + tokenized = this.tokenizedPatternCache.get(pattern); + } + if (tokenized == null) { + tokenized = tokenizePath(pattern); + if (cachePatterns == null && this.tokenizedPatternCache.size() >= CACHE_TURNOFF_THRESHOLD) { + // Try to adapt to the runtime situation that we're encountering: + // There are obviously too many different patterns coming in here... + // So let's turn off the cache since the patterns are unlikely to be reoccurring. + deactivatePatternCache(); + return tokenized; + } + if (cachePatterns == null || cachePatterns) { + this.tokenizedPatternCache.put(pattern, tokenized); + } + } + return tokenized; + } + + /** + * Tokenize the given path into parts, based on this matcher's settings. + * + * @param path the path to tokenize + * @return the tokenized path parts + */ + protected String[] tokenizePath(CharSequence path) { + return tokenizeToStringArray(path, this.pathSeparator, this.trimTokens, true); + } + + public static String[] tokenizeToStringArray(CharSequence str, char delimiters, boolean trimTokens, boolean ignoreEmptyTokens) { + if (str == null) { + return EMPTY_STRING_ARRAY; + } + + StringTokenizer st = new StringTokenizer(str.toString(), String.valueOf(delimiters)); + List tokens = new ArrayList<>(); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + if (trimTokens) { + token = token.trim(); + } + if (!ignoreEmptyTokens || !token.isEmpty()) { + tokens.add(token); + } + } + return toStringArray(tokens); + } + + private static String[] toStringArray(Collection collection) { + return (!(collection == null || collection.isEmpty()) ? collection.toArray(EMPTY_STRING_ARRAY) : EMPTY_STRING_ARRAY); + } + + /** + * Test whether or not a string matches against a pattern. + * + * @param pattern the pattern to match against (never {@code null}) + * @param str the String which must be matched against the pattern (never {@code null}) + * @return {@code true} if the string matches against the pattern, or {@code false} otherwise + */ + private boolean matchStrings(String pattern, String str, Map uriTemplateVariables) { + return getStringMatcher(pattern).matchStrings(str, uriTemplateVariables); + } + + /** + * Build or retrieve an {@link AntPathStringMatcher} for the given pattern. + *

The default implementation checks this AntPathMatcher's internal cache + * (see {@link #setCachePatterns}), creating a new AntPathStringMatcher instance + * if no cached copy is found. + *

When encountering too many patterns to cache at runtime (the threshold is 65536), + * it turns the default cache off, assuming that arbitrary permutations of patterns + * are coming in, with little chance for encountering a recurring pattern. + *

This method may be overridden to implement a custom cache strategy. + * + * @param pattern the pattern to match against (never {@code null}) + * @return a corresponding AntPathStringMatcher (never {@code null}) + * @see #setCachePatterns + */ + protected AntPathStringMatcher getStringMatcher(String pattern) { + AntPathStringMatcher matcher = null; + Boolean cachePatterns = this.cachePatterns; + if (cachePatterns == null || cachePatterns.booleanValue()) { + matcher = this.stringMatcherCache.get(pattern); + } + if (matcher == null) { + matcher = new AntPathStringMatcher(pattern, this.caseSensitive); + if (cachePatterns == null && this.stringMatcherCache.size() >= CACHE_TURNOFF_THRESHOLD) { + // Try to adapt to the runtime situation that we're encountering: + // There are obviously too many different patterns coming in here... + // So let's turn off the cache since the patterns are unlikely to be reoccurring. + deactivatePatternCache(); + return matcher; + } + if (cachePatterns == null || cachePatterns.booleanValue()) { + this.stringMatcherCache.put(pattern, matcher); + } + } + return matcher; + } + + /** + * Tests whether or not a string matches against a pattern via a {@link Pattern}. + *

The pattern may contain special characters: '*' means zero or more characters; '?' means one and + * only one character; '{' and '}' indicate a URI template pattern. For example /users/{user}. + */ + protected static class AntPathStringMatcher { + + private static final Pattern GLOB_PATTERN = Pattern.compile("\\?|\\*|\\{((?:\\{[^/]+?\\}|[^/{}]|\\\\[{}])+?)\\}"); + + private static final String DEFAULT_VARIABLE_PATTERN = "(.*)"; + + private final Pattern pattern; + + private final List variableNames = new LinkedList<>(); + + public AntPathStringMatcher(String pattern) { + this(pattern, true); + } + + public AntPathStringMatcher(String pattern, boolean caseSensitive) { + StringBuilder patternBuilder = new StringBuilder(); + Matcher matcher = GLOB_PATTERN.matcher(pattern); + int end = 0; + while (matcher.find()) { + patternBuilder.append(quote(pattern, end, matcher.start())); + String match = matcher.group(); + if ("?".equals(match)) { + patternBuilder.append('.'); + } else if ("*".equals(match)) { + patternBuilder.append(".*"); + } else if (match.startsWith("{") && match.endsWith("}")) { + int colonIdx = match.indexOf(':'); + if (colonIdx == -1) { + patternBuilder.append(DEFAULT_VARIABLE_PATTERN); + this.variableNames.add(matcher.group(1)); + } else { + String variablePattern = match.substring(colonIdx + 1, match.length() - 1); + patternBuilder.append('('); + patternBuilder.append(variablePattern); + patternBuilder.append(')'); + String variableName = match.substring(1, colonIdx); + this.variableNames.add(variableName); + } + } + end = matcher.end(); + } + patternBuilder.append(quote(pattern, end, pattern.length())); + this.pattern = (caseSensitive ? Pattern.compile(patternBuilder.toString()) : + Pattern.compile(patternBuilder.toString(), Pattern.CASE_INSENSITIVE)); + } + + private String quote(String s, int start, int end) { + if (start == end) { + return ""; + } + return Pattern.quote(s.substring(start, end)); + } + + /** + * Main entry point. + * + * @return {@code true} if the string matches against the pattern, or {@code false} otherwise. + */ + public boolean matchStrings(String str, Map uriTemplateVariables) { + Matcher matcher = this.pattern.matcher(str); + if (matcher.matches()) { + if (uriTemplateVariables != null) { + // SPR-8455 + if (this.variableNames.size() != matcher.groupCount()) { + throw new IllegalArgumentException("The number of capturing groups in the pattern segment " + + this.pattern + " does not match the number of URI template variables it defines, " + + "which can occur if capturing groups are used in a URI template regex. " + + "Use non-capturing groups instead."); + } + for (int i = 1; i <= matcher.groupCount(); i++) { + String name = this.variableNames.get(i - 1); + String value = matcher.group(i); + uriTemplateVariables.put(name, value); + } + } + return true; + } else { + return false; + } + } + } +} diff --git a/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java b/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java new file mode 100644 index 00000000..014f71f5 --- /dev/null +++ b/clusterless-substrate-aws-lambda-common/src/main/java/clusterless/lambda/util/PathMatcher.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.lambda.util; + +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +public class PathMatcher { + protected String path; + protected char pathSeparator; + protected List includes; + protected List excludes; + protected boolean ignoreCase; + protected final Predicate predicate; + + public PathMatcher(String path, List includes, List excludes, char pathSeparator, boolean ignoreCase) { + this.path = path; + this.includes = includes == null ? new ArrayList<>() : includes; + this.excludes = excludes == null ? new ArrayList<>() : excludes; + this.pathSeparator = pathSeparator == 0 ? '/' : pathSeparator; + this.ignoreCase = ignoreCase; + + AntPathMatcher matcher = new AntPathMatcher(this.pathSeparator); + + matcher.setCaseSensitive(!this.ignoreCase); + + Predicate include = this.includes.stream() + .filter(s -> !s.isEmpty()) + .map(s -> predicate(s, matcher)) + .reduce(Predicate::or).orElse(null); + + Predicate exclude = this.excludes.stream() + .filter(s -> !s.isEmpty()) + .map(s -> predicate(s, matcher)) + .reduce(Predicate::or).orElse(null); + + if (include == null && exclude == null) { + predicate = p -> true; + } else if (include != null && exclude != null) { + predicate = include.and(exclude.negate()); + } else if (include != null & exclude == null) { + predicate = include; + } else { + predicate = exclude.negate(); + } + } + + public static Builder builder() { + return Builder.builder(); + } + + @NotNull + private Predicate predicate(String pattern, AntPathMatcher matcher) { + if (pattern.charAt(0) == pathSeparator) { + return path -> matcher.match(pattern, path); + } + + return path -> matcher.match(pattern, path.subSequence(this.path.length(), path.length())); + } + + public boolean keep(String path) { + return predicate.test(path); + } + + + public static final class Builder { + protected String path; + protected char pathSeparator; + protected List includes; + protected List excludes; + protected boolean ignoreCase; + + private Builder() { + } + + public static Builder builder() { + return new Builder(); + } + + public Builder withPath(String path) { + this.path = path; + return this; + } + + public Builder withPathSeparator(char pathSeparator) { + this.pathSeparator = pathSeparator; + return this; + } + + public Builder withIncludes(List includes) { + this.includes = includes; + return this; + } + + public Builder withExcludes(List excludes) { + this.excludes = excludes; + return this; + } + + public Builder withIgnoreCase(boolean ignoreCase) { + this.ignoreCase = ignoreCase; + return this; + } + + public PathMatcher build() { + return new PathMatcher(path, includes, excludes, pathSeparator, ignoreCase); + } + } +} diff --git a/clusterless-substrate-aws-lambda-common/src/test/java/clusterless/lambda/util/AntPathMatcherTest.java b/clusterless-substrate-aws-lambda-common/src/test/java/clusterless/lambda/util/AntPathMatcherTest.java new file mode 100644 index 00000000..a0a67499 --- /dev/null +++ b/clusterless-substrate-aws-lambda-common/src/test/java/clusterless/lambda/util/AntPathMatcherTest.java @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.lambda.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AntPathMatcherTest { + + private final AntPathMatcher pathMatcher = new AntPathMatcher(); + + @Test + public void match() { + // test exact matching + Assertions.assertTrue(pathMatcher.match("test", "test")); + Assertions.assertTrue(pathMatcher.match("/test", "/test")); + // SPR-14141 + Assertions.assertTrue(pathMatcher.match("https://example.org", "https://example.org")); + Assertions.assertFalse(pathMatcher.match("/test.jpg", "test.jpg")); + Assertions.assertFalse(pathMatcher.match("test", "/test")); + Assertions.assertFalse(pathMatcher.match("/test", "test")); + + // test matching with ?'s + Assertions.assertTrue(pathMatcher.match("t?st", "test")); + Assertions.assertTrue(pathMatcher.match("??st", "test")); + Assertions.assertTrue(pathMatcher.match("tes?", "test")); + Assertions.assertTrue(pathMatcher.match("te??", "test")); + Assertions.assertTrue(pathMatcher.match("?es?", "test")); + Assertions.assertFalse(pathMatcher.match("tes?", "tes")); + Assertions.assertFalse(pathMatcher.match("tes?", "testt")); + Assertions.assertFalse(pathMatcher.match("tes?", "tsst")); + + // test matching with *'s + Assertions.assertTrue(pathMatcher.match("*", "test")); + Assertions.assertTrue(pathMatcher.match("test*", "test")); + Assertions.assertTrue(pathMatcher.match("test*", "testTest")); + Assertions.assertTrue(pathMatcher.match("test/*", "test/Test")); + Assertions.assertTrue(pathMatcher.match("test/*", "test/t")); + Assertions.assertTrue(pathMatcher.match("test/*", "test/")); + Assertions.assertTrue(pathMatcher.match("*test*", "AnothertestTest")); + Assertions.assertTrue(pathMatcher.match("*test", "Anothertest")); + Assertions.assertTrue(pathMatcher.match("*.*", "test.")); + Assertions.assertTrue(pathMatcher.match("*.*", "test.test")); + Assertions.assertTrue(pathMatcher.match("*.*", "test.test.test")); + Assertions.assertTrue(pathMatcher.match("test*aaa", "testblaaaa")); + Assertions.assertFalse(pathMatcher.match("test*", "tst")); + Assertions.assertFalse(pathMatcher.match("test*", "tsttest")); + Assertions.assertFalse(pathMatcher.match("test*", "test/")); + Assertions.assertFalse(pathMatcher.match("test*", "test/t")); + Assertions.assertFalse(pathMatcher.match("test/*", "test")); + Assertions.assertFalse(pathMatcher.match("*test*", "tsttst")); + Assertions.assertFalse(pathMatcher.match("*test", "tsttst")); + Assertions.assertFalse(pathMatcher.match("*.*", "tsttst")); + Assertions.assertFalse(pathMatcher.match("test*aaa", "test")); + Assertions.assertFalse(pathMatcher.match("test*aaa", "testblaaab")); + + // test matching with ?'s and /'s + Assertions.assertTrue(pathMatcher.match("/?", "/a")); + Assertions.assertTrue(pathMatcher.match("/?/a", "/a/a")); + Assertions.assertTrue(pathMatcher.match("/a/?", "/a/b")); + Assertions.assertTrue(pathMatcher.match("/??/a", "/aa/a")); + Assertions.assertTrue(pathMatcher.match("/a/??", "/a/bb")); + Assertions.assertTrue(pathMatcher.match("/?", "/a")); + + // test matching with **'s + Assertions.assertTrue(pathMatcher.match("/**", "/testing/testing")); + Assertions.assertTrue(pathMatcher.match("/*/**", "/testing/testing")); + Assertions.assertTrue(pathMatcher.match("/**/*", "/testing/testing")); + Assertions.assertTrue(pathMatcher.match("/bla/**/bla", "/bla/testing/testing/bla")); + Assertions.assertTrue(pathMatcher.match("/bla/**/bla", "/bla/testing/testing/bla/bla")); + Assertions.assertTrue(pathMatcher.match("/**/test", "/bla/bla/test")); + Assertions.assertTrue(pathMatcher.match("/bla/**/**/bla", "/bla/bla/bla/bla/bla/bla")); + Assertions.assertTrue(pathMatcher.match("/bla*bla/test", "/blaXXXbla/test")); + Assertions.assertTrue(pathMatcher.match("/*bla/test", "/XXXbla/test")); + Assertions.assertFalse(pathMatcher.match("/bla*bla/test", "/blaXXXbl/test")); + Assertions.assertFalse(pathMatcher.match("/*bla/test", "XXXblab/test")); + Assertions.assertFalse(pathMatcher.match("/*bla/test", "XXXbl/test")); + + Assertions.assertFalse(pathMatcher.match("/????", "/bala/bla")); + Assertions.assertFalse(pathMatcher.match("/**/*bla", "/bla/bla/bla/bbb")); + + Assertions.assertTrue(pathMatcher.match("/*bla*/**/bla/**", "/XXXblaXXXX/testing/testing/bla/testing/testing/")); + Assertions.assertTrue(pathMatcher.match("/*bla*/**/bla/*", "/XXXblaXXXX/testing/testing/bla/testing")); + Assertions.assertTrue(pathMatcher.match("/*bla*/**/bla/**", "/XXXblaXXXX/testing/testing/bla/testing/testing")); + Assertions.assertTrue(pathMatcher.match("/*bla*/**/bla/**", "/XXXblaXXXX/testing/testing/bla/testing/testing.jpg")); + + Assertions.assertTrue(pathMatcher.match("*bla*/**/bla/**", "XXXblaXXXX/testing/testing/bla/testing/testing/")); + Assertions.assertTrue(pathMatcher.match("*bla*/**/bla/*", "XXXblaXXXX/testing/testing/bla/testing")); + Assertions.assertTrue(pathMatcher.match("*bla*/**/bla/**", "XXXblaXXXX/testing/testing/bla/testing/testing")); + Assertions.assertFalse(pathMatcher.match("*bla*/**/bla/*", "XXXblaXXXX/testing/testing/bla/testing/testing")); + + Assertions.assertFalse(pathMatcher.match("/x/x/**/bla", "/x/x/x/")); + + Assertions.assertTrue(pathMatcher.match("/foo/bar/**", "/foo/bar")); + + Assertions.assertTrue(pathMatcher.match("", "")); + + Assertions.assertTrue(pathMatcher.match("/{bla}.*", "/testing.html")); + } + + @Test + public void matchWithNullPath() { + Assertions.assertFalse(pathMatcher.match("/test", null)); + Assertions.assertFalse(pathMatcher.match("/", null)); + Assertions.assertFalse(pathMatcher.match(null, null)); + } + + @Test + public void defaultCacheSetting() { + match(); + assertTrue(pathMatcher.stringMatcherCache.size() > 20); + + for (int i = 0; i < 65536; i++) { + pathMatcher.match("test" + i, "test"); + } + // Cache turned off because it went beyond the threshold + Assertions.assertTrue(pathMatcher.stringMatcherCache.isEmpty()); + } + + @Test + public void cachePatternsSetToTrue() { + pathMatcher.setCachePatterns(true); + match(); + assertTrue(pathMatcher.stringMatcherCache.size() > 20); + + for (int i = 0; i < 65536; i++) { + pathMatcher.match("test" + i, "test" + i); + } + // Cache keeps being alive due to the explicit cache setting + assertTrue(pathMatcher.stringMatcherCache.size() > 65536); + } + + @Test + public void preventCreatingStringMatchersIfPathDoesNotStartsWithPatternPrefix() { + pathMatcher.setCachePatterns(true); + Assertions.assertEquals(0, pathMatcher.stringMatcherCache.size()); + + pathMatcher.match("test?", "test"); + Assertions.assertEquals(1, pathMatcher.stringMatcherCache.size()); + + pathMatcher.match("test?", "best"); + pathMatcher.match("test/*", "view/test.jpg"); + pathMatcher.match("test/**/test.jpg", "view/test.jpg"); + pathMatcher.match("test/{name}.jpg", "view/test.jpg"); + Assertions.assertEquals(1, pathMatcher.stringMatcherCache.size()); + } + + @Test + public void creatingStringMatchersIfPatternPrefixCannotDetermineIfPathMatch() { + pathMatcher.setCachePatterns(true); + Assertions.assertEquals(0, pathMatcher.stringMatcherCache.size()); + + pathMatcher.match("test", "testian"); + pathMatcher.match("test?", "testFf"); + pathMatcher.match("test/*", "test/dir/name.jpg"); + pathMatcher.match("test/{name}.jpg", "test/lorem.jpg"); + pathMatcher.match("bla/**/test.jpg", "bla/test.jpg"); + pathMatcher.match("**/{name}.jpg", "test/lorem.jpg"); + pathMatcher.match("/**/{name}.jpg", "/test/lorem.jpg"); + pathMatcher.match("/*/dir/{name}.jpg", "/*/dir/lorem.jpg"); + + Assertions.assertEquals(7, pathMatcher.stringMatcherCache.size()); + } + + @Test + public void cachePatternsSetToFalse() { + pathMatcher.setCachePatterns(false); + match(); + Assertions.assertTrue(pathMatcher.stringMatcherCache.isEmpty()); + } + + /** + * ... + */ + @Test + public void greedyMatch() { + Assertions.assertFalse(pathMatcher.match("/organizations/*/fields", "/organizations/1/farms/2/fields")); + } + + /** + * ... + */ + @Test + void another() { + Assertions.assertFalse(pathMatcher.match("*.txt", "path/my.txt")); + } +} diff --git a/clusterless-substrate-aws-lambda-common/src/test/java/clusterless/lambda/util/PathMatcherTest.java b/clusterless-substrate-aws-lambda-common/src/test/java/clusterless/lambda/util/PathMatcherTest.java new file mode 100644 index 00000000..a661ccee --- /dev/null +++ b/clusterless-substrate-aws-lambda-common/src/test/java/clusterless/lambda/util/PathMatcherTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.lambda.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class PathMatcherTest { + @Test + void excludeAbsolute() { + PathMatcher matcher = PathMatcher.builder() + .withPath("/foo/") + .withExcludes(List.of("/**/_*")) + .build(); + + Assertions.assertTrue(matcher.keep("/foo/bar.parquet")); + Assertions.assertFalse(matcher.keep("/foo/_SUCCESS")); + Assertions.assertFalse(matcher.keep("/foo/bar/_SUCCESS")); + } + + @Test + void excludeRelative() { + PathMatcher matcher = PathMatcher.builder() + .withPath("/foo/") + .withExcludes(List.of("_*")) + .build(); + + Assertions.assertTrue(matcher.keep("/foo/bar.parquet")); + Assertions.assertFalse(matcher.keep("/foo/_SUCCESS")); + } + + @Test + void excludeRelativeDeep() { + PathMatcher matcher = PathMatcher.builder() + .withPath("/foo/") + .withExcludes(List.of("**/_*")) + .build(); + + Assertions.assertTrue(matcher.keep("/foo/a/b/c/bar.parquet")); + Assertions.assertFalse(matcher.keep("/foo/a/b/c/_SUCCESS")); + } +} diff --git a/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java b/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java index 0e2f272f..212a87db 100644 --- a/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java +++ b/clusterless-substrate-aws-lambda-workload-model/src/main/java/clusterless/lambda/workload/s3copy/S3CopyProps.java @@ -8,11 +8,44 @@ package clusterless.lambda.workload.s3copy; +import clusterless.model.Struct; import clusterless.model.deploy.WorkloadProps; +import java.util.ArrayList; +import java.util.List; + public class S3CopyProps extends WorkloadProps { + public static class Filter implements Struct { + List includes = new ArrayList<>(); + List excludes = new ArrayList<>(); + char pathSeparator = '/'; + boolean ignoreCase = false; + + public List includes() { + return includes; + } + + public List excludes() { + return excludes; + } + + public char pathSeparator() { + return pathSeparator; + } + + public boolean ignoreCase() { + return ignoreCase; + } + } + + Filter filter = new Filter(); + float failArcOnPartialPercent = 0f; + public Filter filter() { + return filter; + } + public float failArcOnPartialPercent() { return failArcOnPartialPercent; } diff --git a/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java b/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java index 8aaa4469..5b9668dc 100644 --- a/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java +++ b/clusterless-substrate-aws-lambda-workload/src/main/java/clusterless/lambda/workload/s3copy/S3CopyArcEventHandler.java @@ -13,6 +13,7 @@ import clusterless.lambda.manifest.AttemptCounter; import clusterless.lambda.manifest.ManifestReader; import clusterless.lambda.manifest.ManifestWriter; +import clusterless.lambda.util.PathMatcher; import clusterless.model.UriType; import clusterless.model.deploy.SinkDataset; import clusterless.model.manifest.Manifest; @@ -48,6 +49,12 @@ public class S3CopyArcEventHandler extends ArcEventHandler { UriType.identifier ); + protected PathMatcher.Builder pathMatcher = PathMatcher.builder() + .withPathSeparator(arcProps().workloadProps().filter().pathSeparator()) + .withIgnoreCase(arcProps().workloadProps().filter().ignoreCase()) + .withIncludes(arcProps().workloadProps().filter().includes()) + .withExcludes(arcProps().workloadProps().filter().excludes()); + @Override protected Map handleEvent(ArcWorkloadContext arcWorkloadContext, Context context, ArcEventObserver eventObserver) { String fromRole = arcWorkloadContext.role(); @@ -65,6 +72,9 @@ protected Map handleEvent(ArcWorkloadContext arcWorkloadContext, Co URI fromDatasetPath = notifyEvent.dataset().pathURI(); List fromUris = incomingManifest.uris(); + PathMatcher match = pathMatcher.withPath(fromDatasetPath.getPath()) + .build(); + for (Map.Entry sinkRoleEntry : arcProps().sinks().entrySet()) { String toRole = sinkRoleEntry.getKey(); ManifestWriter manifestWriter = manifestWriters.get(toRole); @@ -86,6 +96,11 @@ protected Map handleEvent(ArcWorkloadContext arcWorkloadContext, Co // not using a map so that collisions can be managed independently on the to/from sides List> toUris = new LinkedList<>(); for (URI fromUri : fromUris) { + + if (!match.keep(fromUri.getPath())) { + continue; + } + URI toURI = URIs.fromTo(fromDatasetPath, fromUri, toDatasetPath); toUris.add(new Tuple2<>(fromUri, toURI)); }