diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 985e4eb31e..bb876f47f2 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -16,6 +16,7 @@ */ package feast.core.job; +import com.google.common.collect.Sets; import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; @@ -35,7 +36,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -102,14 +102,8 @@ public Job call() { } boolean featureSetsChangedFor(Job job) { - Set existingFeatureSetsPopulatedByJob = - job.getFeatureSets().stream() - .map(fs -> fs.getProject() + "/" + fs.getName()) - .collect(Collectors.toSet()); - Set newFeatureSetsPopulatedByJob = - featureSets.stream() - .map(fs -> fs.getProject() + "/" + fs.getName()) - .collect(Collectors.toSet()); + Set existingFeatureSetsPopulatedByJob = Sets.newHashSet(job.getFeatureSets()); + Set newFeatureSetsPopulatedByJob = Sets.newHashSet(featureSets); return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob); } diff --git a/core/src/main/java/feast/core/model/Entity.java b/core/src/main/java/feast/core/model/Entity.java index 24bafdd403..4148b8a2a1 100644 --- a/core/src/main/java/feast/core/model/Entity.java +++ b/core/src/main/java/feast/core/model/Entity.java @@ -32,6 +32,7 @@ public class Entity { @EmbeddedId private EntityReference reference; + /** Data type of the entity. String representation of {@link ValueType} * */ private String type; public Entity() {} diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index a6e46a2d74..8ffe334d35 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -33,7 +33,9 @@ @Getter @Setter @javax.persistence.Entity -@Table(name = "feature_sets") +@Table( + name = "feature_sets", + uniqueConstraints = @UniqueConstraint(columnNames = {"name", "version", "project_name"})) public class FeatureSet extends AbstractTimestampEntity implements Comparable { // Id of the featureSet, defined as project/feature_set_name:feature_set_version diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index a0d77ca232..c021576790 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -195,6 +195,7 @@ public Optional getJob(Source source, Store store) { return Optional.of(jobs.get(0)); } + // TODO: optimize this to make less calls to the database. private List featureSetsFromProto(List protos) { return protos.stream() .map(FeatureSetProto.FeatureSet::getSpec) diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 59fdc32b20..26cf331c13 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.CoreServiceProto.ListFeatureSetsRequest.Filter; import feast.core.CoreServiceProto.ListFeatureSetsResponse; @@ -194,6 +195,13 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep when(specService.listStores(any())) .thenReturn(ListStoresResponse.newBuilder().addStore(store).build()); + for (FeatureSetProto.FeatureSet fs : Lists.newArrayList(featureSet1, featureSet2)) { + FeatureSetSpec spec = fs.getSpec(); + when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion( + spec.getName(), spec.getProject(), spec.getVersion())) + .thenReturn(FeatureSet.fromProto(fs)); + } + when(jobManager.startJob(argThat(new JobMatcher(expectedInput)))).thenReturn(expected); when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); @@ -318,6 +326,12 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { when(jobManager.startJob(argThat(new JobMatcher(expectedInput1)))).thenReturn(expected1); when(jobManager.startJob(argThat(new JobMatcher(expectedInput2)))).thenReturn(expected2); when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); + for (FeatureSetProto.FeatureSet fs : Lists.newArrayList(featureSet1, featureSet2)) { + FeatureSetSpec spec = fs.getSpec(); + when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion( + spec.getName(), spec.getProject(), spec.getVersion())) + .thenReturn(FeatureSet.fromProto(fs)); + } JobCoordinatorService jcs = new JobCoordinatorService( diff --git a/protos/feast/core/FeatureSet.proto b/protos/feast/core/FeatureSet.proto index f45d47d055..e7e69ede56 100644 --- a/protos/feast/core/FeatureSet.proto +++ b/protos/feast/core/FeatureSet.proto @@ -69,7 +69,7 @@ message EntitySpec { // Name of the entity. string name = 1; - // Value type of the feature. + // Value type of the entity. feast.types.ValueType.Enum value_type = 2; }