Skip to content

Commit

Permalink
keep store proto in JobStore
Browse files Browse the repository at this point in the history
  • Loading branch information
pyalex committed Jul 1, 2020
1 parent 8e5f32d commit df4b057
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 188 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByFeatureSetJobStatusesIn(List<FeatureSetJobStatus> featureSetsJobStatuses);

// find jobs by feast store name
List<Job> findByStoresName(String storeName);
List<Job> findByJobStoresIdStoreName(String storeName);
}
12 changes: 6 additions & 6 deletions core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public Job getOrCreateJob(Source source, Set<Store> stores) {
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
() -> {
Job job =
Job.builder().setSource(source).setFeatureSetJobStatuses(new HashSet<>()).build();
job.setStores(stores);
return job;
});
}

@Override
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/java/feast/core/job/JobPerStoreStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@ public Job getOrCreateJob(Source source, Set<Store> stores) {
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStoreName(store.getName())
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
() -> {
Job job =
Job.builder()
.setSource(source)
.setStoreName(store.getName())
.setFeatureSetJobStatuses(new HashSet<>())
.build();
job.setStores(stores);
return job;
});
}

@Override
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,8 @@ public JobBuilder setSource(Source source) {
private String sourceConfig;

// Sinks
@ManyToMany
@JoinTable(
name = "jobs_stores",
joinColumns = @JoinColumn(name = "job_id"),
inverseJoinColumns = @JoinColumn(name = "store_name"),
indexes = {
@Index(name = "idx_jobs_stores_job_id", columnList = "job_id"),
@Index(name = "idx_jobs_stores_store_name", columnList = "store_name")
})
private Set<Store> stores;
@OneToMany(mappedBy = "job", cascade = CascadeType.ALL)
private Set<JobStore> jobStores = new HashSet<>();

@Deprecated
@Column(name = "store_name")
Expand Down Expand Up @@ -144,6 +136,20 @@ public void addAllFeatureSets(Set<FeatureSet> featureSets) {
}
}

public Set<Store> getStores() {
return getJobStores().stream()
.map(JobStore::getStoreProto)
.map(Store::fromProto)
.collect(Collectors.toSet());
}

public void setStores(Set<Store> stores) {
jobStores = new HashSet<>();
for (Store store : stores) {
jobStores.add(new JobStore(this, store));
}
}

