Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobCoordinator use public API to communicate with Core #943

Merged
merged 6 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ public void getFeatureStatistics(
}
}

@Override
public void updateFeatureSetStatus(
UpdateFeatureSetStatusRequest request,
StreamObserver<UpdateFeatureSetStatusResponse> responseObserver) {
try {
UpdateFeatureSetStatusResponse response = specService.updateFeatureSetStatus(request);

responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in UpdateFeatureSetStatus method: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void listStores(
ListStoresRequest request, StreamObserver<ListStoresResponse> responseObserver) {
Expand Down
114 changes: 73 additions & 41 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

import static feast.common.models.Store.isSubscribedToFeatureSet;
import static feast.core.model.FeatureSet.parseReference;
import static feast.core.util.StreamUtil.wrapException;

import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.common.models.FeatureSetReference;
import feast.core.config.FeastProperties;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.FeatureSetRepository;
import feast.core.job.*;
import feast.core.job.task.*;
import feast.core.model.*;
import feast.core.model.FeatureSet;
import feast.core.model.FeatureSetDeliveryStatus;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.proto.core.CoreServiceProto;
import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.FeatureSetProto.FeatureSet;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetReferenceProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.SourceProto.Source;
import feast.proto.core.StoreProto.Store;
Expand All @@ -59,24 +61,21 @@ public class JobCoordinatorService {
private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;

private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;
private final JobGroupingStrategy groupingStrategy;
private final KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher;
private final KafkaTemplate<String, FeatureSetSpec> specPublisher;

@Autowired
public JobCoordinatorService(
JobRepository jobRepository,
FeatureSetRepository featureSetRepository,
SpecService specService,
JobManager jobManager,
FeastProperties feastProperties,
JobGroupingStrategy groupingStrategy,
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
KafkaTemplate<String, FeatureSetSpec> specPublisher) {
this.jobRepository = jobRepository;
this.featureSetRepository = featureSetRepository;
this.specService = specService;
this.jobManager = jobManager;
this.jobProperties = feastProperties.getJobs();
Expand Down Expand Up @@ -236,7 +235,7 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
*/
FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
FeatureSetReference ref =
FeatureSetReference.of(featureSet.getProject().getName(), featureSet.getName());
FeatureSetReference.of(featureSet.getSpec().getProject(), featureSet.getSpec().getName());
Set<String> confirmedJobIds = new HashSet<>();

Stream<Pair<Source, Store>> jobArgsStream =
Expand All @@ -245,9 +244,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
s ->
isSubscribedToFeatureSet(
s.getSubscriptionsList(),
featureSet.getProject().getName(),
featureSet.getName()))
.map(s -> Pair.of(featureSet.getSource().toProto(), s));
featureSet.getSpec().getProject(),
featureSet.getSpec().getName()))
.map(s -> Pair.of(featureSet.getSpec().getSource(), s));

// Add featureSet to allocated job if not allocated before
for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
Expand Down Expand Up @@ -320,31 +319,47 @@ Iterable<Pair<Source, Set<Store>>> getSourceToStoreMappings() {
* @param store to get subscribed FeatureSets for
* @return list of FeatureSets that the store subscribes to.
*/
private List<FeatureSetProto.FeatureSet> getFeatureSetsForStore(Store store) {
private List<FeatureSet> getFeatureSetsForStore(Store store) {
return store.getSubscriptionsList().stream()
.flatMap(
subscription ->
featureSetRepository
.findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
subscription.getName().replace('*', '%'),
subscription.getProject().replace('*', '%'))
.stream())
subscription -> {
try {
return specService
.listFeatureSets(
CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder()
.setProject(subscription.getProject())
.setFeatureSetName(subscription.getName())
.build())
.getFeatureSetsList().stream();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
"Couldn't fetch featureSets for subscription %s. Reason: %s",
subscription, e.getMessage()));
}
})
.distinct()
.map(wrapException(FeatureSet::toProto))
.collect(Collectors.toList());
}

@Scheduled(fixedDelayString = "${feast.stream.specsOptions.notifyIntervalMilliseconds}")
public void notifyJobsWhenFeatureSetUpdated() {
public void notifyJobsWhenFeatureSetUpdated() throws InvalidProtocolBufferException {
List<FeatureSet> pendingFeatureSets =
featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING);
specService
.listFeatureSets(
CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder()
.setProject("*")
.setFeatureSetName("*")
.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)
.build())
.getFeatureSetsList();

