From 0b8b170486fc696ce883b693b51eb4ad9160d95e Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 2 Aug 2020 15:49:48 +0800 Subject: [PATCH 1/6] test job with no labels --- .../job/dataflow/DataflowJobManagerTest.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 7b07d52fee..fadc76d803 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -340,4 +340,54 @@ public void shouldRetrieveRunningJobsWithoutLabels() { List jobs = dfJobManager.listRunningJobs(); assertThat(jobs, hasItem(hasProperty("id", equalTo("kafka-to-redis")))); } + + @Test + @SneakyThrows + public void shouldRetrieveRunningJobsWithoutLabels() { + when(dataflow + .projects() + .locations() + .jobs() + .list("project", "region") + .setFilter("ACTIVE") + .execute()) + .thenReturn( + new ListJobsResponse() + .setJobs( + ImmutableList.of( + new com.google.api.services.dataflow.model.Job().setId("job-1")))); + + JsonFormat.Printer jsonPrinter = JsonFormat.printer(); + + // job with no labels + when(dataflow + .projects() + .locations() + .jobs() + .get("project", "region", "job-1") + .setView("JOB_VIEW_ALL") + .execute()) + .thenReturn( + new com.google.api.services.dataflow.model.Job() + .setId("job-1") + .setEnvironment( + new Environment() + .setSdkPipelineOptions( + ImmutableMap.of( + "options", + ImmutableMap.of( + "jobName", "kafka-to-redis", + "sourceJson", jsonPrinter.print(source), + "storesJson", ImmutableList.of(jsonPrinter.print(store))))))); + + MetricsProperties metricsProperties = new MetricsProperties(); + metricsProperties.setEnabled(false); + + dfJobManager = + new DataflowJobManager( + defaults, metricsProperties, specsStreamingUpdateConfig, ImmutableMap.of(), dataflow); + + List jobs = dfJobManager.listJobs(); + assertThat(jobs, hasItem(hasProperty("id", equalTo("kafka-to-redis")))); + } } From 5d505a416ec51e780b5a11fcb85fa166f9c1fbc6 Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 3 Aug 2020 12:34:52 +0800 Subject: [PATCH 2/6] filtering by status & status update in Core API --- .../java/feast/core/grpc/CoreServiceImpl.java | 16 +++ .../core/service/JobCoordinatorService.java | 114 +++++++++++------- .../java/feast/core/service/SpecService.java | 39 ++++-- .../service/JobCoordinatorServiceTest.java | 62 +++++++--- protos/feast/core/CoreService.proto | 12 ++ 5 files changed, 176 insertions(+), 67 deletions(-) diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index b2545d2345..3a7819907d 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -167,6 +167,22 @@ public void getFeatureStatistics( } } + @Override + public void updateFeatureSetStatus( + UpdateFeatureSetStatusRequest request, + StreamObserver responseObserver) { + try { + UpdateFeatureSetStatusResponse response = specService.updateFeatureSetStatus(request); + + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + log.error("Exception has occurred in UpdateFeatureSetStatus method: ", e); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); + } + } + @Override public void listStores( ListStoresRequest request, StreamObserver responseObserver) { diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 6ee226d5e8..766dc80394 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -18,22 +18,24 @@ import static feast.common.models.Store.isSubscribedToFeatureSet; import static feast.core.model.FeatureSet.parseReference; -import static feast.core.util.StreamUtil.wrapException; import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; import feast.common.models.FeatureSetReference; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; -import feast.core.dao.FeatureSetRepository; import feast.core.job.*; import feast.core.job.task.*; -import feast.core.model.*; -import feast.core.model.FeatureSet; +import feast.core.model.FeatureSetDeliveryStatus; import feast.core.model.Job; import feast.core.model.JobStatus; +import feast.proto.core.CoreServiceProto; import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter; import feast.proto.core.CoreServiceProto.ListStoresResponse; import feast.proto.core.FeatureSetProto; +import feast.proto.core.FeatureSetProto.FeatureSet; +import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.FeatureSetReferenceProto; import feast.proto.core.IngestionJobProto; import feast.proto.core.SourceProto.Source; import feast.proto.core.StoreProto.Store; @@ -59,24 +61,21 @@ public class JobCoordinatorService { private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; private final JobRepository jobRepository; - private final FeatureSetRepository featureSetRepository; private final SpecService specService; private final JobManager jobManager; private final JobProperties jobProperties; private final JobGroupingStrategy groupingStrategy; - private final KafkaTemplate specPublisher; + private final KafkaTemplate specPublisher; @Autowired public JobCoordinatorService( JobRepository jobRepository, - FeatureSetRepository featureSetRepository, SpecService specService, JobManager jobManager, FeastProperties feastProperties, JobGroupingStrategy groupingStrategy, - KafkaTemplate specPublisher) { + KafkaTemplate specPublisher) { this.jobRepository = jobRepository; - this.featureSetRepository = featureSetRepository; this.specService = specService; this.jobManager = jobManager; this.jobProperties = feastProperties.getJobs(); @@ -236,7 +235,7 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { */ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { FeatureSetReference ref = - FeatureSetReference.of(featureSet.getProject().getName(), featureSet.getName()); + FeatureSetReference.of(featureSet.getSpec().getProject(), featureSet.getSpec().getName()); Set confirmedJobIds = new HashSet<>(); Stream> jobArgsStream = @@ -245,9 +244,9 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { s -> isSubscribedToFeatureSet( s.getSubscriptionsList(), - featureSet.getProject().getName(), - featureSet.getName())) - .map(s -> Pair.of(featureSet.getSource().toProto(), s)); + featureSet.getSpec().getProject(), + featureSet.getSpec().getName())) + .map(s -> Pair.of(featureSet.getSpec().getSource(), s)); // Add featureSet to allocated job if not allocated before for (Pair> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) { @@ -320,31 +319,47 @@ Iterable>> getSourceToStoreMappings() { * @param store to get subscribed FeatureSets for * @return list of FeatureSets that the store subscribes to. */ - private List getFeatureSetsForStore(Store store) { + private List getFeatureSetsForStore(Store store) { return store.getSubscriptionsList().stream() .flatMap( - subscription -> - featureSetRepository - .findAllByNameLikeAndProject_NameLikeOrderByNameAsc( - subscription.getName().replace('*', '%'), - subscription.getProject().replace('*', '%')) - .stream()) + subscription -> { + try { + return specService + .listFeatureSets( + CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder() + .setProject(subscription.getProject()) + .setFeatureSetName(subscription.getName()) + .build()) + .getFeatureSetsList().stream(); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException( + String.format( + "Couldn't fetch featureSets for subscription %s. Reason: %s", + subscription, e.getMessage())); + } + }) .distinct() - .map(wrapException(FeatureSet::toProto)) .collect(Collectors.toList()); } @Scheduled(fixedDelayString = "${feast.stream.specsOptions.notifyIntervalMilliseconds}") - public void notifyJobsWhenFeatureSetUpdated() { + public void notifyJobsWhenFeatureSetUpdated() throws InvalidProtocolBufferException { List pendingFeatureSets = - featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); + specService + .listFeatureSets( + CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder() + .setProject("*") + .setFeatureSetName("*") + .setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING) + .build()) + .getFeatureSetsList(); pendingFeatureSets.stream() .map(this::allocateFeatureSetToJobs) .map( fs -> { FeatureSetReference ref = - FeatureSetReference.of(fs.getProject().getName(), fs.getName()); + FeatureSetReference.of(fs.getSpec().getProject(), fs.getSpec().getName()); List deliveryStatuses = jobRepository.findByFeatureSetReference(ref).stream() .filter(Job::isRunning) @@ -360,13 +375,17 @@ public void notifyJobsWhenFeatureSetUpdated() { && pair.getRight().stream() .anyMatch( jobStatus -> - jobStatus.getDeliveredVersion() < pair.getLeft().getVersion())) + jobStatus.getDeliveredVersion() + < pair.getLeft().getSpec().getVersion())) .forEach( pair -> { FeatureSet fs = pair.getLeft(); List deliveryStatuses = pair.getRight(); - log.info("Sending new FeatureSet {} to Ingestion", fs.getReference()); + FeatureSetReference ref = + FeatureSetReference.of(fs.getSpec().getProject(), fs.getSpec().getName()); + + log.info("Sending new FeatureSet {} to Ingestion", ref); // Sending latest version of FeatureSet to all currently running IngestionJobs // (there's one topic for all sets). @@ -375,7 +394,7 @@ public void notifyJobsWhenFeatureSetUpdated() { // again later. try { specPublisher - .sendDefault(fs.getReference(), fs.toProto().getSpec()) + .sendDefault(ref.getReference(), fs.getSpec()) .get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { log.error( @@ -393,7 +412,7 @@ public void notifyJobsWhenFeatureSetUpdated() { jobStatus -> { jobStatus.setDeliveryStatus( FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - jobStatus.setDeliveredVersion(fs.getVersion()); + jobStatus.setDeliveredVersion(fs.getSpec().getVersion()); }); }); } @@ -412,13 +431,19 @@ public void notifyJobsWhenFeatureSetUpdated() { @KafkaListener( topics = {"${feast.stream.specsOptions.specsAckTopic}"}, containerFactory = "kafkaAckListenerContainerFactory") - public void listenAckFromJobs( - ConsumerRecord record) { + public void listenAckFromJobs(ConsumerRecord record) + throws InvalidProtocolBufferException { String setReference = record.key(); Pair projectAndSetName = parseReference(setReference); FeatureSet featureSet = - featureSetRepository.findFeatureSetByNameAndProject_Name( - projectAndSetName.getRight(), projectAndSetName.getLeft()); + specService + .getFeatureSet( + CoreServiceProto.GetFeatureSetRequest.newBuilder() + .setProject(projectAndSetName.getLeft()) + .setName(projectAndSetName.getRight()) + .build()) + .getFeatureSet(); + if (featureSet == null) { log.warn( String.format("ACKListener received message for unknown FeatureSet %s", setReference)); @@ -427,18 +452,18 @@ public void listenAckFromJobs( int ackVersion = record.value().getFeatureSetVersion(); - if (featureSet.getVersion() != ackVersion) { + if (featureSet.getSpec().getVersion() != ackVersion) { log.warn( String.format( "ACKListener received outdated ack for %s. Current %d, Received %d", - setReference, featureSet.getVersion(), ackVersion)); + setReference, featureSet.getSpec().getVersion(), ackVersion)); return; } - log.info("Updating featureSet {} delivery statuses.", featureSet.getReference()); - FeatureSetReference ref = - FeatureSetReference.of(featureSet.getProject().getName(), featureSet.getName()); + FeatureSetReference.of(featureSet.getSpec().getProject(), featureSet.getSpec().getName()); + + log.info("Updating featureSet {} delivery statuses.", ref); jobRepository .findById(record.value().getJobName()) @@ -459,10 +484,17 @@ public void listenAckFromJobs( .equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); if (allDelivered) { - log.info("FeatureSet {} update is completely delivered", featureSet.getReference()); - - featureSet.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY); - featureSetRepository.saveAndFlush(featureSet); + log.info("FeatureSet {} update is completely delivered", ref); + + specService.updateFeatureSetStatus( + CoreServiceProto.UpdateFeatureSetStatusRequest.newBuilder() + .setReference( + FeatureSetReferenceProto.FeatureSetReference.newBuilder() + .setName(ref.getFeatureSetName()) + .setProject(ref.getProjectName()) + .build()) + .setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY) + .build()); } } } diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 1523e847bd..e4b1ce5c43 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -37,6 +37,8 @@ import feast.proto.core.CoreServiceProto.ListStoresRequest; import feast.proto.core.CoreServiceProto.ListStoresResponse; import feast.proto.core.CoreServiceProto.ListStoresResponse.Builder; +import feast.proto.core.CoreServiceProto.UpdateFeatureSetStatusRequest; +import feast.proto.core.CoreServiceProto.UpdateFeatureSetStatusResponse; import feast.proto.core.CoreServiceProto.UpdateStoreRequest; import feast.proto.core.CoreServiceProto.UpdateStoreResponse; import feast.proto.core.FeatureSetProto; @@ -89,29 +91,33 @@ public SpecService( */ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) throws InvalidProtocolBufferException { + FeatureSet featureSet = getFeatureSet(request.getProject(), request.getName()); + return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build(); + } + + private FeatureSet getFeatureSet(String projectName, String featureSetName) { // Validate input arguments - checkValidCharacters(request.getName(), "featureSetName"); + checkValidCharacters(featureSetName, "featureSetName"); - if (request.getName().isEmpty()) { + if (featureSetName.isEmpty()) { throw new IllegalArgumentException("No feature set name provided"); } // Autofill default project if project is not specified - if (request.getProject().isEmpty()) { - request = request.toBuilder().setProject(Project.DEFAULT_NAME).build(); + if (projectName.isEmpty()) { + projectName = Project.DEFAULT_NAME; } FeatureSet featureSet; featureSet = - featureSetRepository.findFeatureSetByNameAndProject_Name( - request.getName(), request.getProject()); + featureSetRepository.findFeatureSetByNameAndProject_Name(featureSetName, projectName); if (featureSet == null) { throw new RetrievalException( - String.format("Feature set with name \"%s\" could not be found.", request.getName())); + String.format("Feature set with name \"%s\" could not be found.", featureSetName)); } - return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build(); + return featureSet; } /** @@ -138,6 +144,7 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil String name = filter.getFeatureSetName(); String project = filter.getProject(); Map labelsFilter = filter.getLabelsMap(); + FeatureSetStatus statusFilter = filter.getStatus(); if (name.isEmpty()) { throw new IllegalArgumentException( @@ -195,6 +202,10 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil if (featureSets.size() > 0) { featureSets = featureSets.stream() + .filter( + featureSet -> + statusFilter.equals(FeatureSetStatus.STATUS_INVALID) + || featureSet.getStatus().equals(statusFilter)) .filter(featureSet -> featureSet.hasAllLabels(labelsFilter)) .collect(Collectors.toList()); for (FeatureSet featureSet : featureSets) { @@ -264,6 +275,18 @@ public ListFeaturesResponse listFeatures(ListFeaturesRequest.Filter filter) { } } + /** Update FeatureSet's status by given FeatureSetReference and new status */ + public UpdateFeatureSetStatusResponse updateFeatureSetStatus( + UpdateFeatureSetStatusRequest request) { + FeatureSet featureSet = + getFeatureSet(request.getReference().getProject(), request.getReference().getName()); + + featureSet.setStatus(request.getStatus()); + featureSetRepository.saveAndFlush(featureSet); + + return UpdateFeatureSetStatusResponse.newBuilder().build(); + } + /** * Get stores matching the store name provided in the filter. If the store name is not provided, * the method will return all stores currently registered to Feast. diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 95cc49874a..74ec5a328c 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -31,21 +31,21 @@ import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; -import feast.core.dao.FeatureSetRepository; import feast.core.it.DataGenerator; import feast.core.job.*; import feast.core.job.JobRepository; import feast.core.job.task.*; -import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.util.TestUtil; import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest.Filter; import feast.proto.core.CoreServiceProto.ListFeatureSetsResponse; import feast.proto.core.CoreServiceProto.ListStoresResponse; +import feast.proto.core.FeatureSetProto.FeatureSet; import feast.proto.core.SourceProto.Source; import feast.proto.core.StoreProto.Store; import java.util.*; +import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.junit.Before; @@ -62,7 +62,6 @@ public class JobCoordinatorServiceTest { JobRepository jobRepository; @Mock SpecService specService; - @Mock FeatureSetRepository featureSetRepository; private FeastProperties feastProperties; private JobCoordinatorService jcsWithConsolidation; @@ -85,7 +84,6 @@ public void setUp() { jcsWithConsolidation = new JobCoordinatorService( jobRepository, - featureSetRepository, specService, jobManager, feastProperties, @@ -95,7 +93,6 @@ public void setUp() { jcsWithJobPerStore = new JobCoordinatorService( jobRepository, - featureSetRepository, specService, jobManager, feastProperties, @@ -130,6 +127,7 @@ public void shouldDoNothingIfNoMatchingFeatureSetsFound() throws InvalidProtocol } @Test + @SneakyThrows public void shouldGroupJobsBySource() { Store store = DataGenerator.createStore( @@ -139,12 +137,18 @@ public void shouldGroupJobsBySource() { Source source2 = DataGenerator.createSource("others.servers:9092", "topic"); FeatureSet featureSet1 = - TestUtil.createEmptyFeatureSet("features1", feast.core.model.Source.fromProto(source1)); + DataGenerator.createFeatureSet( + source1, "project1", "features1", Collections.emptyList(), Collections.emptyList()); FeatureSet featureSet2 = - TestUtil.createEmptyFeatureSet("features2", feast.core.model.Source.fromProto(source2)); + DataGenerator.createFeatureSet( + source2, "project1", "features2", Collections.emptyList(), Collections.emptyList()); - when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "project1")) - .thenReturn(Lists.newArrayList(featureSet1, featureSet2)); + when(specService.listFeatureSets( + Filter.newBuilder().setFeatureSetName("*").setProject("project1").build())) + .thenReturn( + ListFeatureSetsResponse.newBuilder() + .addAllFeatureSets(Lists.newArrayList(featureSet1, featureSet2)) + .build()); when(specService.listStores(any())) .thenReturn(ListStoresResponse.newBuilder().addStore(store).build()); @@ -157,6 +161,7 @@ public void shouldGroupJobsBySource() { } @Test + @SneakyThrows public void shouldUseStoreSubscriptionToMapStore() { Store store1 = DataGenerator.createStore( @@ -170,14 +175,25 @@ public void shouldUseStoreSubscriptionToMapStore() { Source source2 = DataGenerator.createSource("other.servers:9092", "topic"); FeatureSet featureSet1 = - TestUtil.createEmptyFeatureSet("feature1", feast.core.model.Source.fromProto(source1)); + DataGenerator.createFeatureSet( + source1, "default", "feature1", Collections.emptyList(), Collections.emptyList()); FeatureSet featureSet2 = - TestUtil.createEmptyFeatureSet("feature2", feast.core.model.Source.fromProto(source2)); + DataGenerator.createFeatureSet( + source2, "default", "feature2", Collections.emptyList(), Collections.emptyList()); + + when(specService.listFeatureSets( + Filter.newBuilder().setFeatureSetName("features1").setProject("*").build())) + .thenReturn( + ListFeatureSetsResponse.newBuilder() + .addAllFeatureSets(Lists.newArrayList(featureSet1)) + .build()); - when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("features1", "%")) - .thenReturn(Lists.newArrayList(featureSet1)); - when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("features2", "%")) - .thenReturn(Lists.newArrayList(featureSet2)); + when(specService.listFeatureSets( + Filter.newBuilder().setFeatureSetName("features2").setProject("*").build())) + .thenReturn( + ListFeatureSetsResponse.newBuilder() + .addAllFeatureSets(Lists.newArrayList(featureSet2)) + .build()); when(specService.listStores(any())) .thenReturn(ListStoresResponse.newBuilder().addStore(store1).addStore(store2).build()); @@ -233,10 +249,15 @@ public void shouldUpgradeJobWhenNeeded() { } @Test + @SneakyThrows public void shouldCreateJobIfNoRunning() { Source source = DataGenerator.createSource("kafka:9092", "topic"); Store store = DataGenerator.getDefaultStore(); + when(specService.listFeatureSets( + Filter.newBuilder().setFeatureSetName("*").setProject("*").build())) + .thenReturn(ListFeatureSetsResponse.newBuilder().build()); + List tasks = jcsWithConsolidation.makeJobUpdateTasks( ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); @@ -256,10 +277,15 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException { Source source = DataGenerator.createSource("servers:9092", "topic"); FeatureSet featureSet = - TestUtil.createEmptyFeatureSet("features1", feast.core.model.Source.fromProto(source)); + DataGenerator.createFeatureSet( + source, "default", "features1", Collections.emptyList(), Collections.emptyList()); - when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "%")) - .thenReturn(Lists.newArrayList(featureSet)); + when(specService.listFeatureSets( + Filter.newBuilder().setFeatureSetName("*").setProject("*").build())) + .thenReturn( + ListFeatureSetsResponse.newBuilder() + .addAllFeatureSets(Lists.newArrayList(featureSet)) + .build()); when(specService.listStores(any())) .thenReturn(ListStoresResponse.newBuilder().addStore(store1).addStore(store2).build()); diff --git a/protos/feast/core/CoreService.proto b/protos/feast/core/CoreService.proto index a9c3f35373..e926542d2a 100644 --- a/protos/feast/core/CoreService.proto +++ b/protos/feast/core/CoreService.proto @@ -108,6 +108,9 @@ service CoreService { // Does not support stopping a job in a transitional (ie pending, suspending, aborting) or unknown status rpc StopIngestionJob(StopIngestionJobRequest) returns (StopIngestionJobResponse); + // Internal API for Job Coordinator to update featureSet's status once responsible ingestion job is running + rpc UpdateFeatureSetStatus(UpdateFeatureSetStatusRequest) returns (UpdateFeatureSetStatusResponse); + } // Request for a single feature set @@ -150,6 +153,8 @@ message ListFeatureSetsRequest { // User defined metadata for feature set. // Feature sets with all matching labels will be returned. map labels = 4; + + FeatureSetStatus status = 5; } } @@ -349,3 +354,10 @@ message GetFeatureStatisticsResponse { // despite the message being of list type. tensorflow.metadata.v0.DatasetFeatureStatisticsList dataset_feature_statistics_list = 1; } + +message UpdateFeatureSetStatusRequest { + FeatureSetReference reference = 1; + FeatureSetStatus status = 2; +} + +message UpdateFeatureSetStatusResponse {} \ No newline at end of file From 0a4da23e0d0157e26be7795ed7d22fdba84ddcc7 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 5 Aug 2020 13:56:11 +0800 Subject: [PATCH 3/6] specs IT --- .../auth/CoreServiceAuthenticationIT.java | 7 +- .../core/auth/CoreServiceAuthorizationIT.java | 21 +- .../java/feast/core/it/DataGenerator.java | 84 ++++-- .../java/feast/core/it/SimpleAPIClient.java | 66 ++++- .../feast/core/service/JobCoordinatorIT.java | 34 +-- .../service/JobCoordinatorServiceTest.java | 20 +- .../feast/core/service/SpecServiceIT.java | 241 ++++++++++++++++++ 7 files changed, 385 insertions(+), 88 deletions(-) create mode 100644 core/src/test/java/feast/core/service/SpecServiceIT.java diff --git a/core/src/test/java/feast/core/auth/CoreServiceAuthenticationIT.java b/core/src/test/java/feast/core/auth/CoreServiceAuthenticationIT.java index aa02e0c66f..71b32ba598 100644 --- a/core/src/test/java/feast/core/auth/CoreServiceAuthenticationIT.java +++ b/core/src/test/java/feast/core/auth/CoreServiceAuthenticationIT.java @@ -162,12 +162,7 @@ void canApplyFeatureSetIfAuthenticated() { SimpleAPIClient secureApiClient = getSecureApiClient("AuthenticatedUserWithoutAuthorization@example.com"); FeatureSetProto.FeatureSet expectedFeatureSet = - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - "project_1", - "test_1", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project_1", "test_1"); secureApiClient.simpleApplyFeatureSet(expectedFeatureSet); diff --git a/core/src/test/java/feast/core/auth/CoreServiceAuthorizationIT.java b/core/src/test/java/feast/core/auth/CoreServiceAuthorizationIT.java index 2b93ca66a4..073b9605a8 100644 --- a/core/src/test/java/feast/core/auth/CoreServiceAuthorizationIT.java +++ b/core/src/test/java/feast/core/auth/CoreServiceAuthorizationIT.java @@ -208,12 +208,7 @@ void cantApplyFeatureSetIfNotProjectMember() throws InvalidProtocolBufferExcepti String userName = "random_user@example.com"; SimpleAPIClient secureApiClient = getSecureApiClient(userName); FeatureSetProto.FeatureSet expectedFeatureSet = - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - project, - "test_5", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), project, "test_5"); StatusRuntimeException exception = assertThrows( @@ -231,12 +226,7 @@ void cantApplyFeatureSetIfNotProjectMember() throws InvalidProtocolBufferExcepti void canApplyFeatureSetIfProjectMember() { SimpleAPIClient secureApiClient = getSecureApiClient(subjectInProject); FeatureSetProto.FeatureSet expectedFeatureSet = - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - project, - "test_6", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), project, "test_6"); secureApiClient.simpleApplyFeatureSet(expectedFeatureSet); @@ -253,12 +243,7 @@ void canApplyFeatureSetIfProjectMember() { void canApplyFeatureSetIfAdmin() { SimpleAPIClient secureApiClient = getSecureApiClient(subjectIsAdmin); FeatureSetProto.FeatureSet expectedFeatureSet = - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - "any_project", - "test_2", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "any_project", "test_2"); secureApiClient.simpleApplyFeatureSet(expectedFeatureSet); diff --git a/core/src/test/java/feast/core/it/DataGenerator.java b/core/src/test/java/feast/core/it/DataGenerator.java index b85e0b4f58..27133c54cf 100644 --- a/core/src/test/java/feast/core/it/DataGenerator.java +++ b/core/src/test/java/feast/core/it/DataGenerator.java @@ -22,9 +22,10 @@ import feast.proto.core.StoreProto; import feast.proto.types.ValueProto; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; public class DataGenerator { @@ -50,12 +51,7 @@ public static SourceProto.Source getDefaultSource() { } public static FeatureSetProto.FeatureSet getDefaultFeatureSet() { - return createFeatureSet( - DataGenerator.getDefaultSource(), - "default", - "test", - Collections.emptyList(), - Collections.emptyList()); + return createFeatureSet(DataGenerator.getDefaultSource(), "default", "test"); } public static SourceProto.Source createSource(String server, String topic) { @@ -106,37 +102,81 @@ public static StoreProto.Store createStore( } } + public static FeatureSetProto.FeatureSpec createFeature( + String name, ValueProto.ValueType.Enum valueType, Map labels) { + return FeatureSetProto.FeatureSpec.newBuilder() + .setName(name) + .setValueType(valueType) + .putAllLabels(labels) + .build(); + } + + public static FeatureSetProto.EntitySpec createEntity( + String name, ValueProto.ValueType.Enum valueType) { + return FeatureSetProto.EntitySpec.newBuilder().setName(name).setValueType(valueType).build(); + } + public static FeatureSetProto.FeatureSet createFeatureSet( SourceProto.Source source, String projectName, String name, - List> entities, - List> features) { + List entities, + List features, + Map labels) { return FeatureSetProto.FeatureSet.newBuilder() .setSpec( FeatureSetProto.FeatureSetSpec.newBuilder() .setSource(source) .setName(name) .setProject(projectName) + .putAllLabels(labels) + .addAllEntities(entities) + .addAllFeatures(features) + .build()) + .build(); + } + + public static FeatureSetProto.FeatureSet createFeatureSet( + SourceProto.Source source, + String projectName, + String name, + Map entities, + Map features, + Map labels) { + return FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetProto.FeatureSetSpec.newBuilder() + .setSource(source) + .setName(name) + .setProject(projectName) + .putAllLabels(labels) .addAllEntities( - entities.stream() - .map( - pair -> - FeatureSetProto.EntitySpec.newBuilder() - .setName(pair.getLeft()) - .setValueType(pair.getRight()) - .build()) + entities.entrySet().stream() + .map(entry -> createEntity(entry.getKey(), entry.getValue())) .collect(Collectors.toList())) .addAllFeatures( - features.stream() + features.entrySet().stream() .map( - pair -> - FeatureSetProto.FeatureSpec.newBuilder() - .setName(pair.getLeft()) - .setValueType(pair.getRight()) - .build()) + entry -> + createFeature( + entry.getKey(), entry.getValue(), Collections.emptyMap())) .collect(Collectors.toList())) .build()) .build(); } + + public static FeatureSetProto.FeatureSet createFeatureSet( + SourceProto.Source source, + String projectName, + String name, + Map entities, + Map features) { + return createFeatureSet(source, projectName, name, entities, features, new HashMap<>()); + } + + public static FeatureSetProto.FeatureSet createFeatureSet( + SourceProto.Source source, String projectName, String name) { + return createFeatureSet( + source, projectName, name, Collections.emptyMap(), Collections.emptyMap()); + } } diff --git a/core/src/test/java/feast/core/it/SimpleAPIClient.java b/core/src/test/java/feast/core/it/SimpleAPIClient.java index 4048a0427b..009cbca1ad 100644 --- a/core/src/test/java/feast/core/it/SimpleAPIClient.java +++ b/core/src/test/java/feast/core/it/SimpleAPIClient.java @@ -17,7 +17,10 @@ package feast.core.it; import feast.proto.core.*; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; public class SimpleAPIClient { private CoreServiceGrpc.CoreServiceBlockingStub stub; @@ -31,17 +34,44 @@ public void simpleApplyFeatureSet(FeatureSetProto.FeatureSet featureSet) { CoreServiceProto.ApplyFeatureSetRequest.newBuilder().setFeatureSet(featureSet).build()); } - public List simpleListFeatureSets(String name) { + public List simpleListFeatureSets( + String projectName, String featureSetName, Map labels) { return stub.listFeatureSets( CoreServiceProto.ListFeatureSetsRequest.newBuilder() .setFilter( CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder() - .setFeatureSetName(name) + .setProject(projectName) + .setFeatureSetName(featureSetName) + .putAllLabels(labels) .build()) .build()) .getFeatureSetsList(); } + public List simpleListFeatureSets( + String projectName, String featureSetName, FeatureSetProto.FeatureSetStatus status) { + return stub.listFeatureSets( + CoreServiceProto.ListFeatureSetsRequest.newBuilder() + .setFilter( + CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder() + .setProject(projectName) + .setFeatureSetName(featureSetName) + .setStatus(status) + .build()) + .build()) + .getFeatureSetsList(); + } + + public List simpleListFeatureSets( + String projectName, String featureSetName) { + return simpleListFeatureSets( + projectName, featureSetName, FeatureSetProto.FeatureSetStatus.STATUS_INVALID); + } + + public List simpleListFeatureSets(String featureSetName) { + return simpleListFeatureSets("default", featureSetName); + } + public FeatureSetProto.FeatureSet simpleGetFeatureSet(String projectName, String name) { return stub.getFeatureSet( CoreServiceProto.GetFeatureSetRequest.newBuilder() @@ -51,6 +81,38 @@ public FeatureSetProto.FeatureSet simpleGetFeatureSet(String projectName, String .getFeatureSet(); } + public void updateFeatureSetStatus( + String projectName, String name, FeatureSetProto.FeatureSetStatus status) { + stub.updateFeatureSetStatus( + CoreServiceProto.UpdateFeatureSetStatusRequest.newBuilder() + .setReference( + FeatureSetReferenceProto.FeatureSetReference.newBuilder() + .setProject(projectName) + .setName(name) + .build()) + .setStatus(status) + .build()); + } + + public Map simpleListFeatures( + String projectName, Map labels, List entities) { + return stub.listFeatures( + CoreServiceProto.ListFeaturesRequest.newBuilder() + .setFilter( + CoreServiceProto.ListFeaturesRequest.Filter.newBuilder() + .setProject(projectName) + .addAllEntities(entities) + .putAllLabels(labels) + .build()) + .build()) + .getFeaturesMap(); + } + + public Map simpleListFeatures( + String projectName, String... entities) { + return simpleListFeatures(projectName, Collections.emptyMap(), Arrays.asList(entities)); + } + public void updateStore(StoreProto.Store store) { stub.updateStore(CoreServiceProto.UpdateStoreRequest.newBuilder().setStore(store).build()); } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index 8726ab9787..99b0738b9b 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -40,7 +40,6 @@ import io.grpc.ManagedChannelBuilder; import java.util.*; import lombok.SneakyThrows; -import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; @@ -103,12 +102,7 @@ public void listenSpecs(ConsumerRecord record) @SneakyThrows public void shouldCreateJobForNewSource() { apiClient.simpleApplyFeatureSet( - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - "default", - "test", - Collections.emptyList(), - Collections.emptyList())); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "default", "test")); List featureSets = apiClient.simpleListFeatureSets("*"); assertThat(featureSets.size(), equalTo(1)); @@ -133,12 +127,7 @@ public void shouldCreateJobForNewSource() { @Test public void shouldUpgradeJobWhenStoreChanged() { apiClient.simpleApplyFeatureSet( - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - "project", - "test", - Collections.emptyList(), - Collections.emptyList())); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project", "test")); await().until(jobManager::getAllJobs, hasSize(1)); @@ -162,12 +151,7 @@ public void shouldUpgradeJobWhenStoreChanged() { @Test public void shouldRestoreJobThatStopped() { apiClient.simpleApplyFeatureSet( - DataGenerator.createFeatureSet( - DataGenerator.getDefaultSource(), - "project", - "test", - Collections.emptyList(), - Collections.emptyList())); + DataGenerator.createFeatureSet(DataGenerator.getDefaultSource(), "project", "test")); await().until(jobManager::getAllJobs, hasSize(1)); Job job = jobRepository.findByStatus(JobStatus.RUNNING).get(0); @@ -217,8 +201,8 @@ public void shouldSendNewSpec() { DataGenerator.getDefaultSource(), "default", "test", - ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), - Collections.emptyList())); + ImmutableMap.of("entity", ValueProto.ValueType.Enum.BOOL), + ImmutableMap.of())); FeatureSetProto.FeatureSet featureSet = apiClient.simpleGetFeatureSet("default", "test"); @@ -247,8 +231,8 @@ public void shouldUpdateSpec() { DataGenerator.getDefaultSource(), "default", "test", - ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), - ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32)))); + ImmutableMap.of("entity", ValueProto.ValueType.Enum.BOOL), + ImmutableMap.of("feature", ValueProto.ValueType.Enum.INT32))); await().until(() -> specsMailbox, hasSize(1)); @@ -309,8 +293,8 @@ public void shouldReallocateFeatureSetAfterSourceChanged() { DataGenerator.createSource("localhost", "newTopic"), "default", "test", - ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), - ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32)))); + ImmutableMap.of("entity", ValueProto.ValueType.Enum.BOOL), + ImmutableMap.of("feature", ValueProto.ValueType.Enum.INT32))); await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED)); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 74ec5a328c..1d80458a71 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -136,12 +136,8 @@ public void shouldGroupJobsBySource() { Source source1 = DataGenerator.createSource("servers:9092", "topic"); Source source2 = DataGenerator.createSource("others.servers:9092", "topic"); - FeatureSet featureSet1 = - DataGenerator.createFeatureSet( - source1, "project1", "features1", Collections.emptyList(), Collections.emptyList()); - FeatureSet featureSet2 = - DataGenerator.createFeatureSet( - source2, "project1", "features2", Collections.emptyList(), Collections.emptyList()); + FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "project1", "features1"); + FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "project1", "features2"); when(specService.listFeatureSets( Filter.newBuilder().setFeatureSetName("*").setProject("project1").build())) @@ -174,12 +170,8 @@ public void shouldUseStoreSubscriptionToMapStore() { Source source1 = DataGenerator.createSource("servers:9092", "topic"); Source source2 = DataGenerator.createSource("other.servers:9092", "topic"); - FeatureSet featureSet1 = - DataGenerator.createFeatureSet( - source1, "default", "feature1", Collections.emptyList(), Collections.emptyList()); - FeatureSet featureSet2 = - DataGenerator.createFeatureSet( - source2, "default", "feature2", Collections.emptyList(), Collections.emptyList()); + FeatureSet featureSet1 = DataGenerator.createFeatureSet(source1, "default", "feature1"); + FeatureSet featureSet2 = DataGenerator.createFeatureSet(source2, "default", "feature2"); when(specService.listFeatureSets( Filter.newBuilder().setFeatureSetName("features1").setProject("*").build())) @@ -276,9 +268,7 @@ public void shouldCreateJobPerStore() throws InvalidProtocolBufferException { Source source = DataGenerator.createSource("servers:9092", "topic"); - FeatureSet featureSet = - DataGenerator.createFeatureSet( - source, "default", "features1", Collections.emptyList(), Collections.emptyList()); + FeatureSet featureSet = DataGenerator.createFeatureSet(source, "default", "features1"); when(specService.listFeatureSets( Filter.newBuilder().setFeatureSetName("*").setProject("*").build())) diff --git a/core/src/test/java/feast/core/service/SpecServiceIT.java b/core/src/test/java/feast/core/service/SpecServiceIT.java new file mode 100644 index 0000000000..b7850a0819 --- /dev/null +++ b/core/src/test/java/feast/core/service/SpecServiceIT.java @@ -0,0 +1,241 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import static com.jayway.jsonassert.impl.matcher.IsMapContainingKey.hasKey; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsMapWithSize.aMapWithSize; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsIterableContaining.hasItem; + +import avro.shaded.com.google.common.collect.ImmutableMap; +import feast.core.it.BaseIT; +import feast.core.it.DataGenerator; +import feast.core.it.SimpleAPIClient; +import feast.proto.core.CoreServiceGrpc; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.SourceProto; +import feast.proto.types.ValueProto; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; + +@SpringBootTest +public class SpecServiceIT extends BaseIT { + + static CoreServiceGrpc.CoreServiceBlockingStub stub; + static SimpleAPIClient apiClient; + + @BeforeAll + public static void globalSetUp(@Value("${grpc.server.port}") int port) { + ManagedChannel channel = + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); + stub = CoreServiceGrpc.newBlockingStub(channel); + apiClient = new SimpleAPIClient(stub); + } + + @BeforeEach + public void initState() { + SourceProto.Source source = DataGenerator.getDefaultSource(); + + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + source, + "default", + "fs1", + ImmutableMap.of("id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("total", ValueProto.ValueType.Enum.INT64))); + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + source, + "default", + "fs2", + ImmutableMap.of("user_id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("sum", ValueProto.ValueType.Enum.INT64))); + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + source, + "project1", + "fs3", + ImmutableList.of( + DataGenerator.createEntity("user_id", ValueProto.ValueType.Enum.STRING)), + ImmutableList.of( + DataGenerator.createFeature( + "feature1", ValueProto.ValueType.Enum.INT32, Collections.emptyMap()), + DataGenerator.createFeature( + "feature2", ValueProto.ValueType.Enum.INT32, Collections.emptyMap())), + Collections.emptyMap())); + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + source, + "project1", + "fs4", + ImmutableList.of( + DataGenerator.createEntity("customer_id", ValueProto.ValueType.Enum.STRING)), + ImmutableList.of( + DataGenerator.createFeature( + "feature2", + ValueProto.ValueType.Enum.INT32, + ImmutableMap.of("app", "feast", "version", "one"))), + ImmutableMap.of("label", "some"))); + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + source, + "project1", + "fs5", + ImmutableList.of( + DataGenerator.createEntity("customer_id", ValueProto.ValueType.Enum.STRING)), + ImmutableList.of( + DataGenerator.createFeature( + "feature3", + ValueProto.ValueType.Enum.INT32, + ImmutableMap.of("app", "feast", "version", "two"))), + Collections.emptyMap())); + apiClient.simpleApplyFeatureSet(DataGenerator.createFeatureSet(source, "default", "new_fs")); + } + + @Nested + class ListFeatureSets { + + @Test + public void shouldGetAllFeatureSetsIfOnlyWildcardsProvided() { + List featureSets = apiClient.simpleListFeatureSets("*", "*"); + + assertThat(featureSets, hasSize(6)); + } + + @Test + public void shouldGetAllFeatureSetsMatchingNameWithWildcardSearch() { + List featureSets = + apiClient.simpleListFeatureSets("default", "fs*"); + + assertThat(featureSets, hasSize(2)); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs1"))))); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs2"))))); + } + + @Test + public void shouldFilterFeatureSetsByNameAndProject() { + List featureSets = + apiClient.simpleListFeatureSets("project1", "fs3"); + + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs3"))))); + } + + @Test + public void shouldFilterFeatureSetsByStatus() { + apiClient.updateFeatureSetStatus( + "project1", "fs3", FeatureSetProto.FeatureSetStatus.STATUS_READY); + + apiClient.updateFeatureSetStatus( + "project1", "fs4", FeatureSetProto.FeatureSetStatus.STATUS_READY); + + List featureSets = + apiClient.simpleListFeatureSets("*", "*", FeatureSetProto.FeatureSetStatus.STATUS_READY); + + assertThat(featureSets, hasSize(2)); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs3"))))); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs4"))))); + + assertThat( + apiClient.simpleListFeatureSets( + "default", "*", FeatureSetProto.FeatureSetStatus.STATUS_PENDING), + hasSize(3)); + } + + @Test + public void shouldFilterFeatureSetsByLabels() { + List featureSets = + apiClient.simpleListFeatureSets("project1", "*", ImmutableMap.of("label", "some")); + + assertThat(featureSets, hasSize(1)); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs4"))))); + } + + @Test + public void shouldUseDefaultProjectIfProjectUnspecified() { + List featureSets = apiClient.simpleListFeatureSets("", "*"); + + assertThat(featureSets, hasSize(3)); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs1"))))); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("fs2"))))); + assertThat(featureSets, hasItem(hasProperty("spec", hasProperty("name", equalTo("new_fs"))))); + } + + @Test + public void shouldThrowExceptionGivenMissingFeatureSetName() {} + } + + @Test + public void shouldFilterFeaturesByEntitiesAndLabels() { + // Case 1: Only filter by entities + Map result1 = + apiClient.simpleListFeatures("project1", "user_id"); + + assertThat(result1, aMapWithSize(2)); + assertThat(result1, hasKey(equalTo("project1/fs3:feature1"))); + assertThat(result1, hasKey(equalTo("project1/fs3:feature2"))); + + // Case 2: Filter by entities and labels + Map result2 = + apiClient.simpleListFeatures( + "project1", + ImmutableMap.of("app", "feast", "version", "one"), + ImmutableList.of("customer_id")); + + assertThat(result2, aMapWithSize(1)); + assertThat(result2, hasKey(equalTo("project1/fs4:feature2"))); + + // Case 3: Filter by labels + Map result3 = + apiClient.simpleListFeatures( + "project1", ImmutableMap.of("app", "feast"), Collections.emptyList()); + + assertThat(result3, aMapWithSize(2)); + assertThat(result3, hasKey(equalTo("project1/fs4:feature2"))); + assertThat(result3, hasKey(equalTo("project1/fs5:feature3"))); + + // Case 4: Filter by nothing, except project + Map result4 = + apiClient.simpleListFeatures("project1", ImmutableMap.of(), Collections.emptyList()); + + assertThat(result4, aMapWithSize(4)); + assertThat(result4, hasKey(equalTo("project1/fs3:feature1"))); + assertThat(result4, hasKey(equalTo("project1/fs3:feature1"))); + assertThat(result4, hasKey(equalTo("project1/fs4:feature2"))); + assertThat(result4, hasKey(equalTo("project1/fs5:feature3"))); + + // Case 5: Filter by nothing; will use default project + Map result5 = + apiClient.simpleListFeatures("", ImmutableMap.of(), Collections.emptyList()); + + assertThat(result5, aMapWithSize(2)); + assertThat(result5, hasKey(equalTo("default/fs1:total"))); + assertThat(result5, hasKey(equalTo("default/fs2:sum"))); + } +} From c0ec97251e7916686ac9d1ebb23ddb9ae2d9259d Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 11 Aug 2020 13:59:09 +0800 Subject: [PATCH 4/6] SpecServiceIT --- core/src/test/java/feast/core/it/BaseIT.java | 11 +- .../java/feast/core/it/SimpleAPIClient.java | 23 +- .../feast/core/service/JobCoordinatorIT.java | 2 +- .../feast/core/service/SpecServiceIT.java | 553 ++++++++- .../feast/core/service/SpecServiceTest.java | 1037 ----------------- .../test/resources/application-it.properties | 1 + 6 files changed, 531 insertions(+), 1096 deletions(-) delete mode 100644 core/src/test/java/feast/core/service/SpecServiceTest.java diff --git a/core/src/test/java/feast/core/it/BaseIT.java b/core/src/test/java/feast/core/it/BaseIT.java index 1d99e8dcec..e2e6f0c5a6 100644 --- a/core/src/test/java/feast/core/it/BaseIT.java +++ b/core/src/test/java/feast/core/it/BaseIT.java @@ -187,15 +187,20 @@ public static void cleanTables(EntityManager em) throws SQLException { @PersistenceContext EntityManager entityManager; /** Used to determine SequentialFlows */ - public Boolean isNestedTest(TestInfo testInfo) { - return testInfo.getTestClass().get().getAnnotation(Nested.class) != null; + public Boolean isSequentialTest(TestInfo testInfo) { + try { + testInfo.getTestClass().get().asSubclass(SequentialFlow.class); + } catch (ClassCastException e) { + return false; + } + return true; } @AfterEach public void tearDown(TestInfo testInfo) throws Exception { CollectorRegistry.defaultRegistry.clear(); - if (!isNestedTest(testInfo)) { + if (!isSequentialTest(testInfo)) { cleanTables(entityManager); } } diff --git a/core/src/test/java/feast/core/it/SimpleAPIClient.java b/core/src/test/java/feast/core/it/SimpleAPIClient.java index 009cbca1ad..6e6f66b101 100644 --- a/core/src/test/java/feast/core/it/SimpleAPIClient.java +++ b/core/src/test/java/feast/core/it/SimpleAPIClient.java @@ -29,8 +29,9 @@ public SimpleAPIClient(CoreServiceGrpc.CoreServiceBlockingStub stub) { this.stub = stub; } - public void simpleApplyFeatureSet(FeatureSetProto.FeatureSet featureSet) { - stub.applyFeatureSet( + public CoreServiceProto.ApplyFeatureSetResponse simpleApplyFeatureSet( + FeatureSetProto.FeatureSet featureSet) { + return stub.applyFeatureSet( CoreServiceProto.ApplyFeatureSetRequest.newBuilder().setFeatureSet(featureSet).build()); } @@ -113,14 +114,19 @@ public Map simpleListFeatures( return simpleListFeatures(projectName, Collections.emptyMap(), Arrays.asList(entities)); } - public void updateStore(StoreProto.Store store) { - stub.updateStore(CoreServiceProto.UpdateStoreRequest.newBuilder().setStore(store).build()); + public CoreServiceProto.UpdateStoreResponse updateStore(StoreProto.Store store) { + return stub.updateStore( + CoreServiceProto.UpdateStoreRequest.newBuilder().setStore(store).build()); } public void createProject(String name) { stub.createProject(CoreServiceProto.CreateProjectRequest.newBuilder().setName(name).build()); } + public void archiveProject(String name) { + stub.archiveProject(CoreServiceProto.ArchiveProjectRequest.newBuilder().setName(name).build()); + } + public void restartIngestionJob(String jobId) { stub.restartIngestionJob( CoreServiceProto.RestartIngestionJobRequest.newBuilder().setId(jobId).build()); @@ -139,4 +145,13 @@ public String getFeastCoreVersion() { feast.proto.core.CoreServiceProto.GetFeastCoreVersionRequest.getDefaultInstance()) .getVersion(); } + + public FeatureSetProto.FeatureSet getFeatureSet(String projectName, String featureSetName) { + return stub.getFeatureSet( + CoreServiceProto.GetFeatureSetRequest.newBuilder() + .setProject(projectName) + .setName(featureSetName) + .build()) + .getFeatureSet(); + } } diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index 99b0738b9b..afae040d95 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -82,7 +82,7 @@ public void setUp(TestInfo testInfo) { specsMailbox = new ArrayList<>(); - if (!isNestedTest(testInfo)) { + if (!isSequentialTest(testInfo)) { jobManager.cleanAll(); jobRepository.deleteAll(); } diff --git a/core/src/test/java/feast/core/service/SpecServiceIT.java b/core/src/test/java/feast/core/service/SpecServiceIT.java index b7850a0819..8bd935febe 100644 --- a/core/src/test/java/feast/core/service/SpecServiceIT.java +++ b/core/src/test/java/feast/core/service/SpecServiceIT.java @@ -17,32 +17,37 @@ package feast.core.service; import static com.jayway.jsonassert.impl.matcher.IsMapContainingKey.hasKey; +import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsMapContaining.hasEntry; import static org.hamcrest.collection.IsMapWithSize.aMapWithSize; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsIterableContaining.hasItem; +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import avro.shaded.com.google.common.collect.ImmutableMap; +import com.google.protobuf.Duration; import feast.core.it.BaseIT; import feast.core.it.DataGenerator; import feast.core.it.SimpleAPIClient; -import feast.proto.core.CoreServiceGrpc; -import feast.proto.core.FeatureSetProto; -import feast.proto.core.SourceProto; +import feast.proto.core.*; import feast.proto.types.ValueProto; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import io.grpc.StatusRuntimeException; +import java.util.*; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; +import org.tensorflow.metadata.v0.*; import org.testcontainers.shaded.com.google.common.collect.ImmutableList; @SpringBootTest @@ -117,6 +122,7 @@ public void initState() { ImmutableMap.of("app", "feast", "version", "two"))), Collections.emptyMap())); apiClient.simpleApplyFeatureSet(DataGenerator.createFeatureSet(source, "default", "new_fs")); + apiClient.updateStore(DataGenerator.getDefaultStore()); } @Nested @@ -188,54 +194,499 @@ public void shouldUseDefaultProjectIfProjectUnspecified() { } @Test - public void shouldThrowExceptionGivenMissingFeatureSetName() {} + public void shouldThrowExceptionGivenMissingFeatureSetName() { + assertThrows(StatusRuntimeException.class, () -> apiClient.simpleListFeatureSets("", "")); + } } - @Test - public void shouldFilterFeaturesByEntitiesAndLabels() { - // Case 1: Only filter by entities - Map result1 = - apiClient.simpleListFeatures("project1", "user_id"); + @Nested + class ApplyFeatureSet { + @Test + public void shouldThrowExceptionGivenReservedFeatureName() { + List reservedNames = + Arrays.asList("created_timestamp", "event_timestamp", "ingestion_id", "job_id"); + String reservedNamesString = StringUtils.join(reservedNames, ", "); + + StatusRuntimeException exc = + assertThrows( + StatusRuntimeException.class, + () -> + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "project", + "name", + ImmutableMap.of("entity", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("event_timestamp", ValueProto.ValueType.Enum.STRING)))); - assertThat(result1, aMapWithSize(2)); - assertThat(result1, hasKey(equalTo("project1/fs3:feature1"))); - assertThat(result1, hasKey(equalTo("project1/fs3:feature2"))); + assertThat( + exc.getMessage(), + equalTo( + String.format( + "INTERNAL: Reserved feature names have been used, which are not allowed. These names include %s." + + "You've just used an invalid name, %s.", + reservedNamesString, "event_timestamp"))); + } - // Case 2: Filter by entities and labels - Map result2 = - apiClient.simpleListFeatures( - "project1", - ImmutableMap.of("app", "feast", "version", "one"), - ImmutableList.of("customer_id")); - - assertThat(result2, aMapWithSize(1)); - assertThat(result2, hasKey(equalTo("project1/fs4:feature2"))); - - // Case 3: Filter by labels - Map result3 = - apiClient.simpleListFeatures( - "project1", ImmutableMap.of("app", "feast"), Collections.emptyList()); - - assertThat(result3, aMapWithSize(2)); - assertThat(result3, hasKey(equalTo("project1/fs4:feature2"))); - assertThat(result3, hasKey(equalTo("project1/fs5:feature3"))); - - // Case 4: Filter by nothing, except project - Map result4 = - apiClient.simpleListFeatures("project1", ImmutableMap.of(), Collections.emptyList()); - - assertThat(result4, aMapWithSize(4)); - assertThat(result4, hasKey(equalTo("project1/fs3:feature1"))); - assertThat(result4, hasKey(equalTo("project1/fs3:feature1"))); - assertThat(result4, hasKey(equalTo("project1/fs4:feature2"))); - assertThat(result4, hasKey(equalTo("project1/fs5:feature3"))); - - // Case 5: Filter by nothing; will use default project - Map result5 = - apiClient.simpleListFeatures("", ImmutableMap.of(), Collections.emptyList()); - - assertThat(result5, aMapWithSize(2)); - assertThat(result5, hasKey(equalTo("default/fs1:total"))); - assertThat(result5, hasKey(equalTo("default/fs2:sum"))); + @Test + public void shouldReturnFeatureSetIfFeatureSetHasNotChanged() { + FeatureSetProto.FeatureSet featureSet = apiClient.getFeatureSet("default", "fs1"); + + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet(featureSet); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.NO_CHANGE)); + assertThat( + response.getFeatureSet().getSpec().getVersion(), + equalTo(featureSet.getSpec().getVersion())); + } + + @Test + public void shouldApplyFeatureSetIfNotExists() { + FeatureSetProto.FeatureSet featureSet = + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "new", + ImmutableMap.of("id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("feature", ValueProto.ValueType.Enum.STRING)); + + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet(featureSet); + + assertThat( + response.getFeatureSet().getSpec(), + equalTo( + featureSet + .getSpec() + .toBuilder() + .setVersion(1) + .setMaxAge(Duration.newBuilder().build()) + .build())); + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.CREATED)); + } + + @Test + public void shouldUpdateAndSaveFeatureSetIfAlreadyExists() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "fs1", + ImmutableMap.of("id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of( + "total", ValueProto.ValueType.Enum.INT64, + "subtotal", ValueProto.ValueType.Enum.INT64))); + + assertThat( + response.getFeatureSet().getSpec().getFeaturesList(), + hasItem(hasProperty("name", equalTo("subtotal")))); + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.UPDATED)); + assertThat(response.getFeatureSet().getSpec().getVersion(), equalTo(2)); + } + + @Test + public void shouldAcceptPresenceShapeAndDomainConstraints() { + List entitySpecs = new ArrayList<>(); + entitySpecs.add( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .build()); + entitySpecs.add( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity2") + .setValueType(ValueProto.ValueType.Enum.INT64) + .build()); + entitySpecs.add( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity3") + .setValueType(ValueProto.ValueType.Enum.FLOAT) + .build()); + entitySpecs.add( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity4") + .setValueType(ValueProto.ValueType.Enum.STRING) + .build()); + entitySpecs.add( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity5") + .setValueType(ValueProto.ValueType.Enum.BOOL) + .build()); + + List featureSpecs = new ArrayList<>(); + featureSpecs.add( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build()); + featureSpecs.add( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature3") + .setValueType(ValueProto.ValueType.Enum.FLOAT) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setFloatDomain(FloatDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature4") + .setValueType(ValueProto.ValueType.Enum.STRING) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setStringDomain(StringDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature5") + .setValueType(ValueProto.ValueType.Enum.BOOL) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setBoolDomain(BoolDomain.getDefaultInstance()) + .build()); + + FeatureSetProto.FeatureSetSpec featureSetSpec = + FeatureSetProto.FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllEntities(entitySpecs) + .addAllFeatures(featureSpecs) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); + + CoreServiceProto.ApplyFeatureSetResponse applyFeatureSetResponse = + apiClient.simpleApplyFeatureSet(featureSet); + FeatureSetProto.FeatureSetSpec appliedFeatureSetSpec = + applyFeatureSetResponse.getFeatureSet().getSpec(); + + // appliedEntitySpecs needs to be sorted because the list returned by specService may not + // follow the order in the request + List appliedEntitySpecs = + new ArrayList<>(appliedFeatureSetSpec.getEntitiesList()); + appliedEntitySpecs.sort(Comparator.comparing(FeatureSetProto.EntitySpec::getName)); + + // appliedFeatureSpecs needs to be sorted because the list returned by specService may not + // follow the order in the request + List appliedFeatureSpecs = + new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); + appliedFeatureSpecs.sort(Comparator.comparing(FeatureSetProto.FeatureSpec::getName)); + + assertEquals(appliedEntitySpecs, entitySpecs); + assertEquals(appliedFeatureSpecs, featureSpecs); + } + + @Test + public void shouldUpdateFeatureSetWhenConstraintsAreUpdated() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "fs1", + ImmutableList.of( + FeatureSetProto.EntitySpec.newBuilder() + .setName("id") + .setValueType(ValueProto.ValueType.Enum.STRING) + .build()), + ImmutableList.of( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("total") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setIntDomain(IntDomain.newBuilder().setMin(0).setMax(100).build()) + .build()), + Collections.emptyMap())); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.UPDATED)); + assertThat( + response.getFeatureSet().getSpec().getFeaturesList(), + hasItem( + hasProperty( + "intDomain", equalTo(IntDomain.newBuilder().setMin(0).setMax(100).build())))); + } + + @Test + public void shouldCreateProjectWhenNotAlreadyExists() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "new_project", + "new_fs", + ImmutableMap.of("id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("total", ValueProto.ValueType.Enum.INT64))); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.CREATED)); + assertThat(response.getFeatureSet().getSpec().getProject(), equalTo("new_project")); + } + + @Test + public void shouldUsedDefaultProjectIfUnspecified() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "", + "some", + ImmutableMap.of("id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("total", ValueProto.ValueType.Enum.INT64))); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.CREATED)); + assertThat(response.getFeatureSet().getSpec().getProject(), equalTo("default")); + } + + @Test + public void shouldFailWhenProjectIsArchived() { + apiClient.createProject("archived"); + apiClient.archiveProject("archived"); + + StatusRuntimeException exc = + assertThrows( + StatusRuntimeException.class, + () -> + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "archived", + "fs", + ImmutableMap.of("id", ValueProto.ValueType.Enum.STRING), + ImmutableMap.of("total", ValueProto.ValueType.Enum.INT64)))); + assertThat(exc.getMessage(), equalTo("INTERNAL: Project is archived: archived")); + } + + @Test + public void shouldAcceptFeatureLabels() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "some", + ImmutableList.of( + FeatureSetProto.EntitySpec.newBuilder() + .setName("id") + .setValueType(ValueProto.ValueType.Enum.STRING) + .build()), + ImmutableList.of( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .putAllLabels(ImmutableMap.of("type", "integer")) + .build(), + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(ValueProto.ValueType.Enum.STRING) + .putAllLabels(ImmutableMap.of("type", "string")) + .build()), + Collections.emptyMap())); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.CREATED)); + assertThat( + response.getFeatureSet().getSpec().getFeaturesList(), + hasItem( + allOf( + hasProperty("name", equalTo("feature1")), + hasProperty("labelsMap", hasEntry("type", "integer"))))); + assertThat( + response.getFeatureSet().getSpec().getFeaturesList(), + hasItem( + allOf( + hasProperty("name", equalTo("feature2")), + hasProperty("labelsMap", hasEntry("type", "string"))))); + } + + @Test + public void shouldUpdateLabels() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "project1", + "fs4", + ImmutableList.of( + DataGenerator.createEntity("customer_id", ValueProto.ValueType.Enum.STRING)), + ImmutableList.of( + DataGenerator.createFeature( + "feature2", + ValueProto.ValueType.Enum.INT32, + ImmutableMap.of("app", "feast", "version", "two"))), + ImmutableMap.of("label", "some"))); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.UPDATED)); + assertThat( + response.getFeatureSet().getSpec().getFeaturesList(), + hasItem( + allOf( + hasProperty("name", equalTo("feature2")), + hasProperty("labelsMap", hasEntry("version", "two"))))); + } + + @Test + public void shouldAcceptFeatureSetLabels() { + CoreServiceProto.ApplyFeatureSetResponse response = + apiClient.simpleApplyFeatureSet( + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "", + "some", + ImmutableList.of( + DataGenerator.createEntity("customer_id", ValueProto.ValueType.Enum.STRING)), + ImmutableList.of(), + ImmutableMap.of("label", "some"))); + + assertThat( + response.getStatus(), equalTo(CoreServiceProto.ApplyFeatureSetResponse.Status.CREATED)); + assertThat(response.getFeatureSet().getSpec().getLabelsMap(), hasEntry("label", "some")); + } + } + + @Nested + class UpdateStore { + @Test + public void shouldUpdateStoreIfConfigChanges() { + StoreProto.Store defaultStore = DataGenerator.getDefaultStore(); + + StoreProto.Store updatedStore = + DataGenerator.createStore( + defaultStore.getName(), + defaultStore.getType(), + ImmutableList.of(Triple.of("project1", "*", false))); + + CoreServiceProto.UpdateStoreResponse response = apiClient.updateStore(updatedStore); + assertThat( + response.getStatus(), equalTo(CoreServiceProto.UpdateStoreResponse.Status.UPDATED)); + } + + @Test + public void shouldDoNothingIfNoChange() { + CoreServiceProto.UpdateStoreResponse response = + apiClient.updateStore(DataGenerator.getDefaultStore()); + assertThat( + response.getStatus(), equalTo(CoreServiceProto.UpdateStoreResponse.Status.NO_CHANGE)); + } + } + + @Nested + class GetFeatureSet { + @Test + public void shouldThrowExceptionGivenMissingFeatureSet() { + StatusRuntimeException exc = + assertThrows( + StatusRuntimeException.class, () -> apiClient.getFeatureSet("default", "unknown")); + + assertThat( + exc.getMessage(), + equalTo("INTERNAL: Feature set with name \"unknown\" could not be found.")); + } + } + + @Nested + class ListStores { + @Test + public void shouldReturnAllStoresIfNoNameProvided() { + apiClient.updateStore(DataGenerator.getDefaultStore()); + apiClient.updateStore( + DataGenerator.createStore( + "data", StoreProto.Store.StoreType.REDIS, Collections.emptyList())); + + List actual = + stub.listStores( + CoreServiceProto.ListStoresRequest.newBuilder() + .setFilter(CoreServiceProto.ListStoresRequest.Filter.newBuilder().build()) + .build()) + .getStoreList(); + + assertThat(actual, hasSize(2)); + assertThat(actual, hasItem(hasProperty("name", equalTo("test-store")))); + assertThat(actual, hasItem(hasProperty("name", equalTo("data")))); + } + + @Test + public void shouldThrowRetrievalExceptionIfNoStoresFoundWithName() { + StatusRuntimeException exc = + assertThrows( + StatusRuntimeException.class, + () -> + stub.listStores( + CoreServiceProto.ListStoresRequest.newBuilder() + .setFilter( + CoreServiceProto.ListStoresRequest.Filter.newBuilder() + .setName("unknown") + .build()) + .build())); + + assertThat(exc.getMessage(), equalTo("INTERNAL: Store with name 'unknown' not found")); + } + } + + @Nested + class ListFeatures { + @Test + public void shouldFilterFeaturesByEntitiesAndLabels() { + // Case 1: Only filter by entities + Map result1 = + apiClient.simpleListFeatures("project1", "user_id"); + + assertThat(result1, aMapWithSize(2)); + assertThat(result1, hasKey(equalTo("project1/fs3:feature1"))); + assertThat(result1, hasKey(equalTo("project1/fs3:feature2"))); + + // Case 2: Filter by entities and labels + Map result2 = + apiClient.simpleListFeatures( + "project1", + ImmutableMap.of("app", "feast", "version", "one"), + ImmutableList.of("customer_id")); + + assertThat(result2, aMapWithSize(1)); + assertThat(result2, hasKey(equalTo("project1/fs4:feature2"))); + + // Case 3: Filter by labels + Map result3 = + apiClient.simpleListFeatures( + "project1", ImmutableMap.of("app", "feast"), Collections.emptyList()); + + assertThat(result3, aMapWithSize(2)); + assertThat(result3, hasKey(equalTo("project1/fs4:feature2"))); + assertThat(result3, hasKey(equalTo("project1/fs5:feature3"))); + + // Case 4: Filter by nothing, except project + Map result4 = + apiClient.simpleListFeatures("project1", ImmutableMap.of(), Collections.emptyList()); + + assertThat(result4, aMapWithSize(4)); + assertThat(result4, hasKey(equalTo("project1/fs3:feature1"))); + assertThat(result4, hasKey(equalTo("project1/fs3:feature1"))); + assertThat(result4, hasKey(equalTo("project1/fs4:feature2"))); + assertThat(result4, hasKey(equalTo("project1/fs5:feature3"))); + + // Case 5: Filter by nothing; will use default project + Map result5 = + apiClient.simpleListFeatures("", ImmutableMap.of(), Collections.emptyList()); + + assertThat(result5, aMapWithSize(2)); + assertThat(result5, hasKey(equalTo("default/fs1:total"))); + assertThat(result5, hasKey(equalTo("default/fs2:sum"))); + } } } diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java deleted file mode 100644 index 03b16543ba..0000000000 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ /dev/null @@ -1,1037 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.core.service; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; -import static org.mockito.MockitoAnnotations.initMocks; - -import com.google.api.client.util.Lists; -import com.google.protobuf.InvalidProtocolBufferException; -import feast.core.dao.FeatureSetRepository; -import feast.core.dao.ProjectRepository; -import feast.core.dao.StoreRepository; -import feast.core.exception.RetrievalException; -import feast.core.model.*; -import feast.core.util.TestUtil; -import feast.proto.core.CoreServiceProto.ApplyFeatureSetResponse; -import feast.proto.core.CoreServiceProto.ApplyFeatureSetResponse.Status; -import feast.proto.core.CoreServiceProto.GetFeatureSetRequest; -import feast.proto.core.CoreServiceProto.GetFeatureSetResponse; -import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest.Filter; -import feast.proto.core.CoreServiceProto.ListFeatureSetsResponse; -import feast.proto.core.CoreServiceProto.ListFeaturesRequest; -import feast.proto.core.CoreServiceProto.ListFeaturesResponse; -import feast.proto.core.CoreServiceProto.ListStoresRequest; -import feast.proto.core.CoreServiceProto.ListStoresResponse; -import feast.proto.core.CoreServiceProto.UpdateStoreRequest; -import feast.proto.core.CoreServiceProto.UpdateStoreResponse; -import feast.proto.core.FeatureSetProto; -import feast.proto.core.FeatureSetProto.EntitySpec; -import feast.proto.core.FeatureSetProto.FeatureSetSpec; -import feast.proto.core.FeatureSetProto.FeatureSpec; -import feast.proto.core.StoreProto; -import feast.proto.core.StoreProto.Store.RedisConfig; -import feast.proto.core.StoreProto.Store.StoreType; -import feast.proto.core.StoreProto.Store.Subscription; -import feast.proto.types.ValueProto.ValueType.Enum; -import java.sql.Date; -import java.time.Instant; -import java.util.*; -import java.util.Map.Entry; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.tensorflow.metadata.v0.BoolDomain; -import org.tensorflow.metadata.v0.FeaturePresence; -import org.tensorflow.metadata.v0.FeaturePresenceWithinGroup; -import org.tensorflow.metadata.v0.FixedShape; -import org.tensorflow.metadata.v0.FloatDomain; -import org.tensorflow.metadata.v0.ImageDomain; -import org.tensorflow.metadata.v0.IntDomain; -import org.tensorflow.metadata.v0.MIDDomain; -import org.tensorflow.metadata.v0.NaturalLanguageDomain; -import org.tensorflow.metadata.v0.StringDomain; -import org.tensorflow.metadata.v0.StructDomain; -import org.tensorflow.metadata.v0.TimeDomain; -import org.tensorflow.metadata.v0.TimeOfDayDomain; -import org.tensorflow.metadata.v0.URLDomain; -import org.tensorflow.metadata.v0.ValueCount; - -public class SpecServiceTest { - - @Mock private FeatureSetRepository featureSetRepository; - - @Mock private StoreRepository storeRepository; - - @Mock private ProjectRepository projectRepository; - - @Rule public final ExpectedException expectedException = ExpectedException.none(); - - private SpecService specService; - private List featureSets; - private List invalidFeatureSets; - private List features; - private List stores; - private Source defaultSource; - - // TODO: Updates update features in place, so if tests follow the wrong order they might break. - // Refactor this maybe? - @Before - public void setUp() throws InvalidProtocolBufferException { - initMocks(this); - defaultSource = TestUtil.defaultSource; - - FeatureSet featureSet1 = newDummyFeatureSet("f1", "project1"); - FeatureSet featureSet2 = newDummyFeatureSet("f2", "project1"); - - Map featureLabels1 = Map.ofEntries(Map.entry("key1", "val1")); - Map featureLabels2 = Map.ofEntries(Map.entry("key2", "val2")); - Map dummyLabels = Map.ofEntries(Map.entry("key", "value")); - - Feature dummyFeature = TestUtil.CreateFeature("feature", Enum.STRING, dummyLabels); - Feature f3f1 = TestUtil.CreateFeature("f3f1", Enum.INT64); - Feature f3f2 = TestUtil.CreateFeature("f3f2", Enum.INT64); - Entity f3e1 = TestUtil.CreateEntity("f3e1", Enum.STRING); - FeatureSet featureSet3 = - TestUtil.CreateFeatureSet("f3", "project1", Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)); - - FeatureSet featureSet4 = newDummyFeatureSet("f4", Project.DEFAULT_NAME); - Map singleFeatureSetLabels = - new HashMap<>() { - { - put("fsLabel1", "fsValue1"); - } - }; - Map duoFeatureSetLabels = - new HashMap<>() { - { - put("fsLabel1", "fsValue1"); - put("fsLabel2", "fsValue2"); - } - }; - FeatureSet featureSet5 = newDummyFeatureSet("f5", Project.DEFAULT_NAME); - FeatureSet featureSet6 = newDummyFeatureSet("f6", Project.DEFAULT_NAME); - FeatureSetSpec featureSetSpec5 = featureSet5.toProto().getSpec().toBuilder().build(); - FeatureSetSpec featureSetSpec6 = featureSet6.toProto().getSpec().toBuilder().build(); - FeatureSetProto.FeatureSet fs5 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - featureSetSpec5 - .toBuilder() - .setSource(defaultSource.toProto()) - .putAllLabels(singleFeatureSetLabels) - .build()) - .build(); - FeatureSetProto.FeatureSet fs6 = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - featureSetSpec6 - .toBuilder() - .setSource(defaultSource.toProto()) - .putAllLabels(duoFeatureSetLabels) - .build()) - .build(); - - Entity f7e1 = TestUtil.CreateEntity("f7e1", Enum.STRING); - Entity f9e1 = TestUtil.CreateEntity("f9e1", Enum.STRING); - Feature f7f1 = TestUtil.CreateFeature("f7f1", Enum.INT64, featureLabels1); - Feature f8f1 = TestUtil.CreateFeature("f8f1", Enum.INT64, featureLabels2); - FeatureSet featureSet7 = - TestUtil.CreateFeatureSet( - "f7", "project2", Arrays.asList(f7e1), Arrays.asList(f3f1, f3f2, f7f1)); - FeatureSet featureSet8 = - TestUtil.CreateFeatureSet("f8", "project2", Arrays.asList(f7e1), Arrays.asList(f3f1, f8f1)); - FeatureSet featureSet9 = - TestUtil.CreateFeatureSet("f9", "default", Arrays.asList(f9e1), Arrays.asList(f3f1, f8f1)); - features = Arrays.asList(dummyFeature, f3f1, f3f2, f7f1, f8f1); - - featureSets = - Arrays.asList( - featureSet1, - featureSet2, - featureSet3, - featureSet4, - FeatureSet.fromProto(fs5), - FeatureSet.fromProto(fs6), - featureSet7, - featureSet8, - featureSet9); - - when(featureSetRepository.findAll()).thenReturn(featureSets); - when(featureSetRepository.findAllByOrderByNameAsc()).thenReturn(featureSets); - when(featureSetRepository.findFeatureSetByNameAndProject_Name("f1", "project1")) - .thenReturn(featureSets.get(0)); - when(featureSetRepository.findFeatureSetByNameAndProject_Name("f2", "project1")) - .thenReturn(featureSets.get(1)); - when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAsc("f1", "project1")) - .thenReturn(featureSets.subList(0, 1)); - when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAsc("%", "default")) - .thenReturn(featureSets.subList(8, 9)); - when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAsc("%", "project2")) - .thenReturn(featureSets.subList(6, 8)); - when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAsc("asd", "project1")) - .thenReturn(Lists.newArrayList()); - when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAsc("f%", "project1")) - .thenReturn(featureSets); - when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "%")) - .thenReturn(featureSets); - - when(projectRepository.findAllByArchivedIsFalse()) - .thenReturn(Collections.singletonList(new Project("project1"))); - when(projectRepository.findById("project1")).thenReturn(Optional.of(new Project("project1"))); - Project archivedProject = new Project("archivedproject"); - archivedProject.setArchived(true); - when(projectRepository.findById(archivedProject.getName())) - .thenReturn(Optional.of(archivedProject)); - - Store store1 = newDummyStore("SERVING"); - Store store2 = newDummyStore("WAREHOUSE"); - stores = Arrays.asList(store1, store2); - when(storeRepository.findAll()).thenReturn(stores); - when(storeRepository.findById("SERVING")).thenReturn(Optional.of(store1)); - when(storeRepository.findById("NOTFOUND")).thenReturn(Optional.empty()); - - specService = - new SpecService(featureSetRepository, storeRepository, projectRepository, defaultSource); - - Feature invalidFeature1 = TestUtil.CreateFeature("created_timestamp", Enum.INT64); - FeatureSet invalidFeatureSet1 = - TestUtil.CreateFeatureSet( - "f1", "invalid", Arrays.asList(f3e1), Arrays.asList(invalidFeature1)); - invalidFeatureSets = Arrays.asList(invalidFeatureSet1); - } - - @Test - public void shouldGetAllFeatureSetsIfOnlyWildcardsProvided() - throws InvalidProtocolBufferException { - ListFeatureSetsResponse actual = - specService.listFeatureSets( - Filter.newBuilder().setFeatureSetName("*").setProject("*").build()); - List list = new ArrayList<>(); - for (FeatureSet featureSet : featureSets) { - FeatureSetProto.FeatureSet toProto = featureSet.toProto(); - list.add(toProto); - } - ListFeatureSetsResponse expected = - ListFeatureSetsResponse.newBuilder().addAllFeatureSets(list).build(); - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldGetAllFeatureSetsMatchingNameWithWildcardSearch() - throws InvalidProtocolBufferException { - ListFeatureSetsResponse actual = - specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f*").build()); - List expectedFeatureSets = - featureSets.stream() - .filter(fs -> fs.getName().startsWith("f")) - .collect(Collectors.toList()); - List list = new ArrayList<>(); - for (FeatureSet expectedFeatureSet : expectedFeatureSets) { - FeatureSetProto.FeatureSet toProto = expectedFeatureSet.toProto(); - list.add(toProto); - } - ListFeatureSetsResponse expected = - ListFeatureSetsResponse.newBuilder().addAllFeatureSets(list).build(); - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldGetFeatureSetsByNameAndProject() throws InvalidProtocolBufferException { - ListFeatureSetsResponse actual = - specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f1").build()); - List expectedFeatureSets = - featureSets.stream().filter(fs -> fs.getName().equals("f1")).collect(Collectors.toList()); - List list = new ArrayList<>(); - for (FeatureSet expectedFeatureSet : expectedFeatureSets) { - FeatureSetProto.FeatureSet toProto = expectedFeatureSet.toProto(); - list.add(toProto); - } - ListFeatureSetsResponse expected = - ListFeatureSetsResponse.newBuilder().addAllFeatureSets(list).build(); - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldThrowExceptionGivenMissingFeatureSetName() - throws InvalidProtocolBufferException { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("No feature set name provided"); - specService.getFeatureSet(GetFeatureSetRequest.newBuilder().build()); - } - - @Test - public void shouldThrowExceptionGivenReservedFeatureName() throws InvalidProtocolBufferException { - List reservedNames = - Arrays.asList("created_timestamp", "event_timestamp", "ingestion_id", "job_id"); - String reservedNamesString = StringUtils.join(reservedNames, ", "); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - String.format( - "Reserved feature names have been used, which are not allowed. These names include %s." - + "You've just used an invalid name, %s.", - reservedNamesString, "created_timestamp")); - FeatureSet invalidFeatureSet = invalidFeatureSets.get(0); - - specService.applyFeatureSet(invalidFeatureSet.toProto()); - } - - @Test - public void shouldThrowExceptionGivenMissingFeatureSet() throws InvalidProtocolBufferException { - expectedException.expect(RetrievalException.class); - expectedException.expectMessage("Feature set with name \"f1000\" could not be found."); - specService.getFeatureSet( - GetFeatureSetRequest.newBuilder().setName("f1000").setProject("project1").build()); - } - - @Test - public void shouldReturnAllStoresIfNoNameProvided() throws InvalidProtocolBufferException { - ListStoresResponse actual = - specService.listStores(ListStoresRequest.Filter.newBuilder().build()); - ListStoresResponse.Builder expected = ListStoresResponse.newBuilder(); - for (Store expectedStore : stores) { - expected.addStore(expectedStore.toProto()); - } - assertThat(actual, equalTo(expected.build())); - } - - @Test - public void shouldReturnStoreWithName() throws InvalidProtocolBufferException { - ListStoresResponse actual = - specService.listStores(ListStoresRequest.Filter.newBuilder().setName("SERVING").build()); - List expectedStores = - stores.stream().filter(s -> s.getName().equals("SERVING")).collect(Collectors.toList()); - ListStoresResponse.Builder expected = ListStoresResponse.newBuilder(); - for (Store expectedStore : expectedStores) { - expected.addStore(expectedStore.toProto()); - } - assertThat(actual, equalTo(expected.build())); - } - - @Test - public void shouldThrowRetrievalExceptionIfNoStoresFoundWithName() { - expectedException.expect(RetrievalException.class); - expectedException.expectMessage("Store with name 'NOTFOUND' not found"); - specService.listStores(ListStoresRequest.Filter.newBuilder().setName("NOTFOUND").build()); - } - - @Test - public void applyFeatureSetShouldReturnFeatureSetIfFeatureSetHasNotChanged() - throws InvalidProtocolBufferException { - FeatureSetSpec incomingFeatureSetSpec = - featureSets.get(0).toProto().getSpec().toBuilder().build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = - specService.applyFeatureSet( - FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSetSpec).build()); - - verify(featureSetRepository, times(0)).save(ArgumentMatchers.any(FeatureSet.class)); - assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.NO_CHANGE)); - assertThat(applyFeatureSetResponse.getFeatureSet(), equalTo(featureSets.get(0).toProto())); - } - - @Test - public void applyFeatureSetShouldApplyFeatureSetIfNotExists() - throws InvalidProtocolBufferException { - when(featureSetRepository.findFeatureSetByNameAndProject_Name("f2", "project1")) - .thenReturn(null); - - FeatureSetProto.FeatureSet incomingFeatureSet = newDummyFeatureSet("f2", "project1").toProto(); - - FeatureSetProto.FeatureSetSpec incomingFeatureSetSpec = - incomingFeatureSet.getSpec().toBuilder().build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = - specService.applyFeatureSet( - FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSet.getSpec()).build()); - verify(projectRepository).saveAndFlush(ArgumentMatchers.any(Project.class)); - - FeatureSetProto.FeatureSet expected = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec(incomingFeatureSetSpec.toBuilder().setSource(defaultSource.toProto()).build()) - .build(); - assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.CREATED)); - assertThat(applyFeatureSetResponse.getFeatureSet().getSpec(), equalTo(expected.getSpec())); - assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(1)); - } - - @Test - public void applyFeatureSetShouldUpdateAndSaveFeatureSetIfAlreadyExists() - throws InvalidProtocolBufferException { - FeatureSetProto.FeatureSet incomingFeatureSet = featureSets.get(0).toProto(); - incomingFeatureSet = - incomingFeatureSet - .toBuilder() - .setMeta(incomingFeatureSet.getMeta()) - .setSpec( - incomingFeatureSet - .getSpec() - .toBuilder() - .addFeatures( - FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) - .build()) - .build(); - - FeatureSetProto.FeatureSet expected = - incomingFeatureSet - .toBuilder() - .setMeta(incomingFeatureSet.getMeta().toBuilder().build()) - .setSpec( - incomingFeatureSet - .getSpec() - .toBuilder() - .setVersion(2) - .setSource(defaultSource.toProto()) - .build()) - .build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = - specService.applyFeatureSet(incomingFeatureSet); - verify(projectRepository).saveAndFlush(ArgumentMatchers.any(Project.class)); - assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.UPDATED)); - assertEquals( - FeatureSet.fromProto(applyFeatureSetResponse.getFeatureSet()), - FeatureSet.fromProto(expected)); - - assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(2)); - } - - @Test - public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() - throws InvalidProtocolBufferException { - - FeatureSet featureSet = featureSets.get(1); - List features = Lists.newArrayList(featureSet.getFeatures()); - Collections.shuffle(features); - featureSet.setFeatures(Set.copyOf(features)); - FeatureSetProto.FeatureSet incomingFeatureSet = featureSet.toProto(); - - ApplyFeatureSetResponse applyFeatureSetResponse = - specService.applyFeatureSet(incomingFeatureSet); - assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.NO_CHANGE)); - assertThat( - applyFeatureSetResponse.getFeatureSet().getSpec().getMaxAge(), - equalTo(incomingFeatureSet.getSpec().getMaxAge())); - assertThat( - applyFeatureSetResponse.getFeatureSet().getSpec().getEntities(0), - equalTo(incomingFeatureSet.getSpec().getEntities(0))); - assertThat( - applyFeatureSetResponse.getFeatureSet().getSpec().getName(), - equalTo(incomingFeatureSet.getSpec().getName())); - } - - @Test - public void applyFeatureSetShouldAcceptPresenceShapeAndDomainConstraints() - throws InvalidProtocolBufferException { - List entitySpecs = new ArrayList<>(); - entitySpecs.add(EntitySpec.newBuilder().setName("entity1").setValueType(Enum.INT64).build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity2").setValueType(Enum.INT64).build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity3").setValueType(Enum.FLOAT).build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity4").setValueType(Enum.STRING).build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity5").setValueType(Enum.BOOL).build()); - - List featureSpecs = new ArrayList<>(); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature1") - .setValueType(Enum.INT64) - .setPresence(FeaturePresence.getDefaultInstance()) - .setShape(FixedShape.getDefaultInstance()) - .setDomain("mydomain") - .build()); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature2") - .setValueType(Enum.INT64) - .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setIntDomain(IntDomain.getDefaultInstance()) - .build()); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature3") - .setValueType(Enum.FLOAT) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setFloatDomain(FloatDomain.getDefaultInstance()) - .build()); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature4") - .setValueType(Enum.STRING) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setStringDomain(StringDomain.getDefaultInstance()) - .build()); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature5") - .setValueType(Enum.BOOL) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setBoolDomain(BoolDomain.getDefaultInstance()) - .build()); - - FeatureSetSpec featureSetSpec = - FeatureSetSpec.newBuilder() - .setProject("project1") - .setName("featureSetWithConstraints") - .addAllEntities(entitySpecs) - .addAllFeatures(featureSpecs) - .build(); - FeatureSetProto.FeatureSet featureSet = - FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); - FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); - - // appliedEntitySpecs needs to be sorted because the list returned by specService may not - // follow the order in the request - List appliedEntitySpecs = new ArrayList<>(appliedFeatureSetSpec.getEntitiesList()); - appliedEntitySpecs.sort(Comparator.comparing(EntitySpec::getName)); - - // appliedFeatureSpecs needs to be sorted because the list returned by specService may not - // follow the order in the request - List appliedFeatureSpecs = - new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); - appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName)); - - assertEquals(appliedEntitySpecs, entitySpecs); - assertEquals(appliedFeatureSpecs, featureSpecs); - } - - @Test - public void applyFeatureSetShouldUpdateFeatureSetWhenConstraintsAreUpdated() - throws InvalidProtocolBufferException { - - // Map of constraint field name -> value, e.g. "shape" -> FixedShape object. - // If any of these fields are updated, SpecService should update the FeatureSet. - Map contraintUpdates = new HashMap<>(); - contraintUpdates.put("presence", FeaturePresence.newBuilder().setMinFraction(0.5).build()); - contraintUpdates.put( - "group_presence", FeaturePresenceWithinGroup.newBuilder().setRequired(true).build()); - contraintUpdates.put("shape", FixedShape.getDefaultInstance()); - contraintUpdates.put("value_count", ValueCount.newBuilder().setMin(2).build()); - contraintUpdates.put("domain", "new_domain"); - contraintUpdates.put("int_domain", IntDomain.newBuilder().setMax(100).build()); - contraintUpdates.put("float_domain", FloatDomain.newBuilder().setMin(-0.5f).build()); - contraintUpdates.put("string_domain", StringDomain.newBuilder().addValue("string1").build()); - contraintUpdates.put("bool_domain", BoolDomain.newBuilder().setFalseValue("falsy").build()); - contraintUpdates.put("struct_domain", StructDomain.getDefaultInstance()); - contraintUpdates.put("natural_language_domain", NaturalLanguageDomain.getDefaultInstance()); - contraintUpdates.put("image_domain", ImageDomain.getDefaultInstance()); - contraintUpdates.put("mid_domain", MIDDomain.getDefaultInstance()); - contraintUpdates.put("url_domain", URLDomain.getDefaultInstance()); - contraintUpdates.put( - "time_domain", TimeDomain.newBuilder().setStringFormat("string_format").build()); - contraintUpdates.put("time_of_day_domain", TimeOfDayDomain.getDefaultInstance()); - - for (Entry constraint : contraintUpdates.entrySet()) { - FeatureSet featureSet = newDummyFeatureSet("constraints", "project1"); - FeatureSetProto.FeatureSet existingFeatureSet = featureSet.toProto(); - when(featureSetRepository.findFeatureSetByNameAndProject_Name("constraints", "project1")) - .thenReturn(featureSet); - String name = constraint.getKey(); - Object value = constraint.getValue(); - FeatureSpec newFeatureSpec = - existingFeatureSet - .getSpec() - .getFeatures(0) - .toBuilder() - .setField(FeatureSpec.getDescriptor().findFieldByName(name), value) - .build(); - FeatureSetSpec newFeatureSetSpec = - existingFeatureSet.getSpec().toBuilder().setFeatures(0, newFeatureSpec).build(); - FeatureSetProto.FeatureSet newFeatureSet = - existingFeatureSet.toBuilder().setSpec(newFeatureSetSpec).build(); - - ApplyFeatureSetResponse response = specService.applyFeatureSet(newFeatureSet); - - assertEquals( - "Response should have CREATED status when field '" + name + "' is updated", - Status.UPDATED, - response.getStatus()); - assertEquals( - "Feature should have field '" + name + "' set correctly", - constraint.getValue(), - response - .getFeatureSet() - .getSpec() - .getFeatures(0) - .getField(FeatureSpec.getDescriptor().findFieldByName(name))); - } - } - - @Test - public void applyFeatureSetShouldCreateProjectWhenNotAlreadyExists() - throws InvalidProtocolBufferException { - Feature f3f1 = TestUtil.CreateFeature("f3f1", Enum.INT64); - Feature f3f2 = TestUtil.CreateFeature("f3f2", Enum.INT64); - Entity f3e1 = TestUtil.CreateEntity("f3e1", Enum.STRING); - FeatureSetProto.FeatureSet incomingFeatureSet = - TestUtil.CreateFeatureSet("f3", "project", Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)) - .toProto(); - - ApplyFeatureSetResponse applyFeatureSetResponse = - specService.applyFeatureSet(incomingFeatureSet); - assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.CREATED)); - assertThat( - applyFeatureSetResponse.getFeatureSet().getSpec().getProject(), - equalTo(incomingFeatureSet.getSpec().getProject())); - } - - @Test - public void imputeProjectNameShouldSetDefaultProjectIfUnspecified() - throws InvalidProtocolBufferException { - Feature f3f1 = TestUtil.CreateFeature("f3f1", Enum.INT64); - Feature f3f2 = TestUtil.CreateFeature("f3f2", Enum.INT64); - Entity f3e1 = TestUtil.CreateEntity("f3e1", Enum.STRING); - - // In protov3, unspecified project defaults to "" - FeatureSetProto.FeatureSet incomingFeatureSet = - TestUtil.CreateFeatureSet("f3", "", Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)) - .toProto(); - FeatureSetProto.FeatureSet imputedFeatureSet = - specService.imputeProjectName(incomingFeatureSet); - assertThat(imputedFeatureSet.getSpec().getProject(), equalTo(Project.DEFAULT_NAME)); - } - - @Test - public void applyFeatureSetShouldFailWhenProjectIsArchived() - throws InvalidProtocolBufferException { - Feature f3f1 = TestUtil.CreateFeature("f3f1", Enum.INT64); - Feature f3f2 = TestUtil.CreateFeature("f3f2", Enum.INT64); - Entity f3e1 = TestUtil.CreateEntity("f3e1", Enum.STRING); - FeatureSetProto.FeatureSet incomingFeatureSet = - TestUtil.CreateFeatureSet( - "f3", "archivedproject", Arrays.asList(f3e1), Arrays.asList(f3f2, f3f1)) - .toProto(); - - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Project is archived"); - specService.applyFeatureSet(incomingFeatureSet); - } - - @Test - public void applyFeatureSetShouldAcceptFeatureLabels() throws InvalidProtocolBufferException { - - Map featureLabels0 = - new HashMap<>() { - { - put("label1", "feast1"); - } - }; - - Map featureLabels1 = - new HashMap<>() { - { - put("label1", "feast1"); - put("label2", "feast2"); - } - }; - - List> featureLabels = new ArrayList<>(); - featureLabels.add(featureLabels0); - featureLabels.add(featureLabels1); - - List featureSpecs = new ArrayList<>(); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature1") - .setValueType(Enum.INT64) - .putAllLabels(featureLabels.get(0)) - .build()); - featureSpecs.add( - FeatureSpec.newBuilder() - .setName("feature2") - .setValueType(Enum.INT64) - .putAllLabels(featureLabels.get(1)) - .build()); - - FeatureSetSpec featureSetSpec = - FeatureSetSpec.newBuilder() - .setProject("project1") - .setName("featureSetWithConstraints") - .addAllFeatures(featureSpecs) - .build(); - FeatureSetProto.FeatureSet featureSet = - FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); - FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); - - // appliedFeatureSpecs needs to be sorted because the list returned by specService may not - // follow the order in the request - List appliedFeatureSpecs = - new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); - appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName)); - - var appliedFeatureSpecsLabels = - appliedFeatureSpecs.stream().map(e -> e.getLabelsMap()).collect(Collectors.toList()); - assertEquals(appliedFeatureSpecsLabels, featureLabels); - } - - @Test - public void applyFeatureSetShouldUpdateLabels() throws InvalidProtocolBufferException { - FeatureSpec updatedFeature = - FeatureSpec.newBuilder().setName("feature").setValueType(Enum.STRING).build(); - - FeatureSet featureSet = featureSets.get(0); - FeatureSetSpec featureSetSpec = featureSet.toProto().getSpec().toBuilder().build(); - Map featureSetLabels = - new HashMap<>() { - { - put("fsLabel1", "fsValue1"); - } - }; - - FeatureSetProto.FeatureSet incomingFeatureSet = - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - featureSetSpec - .toBuilder() - .setFeatures(0, updatedFeature) - .putAllLabels(featureSetLabels) - .build()) - .build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = - specService.applyFeatureSet(incomingFeatureSet); - FeatureSetProto.FeatureSet updatedFs = applyFeatureSetResponse.getFeatureSet(); - Map updatedFsLabels = updatedFs.getSpec().getLabelsMap(); - - Map updatedFeatureLabels = updatedFs.getSpec().getFeatures(0).getLabelsMap(); - Map emptyFeatureLabels = new HashMap<>(); - - assertEquals(featureSetLabels, updatedFsLabels); - assertEquals(emptyFeatureLabels, updatedFeatureLabels); - } - - @Test - public void applyFeatureSetShouldAcceptFeatureSetLabels() throws InvalidProtocolBufferException { - Map featureSetLabels = - new HashMap<>() { - { - put("description", "My precious feature set"); - } - }; - - FeatureSetSpec featureSetSpec = - FeatureSetSpec.newBuilder() - .setProject("project1") - .setName("preciousFeatureSet") - .putAllLabels(featureSetLabels) - .build(); - FeatureSetProto.FeatureSet featureSet = - FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); - - ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); - FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); - - var appliedLabels = appliedFeatureSetSpec.getLabelsMap(); - - assertEquals(featureSetLabels, appliedLabels); - } - - @Test - public void shouldFilterFeaturesByEntitiesAndLabels() throws InvalidProtocolBufferException { - // Case 1: Only filter by entities - List entities = new ArrayList<>(); - String currentProject = "project2"; - - entities.add("f7e1"); - ListFeaturesResponse actual1 = - specService.listFeatures( - ListFeaturesRequest.Filter.newBuilder() - .setProject(currentProject) - .addAllEntities(entities) - .build()); - Map expectedMap1 = - Map.ofEntries( - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(1).getName(), - features.get(1).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(2).getName(), - features.get(2).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(3).getName(), - features.get(3).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(7).getName() - + ":" - + features.get(1).getName(), - features.get(1).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(7).getName() - + ":" - + features.get(4).getName(), - features.get(4).toProto())); - ListFeaturesResponse expected1 = - ListFeaturesResponse.newBuilder().putAllFeatures(expectedMap1).build(); - - // Case 2: Filter by entities and labels - Map featureLabels1 = Map.ofEntries(Map.entry("key1", "val1")); - Map featureLabels2 = Map.ofEntries(Map.entry("key2", "val2")); - ListFeaturesResponse actual2 = - specService.listFeatures( - ListFeaturesRequest.Filter.newBuilder() - .setProject(currentProject) - .addAllEntities(entities) - .putAllLabels(featureLabels1) - .build()); - Map expectedMap2 = - Map.ofEntries( - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(3).getName(), - features.get(3).toProto())); - ListFeaturesResponse expected2 = - ListFeaturesResponse.newBuilder().putAllFeatures(expectedMap2).build(); - - // Case 3: Filter by labels - ListFeaturesResponse actual3 = - specService.listFeatures( - ListFeaturesRequest.Filter.newBuilder() - .setProject(currentProject) - .putAllLabels(featureLabels2) - .build()); - Map expectedMap3 = - Map.ofEntries( - Map.entry( - currentProject - + "/" - + featureSets.get(7).getName() - + ":" - + features.get(4).getName(), - features.get(4).toProto())); - ListFeaturesResponse expected3 = - ListFeaturesResponse.newBuilder().putAllFeatures(expectedMap3).build(); - - // Case 4: Filter by nothing, except project - ListFeaturesResponse actual4 = - specService.listFeatures( - ListFeaturesRequest.Filter.newBuilder().setProject(currentProject).build()); - Map expectedMap4 = - Map.ofEntries( - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(1).getName(), - features.get(1).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(2).getName(), - features.get(2).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(6).getName() - + ":" - + features.get(3).getName(), - features.get(3).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(7).getName() - + ":" - + features.get(1).getName(), - features.get(1).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(7).getName() - + ":" - + features.get(4).getName(), - features.get(4).toProto())); - ListFeaturesResponse expected4 = - ListFeaturesResponse.newBuilder().putAllFeatures(expectedMap4).build(); - - // Case 5: Filter by nothing; will use default project - currentProject = "default"; - ListFeaturesResponse actual5 = - specService.listFeatures(ListFeaturesRequest.Filter.newBuilder().build()); - Map expectedMap5 = - Map.ofEntries( - Map.entry( - currentProject - + "/" - + featureSets.get(8).getName() - + ":" - + features.get(1).getName(), - features.get(1).toProto()), - Map.entry( - currentProject - + "/" - + featureSets.get(8).getName() - + ":" - + features.get(4).getName(), - features.get(4).toProto())); - ListFeaturesResponse expected5 = - ListFeaturesResponse.newBuilder().putAllFeatures(expectedMap5).build(); - - assertThat(actual1, equalTo(expected1)); - assertThat(actual2, equalTo(expected2)); - assertThat(actual3, equalTo(expected3)); - assertThat(actual4, equalTo(expected4)); - assertThat(actual5, equalTo(expected5)); - } - - public void shouldFilterByFeatureSetLabels() throws InvalidProtocolBufferException { - List list = new ArrayList<>(); - ListFeatureSetsResponse actual1 = - specService.listFeatureSets( - Filter.newBuilder() - .setFeatureSetName("*") - .setProject("*") - .putLabels("fsLabel2", "fsValue2") - .build()); - list.add(featureSets.get(5).toProto()); - ListFeatureSetsResponse expected1 = - ListFeatureSetsResponse.newBuilder().addAllFeatureSets(list).build(); - - ListFeatureSetsResponse actual2 = - specService.listFeatureSets( - Filter.newBuilder() - .setFeatureSetName("*") - .setProject("*") - .putLabels("fsLabel1", "fsValue1") - .build()); - list.add(0, featureSets.get(4).toProto()); - ListFeatureSetsResponse expected2 = - ListFeatureSetsResponse.newBuilder().addAllFeatureSets(list).build(); - - assertThat(actual1, equalTo(expected1)); - assertThat(actual2, equalTo(expected2)); - } - - @Test - public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException { - when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0))); - StoreProto.Store newStore = - StoreProto.Store.newBuilder() - .setName("SERVING") - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder()) - .addSubscriptions(Subscription.newBuilder().setProject("project1").setName("a")) - .build(); - UpdateStoreResponse actual = - specService.updateStore(UpdateStoreRequest.newBuilder().setStore(newStore).build()); - UpdateStoreResponse expected = - UpdateStoreResponse.newBuilder() - .setStore(newStore) - .setStatus(UpdateStoreResponse.Status.UPDATED) - .build(); - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Store.class); - verify(storeRepository, times(1)).save(argumentCaptor.capture()); - assertThat(argumentCaptor.getValue().toProto(), equalTo(newStore)); - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldDoNothingIfNoChange() throws InvalidProtocolBufferException { - when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0))); - UpdateStoreResponse actual = - specService.updateStore( - UpdateStoreRequest.newBuilder().setStore(stores.get(0).toProto()).build()); - UpdateStoreResponse expected = - UpdateStoreResponse.newBuilder() - .setStore(stores.get(0).toProto()) - .setStatus(UpdateStoreResponse.Status.NO_CHANGE) - .build(); - verify(storeRepository, times(0)).save(ArgumentMatchers.any()); - assertThat(actual, equalTo(expected)); - } - - @Test - public void getOrListFeatureSetShouldUseDefaultProjectIfProjectUnspecified() - throws InvalidProtocolBufferException { - when(featureSetRepository.findFeatureSetByNameAndProject_Name("f4", Project.DEFAULT_NAME)) - .thenReturn(featureSets.get(3)); - FeatureSet expected = featureSets.get(3); - // check getFeatureSet() - GetFeatureSetResponse getResponse = - specService.getFeatureSet(GetFeatureSetRequest.newBuilder().setName("f4").build()); - assertThat(getResponse.getFeatureSet(), equalTo(expected.toProto())); - - // check listFeatureSets() - ListFeatureSetsResponse listResponse = - specService.listFeatureSets(Filter.newBuilder().setFeatureSetName("f4").build()); - assertThat(listResponse.getFeatureSetsList(), equalTo(Arrays.asList(expected.toProto()))); - } - - private FeatureSet newDummyFeatureSet(String name, String project) { - FeatureSpec f1 = - FeatureSpec.newBuilder() - .setName("feature") - .setValueType(Enum.STRING) - .putLabels("key", "value") - .build(); - Feature feature = Feature.fromProto(f1); - Entity entity = TestUtil.CreateEntity("entity", Enum.STRING); - - FeatureSet fs = - TestUtil.CreateFeatureSet(name, project, Arrays.asList(entity), Arrays.asList(feature)); - fs.setCreated(Date.from(Instant.ofEpochSecond(10L))); - return fs; - } - - private Store newDummyStore(String name) { - // Add type to this method when we enable filtering by type - Store store = new Store(); - store.setName(name); - store.setType(StoreType.REDIS.toString()); - store.setSubscriptions("*:*"); - store.setConfig(RedisConfig.newBuilder().setPort(6379).build().toByteArray()); - return store; - } -} diff --git a/core/src/test/resources/application-it.properties b/core/src/test/resources/application-it.properties index a275dfabb0..ccb339fa9a 100644 --- a/core/src/test/resources/application-it.properties +++ b/core/src/test/resources/application-it.properties @@ -26,5 +26,6 @@ spring.jpa.properties.hibernate.show_sql=false spring.jpa.hibernate.naming.physical-strategy=org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy spring.jpa.hibernate.ddl-auto=none +spring.datasource.hikari.maximum-pool-size=100 spring.main.allow-bean-definition-overriding=true From a3d9e12df9afddc87096deea81f67f7856df70e4 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 11 Aug 2020 16:57:09 +0800 Subject: [PATCH 5/6] resolve conflicts --- .../job/dataflow/DataflowJobManagerTest.java | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index fadc76d803..7b07d52fee 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -340,54 +340,4 @@ public void shouldRetrieveRunningJobsWithoutLabels() { List jobs = dfJobManager.listRunningJobs(); assertThat(jobs, hasItem(hasProperty("id", equalTo("kafka-to-redis")))); } - - @Test - @SneakyThrows - public void shouldRetrieveRunningJobsWithoutLabels() { - when(dataflow - .projects() - .locations() - .jobs() - .list("project", "region") - .setFilter("ACTIVE") - .execute()) - .thenReturn( - new ListJobsResponse() - .setJobs( - ImmutableList.of( - new com.google.api.services.dataflow.model.Job().setId("job-1")))); - - JsonFormat.Printer jsonPrinter = JsonFormat.printer(); - - // job with no labels - when(dataflow - .projects() - .locations() - .jobs() - .get("project", "region", "job-1") - .setView("JOB_VIEW_ALL") - .execute()) - .thenReturn( - new com.google.api.services.dataflow.model.Job() - .setId("job-1") - .setEnvironment( - new Environment() - .setSdkPipelineOptions( - ImmutableMap.of( - "options", - ImmutableMap.of( - "jobName", "kafka-to-redis", - "sourceJson", jsonPrinter.print(source), - "storesJson", ImmutableList.of(jsonPrinter.print(store))))))); - - MetricsProperties metricsProperties = new MetricsProperties(); - metricsProperties.setEnabled(false); - - dfJobManager = - new DataflowJobManager( - defaults, metricsProperties, specsStreamingUpdateConfig, ImmutableMap.of(), dataflow); - - List jobs = dfJobManager.listJobs(); - assertThat(jobs, hasItem(hasProperty("id", equalTo("kafka-to-redis")))); - } } From e055484d09fd04f7b2d2ec4e078f8d5f0e5b48df Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 11 Aug 2020 17:16:59 +0800 Subject: [PATCH 6/6] comments --- protos/feast/core/CoreService.proto | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protos/feast/core/CoreService.proto b/protos/feast/core/CoreService.proto index e926542d2a..22b3fb72e0 100644 --- a/protos/feast/core/CoreService.proto +++ b/protos/feast/core/CoreService.proto @@ -154,6 +154,8 @@ message ListFeatureSetsRequest { // Feature sets with all matching labels will be returned. map labels = 4; + // Filter by FeatureSet's current status + // Project and Feature Set name still must be specified (could be "*") FeatureSetStatus status = 5; } } @@ -356,7 +358,9 @@ message GetFeatureStatisticsResponse { } message UpdateFeatureSetStatusRequest { + // FeatureSetReference of FeatureSet to update FeatureSetReference reference = 1; + // Target status FeatureSetStatus status = 2; }