Skip to content

Commit

Permalink
push and remove owned dataset metadata
Browse files Browse the repository at this point in the history
resolve source dataset path against sink datasets
refactors the push metadata framework to include a materials list
  • Loading branch information
cwensel committed Oct 14, 2023
1 parent 70d5dbd commit 11edbd6
Show file tree
Hide file tree
Showing 39 changed files with 713 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ public static <T> T readAsObject(Path path, TypeReference<T> type) throws IOExce
return OBJECT_READER.forType(type).readValue(path.toFile());
}

public static <T> T readAsObjectSafe(InputStream inputStream, TypeReference<T> type) {
try {
return OBJECT_READER.forType(type).readValue(inputStream);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static String writeTypedAsStringSafe(Object object) {
try {
return TYPED_OBJECT_MAPPER.writeValueAsString(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package clusterless.managed.component;

import clusterless.managed.dataset.LookupDatasetOwnerLookup;
import clusterless.model.manifest.ManifestState;

import java.util.LinkedHashMap;
Expand All @@ -16,7 +17,6 @@
import java.util.Map;

public interface ArcLocalExecutor {

class Command {
String headerComment;
Map<String, String> environmentComments = new LinkedHashMap<>();
Expand Down Expand Up @@ -99,5 +99,5 @@ public Command build() {
}
}

List<Command> commands(String role, String lotId, ManifestState manifestState);
List<Command> commands(String role, String lotId, ManifestState manifestState, LookupDatasetOwnerLookup ownerLookup);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package clusterless.managed.dataset;

import clusterless.model.deploy.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.stream.Collectors;

/**
* Maps ReferencedDataset to a SinkDataset by looking up SourceDatasets in the current set of
* Deployables or in the metadata store.
* <p>
* This prevents a new project from overwriting and existing dataset. And allows current projects
* to declare read permissions against datasets they do not own.
*/
public class DatasetResolver {
private static final Logger LOG = LogManager.getLogger(DatasetResolver.class);
List<Deployable> deployables;

Map<ReferencedDataset, OwnedDataset> resolved = new HashMap<>();
private List<ReferencedDataset> locallyReferenced;
private List<OwnedDataset> locallyOwned;

public DatasetResolver(List<Deployable> deployables) {
this.deployables = deployables;

build();
}

private void build() {
locallyReferenced = new LinkedList<>();
locallyOwned = new LinkedList<>();

for (Deployable deployable : deployables) {
deployable.boundaries().stream()
.map(boundary -> new OwnedDataset(deployable.project(), boundary.dataset()))
.forEach(locallyOwned::add);

for (Arc<? extends Workload<?>> arc : deployable.arcs()) {
arc.sources()
.values()
.stream()
.map(source -> new ReferencedDataset(deployable.project(), source))
.forEach(locallyReferenced::add);

arc.sinks().values()
.stream()
.map(sink -> new OwnedDataset(deployable.project(), sink))
.forEach(locallyOwned::add);
}
}

for (ReferencedDataset referencedDataset : locallyReferenced) {
Set<OwnedDataset> ownedDatasets = locallyOwned.stream()
.filter(ownedDataset -> ownedDataset.dataset().sameDataset(referencedDataset.dataset()))
.collect(Collectors.toSet());

// todo: confirm dataset isn't already owned outside this set of deployables

if (ownedDatasets.isEmpty()) {
String message = "dataset owner not found: %s".formatted(referencedDataset.dataset().id());
LOG.error(message);
throw new IllegalStateException(message);
} else if (ownedDatasets.size() != 1) {
Set<String> projects = ownedDatasets.stream().map(OwnedDataset::owner).map(Project::id).collect(Collectors.toSet());
String message = "dataset: %s, is owned by multiple projects: %s".formatted(referencedDataset.dataset().id(), projects);
LOG.error(message);
throw new IllegalStateException(message);
}

resolved.put(referencedDataset, ownedDatasets.stream().findFirst().orElseThrow());
}
}

public List<OwnedDataset> locallyOwned() {
return locallyOwned;
}

public Map<String, ? extends LocatedDataset> locate(@NotNull Project dependent, @NotNull Map<String, SourceDataset> sources) {
return sources.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> locate(dependent, e.getValue()).dataset()));
}

public OwnedDataset locate(Project dependent, SourceDataset value) {
ReferencedDataset referencedDataset = new ReferencedDataset(dependent, value);
OwnedDataset ownedDataset = resolved.get(referencedDataset);

if (ownedDataset != null) {
return ownedDataset;
}

String message = "dataset owner not found for: %s".formatted(referencedDataset.dataset().id());
LOG.error(message);
throw new IllegalStateException(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package clusterless.managed.dataset;

import clusterless.model.deploy.OwnedDataset;
import clusterless.model.deploy.SourceDataset;

public interface LookupDatasetOwnerLookup {
OwnedDataset lookup(SourceDataset source);
}
4 changes: 0 additions & 4 deletions clusterless-model/src/main/java/clusterless/model/Model.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
import clusterless.json.JSONUtil;
import clusterless.naming.Label;

/**
* Note {@link #equals(Object)} and {@link #hashCode()} are declared final. No two Model instances
* should ever be equal
*/
public abstract class Model implements Struct {
public final Label label() {
return Label.of(getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import clusterless.json.JsonRequiredProperty;
import clusterless.model.Model;
import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.Objects;

Expand Down Expand Up @@ -47,6 +48,11 @@ public String version() {
return version;
}

@JsonIgnore
public String id() {
return String.format("%s/%s", name(), version());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -55,6 +61,13 @@ public boolean equals(Object o) {
return Objects.equals(name, dataset.name) && Objects.equals(version, dataset.version);
}

public boolean sameDataset(Object o) {
if (this == o) return true;
if (o == null) return false;
Dataset dataset = (Dataset) o;
return Objects.equals(name, dataset.name) && Objects.equals(version, dataset.version);
}

@Override
public int hashCode() {
return Objects.hash(name, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,60 @@
package clusterless.model.deploy;

import clusterless.model.Struct;
import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.Objects;

public class OwnedDataset implements Struct {
Project project;
Project owner;
SinkDataset dataset;

public OwnedDataset(Project project, SinkDataset dataset) {
this.project = project;
public OwnedDataset(Project owner, SinkDataset dataset) {
this.owner = owner;
this.dataset = dataset;
}

public static Builder builder() {
return Builder.builder();
}

public Project project() {
return project;
public Project owner() {
return owner;
}

public SinkDataset dataset() {
return dataset;
}

@JsonIgnore
public String id() {
return String.format("%s:%s", owner().id(), dataset().id());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OwnedDataset that = (OwnedDataset) o;
return Objects.equals(owner, that.owner) && Objects.equals(dataset, that.dataset);
}

@Override
public int hashCode() {
return Objects.hash(owner, dataset);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("OwnedDataset{");
sb.append("project=").append(project);
sb.append("owner=").append(owner);
sb.append(", dataset=").append(dataset);
sb.append('}');
return sb.toString();
}

public static final class Builder {
Project project;
Project owner;
SinkDataset dataset;

private Builder() {
Expand All @@ -51,8 +72,8 @@ public static Builder builder() {
return new Builder();
}

public Builder withProject(Project project) {
this.project = project;
public Builder withOwner(Project owner) {
this.owner = owner;
return this;
}

Expand All @@ -62,7 +83,7 @@ public Builder withDataset(SinkDataset dataset) {
}

public OwnedDataset build() {
return new OwnedDataset(project, dataset);
return new OwnedDataset(owner, dataset);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import clusterless.json.JsonRequiredProperty;
import clusterless.managed.component.DocumentsModel;
import clusterless.model.Struct;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import java.util.Objects;
Expand Down Expand Up @@ -57,6 +58,11 @@ public String version() {
return version;
}

@JsonIgnore
public String id() {
return String.format("%s/%s", name(), version());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading

0 comments on commit 11edbd6

Please sign in to comment.