Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep StoreProto inside JobStore to decouple JobCoordination from SpecService internals #852

Merged
merged 4 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public interface JobRepository extends JpaRepository<Job, String> {

List<Job> findByFeatureSetJobStatusesIn(List<FeatureSetJobStatus> featureSetsJobStatuses);

// find jobs by feast store name
List<Job> findByStoresName(String storeName);
// find jobs that have at least one store with given name
List<Job> findByJobStoresIdStoreName(String storeName);
pyalex marked this conversation as resolved.
Show resolved Hide resolved
}
12 changes: 6 additions & 6 deletions core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public Job getOrCreateJob(Source source, Set<Store> stores) {
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
() -> {
Job job =
Job.builder().setSource(source).setFeatureSetJobStatuses(new HashSet<>()).build();
job.setStores(stores);
return job;
});
}

@Override
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/java/feast/core/job/JobPerStoreStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@ public Job getOrCreateJob(Source source, Set<Store> stores) {
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStoreName(store.getName())
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
() -> {
Job job =
Job.builder()
.setSource(source)
.setStoreName(store.getName())
.setFeatureSetJobStatuses(new HashSet<>())
.build();
job.setStores(stores);
return job;
});
}

@Override
Expand Down
42 changes: 29 additions & 13 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,8 @@ public JobBuilder setSource(Source source) {
private String sourceConfig;

// Sinks
@ManyToMany
@JoinTable(
name = "jobs_stores",
joinColumns = @JoinColumn(name = "job_id"),
inverseJoinColumns = @JoinColumn(name = "store_name"),
indexes = {
@Index(name = "idx_jobs_stores_job_id", columnList = "job_id"),
@Index(name = "idx_jobs_stores_store_name", columnList = "store_name")
})
private Set<Store> stores;
@OneToMany(mappedBy = "job", cascade = CascadeType.ALL)
private Set<JobStore> jobStores = new HashSet<>();

@Deprecated
@Column(name = "store_name")
Expand Down Expand Up @@ -144,6 +136,30 @@ public void addAllFeatureSets(Set<FeatureSet> featureSets) {
}
}

/**
* Materialize stores from protos stored in {@link JobStore}
*
* @return set of {@link Store}
*/
public Set<Store> getStores() {
pyalex marked this conversation as resolved.
Show resolved Hide resolved
return getJobStores().stream()
.map(JobStore::getStoreProto)
.map(Store::fromProto)
.collect(Collectors.toSet());
}

/**
* Copy stores as protos to JobStores {@link JobStore} to keep job's version of allocated stores.
*
* @param stores allocated set of {@link Store}
*/
public void setStores(Set<Store> stores) {
jobStores = new HashSet<>();
for (Store store : stores) {
jobStores.add(new JobStore(this, store));
}
}

/**
* Convert a job model to ingestion job proto
*
Expand Down Expand Up @@ -177,21 +193,21 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce
public Job clone() {
Job job =
Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
job.setStores(getStores());
job.addAllFeatureSets(getFeatureSets());
return job;
}

@Override
public int hashCode() {
return Objects.hash(getSource(), this.stores, this.runner);
return Objects.hash(getSource(), getStores(), this.runner);
}

@Override
Expand All @@ -204,7 +220,7 @@ public boolean equals(Object obj) {
return false;
} else if (!getSource().equals(other.getSource())) {
return false;
} else if (!stores.equals(other.stores)) {
} else if (!getStores().equals(other.getStores())) {
return false;
}
return true;
Expand Down
115 changes: 115 additions & 0 deletions core/src/main/java/feast/core/model/JobStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.model;

import com.google.common.base.Objects;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.proto.core.StoreProto;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import javax.persistence.*;
import javax.persistence.Entity;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

/**
* Represents {@link Store}s attached to one {@link Job}. Keeps copy of Store's proto to detect
* changes in original Store.
*/
@Entity
@Table(
name = "jobs_stores",
indexes = {
@Index(name = "idx_jobs_stores_job_id", columnList = "job_id"),
@Index(name = "idx_jobs_stores_store_name", columnList = "store_name")
})
@Getter
@Setter
public class JobStore {
@Embeddable
@EqualsAndHashCode
@AllArgsConstructor
public static class JobStoreKey implements Serializable {
public JobStoreKey() {}

@Column(name = "job_id")
String jobId;

@Column(name = "store_name")
String storeName;
}

@EmbeddedId private JobStoreKey id = new JobStoreKey();

@ManyToOne
@MapsId("jobId")
@JoinColumn(name = "job_id")
private Job job;

@Column(name = "store_proto", nullable = false)
@Lob
private byte[] storeProto;

public JobStore() {}

public JobStore(Job job, Store store) {
this.job = job;
this.id.storeName = store.getName();
try {
setStoreProto(store.toProto());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Couldn't convert Store to proto. Reason: %s", e.getCause());
}
}

public StoreProto.Store getStoreProto() {
try {
return StoreProto.Store.parseFrom(this.storeProto);
} catch (InvalidProtocolBufferException e) {
return StoreProto.Store.newBuilder().build();
}
}

public void setStoreProto(StoreProto.Store storeProto) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
storeProto.writeTo(output);
} catch (IOException e) {
throw new RuntimeException(
String.format("Couldn't write StoreProto to byteArray: %s", e.getCause()));
}