/**
* Convert a job model to ingestion job proto
*
Expand Down Expand Up @@ -177,21 +183,21 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce
public Job clone() {
Job job =
Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
job.setStores(getStores());
job.addAllFeatureSets(getFeatureSets());
return job;
}

@Override
public int hashCode() {
return Objects.hash(getSource(), this.stores, this.runner);
return Objects.hash(getSource(), getStores(), this.runner);
}

@Override
Expand All @@ -204,7 +210,7 @@ public boolean equals(Object obj) {
return false;
} else if (!getSource().equals(other.getSource())) {
return false;
} else if (!stores.equals(other.stores)) {
} else if (!getStores().equals(other.getStores())) {
return false;
}
return true;
Expand Down
110 changes: 110 additions & 0 deletions core/src/main/java/feast/core/model/JobStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.model;

import com.google.common.base.Objects;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.proto.core.StoreProto;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import javax.persistence.*;
import javax.persistence.Entity;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

@Entity
@Table(
name = "jobs_stores",
indexes = {
@Index(name = "idx_jobs_stores_job_id", columnList = "job_id"),
@Index(name = "idx_jobs_stores_store_name", columnList = "store_name")
})
@Getter
@Setter
public class JobStore {
@Embeddable
@EqualsAndHashCode
@AllArgsConstructor
public static class JobStoreKey implements Serializable {
public JobStoreKey() {}

@Column(name = "job_id")
String jobId;

@Column(name = "store_name")
String storeName;
}

@EmbeddedId private JobStoreKey id = new JobStoreKey();

@ManyToOne
@MapsId("jobId")
@JoinColumn(name = "job_id")
private Job job;

@Column(name = "store_proto", nullable = false)
@Lob
private byte[] storeProto;

public JobStore() {}

public JobStore(Job job, Store store) {
this.job = job;
this.id.storeName = store.getName();
try {
setStoreProto(store.toProto());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Couldn't convert Store to proto. Reason: %s", e.getCause());
}
}

public StoreProto.Store getStoreProto() {
try {
return StoreProto.Store.parseFrom(storeProto);
} catch (InvalidProtocolBufferException e) {
return StoreProto.Store.newBuilder().build();
}
}

public void setStoreProto(StoreProto.Store storeProto) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
storeProto.writeTo(output);
} catch (IOException e) {
throw new RuntimeException(
String.format("Couldn't write StoreProto to byteArray: %s", e.getCause()));
}

this.storeProto = output.toByteArray();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobStore jobStore = (JobStore) o;
return Objects.equal(id, jobStore.id) && Objects.equal(storeProto, jobStore.storeProto);
}

@Override
public int hashCode() {
return Objects.hashCode(id, storeProto);
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/model/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@AllArgsConstructor
@Entity
@Table(name = "stores")
public class Store extends AbstractTimestampEntity {
public class Store {

// Name of the store. Must be unique
@Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,6 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
return true;
}

if (stores.stream().anyMatch(s -> s.getLastUpdated().after(job.getCreated()))) {
return true;
}

return false;
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request)
// multiple filters can apply together in an 'and' operation
if (!filter.getStoreName().isEmpty()) {
// find jobs by name
List<Job> jobs = this.jobRepository.findByStoresName(filter.getStoreName());
List<Job> jobs = this.jobRepository.findByJobStoresIdStoreName(filter.getStoreName());
Set<String> jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet());
matchingJobIds = this.mergeResults(matchingJobIds, jobIds);
}
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions core/src/main/resources/db/migration/V2.4__Store_proto.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE jobs_stores ADD COLUMN store_proto oid not null;
20 changes: 11 additions & 9 deletions core/src/test/java/feast/core/job/JobTasksTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ public void setUp() {
}

Job makeJob(String extId, List<FeatureSet> featureSets, JobStatus status) {
return Job.builder()
.setId("job")
.setExtId(extId)
.setRunner(RUNNER)
.setSource(source)
.setStores(ImmutableSet.of(store))
.setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets))
.setStatus(status)
.build();
Job job =
Job.builder()
.setId("job")
.setExtId(extId)
.setRunner(RUNNER)
.setSource(source)
.setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets))
.setStatus(status)
.build();
job.setStores(ImmutableSet.of(store));
return job;
}

CreateJobTask makeCreateTask(Job currentJob) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.job.dataflow;

import static feast.common.models.Store.convertStringToSubscription;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -25,6 +26,7 @@
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
import com.google.api.services.dataflow.Dataflow;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -35,6 +37,7 @@
import feast.core.exception.JobExecutionException;
import feast.core.job.Runner;
import feast.core.model.*;
import feast.core.util.TestUtil;
import feast.ingestion.options.ImportOptions;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.FeatureSetProto.FeatureSetMeta;
Expand Down Expand Up @@ -170,10 +173,10 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
.setExtId("")
.setRunner(Runner.DATAFLOW)
.setSource(Source.fromProto(source))
.setStores(ImmutableSet.of(Store.fromProto(store)))
.setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus))
.setStatus(JobStatus.PENDING)
.build();
job.setStores(ImmutableSet.of(Store.fromProto(store)));
Job actual = dfJobManager.startJob(job);

verify(dfJobManager, times(1)).runPipeline(captor.capture());
Expand Down Expand Up @@ -215,12 +218,8 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {

@Test
public void shouldThrowExceptionWhenJobStateTerminal() throws IOException {
StoreProto.Store store =
StoreProto.Store.newBuilder()
.setName("SERVING")
.setType(StoreType.REDIS)
.setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build())
.build();
Store store =
TestUtil.createStore("store", ImmutableList.of(convertStringToSubscription("*:*")));

SourceProto.Source source =
SourceProto.Source.newBuilder()
Expand Down Expand Up @@ -253,11 +252,10 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException {
.setExtId("")
.setRunner(Runner.DATAFLOW)
.setSource(Source.fromProto(source))
.setStores(ImmutableSet.of(Store.fromProto(store)))
.setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus))
.setStatus(JobStatus.PENDING)
.build();

job.setStores(ImmutableSet.of(store));
expectedException.expect(JobExecutionException.class);
dfJobManager.startJob(job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,10 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
.setExtId("")
.setRunner(Runner.DIRECT)
.setSource(Source.fromProto(source))
.setStores(ImmutableSet.of(Store.fromProto(store)))
.setFeatureSetJobStatuses(makeFeatureSetJobStatus(FeatureSet.fromProto(featureSet)))
.setStatus(JobStatus.PENDING)
.build();

job.setStores(ImmutableSet.of(Store.fromProto(store)));
Job actual = drJobManager.startJob(job);

verify(drJobManager, times(1)).runPipeline(pipelineOptionsCaptor.capture());
Expand Down
Loading

0 comments on commit df4b057

Please sign in to comment.