Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async job management #361

Merged
merged 12 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .prow/scripts/test-end-to-end-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope
sleep 5
tail -n10 /var/log/zookeeper.log
nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 &
sleep 10
sleep 20
tail -n10 /var/log/kafka.log

echo "
Expand Down Expand Up @@ -108,6 +108,8 @@ feast:
jobs:
runner: DirectRunner
options: {}
updates:
timeoutSeconds: 240
metrics:
enabled: false

Expand Down Expand Up @@ -141,7 +143,7 @@ EOF
nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \
--spring.config.location=file:///tmp/core.application.yml \
&> /var/log/feast-core.log &
sleep 30
sleep 35
tail -n10 /var/log/feast-core.log
echo "
============================================================
Expand Down
6 changes: 4 additions & 2 deletions .prow/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope
sleep 5
tail -n10 /var/log/zookeeper.log
nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 &
sleep 10
sleep 20
tail -n10 /var/log/kafka.log

echo "
Expand Down Expand Up @@ -91,6 +91,8 @@ feast:
jobs:
runner: DirectRunner
options: {}
updates:
timeoutSeconds: 240
metrics:
enabled: false

Expand Down Expand Up @@ -124,7 +126,7 @@ EOF
nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \
--spring.config.location=file:///tmp/core.application.yml \
&> /var/log/feast-core.log &
sleep 30
sleep 35
tail -n10 /var/log/feast-core.log

echo "
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public static class JobProperties {
private String runner;
private Map<String, String> options;
private MetricsProperties metrics;
private JobUpdatesProperties updates;
}

@Getter
@Setter
public static class JobUpdatesProperties {

private long timeoutSeconds;
}

@Getter
Expand Down
56 changes: 8 additions & 48 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import com.google.api.services.dataflow.DataflowScopes;
import com.google.common.base.Strings;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.config.FeastProperties.JobUpdatesProperties;
import feast.core.job.JobManager;
import feast.core.job.JobMonitor;
import feast.core.job.NoopJobMonitor;
import feast.core.job.Runner;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.dataflow.DataflowJobMonitor;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.core.job.direct.DirectRunnerJobMonitor;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
Expand All @@ -54,7 +51,7 @@ public class JobConfig {
@Bean
@Autowired
public JobManager getJobManager(
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) throws Exception {
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) {

JobProperties jobProperties = feastProperties.getJobs();
Runner runner = Runner.fromString(jobProperties.getRunner());
Expand Down Expand Up @@ -97,52 +94,15 @@ public JobManager getJobManager(
}
}

/** Get a Job Monitor given the runner type and dataflow configuration. */
@Bean
public JobMonitor getJobMonitor(
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) throws Exception {

JobProperties jobProperties = feastProperties.getJobs();
Runner runner = Runner.fromString(jobProperties.getRunner());
Map<String, String> jobOptions = jobProperties.getOptions();

switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(jobOptions.getOrDefault("region", null))
|| Strings.isNullOrEmpty(jobOptions.getOrDefault("project", null))) {
log.warn(
"Project and location of the Dataflow runner is not configured, will not do job monitoring");
return new NoopJobMonitor();
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);

return new DataflowJobMonitor(
dataflow, jobOptions.get("project"), jobOptions.get("region"));
} catch (IOException e) {
log.error(
"Unable to find credential required for Dataflow monitoring API: {}", e.getMessage());
} catch (GeneralSecurityException e) {
log.error("Security exception while ");
} catch (Exception e) {
log.error("Unable to initialize DataflowJobMonitor", e);
}
case DIRECT:
return new DirectRunnerJobMonitor(directJobRegistry);
default:
return new NoopJobMonitor();
}
}

/** Get a direct job registry */
@Bean
public DirectJobRegistry directJobRegistry() {
return new DirectJobRegistry();
}

/** Extracts job update options from feast core options. */
@Bean
public JobUpdatesProperties jobUpdatesProperties(FeastProperties feastProperties) {
return feastProperties.getJobs().getUpdates();
}
}
6 changes: 3 additions & 3 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
List<FeatureSet> findByName(String name);

// find all versions of featureSets with names matching the regex
@Query(nativeQuery = true, value = "SELECT * FROM feature_sets "
+ "WHERE name LIKE ?1 ORDER BY name ASC, version ASC")
@Query(
nativeQuery = true,
value = "SELECT * FROM feature_sets " + "WHERE name LIKE ?1 ORDER BY name ASC, version ASC")
List<FeatureSet> findByNameWithWildcardOrderByNameAscVersionAsc(String name);

// find all feature sets and order by name and version
List<FeatureSet> findAllByOrderByNameAscVersionAsc();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
*/
package feast.core.dao;