pendingFeatureSets.stream()
.map(this::allocateFeatureSetToJobs)
.map(
fs -> {
FeatureSetReference ref =
FeatureSetReference.of(fs.getProject().getName(), fs.getName());
FeatureSetReference.of(fs.getSpec().getProject(), fs.getSpec().getName());
List<FeatureSetDeliveryStatus> deliveryStatuses =
jobRepository.findByFeatureSetReference(ref).stream()
.filter(Job::isRunning)
Expand All @@ -360,13 +375,17 @@ public void notifyJobsWhenFeatureSetUpdated() {
&& pair.getRight().stream()
.anyMatch(
jobStatus ->
jobStatus.getDeliveredVersion() < pair.getLeft().getVersion()))
jobStatus.getDeliveredVersion()
< pair.getLeft().getSpec().getVersion()))
.forEach(
pair -> {
FeatureSet fs = pair.getLeft();
List<FeatureSetDeliveryStatus> deliveryStatuses = pair.getRight();

log.info("Sending new FeatureSet {} to Ingestion", fs.getReference());
FeatureSetReference ref =
FeatureSetReference.of(fs.getSpec().getProject(), fs.getSpec().getName());

log.info("Sending new FeatureSet {} to Ingestion", ref);

// Sending latest version of FeatureSet to all currently running IngestionJobs
// (there's one topic for all sets).
Expand All @@ -375,7 +394,7 @@ public void notifyJobsWhenFeatureSetUpdated() {
// again later.
try {
specPublisher
.sendDefault(fs.getReference(), fs.toProto().getSpec())
.sendDefault(ref.getReference(), fs.getSpec())
.get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(
Expand All @@ -393,7 +412,7 @@ public void notifyJobsWhenFeatureSetUpdated() {
jobStatus -> {
jobStatus.setDeliveryStatus(
FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
jobStatus.setDeliveredVersion(fs.getVersion());
jobStatus.setDeliveredVersion(fs.getSpec().getVersion());
});
});
}
Expand All @@ -412,13 +431,19 @@ public void notifyJobsWhenFeatureSetUpdated() {
@KafkaListener(
topics = {"${feast.stream.specsOptions.specsAckTopic}"},
containerFactory = "kafkaAckListenerContainerFactory")
public void listenAckFromJobs(
ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> record) {
public void listenAckFromJobs(ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> record)
throws InvalidProtocolBufferException {
String setReference = record.key();
Pair<String, String> projectAndSetName = parseReference(setReference);
FeatureSet featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
projectAndSetName.getRight(), projectAndSetName.getLeft());
specService
.getFeatureSet(
CoreServiceProto.GetFeatureSetRequest.newBuilder()
.setProject(projectAndSetName.getLeft())
.setName(projectAndSetName.getRight())
.build())
.getFeatureSet();

if (featureSet == null) {
log.warn(
String.format("ACKListener received message for unknown FeatureSet %s", setReference));
Expand All @@ -427,18 +452,18 @@ public void listenAckFromJobs(

int ackVersion = record.value().getFeatureSetVersion();

if (featureSet.getVersion() != ackVersion) {
if (featureSet.getSpec().getVersion() != ackVersion) {
log.warn(
String.format(
"ACKListener received outdated ack for %s. Current %d, Received %d",
setReference, featureSet.getVersion(), ackVersion));
setReference, featureSet.getSpec().getVersion(), ackVersion));
return;
}

log.info("Updating featureSet {} delivery statuses.", featureSet.getReference());

FeatureSetReference ref =
FeatureSetReference.of(featureSet.getProject().getName(), featureSet.getName());
FeatureSetReference.of(featureSet.getSpec().getProject(), featureSet.getSpec().getName());

log.info("Updating featureSet {} delivery statuses.", ref);

jobRepository
.findById(record.value().getJobName())
Expand All @@ -459,10 +484,17 @@ public void listenAckFromJobs(
.equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED));

if (allDelivered) {
log.info("FeatureSet {} update is completely delivered", featureSet.getReference());

featureSet.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY);
featureSetRepository.saveAndFlush(featureSet);
log.info("FeatureSet {} update is completely delivered", ref);

specService.updateFeatureSetStatus(
CoreServiceProto.UpdateFeatureSetStatusRequest.newBuilder()
.setReference(
FeatureSetReferenceProto.FeatureSetReference.newBuilder()
.setName(ref.getFeatureSetName())
.setProject(ref.getProjectName())
.build())
.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY)
.build());
}
}
}
39 changes: 31 additions & 8 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import feast.proto.core.CoreServiceProto.ListStoresRequest;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.CoreServiceProto.ListStoresResponse.Builder;
import feast.proto.core.CoreServiceProto.UpdateFeatureSetStatusRequest;
import feast.proto.core.CoreServiceProto.UpdateFeatureSetStatusResponse;
import feast.proto.core.CoreServiceProto.UpdateStoreRequest;
import feast.proto.core.CoreServiceProto.UpdateStoreResponse;
import feast.proto.core.FeatureSetProto;
Expand Down Expand Up @@ -89,29 +91,33 @@ public SpecService(
*/
public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request)
throws InvalidProtocolBufferException {
FeatureSet featureSet = getFeatureSet(request.getProject(), request.getName());

return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build();
}

private FeatureSet getFeatureSet(String projectName, String featureSetName) {
// Validate input arguments
checkValidCharacters(request.getName(), "featureSetName");
checkValidCharacters(featureSetName, "featureSetName");

if (request.getName().isEmpty()) {
if (featureSetName.isEmpty()) {
throw new IllegalArgumentException("No feature set name provided");
}
// Autofill default project if project is not specified
if (request.getProject().isEmpty()) {
request = request.toBuilder().setProject(Project.DEFAULT_NAME).build();
if (projectName.isEmpty()) {
projectName = Project.DEFAULT_NAME;
}

FeatureSet featureSet;

featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
request.getName(), request.getProject());
featureSetRepository.findFeatureSetByNameAndProject_Name(featureSetName, projectName);

if (featureSet == null) {
throw new RetrievalException(
String.format("Feature set with name \"%s\" could not be found.", request.getName()));
String.format("Feature set with name \"%s\" could not be found.", featureSetName));
}
return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build();
return featureSet;
}

/**
Expand All @@ -138,6 +144,7 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil
String name = filter.getFeatureSetName();
String project = filter.getProject();
Map<String, String> labelsFilter = filter.getLabelsMap();
FeatureSetStatus statusFilter = filter.getStatus();

if (name.isEmpty()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -195,6 +202,10 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil
if (featureSets.size() > 0) {
featureSets =
featureSets.stream()
.filter(
featureSet ->
statusFilter.equals(FeatureSetStatus.STATUS_INVALID)
|| featureSet.getStatus().equals(statusFilter))
.filter(featureSet -> featureSet.hasAllLabels(labelsFilter))
.collect(Collectors.toList());
for (FeatureSet featureSet : featureSets) {
Expand Down Expand Up @@ -264,6 +275,18 @@ public ListFeaturesResponse listFeatures(ListFeaturesRequest.Filter filter) {
}
}

/** Update FeatureSet's status by given FeatureSetReference and new status */
public UpdateFeatureSetStatusResponse updateFeatureSetStatus(
UpdateFeatureSetStatusRequest request) {
FeatureSet featureSet =
getFeatureSet(request.getReference().getProject(), request.getReference().getName());

featureSet.setStatus(request.getStatus());
featureSetRepository.saveAndFlush(featureSet);

return UpdateFeatureSetStatusResponse.newBuilder().build();
}

/**
* Get stores matching the store name provided in the filter. If the store name is not provided,
* the method will return all stores currently registered to Feast.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,7 @@ void canApplyFeatureSetIfAuthenticated() {
SimpleAPIClient secureApiClient =
getSecureApiClient("[email protected]");
FeatureSetProto.FeatureSet expectedFeatureSet =
DataGenerator.createFeatureSet(
DataGenerator.getDefaultSource(),
"project_1",
"test_1",
Collections.emptyList(),
Collections.emptyList());
DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project_1", "test_1");

secureApiClient.simpleApplyFeatureSet(expectedFeatureSet);

Expand Down
Loading