From df4b05746cb653e1cb906acabc7d1868d24e0ba6 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 1 Jul 2020 09:00:43 +0300 Subject: [PATCH 1/4] keep store proto in JobStore --- .../java/feast/core/dao/JobRepository.java | 2 +- .../core/job/ConsolidatedJobStrategy.java | 12 +- .../feast/core/job/JobPerStoreStrategy.java | 17 +- core/src/main/java/feast/core/model/Job.java | 32 ++-- .../main/java/feast/core/model/JobStore.java | 110 +++++++++++++ .../src/main/java/feast/core/model/Store.java | 2 +- .../core/service/JobCoordinatorService.java | 4 - .../java/feast/core/service/JobService.java | 2 +- .../db/migration/V2.4__Store_Timestamps.sql | 2 - .../db/migration/V2.4__Store_proto.sql | 1 + .../java/feast/core/job/JobTasksTest.java | 20 ++- .../job/dataflow/DataflowJobManagerTest.java | 16 +- .../direct/DirectRunnerJobManagerTest.java | 3 +- .../service/JobCoordinatorServiceTest.java | 154 ++++-------------- .../feast/core/service/JobServiceTest.java | 22 +-- 15 files changed, 211 insertions(+), 188 deletions(-) create mode 100644 core/src/main/java/feast/core/model/JobStore.java delete mode 100644 core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql create mode 100644 core/src/main/resources/db/migration/V2.4__Store_proto.sql diff --git a/core/src/main/java/feast/core/dao/JobRepository.java b/core/src/main/java/feast/core/dao/JobRepository.java index 244c0d5eff..4f1e527fb1 100644 --- a/core/src/main/java/feast/core/dao/JobRepository.java +++ b/core/src/main/java/feast/core/dao/JobRepository.java @@ -41,5 +41,5 @@ public interface JobRepository extends JpaRepository { List findByFeatureSetJobStatusesIn(List featureSetsJobStatuses); // find jobs by feast store name - List findByStoresName(String storeName); + List findByJobStoresIdStoreName(String storeName); } diff --git a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java index e9eb59ed61..1aab91532e 100644 --- a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java +++ b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java @@ -50,12 +50,12 @@ public Job getOrCreateJob(Source source, Set stores) { .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()) .orElseGet( - () -> - Job.builder() - .setSource(source) - .setStores(stores) - .setFeatureSetJobStatuses(new HashSet<>()) - .build()); + () -> { + Job job = + Job.builder().setSource(source).setFeatureSetJobStatuses(new HashSet<>()).build(); + job.setStores(stores); + return job; + }); } @Override diff --git a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java index a5d76b6358..c3b1d629a9 100644 --- a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java +++ b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java @@ -55,13 +55,16 @@ public Job getOrCreateJob(Source source, Set stores) { .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates()) .orElseGet( - () -> - Job.builder() - .setSource(source) - .setStoreName(store.getName()) - .setStores(stores) - .setFeatureSetJobStatuses(new HashSet<>()) - .build()); + () -> { + Job job = + Job.builder() + .setSource(source) + .setStoreName(store.getName()) + .setFeatureSetJobStatuses(new HashSet<>()) + .build(); + job.setStores(stores); + return job; + }); } @Override diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 90794292d1..07f94bb828 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -76,16 +76,8 @@ public JobBuilder setSource(Source source) { private String sourceConfig; // Sinks - @ManyToMany - @JoinTable( - name = "jobs_stores", - joinColumns = @JoinColumn(name = "job_id"), - inverseJoinColumns = @JoinColumn(name = "store_name"), - indexes = { - @Index(name = "idx_jobs_stores_job_id", columnList = "job_id"), - @Index(name = "idx_jobs_stores_store_name", columnList = "store_name") - }) - private Set stores; + @OneToMany(mappedBy = "job", cascade = CascadeType.ALL) + private Set jobStores = new HashSet<>(); @Deprecated @Column(name = "store_name") @@ -144,6 +136,20 @@ public void addAllFeatureSets(Set featureSets) { } } + public Set getStores() { + return getJobStores().stream() + .map(JobStore::getStoreProto) + .map(Store::fromProto) + .collect(Collectors.toSet()); + } + + public void setStores(Set stores) { + jobStores = new HashSet<>(); + for (Store store : stores) { + jobStores.add(new JobStore(this, store)); + } + } + /** * Convert a job model to ingestion job proto * @@ -177,7 +183,6 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce public Job clone() { Job job = Job.builder() - .setStores(getStores()) .setStoreName(getStoreName()) .setSourceConfig(getSourceConfig()) .setSourceType(getSourceType()) @@ -185,13 +190,14 @@ public Job clone() { .setRunner(getRunner()) .setStatus(JobStatus.UNKNOWN) .build(); + job.setStores(getStores()); job.addAllFeatureSets(getFeatureSets()); return job; } @Override public int hashCode() { - return Objects.hash(getSource(), this.stores, this.runner); + return Objects.hash(getSource(), getStores(), this.runner); } @Override @@ -204,7 +210,7 @@ public boolean equals(Object obj) { return false; } else if (!getSource().equals(other.getSource())) { return false; - } else if (!stores.equals(other.stores)) { + } else if (!getStores().equals(other.getStores())) { return false; } return true; diff --git a/core/src/main/java/feast/core/model/JobStore.java b/core/src/main/java/feast/core/model/JobStore.java new file mode 100644 index 0000000000..621d3ec0e2 --- /dev/null +++ b/core/src/main/java/feast/core/model/JobStore.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.model; + +import com.google.common.base.Objects; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.proto.core.StoreProto; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import javax.persistence.*; +import javax.persistence.Entity; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +@Entity +@Table( + name = "jobs_stores", + indexes = { + @Index(name = "idx_jobs_stores_job_id", columnList = "job_id"), + @Index(name = "idx_jobs_stores_store_name", columnList = "store_name") + }) +@Getter +@Setter +public class JobStore { + @Embeddable + @EqualsAndHashCode + @AllArgsConstructor + public static class JobStoreKey implements Serializable { + public JobStoreKey() {} + + @Column(name = "job_id") + String jobId; + + @Column(name = "store_name") + String storeName; + } + + @EmbeddedId private JobStoreKey id = new JobStoreKey(); + + @ManyToOne + @MapsId("jobId") + @JoinColumn(name = "job_id") + private Job job; + + @Column(name = "store_proto", nullable = false) + @Lob + private byte[] storeProto; + + public JobStore() {} + + public JobStore(Job job, Store store) { + this.job = job; + this.id.storeName = store.getName(); + try { + setStoreProto(store.toProto()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Couldn't convert Store to proto. Reason: %s", e.getCause()); + } + } + + public StoreProto.Store getStoreProto() { + try { + return StoreProto.Store.parseFrom(storeProto); + } catch (InvalidProtocolBufferException e) { + return StoreProto.Store.newBuilder().build(); + } + } + + public void setStoreProto(StoreProto.Store storeProto) { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + try { + storeProto.writeTo(output); + } catch (IOException e) { + throw new RuntimeException( + String.format("Couldn't write StoreProto to byteArray: %s", e.getCause())); + } + + this.storeProto = output.toByteArray(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JobStore jobStore = (JobStore) o; + return Objects.equal(id, jobStore.id) && Objects.equal(storeProto, jobStore.storeProto); + } + + @Override + public int hashCode() { + return Objects.hashCode(id, storeProto); + } +} diff --git a/core/src/main/java/feast/core/model/Store.java b/core/src/main/java/feast/core/model/Store.java index bd16a5cf3e..9288217e74 100644 --- a/core/src/main/java/feast/core/model/Store.java +++ b/core/src/main/java/feast/core/model/Store.java @@ -47,7 +47,7 @@ @AllArgsConstructor @Entity @Table(name = "stores") -public class Store extends AbstractTimestampEntity { +public class Store { // Name of the store. Must be unique @Id diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 55180b6abe..59cc619fa6 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -227,10 +227,6 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { return true; } - if (stores.stream().anyMatch(s -> s.getLastUpdated().after(job.getCreated()))) { - return true; - } - return false; } diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index 315988c3d3..edc2bb2015 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -108,7 +108,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) // multiple filters can apply together in an 'and' operation if (!filter.getStoreName().isEmpty()) { // find jobs by name - List jobs = this.jobRepository.findByStoresName(filter.getStoreName()); + List jobs = this.jobRepository.findByJobStoresIdStoreName(filter.getStoreName()); Set jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet()); matchingJobIds = this.mergeResults(matchingJobIds, jobIds); } diff --git a/core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql b/core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql deleted file mode 100644 index b69cd9c633..0000000000 --- a/core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE stores ADD COLUMN created timestamp default now(); -ALTER TABLE stores ADD COLUMN last_updated timestamp default now(); \ No newline at end of file diff --git a/core/src/main/resources/db/migration/V2.4__Store_proto.sql b/core/src/main/resources/db/migration/V2.4__Store_proto.sql new file mode 100644 index 0000000000..9e18313e95 --- /dev/null +++ b/core/src/main/resources/db/migration/V2.4__Store_proto.sql @@ -0,0 +1 @@ +ALTER TABLE jobs_stores ADD COLUMN store_proto oid not null; \ No newline at end of file diff --git a/core/src/test/java/feast/core/job/JobTasksTest.java b/core/src/test/java/feast/core/job/JobTasksTest.java index e0c0cd182c..d1e1b651c1 100644 --- a/core/src/test/java/feast/core/job/JobTasksTest.java +++ b/core/src/test/java/feast/core/job/JobTasksTest.java @@ -76,15 +76,17 @@ public void setUp() { } Job makeJob(String extId, List featureSets, JobStatus status) { - return Job.builder() - .setId("job") - .setExtId(extId) - .setRunner(RUNNER) - .setSource(source) - .setStores(ImmutableSet.of(store)) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets)) - .setStatus(status) - .build(); + Job job = + Job.builder() + .setId("job") + .setExtId(extId) + .setRunner(RUNNER) + .setSource(source) + .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets)) + .setStatus(status) + .build(); + job.setStores(ImmutableSet.of(store)); + return job; } CreateJobTask makeCreateTask(Job currentJob) { diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 0c182a2319..5b5c6a6340 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -16,6 +16,7 @@ */ package feast.core.job.dataflow; +import static feast.common.models.Store.convertStringToSubscription; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; @@ -25,6 +26,7 @@ import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; import com.google.api.services.dataflow.Dataflow; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -35,6 +37,7 @@ import feast.core.exception.JobExecutionException; import feast.core.job.Runner; import feast.core.model.*; +import feast.core.util.TestUtil; import feast.ingestion.options.ImportOptions; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetMeta; @@ -170,10 +173,10 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(Source.fromProto(source)) - .setStores(ImmutableSet.of(Store.fromProto(store))) .setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus)) .setStatus(JobStatus.PENDING) .build(); + job.setStores(ImmutableSet.of(Store.fromProto(store))); Job actual = dfJobManager.startJob(job); verify(dfJobManager, times(1)).runPipeline(captor.capture()); @@ -215,12 +218,8 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { @Test public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { - StoreProto.Store store = - StoreProto.Store.newBuilder() - .setName("SERVING") - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build()) - .build(); + Store store = + TestUtil.createStore("store", ImmutableList.of(convertStringToSubscription("*:*"))); SourceProto.Source source = SourceProto.Source.newBuilder() @@ -253,11 +252,10 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(Source.fromProto(source)) - .setStores(ImmutableSet.of(Store.fromProto(store))) .setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus)) .setStatus(JobStatus.PENDING) .build(); - + job.setStores(ImmutableSet.of(store)); expectedException.expect(JobExecutionException.class); dfJobManager.startJob(job); } diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index e9db2fde0e..5846cf5490 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -150,11 +150,10 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { .setExtId("") .setRunner(Runner.DIRECT) .setSource(Source.fromProto(source)) - .setStores(ImmutableSet.of(Store.fromProto(store))) .setFeatureSetJobStatuses(makeFeatureSetJobStatus(FeatureSet.fromProto(featureSet))) .setStatus(JobStatus.PENDING) .build(); - + job.setStores(ImmutableSet.of(Store.fromProto(store))); Job actual = drJobManager.startJob(job); verify(drJobManager, times(1)).runPipeline(pipelineOptionsCaptor.capture()); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 066953104d..621590bd06 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -55,7 +55,6 @@ import feast.proto.core.SourceProto.SourceType; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; -import java.time.Instant; import java.util.*; import java.util.concurrent.CancellationException; import lombok.SneakyThrows; @@ -180,16 +179,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep FeatureSet featureSet2 = FeatureSet.fromProto(featureSetSpec2); ArgumentCaptor> jobArgCaptor = ArgumentCaptor.forClass(List.class); - Job expectedInput = - Job.builder() - .setId("id") - .setExtId("") - .setRunner(Runner.DATAFLOW) - .setSource(source) - .setStores(ImmutableSet.of(store)) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1, featureSet2)) - .setStatus(JobStatus.PENDING) - .build(); + Job expectedInput = newJob("id", store, source, featureSet1, featureSet2); Job expected = expectedInput.toBuilder().setExtId("extid1").setStatus(JobStatus.RUNNING).build(); @@ -220,6 +210,20 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep assertThat(actual, containsInAnyOrder(expected)); } + private Job newJob(String id, Store store, Source source, FeatureSet... featureSets) { + Job job = + Job.builder() + .setId(id) + .setExtId("") + .setRunner(Runner.DATAFLOW) + .setSource(source) + .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets)) + .setStatus(JobStatus.PENDING) + .build(); + job.setStores(ImmutableSet.of(store)); + return job; + } + @Test public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { Store store = @@ -233,30 +237,12 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { FeatureSet featureSet1 = TestUtil.createEmptyFeatureSet("features1", source1); FeatureSet featureSet2 = TestUtil.createEmptyFeatureSet("features2", source2); - Job expectedInput1 = - Job.builder() - .setId("id1") - .setExtId("") - .setRunner(Runner.DATAFLOW) - .setSource(source1) - .setStores(ImmutableSet.of(store)) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1)) - .setStatus(JobStatus.PENDING) - .build(); + Job expectedInput1 = newJob("id1", store, source1, featureSet1); Job expected1 = expectedInput1.toBuilder().setExtId("extid1").setStatus(JobStatus.RUNNING).build(); - Job expectedInput2 = - Job.builder() - .setId("id2") - .setExtId("") - .setRunner(Runner.DATAFLOW) - .setSource(source2) - .setStores(ImmutableSet.of(store)) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet2)) - .setStatus(JobStatus.PENDING) - .build(); + Job expectedInput2 = newJob("id2", store, source2, featureSet2); Job expected2 = expectedInput2.toBuilder().setExtId("extid2").setStatus(JobStatus.RUNNING).build(); @@ -315,16 +301,7 @@ public void shouldGroupJobsBySourceAndIgnoreDuplicateSourceObjects() FeatureSet featureSet1 = TestUtil.createEmptyFeatureSet("features1", source1); FeatureSet featureSet2 = TestUtil.createEmptyFeatureSet("features2", source2); - Job expectedInput = - Job.builder() - .setId("id") - .setExtId("") - .setSource(source1) - .setStores(ImmutableSet.of(store)) - .setRunner(Runner.DATAFLOW) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1, featureSet2)) - .setStatus(JobStatus.PENDING) - .build(); + Job expectedInput = newJob("id", store, source1, featureSet1, featureSet2); Job expected = expectedInput.toBuilder().setExtId("extid1").setStatus(JobStatus.RUNNING).build(); @@ -374,16 +351,10 @@ public void shouldStopDuplicateJobsForSource() throws InvalidProtocolBufferExcep List expectedJobs = new ArrayList<>(); List extraJobs = new ArrayList(); for (int i = 0; i < 3; i++) { - inputJobs.add( - Job.builder() - .setId(String.format("id%d", i)) - .setExtId(String.format("extid%d", i)) - .setSource(source) - .setStores(ImmutableSet.of(store)) - .setRunner(Runner.DATAFLOW) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet)) - .setStatus(JobStatus.RUNNING) - .build()); + Job job = newJob(String.format("id%d", i), store, source, featureSet); + job.setExtId(String.format("extid%d", i)); + job.setStatus(JobStatus.RUNNING); + inputJobs.add(job); JobStatus targetStatus = (i >= 1) ? JobStatus.ABORTED : JobStatus.RUNNING; expectedJobs.add(inputJobs.get(i).toBuilder().setStatus(targetStatus).build()); @@ -449,30 +420,12 @@ public void shouldUseStoreSubscriptionToMapStore() throws InvalidProtocolBufferE FeatureSet featureSet1 = TestUtil.createEmptyFeatureSet("feature1", source1); FeatureSet featureSet2 = TestUtil.createEmptyFeatureSet("feature2", source2); - Job expectedInput1 = - Job.builder() - .setId("id1") - .setExtId("") - .setRunner(Runner.DATAFLOW) - .setSource(source1) - .setStores(ImmutableSet.of(store1)) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1)) - .setStatus(JobStatus.PENDING) - .build(); + Job expectedInput1 = newJob("id1", store1, source1, featureSet1); Job expected1 = expectedInput1.toBuilder().setExtId("extid1").setStatus(JobStatus.RUNNING).build(); - Job expectedInput2 = - Job.builder() - .setId("id2") - .setExtId("") - .setRunner(Runner.DATAFLOW) - .setSource(source2) - .setStores(ImmutableSet.of(store2)) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet2)) - .setStatus(JobStatus.PENDING) - .build(); + Job expectedInput2 = newJob("id2", store2, source2, featureSet2); Job expected2 = expectedInput2.toBuilder().setExtId("extid2").setStatus(JobStatus.RUNNING).build(); @@ -734,39 +687,10 @@ public void shouldUpgradeJobWhenNeeded() { .setStatus(JobStatus.RUNNING) .setFeatureSetJobStatuses(new HashSet<>()) .setSource(source) - .setStores(new HashSet<>()) - .setExtId("extId") - .build(); - - when(jobRepository - .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) - .thenReturn(Optional.of(job)); - - List tasks = - jcsWithConsolidation.makeJobUpdateTasks( - ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); - - assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); - } - - @Test - public void shouldUpgradeJobWhenStoreChanged() { - Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); - Store store = TestUtil.createStore("store", Collections.emptyList()); - - Job job = - Job.builder() - .setStatus(JobStatus.RUNNING) - .setFeatureSetJobStatuses(new HashSet<>()) - .setSource(source) - .setStores(ImmutableSet.of(store)) .setExtId("extId") + .setJobStores(new HashSet<>()) .build(); - job.setCreated(Date.from(Instant.now())); - store.setLastUpdated(Date.from(Instant.now().plusSeconds(1))); - when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) @@ -825,19 +749,8 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException { eq(JobStatus.getTerminalStates()))) .thenReturn(Optional.empty()); - Job expected1 = - Job.builder() - .setSource(source) - .setStores(ImmutableSet.of(store1)) - .setRunner(Runner.DATAFLOW) - .build(); - - Job expected2 = - Job.builder() - .setSource(source) - .setStores(ImmutableSet.of(store2)) - .setRunner(Runner.DATAFLOW) - .build(); + Job expected1 = newJob("", store1, source); + Job expected2 = newJob("", store2, source); when(jobManager.startJob(any())).thenReturn(new Job()); when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); @@ -887,15 +800,10 @@ public void shouldCloneRunningJobOnUpgrade() throws InvalidProtocolBufferExcepti when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "%")) .thenReturn(ImmutableList.of(TestUtil.createEmptyFeatureSet("fs", source))); - Job existingJob = - Job.builder() - .setStores(ImmutableSet.of(store1)) - .setSource(source) - .setExtId("extId") - .setId("some-id") - .setStatus(JobStatus.RUNNING) - .setFeatureSetJobStatuses(new HashSet<>()) - .build(); + Job existingJob = newJob("some-id", store1, source); + existingJob.setExtId("extId"); + existingJob.setFeatureSetJobStatuses(new HashSet<>()); + existingJob.setStatus(JobStatus.RUNNING); when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index 967949e565..1e0bec76da 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -123,7 +123,7 @@ public void setupSpecService() { public void setupJobRepository() { when(this.jobRepository.findById(this.job.getId())).thenReturn(Optional.of(this.job)); - when(this.jobRepository.findByStoresName(this.dataStore.getName())) + when(this.jobRepository.findByJobStoresIdStoreName(this.dataStore.getName())) .thenReturn(Arrays.asList(this.job)); when(this.jobRepository.findByFeatureSetJobStatusesIn( Lists.newArrayList((this.featureSet.getJobStatuses())))) @@ -148,15 +148,17 @@ private FeatureSet newDummyFeatureSet(String name, String project) { } private Job newDummyJob(String id, String extId, JobStatus status) { - return job.builder() - .setId(id) - .setExtId(extId) - .setRunner(Runner.DATAFLOW) - .setSource(this.dataSource) - .setStores(ImmutableSet.of(this.dataStore)) - .setFeatureSetJobStatuses(makeFeatureSetJobStatus(this.featureSet)) - .setStatus(status) - .build(); + Job job = + Job.builder() + .setId(id) + .setExtId(extId) + .setRunner(Runner.DATAFLOW) + .setSource(this.dataSource) + .setFeatureSetJobStatuses(makeFeatureSetJobStatus(this.featureSet)) + .setStatus(status) + .build(); + job.setStores(ImmutableSet.of(this.dataStore)); + return job; } private List newDummyFeatureSetReferences() { From 62728b1248e1d93d8605578c598c4367b9e01ab2 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 1 Jul 2020 09:20:56 +0300 Subject: [PATCH 2/4] lint python --- sdk/python/feast/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index b6c1b45f9e..4ef72fc36c 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -15,8 +15,8 @@ import datetime import logging -import os import multiprocessing +import os import shutil import tempfile import time From 9f4d92c0dd0c208d33a63b94d0540780ca1bef14 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 1 Jul 2020 10:31:09 +0300 Subject: [PATCH 3/4] pr comments --- core/src/main/java/feast/core/dao/JobRepository.java | 2 +- core/src/main/java/feast/core/model/Job.java | 10 ++++++++++ core/src/main/java/feast/core/model/JobStore.java | 7 ++++--- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/feast/core/dao/JobRepository.java b/core/src/main/java/feast/core/dao/JobRepository.java index 4f1e527fb1..d7c6673897 100644 --- a/core/src/main/java/feast/core/dao/JobRepository.java +++ b/core/src/main/java/feast/core/dao/JobRepository.java @@ -40,6 +40,6 @@ public interface JobRepository extends JpaRepository { List findByFeatureSetJobStatusesIn(List featureSetsJobStatuses); - // find jobs by feast store name + // find jobs that have at least one store with given name List findByJobStoresIdStoreName(String storeName); } diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 07f94bb828..51637e2a01 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -136,6 +136,11 @@ public void addAllFeatureSets(Set featureSets) { } } + /** + * Materialize stores from protos stored in {@link JobStore} + * + * @return set of {@link Store} + */ public Set getStores() { return getJobStores().stream() .map(JobStore::getStoreProto) @@ -143,6 +148,11 @@ public Set getStores() { .collect(Collectors.toSet()); } + /** + * Copy stores as protos to JobStores {@link JobStore} to keep job's version of allocated stores. + * + * @param stores allocated set of {@link Store} + */ public void setStores(Set stores) { jobStores = new HashSet<>(); for (Store store : stores) { diff --git a/core/src/main/java/feast/core/model/JobStore.java b/core/src/main/java/feast/core/model/JobStore.java index 621d3ec0e2..5eaa7dee35 100644 --- a/core/src/main/java/feast/core/model/JobStore.java +++ b/core/src/main/java/feast/core/model/JobStore.java @@ -77,7 +77,7 @@ public JobStore(Job job, Store store) { public StoreProto.Store getStoreProto() { try { - return StoreProto.Store.parseFrom(storeProto); + return StoreProto.Store.parseFrom(this.storeProto); } catch (InvalidProtocolBufferException e) { return StoreProto.Store.newBuilder().build(); } @@ -100,11 +100,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; JobStore jobStore = (JobStore) o; - return Objects.equal(id, jobStore.id) && Objects.equal(storeProto, jobStore.storeProto); + return Objects.equal(this.id, jobStore.id) + && Objects.equal(this.storeProto, jobStore.storeProto); } @Override public int hashCode() { - return Objects.hashCode(id, storeProto); + return Objects.hashCode(this.id, this.storeProto); } } From 9414f2c24b244e7801840710787700aa5158eb35 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 1 Jul 2020 10:57:21 +0300 Subject: [PATCH 4/4] more comments --- core/src/main/java/feast/core/model/JobStore.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/feast/core/model/JobStore.java b/core/src/main/java/feast/core/model/JobStore.java index 5eaa7dee35..bd83a233c5 100644 --- a/core/src/main/java/feast/core/model/JobStore.java +++ b/core/src/main/java/feast/core/model/JobStore.java @@ -29,6 +29,10 @@ import lombok.Getter; import lombok.Setter; +/** + * Represents {@link Store}s attached to one {@link Job}. Keeps copy of Store's proto to detect + * changes in original Store. + */ @Entity @Table( name = "jobs_stores",