import feast.core.model.JobInfo;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.Collection;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/** JPA repository supplying JobInfo objects keyed by ID. */
/** JPA repository supplying Job objects keyed by ID. */
@Repository
public interface JobInfoRepository extends JpaRepository<JobInfo, String> {
List<JobInfo> findByStatusNotIn(Collection<JobStatus> statuses);
public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByStatusNotIn(Collection<JobStatus> statuses);

List<JobInfo> findBySourceIdAndStoreName(String sourceId, String storeName);
List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);
}
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/dao/MetricsRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@

@Repository
public interface MetricsRepository extends JpaRepository<Metrics, Long> {
List<Metrics> findByJobInfo_Id(String id);
List<Metrics> findByJob_Id(String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;
package feast.core.dao;

import feast.core.model.JobInfo;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import org.springframework.data.jpa.repository.JpaRepository;

public class NoopJobMonitor implements JobMonitor {

@Override
public JobStatus getJobStatus(JobInfo job) {
return JobStatus.UNKNOWN;
}
}
/** JPA repository supplying Source objects keyed by id. */
public interface SourceRepository extends JpaRepository<Source, String> {}
69 changes: 2 additions & 67 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package feast.core.grpc;

import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
Expand All @@ -28,43 +27,28 @@
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListStoresRequest;
import feast.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto;
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.Subscription;
import feast.core.exception.RetrievalException;
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.service.JobCoordinatorService;
import feast.core.service.SpecService;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

/**
* Implementation of the feast core GRPC service.
*/
/** Implementation of the feast core GRPC service. */
@Slf4j
@GRpcService(interceptors = {MonitoringInterceptor.class})
public class CoreServiceImpl extends CoreServiceImplBase {

private SpecService specService;
private JobCoordinatorService jobCoordinatorService;

@Autowired
public CoreServiceImpl(SpecService specService, JobCoordinatorService jobCoordinatorService) {
public CoreServiceImpl(SpecService specService) {
this.specService = specService;
this.jobCoordinatorService = jobCoordinatorService;
}

@Override
Expand Down Expand Up @@ -118,31 +102,6 @@ public void applyFeatureSet(
ApplyFeatureSetRequest request, StreamObserver<ApplyFeatureSetResponse> responseObserver) {
try {
ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet());
ListStoresResponse stores = specService.listStores(Filter.newBuilder().build());
for (Store store : stores.getStoreList()) {
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
for (Subscription subscription : store.getSubscriptionsList()) {
featureSetSpecs.addAll(
specService
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.build())
.getFeatureSetsList());
}
if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) {
// We use the response featureSet source because it contains the information
// about whether to default to the default feature stream or not
SourceProto.Source source = response.getFeatureSet().getSource();
featureSetSpecs =
featureSetSpecs.stream()
.filter(fs -> fs.getSource().equals(source))
.collect(Collectors.toSet());
jobCoordinatorService.startOrUpdateJob(
Lists.newArrayList(featureSetSpecs), source, store);
}
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
Expand All @@ -158,30 +117,6 @@ public void updateStore(
UpdateStoreResponse response = specService.updateStore(request);
responseObserver.onNext(response);
responseObserver.onCompleted();

if (!response.getStatus().equals(Status.NO_CHANGE)) {
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
Store store = response.getStore();
for (Subscription subscription : store.getSubscriptionsList()) {
featureSetSpecs.addAll(
specService
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.build())
.getFeatureSetsList());
}
if (featureSetSpecs.size() == 0) {
return;
}
featureSetSpecs.stream()
.collect(Collectors.groupingBy(FeatureSetSpec::getSource))
.entrySet()
.stream()
.forEach(
kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), kv.getKey(), store));
}
} catch (Exception e) {
log.error("Exception has occurred in UpdateStore method: ", e);
responseObserver.onError(e);
Expand Down
28 changes: 16 additions & 12 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
*/
package feast.core.job;

import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.StoreProto.Store;
import feast.core.model.JobInfo;
import java.util.List;
import feast.core.model.Job;
import feast.core.model.JobStatus;

public interface JobManager {

Expand All @@ -33,25 +31,31 @@ public interface JobManager {
/**
* Start an import job.
*
* @param name of job to run
* @param featureSets list of featureSets to be populated by the job
* @param sink Store to sink features to
* @return runner specific job id
* @param job job to start
* @return Job
*/
String startJob(String name, List<FeatureSetSpec> featureSets, Store sink);
Job startJob(Job job);

/**
* Update already running job with new set of features to ingest.
*
* @param jobInfo jobInfo of target job to change
* @return job runner specific job id
* @param job job of target job to change
* @return Job
*/
String updateJob(JobInfo jobInfo);
Job updateJob(Job job);

/**
* Abort a job given runner-specific job ID.
*
* @param extId runner specific job id.
*/
void abortJob(String extId);

/**
* Get status of a job given runner-specific job ID.
*
* @param job job.
* @return job status.
*/
JobStatus getJobStatus(Job job);
}
Loading