Skip to content

Commit

Permalink
Add uniqueness constraint to FeatureSets, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed May 4, 2020
1 parent fd9a9d7 commit d579424
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 11 deletions.
12 changes: 3 additions & 9 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.job;

import com.google.common.collect.Sets;
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -102,14 +102,8 @@ public Job call() {
}

boolean featureSetsChangedFor(Job job) {
Set<String> existingFeatureSetsPopulatedByJob =
job.getFeatureSets().stream()
.map(fs -> fs.getProject() + "/" + fs.getName())
.collect(Collectors.toSet());
Set<String> newFeatureSetsPopulatedByJob =
featureSets.stream()
.map(fs -> fs.getProject() + "/" + fs.getName())
.collect(Collectors.toSet());
Set<FeatureSet> existingFeatureSetsPopulatedByJob = Sets.newHashSet(job.getFeatureSets());
Set<FeatureSet> newFeatureSetsPopulatedByJob = Sets.newHashSet(featureSets);

return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/feast/core/model/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class Entity {
@EmbeddedId private EntityReference reference;

/** Data type of the entity. String representation of {@link ValueType} * */
private String type;

public Entity() {}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
@Getter
@Setter
@javax.persistence.Entity
@Table(name = "feature_sets")
@Table(
name = "feature_sets",
uniqueConstraints = @UniqueConstraint(columnNames = {"name", "version", "project_name"}))
public class FeatureSet extends AbstractTimestampEntity implements Comparable<FeatureSet> {

// Id of the featureSet, defined as project/feature_set_name:feature_set_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public Optional<Job> getJob(Source source, Store store) {
return Optional.of(jobs.get(0));
}

// TODO: optimize this to make less calls to the database.
private List<FeatureSet> featureSetsFromProto(List<FeatureSetProto.FeatureSet> protos) {
return protos.stream()
.map(FeatureSetProto.FeatureSet::getSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceProto.ListFeatureSetsRequest.Filter;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
Expand Down Expand Up @@ -194,6 +195,13 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep
when(specService.listStores(any()))
.thenReturn(ListStoresResponse.newBuilder().addStore(store).build());

for (FeatureSetProto.FeatureSet fs : Lists.newArrayList(featureSet1, featureSet2)) {
FeatureSetSpec spec = fs.getSpec();
when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion(
spec.getName(), spec.getProject(), spec.getVersion()))
.thenReturn(FeatureSet.fromProto(fs));
}

when(jobManager.startJob(argThat(new JobMatcher(expectedInput)))).thenReturn(expected);
when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW);

Expand Down Expand Up @@ -318,6 +326,12 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException {
when(jobManager.startJob(argThat(new JobMatcher(expectedInput1)))).thenReturn(expected1);
when(jobManager.startJob(argThat(new JobMatcher(expectedInput2)))).thenReturn(expected2);
when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW);
for (FeatureSetProto.FeatureSet fs : Lists.newArrayList(featureSet1, featureSet2)) {
FeatureSetSpec spec = fs.getSpec();
when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion(
spec.getName(), spec.getProject(), spec.getVersion()))
.thenReturn(FeatureSet.fromProto(fs));
}

JobCoordinatorService jcs =
new JobCoordinatorService(
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/core/FeatureSet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ message EntitySpec {
// Name of the entity.
string name = 1;

// Value type of the feature.
// Value type of the entity.
feast.types.ValueType.Enum value_type = 2;
}

Expand Down

0 comments on commit d579424

Please sign in to comment.