Skip to content

Commit

Permalink
Allow submission of kafka jobs (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc authored and feast-ci-bot committed Jan 21, 2019
1 parent c9e20d4 commit 986381d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 25 deletions.
46 changes: 30 additions & 16 deletions core/src/main/java/feast/core/validators/SpecValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package feast.core.validators;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static feast.core.validators.Matchers.checkLowerSnakeCase;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import feast.core.dao.EntityInfoRepository;
Expand All @@ -35,36 +39,28 @@
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static feast.core.validators.Matchers.checkLowerSnakeCase;
import org.springframework.beans.factory.annotation.Autowired;

public class SpecValidator {

private StorageInfoRepository storageInfoRepository;
private EntityInfoRepository entityInfoRepository;
private FeatureGroupInfoRepository featureGroupInfoRepository;
private FeatureInfoRepository featureInfoRepository;
private static final String FILE_ERROR_STORE_TYPE = "file.json";

private static final String NO_STORE = "";

private static String[] SUPPORTED_WAREHOUSE_STORES =
new String[] {
new String[]{
BigQueryStorageManager.TYPE, FILE_ERROR_STORE_TYPE,
};

private static String[] SUPPORTED_SERVING_STORES =
new String[] {
new String[]{
BigTableStorageManager.TYPE, PostgresStorageManager.TYPE, RedisStorageManager.TYPE,
};
private StorageInfoRepository storageInfoRepository;
private EntityInfoRepository entityInfoRepository;
private FeatureGroupInfoRepository featureGroupInfoRepository;
private FeatureInfoRepository featureInfoRepository;

@Autowired
public SpecValidator(
Expand Down Expand Up @@ -130,7 +126,8 @@ public void validateFeatureSpec(FeatureSpec spec) throws IllegalArgumentExceptio
servingStoreId =
servingStoreId.equals(NO_STORE) ? group.getServingStore().getId() : servingStoreId;
warehouseStoreId =
warehouseStoreId.equals(NO_STORE) ? group.getWarehouseStore().getId() : warehouseStoreId;
warehouseStoreId.equals(NO_STORE) ? group.getWarehouseStore().getId()
: warehouseStoreId;
}
Optional<StorageInfo> servingStore = storageInfoRepository.findById(servingStoreId);
Optional<StorageInfo> warehouseStore = storageInfoRepository.findById(warehouseStoreId);
Expand Down Expand Up @@ -221,6 +218,9 @@ public void validateStorageSpec(StorageSpec spec) throws IllegalArgumentExceptio
public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException {
try {
switch (spec.getType()) {
case "kafka":
checkKafkaImportSpecOption(spec);
break;
case "pubsub":
checkPubSubImportSpecOption(spec);
break;
Expand Down Expand Up @@ -262,6 +262,20 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException
}
}

private void checkKafkaImportSpecOption(ImportSpec spec) {
try {
String topics = spec.getOptionsOrDefault("topics", "");
String server = spec.getOptionsOrDefault("server", "");
if (topics.equals("") && server.equals("")) {
throw new IllegalArgumentException(
"Kafka ingestion requires either topics or servers");
}
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
}
}

private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
checkArgument(!spec.getOptionsOrDefault("path", "").equals(""), "File path cannot be empty");
Expand Down
75 changes: 66 additions & 9 deletions core/src/test/java/feast/core/validators/SpecValidatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
import org.mockito.Mockito;

public class SpecValidatorTest {

@Rule
public final ExpectedException exception = ExpectedException.none();
private FeatureInfoRepository featureInfoRepository;
private FeatureGroupInfoRepository featureGroupInfoRepository;
private EntityInfoRepository entityInfoRepository;
private StorageInfoRepository storageInfoRepository;

@Rule public final ExpectedException exception = ExpectedException.none();

@Before
public void setUp() {
featureInfoRepository = Mockito.mock(FeatureInfoRepository.class);
Expand Down Expand Up @@ -369,7 +370,7 @@ public void featureSpecWithoutExistingWarehouseStoreShouldThrowIllegalArgumentEx
StorageInfo redis1 = new StorageInfo();
redis1.setId(servingStoreId);
redis1.setType("redis");
when(storageInfoRepository.findById( servingStoreId)).thenReturn(Optional.of(redis1));
when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(redis1));

SpecValidator validator =
new SpecValidator(
Expand All @@ -392,7 +393,8 @@ public void featureSpecWithoutExistingWarehouseStoreShouldThrowIllegalArgumentEx
.setDataStores(dataStores)
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(String.format("Warehouse store with id %s does not exist", warehouseStoreId));
exception.expectMessage(
String.format("Warehouse store with id %s does not exist", warehouseStoreId));
validator.validateFeatureSpec(input);
}

Expand All @@ -405,7 +407,7 @@ public void featureSpecWithoutWarehouseStoreShouldBeAllowed() {
StorageInfo redis1 = new StorageInfo();
redis1.setId(servingStoreId);
redis1.setType("redis");
when(storageInfoRepository.findById( servingStoreId)).thenReturn(Optional.of(redis1));
when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(redis1));

SpecValidator validator =
new SpecValidator(
Expand All @@ -432,18 +434,21 @@ public void featureSpecWithoutWarehouseStoreShouldBeAllowed() {
@Test
public void featureSpecWithUnsupportedWarehouseStoreShouldThrowIllegalArgumentException() {
String servingStoreId = "REDIS1";
StorageSpec servingStoreSpec = StorageSpec.newBuilder().setId(servingStoreId).setType("redis").build();
StorageSpec servingStoreSpec = StorageSpec.newBuilder().setId(servingStoreId).setType("redis")
.build();
StorageInfo servingStoreInfo = new StorageInfo(servingStoreSpec);

String warehouseStoreId = "REDIS2";
StorageSpec warehouseStoreSpec = StorageSpec.newBuilder().setId(warehouseStoreId).setType("redis").build();
StorageSpec warehouseStoreSpec = StorageSpec.newBuilder().setId(warehouseStoreId)
.setType("redis").build();
StorageInfo warehouseStoreInfo = new StorageInfo(warehouseStoreSpec);

when(entityInfoRepository.existsById("entity")).thenReturn(true);
when(storageInfoRepository.existsById(servingStoreId)).thenReturn(true);
when(storageInfoRepository.existsById(warehouseStoreId)).thenReturn(true);
when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(servingStoreInfo));
when(storageInfoRepository.findById(warehouseStoreId)).thenReturn(Optional.of(warehouseStoreInfo));
when(storageInfoRepository.findById(warehouseStoreId))
.thenReturn(Optional.of(warehouseStoreInfo));
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
Expand Down Expand Up @@ -488,7 +493,8 @@ public void featureSpecWithUnsupportedServingStoreShouldThrowIllegalArgumentExce
when(entityInfoRepository.existsById("entity")).thenReturn(true);
when(storageInfoRepository.existsById(servingStoreName)).thenReturn(true);
when(storageInfoRepository.existsById(warehouseStorageName)).thenReturn(true);
when(storageInfoRepository.findById(servingStoreName)).thenReturn(Optional.of(redis1StorageInfo));
when(storageInfoRepository.findById(servingStoreName))
.thenReturn(Optional.of(redis1StorageInfo));
when(storageInfoRepository.findById(warehouseStorageName)).thenReturn(Optional.of(bqInfo));
SpecValidator validator =
new SpecValidator(
Expand Down Expand Up @@ -778,4 +784,55 @@ public void importSpecWithUnregisteredFeaturesShouldThrowIllegalArgumentExceptio
"Validation for import spec failed: Feature some_nonexistent_feature not registered");
validator.validateImportSpec(input);
}

@Test
public void importSpecWithKafkaSourceAndCorrectOptionsShouldPassValidation() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putOptions("topics", "my-kafka-topic")
.putOptions("server", "localhost:54321")
.setSchema(schema)
.addEntities("someEntity")
.build();
validator.validateImportSpec(input);
}

@Test
public void importSpecWithKafkaSourceWithoutOptionsShouldThrowIllegalArgumentException() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.setSchema(schema)
.addEntities("someEntity")
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
"Validation for import spec failed: Invalid options: Kafka ingestion requires either topics or servers");
validator.validateImportSpec(input);
}
}

0 comments on commit 986381d

Please sign in to comment.