this.storeProto = output.toByteArray();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobStore jobStore = (JobStore) o;
return Objects.equal(this.id, jobStore.id)
&& Objects.equal(this.storeProto, jobStore.storeProto);
}

@Override
public int hashCode() {
return Objects.hashCode(this.id, this.storeProto);
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/model/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@AllArgsConstructor
@Entity
@Table(name = "stores")
public class Store extends AbstractTimestampEntity {
public class Store {

// Name of the store. Must be unique
@Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,6 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
return true;
}

if (stores.stream().anyMatch(s -> s.getLastUpdated().after(job.getCreated()))) {
return true;
}

return false;
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request)
// multiple filters can apply together in an 'and' operation
if (!filter.getStoreName().isEmpty()) {
// find jobs by name
List<Job> jobs = this.jobRepository.findByStoresName(filter.getStoreName());
List<Job> jobs = this.jobRepository.findByJobStoresIdStoreName(filter.getStoreName());
Set<String> jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet());
matchingJobIds = this.mergeResults(matchingJobIds, jobIds);
}
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions core/src/main/resources/db/migration/V2.4__Store_proto.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE jobs_stores ADD COLUMN store_proto oid not null;
20 changes: 11 additions & 9 deletions core/src/test/java/feast/core/job/JobTasksTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ public void setUp() {
}

Job makeJob(String extId, List<FeatureSet> featureSets, JobStatus status) {
return Job.builder()
.setId("job")
.setExtId(extId)
.setRunner(RUNNER)
.setSource(source)
.setStores(ImmutableSet.of(store))
.setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets))
.setStatus(status)
.build();
Job job =
Job.builder()
.setId("job")
.setExtId(extId)
.setRunner(RUNNER)
.setSource(source)
.setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets))
.setStatus(status)
.build();
job.setStores(ImmutableSet.of(store));
return job;
}

CreateJobTask makeCreateTask(Job currentJob) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.job.dataflow;

import static feast.common.models.Store.convertStringToSubscription;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -25,6 +26,7 @@
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
import com.google.api.services.dataflow.Dataflow;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -35,6 +37,7 @@
import feast.core.exception.JobExecutionException;
import feast.core.job.Runner;
import feast.core.model.*;
import feast.core.util.TestUtil;
import feast.ingestion.options.ImportOptions;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.FeatureSetProto.FeatureSetMeta;
Expand Down Expand Up @@ -170,10 +173,10 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
.setExtId("")
.setRunner(Runner.DATAFLOW)
.setSource(Source.fromProto(source))
.setStores(ImmutableSet.of(Store.fromProto(store)))
.setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus))
.setStatus(JobStatus.PENDING)
.build();
job.setStores(ImmutableSet.of(Store.fromProto(store)));
Job actual = dfJobManager.startJob(job);

verify(dfJobManager, times(1)).runPipeline(captor.capture());
Expand Down Expand Up @@ -215,12 +218,8 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {

@Test
public void shouldThrowExceptionWhenJobStateTerminal() throws IOException {
StoreProto.Store store =
StoreProto.Store.newBuilder()
.setName("SERVING")
.setType(StoreType.REDIS)
.setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build())
.build();
Store store =
TestUtil.createStore("store", ImmutableList.of(convertStringToSubscription("*:*")));

SourceProto.Source source =
SourceProto.Source.newBuilder()
Expand Down Expand Up @@ -253,11 +252,10 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException {
.setExtId("")
.setRunner(Runner.DATAFLOW)
.setSource(Source.fromProto(source))
.setStores(ImmutableSet.of(Store.fromProto(store)))
.setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus))
.setStatus(JobStatus.PENDING)
.build();

job.setStores(ImmutableSet.of(store));
expectedException.expect(JobExecutionException.class);
dfJobManager.startJob(job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,10 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
.setExtId("")
.setRunner(Runner.DIRECT)
.setSource(Source.fromProto(source))
.setStores(ImmutableSet.of(Store.fromProto(store)))
.setFeatureSetJobStatuses(makeFeatureSetJobStatus(FeatureSet.fromProto(featureSet)))
.setStatus(JobStatus.PENDING)
.build();

job.setStores(ImmutableSet.of(Store.fromProto(store)));
Job actual = drJobManager.startJob(job);

verify(drJobManager, times(1)).runPipeline(pipelineOptionsCaptor.capture());
Expand Down
Loading