diff --git a/clusterless-common/src/main/java/clusterless/json/JSONUtil.java b/clusterless-common/src/main/java/clusterless/json/JSONUtil.java index 3c22065a..e5fec5d3 100644 --- a/clusterless-common/src/main/java/clusterless/json/JSONUtil.java +++ b/clusterless-common/src/main/java/clusterless/json/JSONUtil.java @@ -229,6 +229,14 @@ public static T readAsObject(Path path, TypeReference type) throws IOExce return OBJECT_READER.forType(type).readValue(path.toFile()); } + public static T readAsObjectSafe(InputStream inputStream, TypeReference type) { + try { + return OBJECT_READER.forType(type).readValue(inputStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public static String writeTypedAsStringSafe(Object object) { try { return TYPED_OBJECT_MAPPER.writeValueAsString(object); diff --git a/clusterless-model/src/main/java/clusterless/managed/component/ArcLocalExecutor.java b/clusterless-model/src/main/java/clusterless/managed/component/ArcLocalExecutor.java index f2769a12..62611620 100644 --- a/clusterless-model/src/main/java/clusterless/managed/component/ArcLocalExecutor.java +++ b/clusterless-model/src/main/java/clusterless/managed/component/ArcLocalExecutor.java @@ -8,6 +8,7 @@ package clusterless.managed.component; +import clusterless.managed.dataset.LookupDatasetOwnerLookup; import clusterless.model.manifest.ManifestState; import java.util.LinkedHashMap; @@ -16,7 +17,6 @@ import java.util.Map; public interface ArcLocalExecutor { - class Command { String headerComment; Map environmentComments = new LinkedHashMap<>(); @@ -99,5 +99,5 @@ public Command build() { } } - List commands(String role, String lotId, ManifestState manifestState); + List commands(String role, String lotId, ManifestState manifestState, LookupDatasetOwnerLookup ownerLookup); } diff --git a/clusterless-model/src/main/java/clusterless/managed/dataset/DatasetResolver.java b/clusterless-model/src/main/java/clusterless/managed/dataset/DatasetResolver.java new file mode 100644 index 00000000..ac58dd96 --- /dev/null +++ b/clusterless-model/src/main/java/clusterless/managed/dataset/DatasetResolver.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.managed.dataset; + +import clusterless.model.deploy.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * Maps ReferencedDataset to a SinkDataset by looking up SourceDatasets in the current set of + * Deployables or in the metadata store. + *

+ * This prevents a new project from overwriting and existing dataset. And allows current projects + * to declare read permissions against datasets they do not own. + */ +public class DatasetResolver { + private static final Logger LOG = LogManager.getLogger(DatasetResolver.class); + List deployables; + + Map resolved = new HashMap<>(); + private List locallyReferenced; + private List locallyOwned; + + public DatasetResolver(List deployables) { + this.deployables = deployables; + + build(); + } + + private void build() { + locallyReferenced = new LinkedList<>(); + locallyOwned = new LinkedList<>(); + + for (Deployable deployable : deployables) { + deployable.boundaries().stream() + .map(boundary -> new OwnedDataset(deployable.project(), boundary.dataset())) + .forEach(locallyOwned::add); + + for (Arc> arc : deployable.arcs()) { + arc.sources() + .values() + .stream() + .map(source -> new ReferencedDataset(deployable.project(), source)) + .forEach(locallyReferenced::add); + + arc.sinks().values() + .stream() + .map(sink -> new OwnedDataset(deployable.project(), sink)) + .forEach(locallyOwned::add); + } + } + + for (ReferencedDataset referencedDataset : locallyReferenced) { + Set ownedDatasets = locallyOwned.stream() + .filter(ownedDataset -> ownedDataset.dataset().sameDataset(referencedDataset.dataset())) + .collect(Collectors.toSet()); + + // todo: confirm dataset isn't already owned outside this set of deployables + + if (ownedDatasets.isEmpty()) { + String message = "dataset owner not found: %s".formatted(referencedDataset.dataset().id()); + LOG.error(message); + throw new IllegalStateException(message); + } else if (ownedDatasets.size() != 1) { + Set projects = ownedDatasets.stream().map(OwnedDataset::owner).map(Project::id).collect(Collectors.toSet()); + String message = "dataset: %s, is owned by multiple projects: %s".formatted(referencedDataset.dataset().id(), projects); + LOG.error(message); + throw new IllegalStateException(message); + } + + resolved.put(referencedDataset, ownedDatasets.stream().findFirst().orElseThrow()); + } + } + + public List locallyOwned() { + return locallyOwned; + } + + public Map locate(@NotNull Project dependent, @NotNull Map sources) { + return sources.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> locate(dependent, e.getValue()).dataset())); + } + + public OwnedDataset locate(Project dependent, SourceDataset value) { + ReferencedDataset referencedDataset = new ReferencedDataset(dependent, value); + OwnedDataset ownedDataset = resolved.get(referencedDataset); + + if (ownedDataset != null) { + return ownedDataset; + } + + String message = "dataset owner not found for: %s".formatted(referencedDataset.dataset().id()); + LOG.error(message); + throw new IllegalStateException(message); + } +} diff --git a/clusterless-model/src/main/java/clusterless/managed/dataset/LookupDatasetOwnerLookup.java b/clusterless-model/src/main/java/clusterless/managed/dataset/LookupDatasetOwnerLookup.java new file mode 100644 index 00000000..b2133ede --- /dev/null +++ b/clusterless-model/src/main/java/clusterless/managed/dataset/LookupDatasetOwnerLookup.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.managed.dataset; + +import clusterless.model.deploy.OwnedDataset; +import clusterless.model.deploy.SourceDataset; + +public interface LookupDatasetOwnerLookup { + OwnedDataset lookup(SourceDataset source); +} diff --git a/clusterless-model/src/main/java/clusterless/model/Model.java b/clusterless-model/src/main/java/clusterless/model/Model.java index 83cccf67..2256dbf9 100644 --- a/clusterless-model/src/main/java/clusterless/model/Model.java +++ b/clusterless-model/src/main/java/clusterless/model/Model.java @@ -11,10 +11,6 @@ import clusterless.json.JSONUtil; import clusterless.naming.Label; -/** - * Note {@link #equals(Object)} and {@link #hashCode()} are declared final. No two Model instances - * should ever be equal - */ public abstract class Model implements Struct { public final Label label() { return Label.of(getClass().getSimpleName()); diff --git a/clusterless-model/src/main/java/clusterless/model/deploy/Dataset.java b/clusterless-model/src/main/java/clusterless/model/deploy/Dataset.java index 0bb9fc4c..7c178e97 100644 --- a/clusterless-model/src/main/java/clusterless/model/deploy/Dataset.java +++ b/clusterless-model/src/main/java/clusterless/model/deploy/Dataset.java @@ -10,6 +10,7 @@ import clusterless.json.JsonRequiredProperty; import clusterless.model.Model; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.Objects; @@ -47,6 +48,11 @@ public String version() { return version; } + @JsonIgnore + public String id() { + return String.format("%s/%s", name(), version()); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -55,6 +61,13 @@ public boolean equals(Object o) { return Objects.equals(name, dataset.name) && Objects.equals(version, dataset.version); } + public boolean sameDataset(Object o) { + if (this == o) return true; + if (o == null) return false; + Dataset dataset = (Dataset) o; + return Objects.equals(name, dataset.name) && Objects.equals(version, dataset.version); + } + @Override public int hashCode() { return Objects.hash(name, version); diff --git a/clusterless-model/src/main/java/clusterless/model/deploy/OwnedDataset.java b/clusterless-model/src/main/java/clusterless/model/deploy/OwnedDataset.java index 2669f42d..f03bcfcd 100644 --- a/clusterless-model/src/main/java/clusterless/model/deploy/OwnedDataset.java +++ b/clusterless-model/src/main/java/clusterless/model/deploy/OwnedDataset.java @@ -9,13 +9,16 @@ package clusterless.model.deploy; import clusterless.model.Struct; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.Objects; public class OwnedDataset implements Struct { - Project project; + Project owner; SinkDataset dataset; - public OwnedDataset(Project project, SinkDataset dataset) { - this.project = project; + public OwnedDataset(Project owner, SinkDataset dataset) { + this.owner = owner; this.dataset = dataset; } @@ -23,25 +26,43 @@ public static Builder builder() { return Builder.builder(); } - public Project project() { - return project; + public Project owner() { + return owner; } public SinkDataset dataset() { return dataset; } + @JsonIgnore + public String id() { + return String.format("%s:%s", owner().id(), dataset().id()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OwnedDataset that = (OwnedDataset) o; + return Objects.equals(owner, that.owner) && Objects.equals(dataset, that.dataset); + } + + @Override + public int hashCode() { + return Objects.hash(owner, dataset); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("OwnedDataset{"); - sb.append("project=").append(project); + sb.append("owner=").append(owner); sb.append(", dataset=").append(dataset); sb.append('}'); return sb.toString(); } public static final class Builder { - Project project; + Project owner; SinkDataset dataset; private Builder() { @@ -51,8 +72,8 @@ public static Builder builder() { return new Builder(); } - public Builder withProject(Project project) { - this.project = project; + public Builder withOwner(Project owner) { + this.owner = owner; return this; } @@ -62,7 +83,7 @@ public Builder withDataset(SinkDataset dataset) { } public OwnedDataset build() { - return new OwnedDataset(project, dataset); + return new OwnedDataset(owner, dataset); } } } diff --git a/clusterless-model/src/main/java/clusterless/model/deploy/Project.java b/clusterless-model/src/main/java/clusterless/model/deploy/Project.java index 374587f4..fa21733d 100644 --- a/clusterless-model/src/main/java/clusterless/model/deploy/Project.java +++ b/clusterless-model/src/main/java/clusterless/model/deploy/Project.java @@ -11,6 +11,7 @@ import clusterless.json.JsonRequiredProperty; import clusterless.managed.component.DocumentsModel; import clusterless.model.Struct; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import java.util.Objects; @@ -57,6 +58,11 @@ public String version() { return version; } + @JsonIgnore + public String id() { + return String.format("%s/%s", name(), version()); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/clusterless-model/src/main/java/clusterless/model/deploy/ReferencedDataset.java b/clusterless-model/src/main/java/clusterless/model/deploy/ReferencedDataset.java new file mode 100644 index 00000000..9b82dbe9 --- /dev/null +++ b/clusterless-model/src/main/java/clusterless/model/deploy/ReferencedDataset.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.model.deploy; + +import clusterless.model.Struct; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.Objects; + +public class ReferencedDataset implements Struct { + Project dependent; + SourceDataset dataset; + + public ReferencedDataset(Project dependent, SourceDataset dataset) { + this.dependent = dependent; + this.dataset = dataset; + } + + public static Builder builder() { + return Builder.builder(); + } + + public Project dependent() { + return dependent; + } + + public SourceDataset dataset() { + return dataset; + } + + @JsonIgnore + public String id() { + return String.format("%s:%s", dependent().id(), dataset().id()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReferencedDataset that = (ReferencedDataset) o; + return Objects.equals(dependent, that.dependent) && Objects.equals(dataset, that.dataset); + } + + @Override + public int hashCode() { + return Objects.hash(dependent, dataset); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ReferencedDataset{"); + sb.append("project=").append(dependent); + sb.append(", dataset=").append(dataset); + sb.append('}'); + return sb.toString(); + } + + public static final class Builder { + Project dependent; + SourceDataset dataset; + + private Builder() { + } + + public static Builder builder() { + return new Builder(); + } + + public Builder withDependent(Project dependent) { + this.dependent = dependent; + return this; + } + + public Builder withDataset(SourceDataset dataset) { + this.dataset = dataset; + return this; + } + + public ReferencedDataset build() { + return new ReferencedDataset(dependent, dataset); + } + } +} diff --git a/clusterless-scenario/src/main/cls/scenarios/glue-simple/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/glue-simple/s3-copy-arc.jsonnet index b09e51ee..21feed2d 100644 --- a/clusterless-scenario/src/main/cls/scenarios/glue-simple/s3-copy-arc.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/glue-simple/s3-copy-arc.jsonnet @@ -76,7 +76,6 @@ local tableName = 'clusterless-glue-test-table'; main: { name: 'ingress-glue', version: '20220101', - pathURI: bucketPrefix + '/ingress/', }, }, sinks: { diff --git a/clusterless-scenario/src/main/cls/scenarios/glue-simple/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/glue-simple/scenario.jsonnet index f8942f6f..ff744f34 100644 --- a/clusterless-scenario/src/main/cls/scenarios/glue-simple/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/glue-simple/scenario.jsonnet @@ -12,7 +12,7 @@ local project = import 's3-copy-arc.jsonnet'; ingressStores: [ { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 60 * 5, objectCount: 3, objectName: 'partition=%1$04d/data-%1$04d-%2$d.txt', diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet index c87e56a9..04edd3e0 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/s3-copy-arc-project-chain.jsonnet @@ -43,7 +43,6 @@ local unit = 'Twelfths'; main: { name: 'ingress-chain', version: '20220101', - pathURI: bucketPrefix + '/ingress/', }, }, sinks: { diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/scenario.jsonnet index 565773bb..f4ea8569 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-copy-chain/scenario.jsonnet @@ -12,7 +12,7 @@ local project = import 's3-copy-arc-project-chain.jsonnet'; ingressStores: [ { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 60 * 5, objectCount: 3, }, diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet index 128541c6..1fd9816b 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/s3-copy-arc.jsonnet @@ -47,7 +47,6 @@ local unit = 'Twelfths'; main: { name: 'ingress-frequent-boundary', version: '20220101', - pathURI: bucketPrefix + '/ingress/', }, }, sinks: { diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet index 431ff162..050d3106 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-boundary/scenario.jsonnet @@ -12,13 +12,13 @@ local project = import 's3-copy-arc.jsonnet'; ingressStores: [ { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 15, objectCount: 60/15 * 5 * 3, }, { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 15, objectCount: 60/15 * 5 * 3, objectName: '_SUCCESS-%04d-%d.txt', diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet index 94183b10..0d24af52 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/s3-copy-arc.jsonnet @@ -44,7 +44,6 @@ local unit = 'Twelfths'; main: { name: 'ingress-frequent-copy', version: '20220101', - pathURI: bucketPrefix + '/ingress/', }, }, sinks: { diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet index 12a9f846..ca58fb23 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-frequent-filter-copy/scenario.jsonnet @@ -12,13 +12,13 @@ local project = import 's3-copy-arc.jsonnet'; ingressStores: [ { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 15, objectCount: 60/15 * 5 * 3, }, { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 15, objectCount: 60/15 * 5 * 3, objectName: '_SUCCESS-%04d-%d.txt', diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-simple/s3-copy-arc.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-simple/s3-copy-arc.jsonnet index 2e680ab7..6100ce22 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-simple/s3-copy-arc.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-simple/s3-copy-arc.jsonnet @@ -43,7 +43,6 @@ local unit = 'Twelfths'; main: { name: 'ingress-simple', version: '20220101', - pathURI: bucketPrefix + '/ingress/', }, }, sinks: { diff --git a/clusterless-scenario/src/main/cls/scenarios/s3-simple/scenario.jsonnet b/clusterless-scenario/src/main/cls/scenarios/s3-simple/scenario.jsonnet index db3a7d56..b32ed22e 100644 --- a/clusterless-scenario/src/main/cls/scenarios/s3-simple/scenario.jsonnet +++ b/clusterless-scenario/src/main/cls/scenarios/s3-simple/scenario.jsonnet @@ -12,7 +12,7 @@ local project = import 's3-copy-arc.jsonnet'; ingressStores: [ { region: project.placement.region, - path: project.arcs[0].sources.main.pathURI, + path: project.boundaries[0].dataset.pathURI, uploadDelaySec: 60 * 5, objectCount: 3, }, diff --git a/clusterless-substrate-aws-common/src/main/java/clusterless/substrate/aws/event/ArcNotifyEvent.java b/clusterless-substrate-aws-common/src/main/java/clusterless/substrate/aws/event/ArcNotifyEvent.java index d16d3683..8d1820c7 100644 --- a/clusterless-substrate-aws-common/src/main/java/clusterless/substrate/aws/event/ArcNotifyEvent.java +++ b/clusterless-substrate-aws-common/src/main/java/clusterless/substrate/aws/event/ArcNotifyEvent.java @@ -73,11 +73,7 @@ public URI manifest() { */ @JsonProperty(access = JsonProperty.Access.READ_ONLY) public String datasetId() { - return createDatasetId(dataset.name(), dataset.version()); - } - - public static String createDatasetId(String datasetName, String datasetVersion) { - return String.format("%s/%s", datasetName, datasetVersion); + return dataset.id(); } @Override diff --git a/clusterless-substrate-aws-common/src/test/java/clusterless/substrate/aws/event/NotifyEventTest.java b/clusterless-substrate-aws-common/src/test/java/clusterless/substrate/aws/event/NotifyEventTest.java index 31064379..e85d3f4e 100644 --- a/clusterless-substrate-aws-common/src/test/java/clusterless/substrate/aws/event/NotifyEventTest.java +++ b/clusterless-substrate-aws-common/src/test/java/clusterless/substrate/aws/event/NotifyEventTest.java @@ -9,7 +9,7 @@ package clusterless.substrate.aws.event; import clusterless.json.JSONUtil; -import clusterless.model.deploy.Dataset; +import clusterless.model.deploy.LocatedDataset; import com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -24,7 +24,7 @@ public class NotifyEventTest { @Test void arcNotifyEvent() throws IOException { ArcNotifyEvent event = ArcNotifyEvent.Builder.builder() - .withDataset(Dataset.builder() + .withDataset(LocatedDataset.Builder.builder() .withName("name") .withVersion("version") .build()) diff --git a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcListener.java b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcListener.java index 1f885dc9..37792369 100644 --- a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcListener.java +++ b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcListener.java @@ -9,6 +9,7 @@ package clusterless.substrate.aws.arc; import clusterless.model.deploy.Arc; +import clusterless.model.deploy.Dataset; import clusterless.model.deploy.SourceDataset; import clusterless.model.deploy.Workload; import clusterless.naming.Label; @@ -39,7 +40,7 @@ public ArcListener(@NotNull ManagedComponentContext context, @NotNull Arc ids = arc.sources().values() .stream() .filter(SourceDataset::subscribe) // only listen for those subscribed too - .map(d -> ArcNotifyEvent.createDatasetId(d.name(), d.version())) + .map(Dataset::id) .collect(Collectors.toList()); Map detail = OrderedSafeMaps.of( diff --git a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcStack.java b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcStack.java index 52ca7b3c..61bbd682 100644 --- a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcStack.java +++ b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/arc/ArcStack.java @@ -10,6 +10,7 @@ import clusterless.config.Configurations; import clusterless.managed.component.ArcComponent; +import clusterless.managed.dataset.DatasetResolver; import clusterless.model.deploy.Arc; import clusterless.model.deploy.Deployable; import clusterless.model.deploy.Workload; @@ -24,6 +25,7 @@ */ public class ArcStack extends ManagedStack { private final Configurations configurations; + private final DatasetResolver resolver; private final ManagedProject managedProject; private final Deployable deployable; private final Arc> arc; @@ -31,20 +33,21 @@ public class ArcStack extends ManagedStack { private ArcOrchestration orchestration; private ArcListener arcListener; - private static Label arcBaseId(Arc arc) { + private static Label arcBaseId(Arc> arc) { return Label.of("Arc").with(arc.name()); } - public ArcStack(Configurations configurations, ManagedProject managedProject, Deployable deployable, Arc arc) { + public ArcStack(Configurations configurations, DatasetResolver resolver, ManagedProject managedProject, Deployable deployable, Arc arc) { super(Stacks.stackName(deployable, arcBaseId(arc)), managedProject, deployable, arcBaseId(arc)); this.configurations = configurations; + this.resolver = resolver; this.managedProject = managedProject; this.deployable = deployable; this.arc = arc; } public void applyArcWorkloadComponent(ArcComponent arcComponent) { - ManagedComponentContext context = new ManagedComponentContext(configurations, managedProject, deployable, this); + ManagedComponentContext context = new ManagedComponentContext(configurations, resolver, managedProject, deployable, this); orchestration = new ArcOrchestration(context, arc); diff --git a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/construct/ArcConstruct.java b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/construct/ArcConstruct.java index 9e7f07df..defb43b7 100644 --- a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/construct/ArcConstruct.java +++ b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/construct/ArcConstruct.java @@ -42,7 +42,8 @@ protected void grantManifestAndDatasetPermissionsTo(IGrantable grantable) { } protected void grantDatasets(IGrantable grantable) { - grantEachBucket(model().sources(), id("Source"), b -> b.grantRead(grantable)); + Map located = context().resolver().locate(context().deployable().project(), model().sources()); + grantEachBucket(located, id("Source"), b -> b.grantRead(grantable)); grantEachBucket(model().sinks(), id("Sink"), b -> b.grantReadWrite(grantable)); } diff --git a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/managed/ManagedComponentContext.java b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/managed/ManagedComponentContext.java index a869b454..f3bf85a1 100644 --- a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/managed/ManagedComponentContext.java +++ b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/managed/ManagedComponentContext.java @@ -10,6 +10,7 @@ import clusterless.config.Configurations; import clusterless.managed.component.ComponentContext; +import clusterless.managed.dataset.DatasetResolver; import clusterless.model.deploy.Deployable; import software.constructs.Construct; @@ -19,16 +20,18 @@ public class ManagedComponentContext implements ComponentContext { final Configurations configurations; + final DatasetResolver resolver; final ManagedProject managedProject; final Deployable deployable; final Managed parent; - public ManagedComponentContext(Configurations configurations, ManagedProject managedProject, Deployable deployable) { - this(configurations, managedProject, deployable, managedProject); + public ManagedComponentContext(Configurations configurations, DatasetResolver resolver, ManagedProject managedProject, Deployable deployable) { + this(configurations, resolver, managedProject, deployable, managedProject); } - public ManagedComponentContext(Configurations configurations, ManagedProject managedProject, Deployable deployable, Managed parent) { + public ManagedComponentContext(Configurations configurations, DatasetResolver resolver, ManagedProject managedProject, Deployable deployable, Managed parent) { this.configurations = configurations; + this.resolver = resolver; this.managedProject = managedProject; this.deployable = deployable; this.parent = parent; @@ -38,6 +41,10 @@ public Configurations configurations() { return configurations; } + public DatasetResolver resolver() { + return resolver; + } + public ManagedProject managedProject() { return managedProject; } diff --git a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/resources/StateURIs.java b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/resources/StateURIs.java index f87da334..dbbda656 100644 --- a/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/resources/StateURIs.java +++ b/clusterless-substrate-aws-construct-common/src/main/java/clusterless/substrate/aws/resources/StateURIs.java @@ -8,7 +8,7 @@ package clusterless.substrate.aws.resources; -import clusterless.model.deploy.LocatedDataset; +import clusterless.model.deploy.Dataset; import clusterless.model.deploy.Placement; import clusterless.model.deploy.Project; import clusterless.model.manifest.ManifestState; @@ -22,25 +22,25 @@ import software.constructs.Construct; public class StateURIs { - public static ManifestURI manifestPath(@NotNull ManagedConstruct managedConstruct, LocatedDataset dataset) { + public static ManifestURI manifestPath(@NotNull ManagedConstruct managedConstruct, Dataset dataset) { Placement placement = placementFor(managedConstruct); return manifestPath(placement, dataset); } @NotNull - public static ManifestURI manifestPath(Placement placement, LocatedDataset dataset) { + public static ManifestURI manifestPath(Placement placement, Dataset dataset) { return ManifestURI.builder() .withPlacement(placement) .withDataset(dataset) .build(); } - public static ManifestURI manifestPath(@NotNull ManagedConstruct managedConstruct, ManifestState state, LocatedDataset dataset) { + public static ManifestURI manifestPath(@NotNull ManagedConstruct managedConstruct, ManifestState state, Dataset dataset) { return manifestPath(placementFor(managedConstruct), state, dataset); } @NotNull - public static ManifestURI manifestPath(Placement placement, ManifestState state, LocatedDataset dataset) { + public static ManifestURI manifestPath(Placement placement, ManifestState state, Dataset dataset) { return ManifestURI.builder() .withPlacement(placement) .withDataset(dataset) diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/batch/BatchExecArcLocalExecutor.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/batch/BatchExecArcLocalExecutor.java index 32c9ce07..d3986cc6 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/batch/BatchExecArcLocalExecutor.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/batch/BatchExecArcLocalExecutor.java @@ -10,6 +10,7 @@ import clusterless.json.JSONUtil; import clusterless.managed.component.ArcLocalExecutor; +import clusterless.managed.dataset.LookupDatasetOwnerLookup; import clusterless.model.deploy.Placement; import clusterless.model.manifest.ManifestState; import clusterless.substrate.aws.arc.props.ArcEnvBuilder; @@ -57,10 +58,11 @@ public BatchExecArcLocalExecutor(Placement placement, BatchExecArc arc) { } @Override - public List commands(String role, String lotId, ManifestState manifestState) { + public List commands(String role, String lotId, ManifestState manifestState, LookupDatasetOwnerLookup ownerLookup) { ArcEnvBuilder arcEnvBuilder = new ArcEnvBuilder(placement, arc); Map arcEnvironment = arcEnvBuilder.asEnvironment(); - ArcWorkloadContext arcWorkloadContext = arcEnvBuilder.execContext(role, lotId, manifestState); + + ArcWorkloadContext arcWorkloadContext = arcEnvBuilder.execContext(role, lotId, manifestState, ownerLookup); Map localComments = new LinkedHashMap<>(); Map localEnvironment = new LinkedHashMap<>(arcEnvironment); diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/props/ArcEnvBuilder.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/props/ArcEnvBuilder.java index 65732724..4184e027 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/props/ArcEnvBuilder.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/substrate/aws/arc/props/ArcEnvBuilder.java @@ -9,6 +9,7 @@ package clusterless.substrate.aws.arc.props; import clusterless.lambda.arc.ArcProps; +import clusterless.managed.dataset.LookupDatasetOwnerLookup; import clusterless.model.deploy.*; import clusterless.model.manifest.ManifestState; import clusterless.substrate.aws.event.ArcNotifyEvent; @@ -82,19 +83,21 @@ public Map asEnvironment() { } @NotNull - public ArcWorkloadContext execContext(String role, String lotId, ManifestState manifestState) { + public ArcWorkloadContext execContext(String role, String lotId, ManifestState manifestState, LookupDatasetOwnerLookup lookupOwner) { URI manifest = arcProps.sourceManifestPaths() .get(role) .withState(manifestState) .withLot(lotId) .uri(); - SourceDataset dataset = sources.get(role); + SourceDataset source = sources.get(role); + + OwnedDataset sink = lookupOwner.lookup(source); return ArcWorkloadContext.builder() .withArcNotifyEvent( ArcNotifyEvent.Builder.builder() - .withDataset(dataset) + .withDataset(sink.dataset()) .withManifest(manifest) .withLot(lotId) .build() diff --git a/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/cdk/lifecycle/Lifecycle.java b/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/cdk/lifecycle/Lifecycle.java index 505be542..96bf6135 100644 --- a/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/cdk/lifecycle/Lifecycle.java +++ b/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/cdk/lifecycle/Lifecycle.java @@ -11,6 +11,7 @@ import clusterless.config.Configurations; import clusterless.managed.ModelType; import clusterless.managed.component.*; +import clusterless.managed.dataset.DatasetResolver; import clusterless.model.DeployableLoader; import clusterless.model.Model; import clusterless.model.deploy.Arc; @@ -73,6 +74,8 @@ public ManagedProject mapProject(List deployables) { String version = versions.stream().findFirst().orElseThrow(); String stage = stages.isEmpty() ? null : stages.stream().findFirst().orElseThrow(); + DatasetResolver resolver = new DatasetResolver(deployables); + ManagedProject managedProject = new ManagedProject(name, version, stage, deployables); for (Deployable deployable : deployables) { @@ -80,24 +83,24 @@ public ManagedProject mapProject(List deployables) { // create a stack for each isolatable construct for (ModelType[] stackGroup : stackGroups.independentModels()) { - constructIndependentStacks(managedProject, deployable, stackGroup); + constructIndependentStacks(resolver, managedProject, deployable, stackGroup); } // create a stack for grouped constructs for (ModelType[] stackGroup : stackGroups.groupedModels()) { - constructGroupedStack(managedProject, deployable, stackGroup); + constructGroupedStack(resolver, managedProject, deployable, stackGroup); } // create a stack for independent constructs for (ModelType[] stackGroup : stackGroups.managedModels()) { - constructManagedStacks(managedProject, deployable, stackGroup); + constructManagedStacks(resolver, managedProject, deployable, stackGroup); } } return managedProject; } - private void constructManagedStacks(ManagedProject managedProject, Deployable deployable, ModelType[] independent) { + private void constructManagedStacks(DatasetResolver resolver, ManagedProject managedProject, Deployable deployable, ModelType[] independent) { Map> managed = componentServices.componentServicesFor(Isolation.managed, deployable, independent); if (managed.isEmpty()) { @@ -122,13 +125,13 @@ private void constructManagedStacks(ManagedProject managedProject, Deployable de } // construct a stack for every arc - ArcStack stack = new ArcStack(configurations, managedProject, deployable, arc); + ArcStack stack = new ArcStack(configurations, resolver, managedProject, deployable, arc); // force dependency on prior stacks, but not prior arcs priorStacks.forEach(stack::addDependency); // todo: lookup all referenced datasets and retrieve their bucket names - ManagedComponentContext context = new ManagedComponentContext(configurations, managedProject, deployable, stack); + ManagedComponentContext context = new ManagedComponentContext(configurations, resolver, managedProject, deployable, stack); LOG.info(String.format("creating %s embedded construct: %s", arc.label(), arc.type())); ArcComponent construct = (ArcComponent) modelComponentService.create(context, arc); @@ -137,7 +140,7 @@ private void constructManagedStacks(ManagedProject managedProject, Deployable de } - private void constructIndependentStacks(ManagedProject managedProject, Deployable deployable, ModelType[] isolatable) { + private void constructIndependentStacks(DatasetResolver resolver, ManagedProject managedProject, Deployable deployable, ModelType[] isolatable) { Map> isolated = componentServices.componentServicesFor(Isolation.independent, deployable, isolatable); if (isolated.isEmpty()) { @@ -146,10 +149,10 @@ private void constructIndependentStacks(ManagedProject managedProject, Deployabl } // constructs a stack for every isolated declared model - construct(new ManagedComponentContext(configurations, managedProject, deployable), isolated); + construct(new ManagedComponentContext(configurations, resolver, managedProject, deployable), isolated); } - private void constructGroupedStack(ManagedProject managedProject, Deployable deployable, ModelType[] includable) { + private void constructGroupedStack(DatasetResolver resolver, ManagedProject managedProject, Deployable deployable, ModelType[] includable) { Map> included = componentServices.componentServicesFor(Isolation.grouped, deployable, includable); if (included.isEmpty()) { @@ -163,7 +166,7 @@ private void constructGroupedStack(ManagedProject managedProject, Deployable dep // make the new stack dependent on the prior stacks so order is retained during deployment managedProject.stacks().forEach(stack::addDependency); - ComponentContext context = new ManagedComponentContext(configurations, managedProject, deployable, stack); + ComponentContext context = new ManagedComponentContext(configurations, resolver, managedProject, deployable, stack); construct(context, included); } diff --git a/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/local/Local.java b/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/local/Local.java index 0a767ecb..11fc1cef 100644 --- a/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/local/Local.java +++ b/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/local/Local.java @@ -10,6 +10,7 @@ import clusterless.command.project.LocalCommandOptions; import clusterless.managed.component.*; +import clusterless.managed.dataset.DatasetResolver; import clusterless.model.DeployableLoader; import clusterless.model.Model; import clusterless.model.deploy.Arc; @@ -51,6 +52,8 @@ public List loadProjectModels(List deployFiles) throws IOExcep public Integer call() throws Exception { List deployables = loadProjectModels(commandOptions.projectFiles()); + DatasetResolver resolver = new DatasetResolver(deployables); + Map>> found = new LinkedHashMap<>(); for (Deployable deployable : deployables) { @@ -84,7 +87,12 @@ public Integer call() throws Exception { ArcLocalExecutor executor = executorFor(deployable.placement(), arc); String lotId = prompt(commandOptions.lotId(), "Enter lot id: "); - List commands = executor.commands(commandOptions.role(), lotId, commandOptions.manifestState()); + List commands = executor.commands( + commandOptions.role(), + lotId, + commandOptions.manifestState(), + source -> resolver.locate(deployable.project(), source) + ); ShellWriter shellWriter = new ShellWriter(Runtimes.current()); diff --git a/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/meta/Metadata.java b/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/meta/Metadata.java index ee9b1258..e253d34a 100644 --- a/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/meta/Metadata.java +++ b/clusterless-substrate-aws-kernel/src/main/java/clusterless/substrate/aws/meta/Metadata.java @@ -14,6 +14,8 @@ import clusterless.substrate.aws.cdk.bootstrap.BootstrapMeta; import clusterless.substrate.aws.sdk.S3; import clusterless.substrate.uri.ArcURI; +import clusterless.substrate.uri.DatasetURI; +import clusterless.substrate.uri.ProjectMaterialsURI; import clusterless.substrate.uri.ProjectURI; import com.fasterxml.jackson.core.type.TypeReference; import org.jetbrains.annotations.NotNull; @@ -26,6 +28,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -127,6 +131,8 @@ public static int pushDeployablesMetadata(List deployables) { String profile = System.getenv().get(CDKProcessExec.CLS_CDK_PROFILE); for (Deployable deployable : deployables) { + List materials = new ArrayList<>(); + Placement placement = deployable.placement(); Project project = deployable.project(); @@ -138,6 +144,7 @@ public static int pushDeployablesMetadata(List deployables) { .build() .uri(); + materials.add(metaURI); LOG.info("putting metadata in: {}", metaURI); Optional result = s3.put(metaURI, "application/json", deployable) @@ -147,6 +154,8 @@ public static int pushDeployablesMetadata(List deployables) { return 1; } + List sinks = new LinkedList<>(); + for (Arc> arc : deployable.arcs()) { URI arcURI = ArcURI.builder() .withPlacement(placement) @@ -155,6 +164,7 @@ public static int pushDeployablesMetadata(List deployables) { .build() .uri(); + materials.add(arcURI); LOG.info("putting metadata in: {}", arcURI); result = s3.put(arcURI, "application/json", arc) @@ -163,6 +173,46 @@ public static int pushDeployablesMetadata(List deployables) { if (result.isPresent()) { return 1; } + + sinks.addAll(arc.sinks().values()); + } + + for (Boundary boundary : deployable.boundaries()) { + // todo: upload boundary metadata + sinks.add(boundary.dataset()); + } + + for (SinkDataset sinkDataset : sinks) { + URI datasetURI = DatasetURI.builder() + .withPlacement(placement) + .withDataset(sinkDataset) + .build() + .uri(); + + materials.add(datasetURI); + LOG.info("putting metadata in: {}", datasetURI); + + result = s3.put(datasetURI, "application/json", new OwnedDataset(project, sinkDataset)) + .isSuccessOrLog(r -> String.format("unable to upload dataset metadata to: %s, %s", datasetURI, r.errorMessage())); + + if (result.isPresent()) { + return 1; + } + } + + URI materialsURI = ProjectMaterialsURI.builder() + .withPlacement(placement) + .withProject(project) + .build() + .uri(); + + LOG.info("putting metadata in: {}", materialsURI); + + Optional materialsResults = s3.put(materialsURI, "application/json", materials) + .isSuccessOrLog(r -> String.format("unable to upload materials metadata to: %s, %s", metaURI, r.errorMessage())); + + if (materialsResults.isPresent()) { + return 1; } } @@ -178,41 +228,42 @@ public static int removeDeployablesMetadata(List deployables) { S3 s3 = new S3(profile, placement.region()); - ProjectURI uri = ProjectURI.builder() + URI uri = ProjectMaterialsURI.builder() .withPlacement(placement) .withProject(project) - .build(); + .build() + .uri(); - URI metaURI = uri.uri(); - LOG.info("removing metadata in: {}", metaURI); + LOG.info("removing metadata in: {}", uri); - Optional result = s3.remove(metaURI) - .isSuccessOrLog(r -> String.format("unable to remove project metadata to: %s, %s", metaURI, r.errorMessage())); + S3.Response response = s3.get(uri); - if (result.isPresent()) { - return 1; + if (!s3.exists(response)) { + throw new IllegalStateException("materials not found: " + uri, response.exception()); } - for (Arc> arc : deployable.arcs()) { - URI arcURI = ArcURI.builder() - .withPlacement(placement) - .withProject(project) - .withArcName(arc.name()) - .build() - .uri(); + List materials = JSONUtil.readAsObjectSafe(response.asInputStream(), new TypeReference<>() {}); - LOG.info("removing metadata in: {}", arcURI); + boolean failed = false; + for (URI material : materials) { + LOG.info("removing metadata in: {}", material); - result = s3.remove(arcURI) - .isSuccessOrLog(r -> String.format("unable to remove arc metadata to: %s, %s", arcURI, r.errorMessage())); + Optional result = s3.remove(material) + .isSuccessOrLog(r -> String.format("unable to remove material metadata to: %s, %s", material, r.errorMessage())); - if (result.isPresent()) { - return 1; - } + failed |= result.isPresent(); + } + + Optional result = s3.remove(uri) + .isSuccessOrLog(r -> String.format("unable to remove materials metadata to: %s, %s", uri, r.errorMessage())); + + if (result.isPresent() || failed) { + return 1; } } return 0; } + } diff --git a/clusterless-substrate-aws-kernel/src/test/java/clusterless/substrate/aws/KernelTest.java b/clusterless-substrate-aws-kernel/src/test/java/clusterless/substrate/aws/KernelTest.java index fd55aa36..90833654 100644 --- a/clusterless-substrate-aws-kernel/src/test/java/clusterless/substrate/aws/KernelTest.java +++ b/clusterless-substrate-aws-kernel/src/test/java/clusterless/substrate/aws/KernelTest.java @@ -35,68 +35,86 @@ public class KernelTest { }; @Test - @StdIo("{\n" + - "\"project\": {\n" + - " \"name\" : \"TestProject\",\n" + - " \"version\": \"20230101-00\"\n" + - "},\n" + - "\"placement\": {\n" + - " \"stage\": \"prod\",\n" + - " \"provider\": \"aws\",\n" + - " \"account\": \"abc123\",\n" + - " \"region\": \"us-east-2\"\n" + - "},\n" + - "\"resources\" : [\n" + - " {\n" + - " \"type\" : \"aws:core:s3Bucket\",\n" + - " \"name\" : \"sample-bucket1\",\n" + - " \"bucketName\" : \"sample-bucket1\"\n" + - " },\n" + - " {\n" + - " \"type\" : \"aws:core:s3Bucket\",\n" + - " \"name\" : \"sample-bucket2\",\n" + - " \"bucketName\" : \"sample-bucket2\"\n" + - " }\n" + - " ]\n" + - "}\n") + @StdIo(""" + { + "project": { + "name" : "TestProject", + "version": "20230101-00" + }, + "placement": { + "stage": "prod", + "provider": "aws", + "account": "abc123", + "region": "us-east-2" + }, + "resources" : [ + { + "type" : "aws:core:s3Bucket", + "name" : "sample-bucket1", + "bucketName" : "sample-bucket1" + }, + { + "type" : "aws:core:s3Bucket", + "name" : "sample-bucket2", + "bucketName" : "sample-bucket2" + } + ] + } + """) void createResourcesProject() { Assertions.assertEquals(0, new Kernel().execute(args)); } + /** + * This test confirms the boundary dataset is resolved into the arc source dataset declaration + */ @Test - @StdIo("{\n" + - " \"project\": {\n" + - " \"name\": \"TestProject\",\n" + - " \"version\": \"20230101-00\"\n" + - " },\n" + - " \"placement\": {\n" + - " \"stage\": \"prod\",\n" + - " \"provider\": \"aws\",\n" + - " \"account\": \"abc123\",\n" + - " \"region\": \"us-east-2\"\n" + - " },\n" + - " \"arcs\": [\n" + - " {\n" + - " \"type\": \"aws:core:s3CopyArc\",\n" + - " \"name\": \"copy\",\n" + - " \"sources\": {\n" + - " \"main\": {\n" + - " \"name\": \"ingress\",\n" + - " \"version\": \"20220101\",\n" + - " \"pathURI\": \"s3://clusterless-test/ingress/\"\n" + - " }\n" + - " },\n" + - " \"sinks\": {\n" + - " \"main\": {\n" + - " \"name\": \"copy\",\n" + - " \"version\": \"20230101\",\n" + - " \"pathURI\": \"s3://clusterless-test/copy/\"\n" + - " }\n" + - " }\n" + - " }\n" + - " ]\n" + - "}\n" + - " ") + @StdIo(""" + { + "project": { + "name": "TestProject", + "version": "20230101-00" + }, + "placement": { + "stage": "prod", + "provider": "aws", + "account": "abc123", + "region": "us-east-2" + }, + "boundaries": [ + { + "dataset": { + "name": "ingress", + "version": "20220101", + "pathURI": "s3://test-native-copy-0192-us-west-2/ingress/" + }, + "eventArrival": "infrequent", + "lotUnit": "Twelfths", + "name": "IngressPutListener", + "type": "aws:core:s3PutListenerBoundary" + } + ], + "arcs": [ + { + "type": "aws:core:s3CopyArc", + "name": "copy", + "sources": { + "main": { + "name": "ingress", + "version": "20220101" + } + }, + "sinks": { + "main": { + "name": "copy", + "version": "20230101", + "pathURI": "s3://clusterless-test/copy/" + } + } + } + ] + } + """) void copyArcProject() { Assertions.assertEquals(0, new Kernel().execute(args)); } diff --git a/clusterless-substrate-aws-lambda-arc/src/integrationTest/java/clusterless/lambda/arc/ArcStatesEventHandlerTest.java b/clusterless-substrate-aws-lambda-arc/src/integrationTest/java/clusterless/lambda/arc/ArcStatesEventHandlerTest.java index 6a36ffae..971e06ea 100644 --- a/clusterless-substrate-aws-lambda-arc/src/integrationTest/java/clusterless/lambda/arc/ArcStatesEventHandlerTest.java +++ b/clusterless-substrate-aws-lambda-arc/src/integrationTest/java/clusterless/lambda/arc/ArcStatesEventHandlerTest.java @@ -64,7 +64,7 @@ protected ArcStateProps getProps() { Stream events() { return Stream.of( ArcNotifyEvent.Builder.builder() - .withDataset(datasets().datasetMap().get("main")) + .withDataset(datasets().sourceDatasetMap().get("main")) .withManifest(datasets().manifestIdentifierMap("20230227PT5M287", datasets().datasetMap(), ManifestState.complete).get("main").uri()) .withLot("20230227PT5M287") .build() @@ -73,8 +73,6 @@ Stream events() { @BeforeEach void initData() { - ArcStateProps props = getProps(); - new CreateDataMachine("20230227PT5M287") .applyBucketsFrom(datasets().sourceDatasetMap()) .applyBucketsFrom(datasets().sinkDatasetMap()) diff --git a/clusterless-substrate-aws-lambda-arc/src/main/java/clusterless/lambda/arc/ArcStateStartHandler.java b/clusterless-substrate-aws-lambda-arc/src/main/java/clusterless/lambda/arc/ArcStateStartHandler.java index 6c9a313c..4eb40ef8 100644 --- a/clusterless-substrate-aws-lambda-arc/src/main/java/clusterless/lambda/arc/ArcStateStartHandler.java +++ b/clusterless-substrate-aws-lambda-arc/src/main/java/clusterless/lambda/arc/ArcStateStartHandler.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -126,7 +125,7 @@ protected ArcStateContext handleEvent(ArcNotifyEvent event, Context context, Arc List roles = arcStateProps.sources().entrySet() .stream() - .filter(e -> Objects.equals(e.getValue().name(), event.dataset().name()) && Objects.equals(e.getValue().version(), event.dataset().version())) + .filter(e -> e.getValue().sameDataset(event.dataset())) .map(Map.Entry::getKey).collect(Collectors.toList()); eventObserver.applyRoles(roles); diff --git a/clusterless-substrate-aws-lambda-common/src/testFixtures/java/clusterless/lambda/TestDatasets.java b/clusterless-substrate-aws-lambda-common/src/testFixtures/java/clusterless/lambda/TestDatasets.java index cc491687..1b482b11 100644 --- a/clusterless-substrate-aws-lambda-common/src/testFixtures/java/clusterless/lambda/TestDatasets.java +++ b/clusterless-substrate-aws-lambda-common/src/testFixtures/java/clusterless/lambda/TestDatasets.java @@ -107,8 +107,8 @@ public Map manifestPathMap() { return manifestPathMap(datasetMap(), null); } - public List sinkDatasetList() { - return sinkDatasets(roles).collect(Collectors.toList()); + public List sinkDatasetListAsSink() { + return sinkDatasets(roles).map(SinkDataset::new).collect(Collectors.toList()); } public List manifestPathList(ManifestState state) { diff --git a/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandlerTest.java b/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandlerTest.java index 01a324c6..5ece6654 100644 --- a/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandlerTest.java +++ b/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/frequents3put/FrequentPutEventTransformHandlerTest.java @@ -56,7 +56,7 @@ protected FrequentS3PutTransformProps getProps() { .withManifestCompletePath(datasets().manifestPathList(ManifestState.complete).get(0)) .withManifestPartialPath(datasets().manifestPathList(ManifestState.partial).get(0)) .withLotUnit(IntervalUnit.TWELFTHS.name()) - .withDataset(datasets().sinkDatasetList().get(0)) + .withDataset(datasets().sinkDatasetListAsSink().get(0)) .withEventBusName(eventBusName()) .withSqsQueueName(sqsQueueName()) .build(); diff --git a/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/s3put/PutEventTransformHandlerTest.java b/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/s3put/PutEventTransformHandlerTest.java index 0521c3cb..68bf85f4 100644 --- a/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/s3put/PutEventTransformHandlerTest.java +++ b/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/lambda/transform/s3put/PutEventTransformHandlerTest.java @@ -52,7 +52,7 @@ protected S3PutTransformProps getProps() { .withManifestCompletePath(datasets().manifestPathList(ManifestState.complete).get(0)) .withManifestPartialPath(datasets().manifestPathList(ManifestState.partial).get(0)) .withLotUnit(IntervalUnit.TWELFTHS.name()) - .withDataset(datasets().sinkDatasetList().get(0)) + .withDataset(datasets().sinkDatasetListAsSink().get(0)) .withEventBusName(eventBusName()) .build(); } diff --git a/clusterless-substrate/src/main/java/clusterless/substrate/uri/ManifestURI.java b/clusterless-substrate/src/main/java/clusterless/substrate/uri/ManifestURI.java index 4cbbf522..5e3a1e1e 100644 --- a/clusterless-substrate/src/main/java/clusterless/substrate/uri/ManifestURI.java +++ b/clusterless-substrate/src/main/java/clusterless/substrate/uri/ManifestURI.java @@ -8,6 +8,7 @@ package clusterless.substrate.uri; +import clusterless.model.deploy.Dataset; import clusterless.model.deploy.LocatedDataset; import clusterless.model.deploy.Placement; import clusterless.model.manifest.ManifestState; @@ -88,7 +89,7 @@ public ManifestURI deserialize(JsonParser p, DeserializationContext ctxt) throws // todo: make attempt supplier a strategy class/enum so that we can json serialize it protected transient Supplier attemptSupplier = Lazy.of(() -> String.valueOf(System.currentTimeMillis())); - protected LocatedDataset dataset; + protected Dataset dataset; protected ManifestURI() { super(StateStore.Manifest); @@ -153,8 +154,8 @@ private Partition terminalPartition() { Partition attempt = state != null && state.hasAttempts() ? Partition.namedOf("attempt", attemptId()) : Partition.NULL; return Partition.of(DATASETS) - .withNamedTerminal("name", Optional.ofNullable(dataset).map(LocatedDataset::name)) - .withNamedTerminal("version", Optional.ofNullable(dataset).map(LocatedDataset::version)) + .withNamedTerminal("name", Optional.ofNullable(dataset).map(Dataset::name)) + .withNamedTerminal("version", Optional.ofNullable(dataset).map(Dataset::version)) .withNamedTerminal("lot", lotId) // retain case .withTerminal(state) .withTerminal(attempt); @@ -229,11 +230,11 @@ protected ManifestURI setAttemptId(String attemptId) { return this; } - public LocatedDataset dataset() { + public Dataset dataset() { return dataset; } - protected ManifestURI setDataset(LocatedDataset dataset) { + protected ManifestURI setDataset(Dataset dataset) { this.dataset = dataset; return this; } @@ -252,7 +253,7 @@ public ManifestURI self() { } public static final class Builder { - private LocatedDataset dataset; + private Dataset dataset; private Placement placement; private String lotId; private ManifestState state; @@ -264,7 +265,7 @@ public static Builder aManifestStateIdentifier() { return new Builder(); } - public Builder withDataset(LocatedDataset dataset) { + public Builder withDataset(Dataset dataset) { this.dataset = dataset; return this; } diff --git a/clusterless-substrate/src/main/java/clusterless/substrate/uri/ProjectMaterialsURI.java b/clusterless-substrate/src/main/java/clusterless/substrate/uri/ProjectMaterialsURI.java new file mode 100644 index 00000000..58bff6e0 --- /dev/null +++ b/clusterless-substrate/src/main/java/clusterless/substrate/uri/ProjectMaterialsURI.java @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.substrate.uri; + +import clusterless.model.deploy.Placement; +import clusterless.model.deploy.Project; +import clusterless.naming.Partition; +import clusterless.substrate.store.StateStore; +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer; + +import java.io.IOException; +import java.net.URI; +import java.util.Objects; +import java.util.Optional; + +import static java.util.Optional.ofNullable; + +/** + * Path + *

+ * {@code {providerService}://{stateStore}/{projectName}/{projectVersion}/} + *

+ * Identifier + *

+ * {@code {providerService}://{stateStore}/{projectName}/{projectVersion}/project.json} + */ +@JsonSerialize(using = ProjectMaterialsURI.Serializer.class) +@JsonDeserialize(using = ProjectMaterialsURI.DeSerializer.class) +public class ProjectMaterialsURI extends MetaURI { + + public static final String MATERIALS = "materials"; + + public static Builder builder() { + return Builder.builder(); + } + + static class Serializer extends StdScalarSerializer { + protected Serializer() { + super(ProjectMaterialsURI.class); + } + + @Override + public void serialize(ProjectMaterialsURI value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeString(value.template()); + } + } + + static class DeSerializer extends StdScalarDeserializer { + protected DeSerializer() { + super(ProjectMaterialsURI.class); + } + + @Override + public ProjectMaterialsURI deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException { + // The critical path: ensure we handle the common case first. + if (p.hasToken(JsonToken.VALUE_STRING)) { + return ProjectMaterialsURI.parse(p.getText()); + } + // [databind#381] +// if (p.hasToken(JsonToken.START_ARRAY)) { +// return _deserializeFromArray(p, ctxt); +// } + + return ProjectMaterialsURI.parse(_parseString(p, ctxt, this)); + } + } + + protected Project project; + + protected ProjectMaterialsURI() { + super(StateStore.Meta); + } + + protected ProjectMaterialsURI(ProjectMaterialsURI other) { + super(other); + this.project = other.project; + } + + public Project project() { + return project; + } + + protected ProjectMaterialsURI copy() { + return new ProjectMaterialsURI(this); + } + + @Override + public boolean isPath() { + return project == null || project.name() == null || project.version() == null; + } + + /** + * Template may be a URI or an absolute path. + * + * @param template + * @return + */ + public static ProjectMaterialsURI parse(String template) { + Objects.requireNonNull(template, "template is null"); + + // {providerService}://{stateStore}/projects/{projectName}/{projectVersion}/project.json + String[] split = template.split("/"); + + boolean isOnlyPath = isOnlyPath(template); + int index = isOnlyPath ? 2 : 4; + String storeName = isOnlyPath ? null : value(split, 2); + return new ProjectMaterialsURI() + .setStoreName(storeName) // the bucket in s3 + .setProject(Project.Builder.builder() + .withName(value(split, index++)) + .withVersion(value(split, index)) + .build()); + } + + @Override + public URI uriPrefix() { + Partition partition = Partition.of(MATERIALS) + .withNamedTerminal("name", ofNullable(project).map(Project::name)) + .withNamedTerminal("version", ofNullable(project).map(Project::version)); + + return createUri(partition.prefix()); + } + + @Override + public URI uriPath() { + String path = Partition.of(MATERIALS) + .withNamedTerminal("name", Optional.ofNullable(project).map(Project::name)) + .withNamedTerminal("version", Optional.ofNullable(project).map(Project::version)) + .with("materials.json") + .prefix(); + + return createUri(path); + } + + @Override + public URI uri() { + require(project, "project"); + + String path = Partition.of(MATERIALS) + .withNamed("name", project.name()) + .withNamed("version", project.version()) + .with("materials.json") + .prefix(); + + return createUri(path); + } + + @Override + public String template() { + String path = Partition.namedOf("name", ofNullable(project.name()).orElse("{projectName}")) + .withNamed("version", ofNullable(project.version()).orElse("{projectVersion}")) + .with("materials.json") + .partition(); + + return String.format("s3://%s/%s/%s", storeName.get(), MATERIALS, path); + } + + protected ProjectMaterialsURI setProject(Project project) { + this.project = project; + return this; + } + + public ProjectMaterialsURI withProject(Project project) { + return copy().setProject(project); + } + + @Override + public ProjectMaterialsURI self() { + return this; + } + + public static final class Builder { + protected Placement placement; + protected Project project; + + private Builder() { + } + + public static Builder builder() { + return new Builder(); + } + + public Builder withPlacement(Placement placement) { + this.placement = placement; + return this; + } + + public Builder withProject(Project project) { + this.project = project; + return this; + } + + public ProjectMaterialsURI build() { + ProjectMaterialsURI projectURI = new ProjectMaterialsURI(); + projectURI.setPlacement(placement); + projectURI.setProject(project); + return projectURI; + } + } +}