From 986381dea069ca33db69de055207ae211332da33 Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Mon, 21 Jan 2019 10:13:59 +0800 Subject: [PATCH] Allow submission of kafka jobs (#94) --- .../feast/core/validators/SpecValidator.java | 46 ++++++++---- .../core/validators/SpecValidatorTest.java | 75 ++++++++++++++++--- 2 files changed, 96 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/feast/core/validators/SpecValidator.java b/core/src/main/java/feast/core/validators/SpecValidator.java index 743f21e1b0..c8500ac331 100644 --- a/core/src/main/java/feast/core/validators/SpecValidator.java +++ b/core/src/main/java/feast/core/validators/SpecValidator.java @@ -17,6 +17,10 @@ package feast.core.validators; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static feast.core.validators.Matchers.checkLowerSnakeCase; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import feast.core.dao.EntityInfoRepository; @@ -35,36 +39,28 @@ import feast.specs.ImportSpecProto.Field; import feast.specs.ImportSpecProto.ImportSpec; import feast.specs.StorageSpecProto.StorageSpec; -import org.springframework.beans.factory.annotation.Autowired; - import java.util.Arrays; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static feast.core.validators.Matchers.checkLowerSnakeCase; +import org.springframework.beans.factory.annotation.Autowired; public class SpecValidator { - private StorageInfoRepository storageInfoRepository; - private EntityInfoRepository entityInfoRepository; - private FeatureGroupInfoRepository featureGroupInfoRepository; - private FeatureInfoRepository featureInfoRepository; private static final String FILE_ERROR_STORE_TYPE = "file.json"; - private static final String NO_STORE = ""; - private static String[] SUPPORTED_WAREHOUSE_STORES = - new String[] { + new String[]{ BigQueryStorageManager.TYPE, FILE_ERROR_STORE_TYPE, }; - private static String[] SUPPORTED_SERVING_STORES = - new String[] { + new String[]{ BigTableStorageManager.TYPE, PostgresStorageManager.TYPE, RedisStorageManager.TYPE, }; + private StorageInfoRepository storageInfoRepository; + private EntityInfoRepository entityInfoRepository; + private FeatureGroupInfoRepository featureGroupInfoRepository; + private FeatureInfoRepository featureInfoRepository; @Autowired public SpecValidator( @@ -130,7 +126,8 @@ public void validateFeatureSpec(FeatureSpec spec) throws IllegalArgumentExceptio servingStoreId = servingStoreId.equals(NO_STORE) ? group.getServingStore().getId() : servingStoreId; warehouseStoreId = - warehouseStoreId.equals(NO_STORE) ? group.getWarehouseStore().getId() : warehouseStoreId; + warehouseStoreId.equals(NO_STORE) ? group.getWarehouseStore().getId() + : warehouseStoreId; } Optional servingStore = storageInfoRepository.findById(servingStoreId); Optional warehouseStore = storageInfoRepository.findById(warehouseStoreId); @@ -221,6 +218,9 @@ public void validateStorageSpec(StorageSpec spec) throws IllegalArgumentExceptio public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException { try { switch (spec.getType()) { + case "kafka": + checkKafkaImportSpecOption(spec); + break; case "pubsub": checkPubSubImportSpecOption(spec); break; @@ -262,6 +262,20 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException } } + private void checkKafkaImportSpecOption(ImportSpec spec) { + try { + String topics = spec.getOptionsOrDefault("topics", ""); + String server = spec.getOptionsOrDefault("server", ""); + if (topics.equals("") && server.equals("")) { + throw new IllegalArgumentException( + "Kafka ingestion requires either topics or servers"); + } + } catch (NullPointerException | IllegalArgumentException e) { + throw new IllegalArgumentException( + Strings.lenientFormat("Invalid options: %s", e.getMessage())); + } + } + private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException { try { checkArgument(!spec.getOptionsOrDefault("path", "").equals(""), "File path cannot be empty"); diff --git a/core/src/test/java/feast/core/validators/SpecValidatorTest.java b/core/src/test/java/feast/core/validators/SpecValidatorTest.java index e09e31c6d0..0c666a11a6 100644 --- a/core/src/test/java/feast/core/validators/SpecValidatorTest.java +++ b/core/src/test/java/feast/core/validators/SpecValidatorTest.java @@ -44,13 +44,14 @@ import org.mockito.Mockito; public class SpecValidatorTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); private FeatureInfoRepository featureInfoRepository; private FeatureGroupInfoRepository featureGroupInfoRepository; private EntityInfoRepository entityInfoRepository; private StorageInfoRepository storageInfoRepository; - @Rule public final ExpectedException exception = ExpectedException.none(); - @Before public void setUp() { featureInfoRepository = Mockito.mock(FeatureInfoRepository.class); @@ -369,7 +370,7 @@ public void featureSpecWithoutExistingWarehouseStoreShouldThrowIllegalArgumentEx StorageInfo redis1 = new StorageInfo(); redis1.setId(servingStoreId); redis1.setType("redis"); - when(storageInfoRepository.findById( servingStoreId)).thenReturn(Optional.of(redis1)); + when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(redis1)); SpecValidator validator = new SpecValidator( @@ -392,7 +393,8 @@ public void featureSpecWithoutExistingWarehouseStoreShouldThrowIllegalArgumentEx .setDataStores(dataStores) .build(); exception.expect(IllegalArgumentException.class); - exception.expectMessage(String.format("Warehouse store with id %s does not exist", warehouseStoreId)); + exception.expectMessage( + String.format("Warehouse store with id %s does not exist", warehouseStoreId)); validator.validateFeatureSpec(input); } @@ -405,7 +407,7 @@ public void featureSpecWithoutWarehouseStoreShouldBeAllowed() { StorageInfo redis1 = new StorageInfo(); redis1.setId(servingStoreId); redis1.setType("redis"); - when(storageInfoRepository.findById( servingStoreId)).thenReturn(Optional.of(redis1)); + when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(redis1)); SpecValidator validator = new SpecValidator( @@ -432,18 +434,21 @@ public void featureSpecWithoutWarehouseStoreShouldBeAllowed() { @Test public void featureSpecWithUnsupportedWarehouseStoreShouldThrowIllegalArgumentException() { String servingStoreId = "REDIS1"; - StorageSpec servingStoreSpec = StorageSpec.newBuilder().setId(servingStoreId).setType("redis").build(); + StorageSpec servingStoreSpec = StorageSpec.newBuilder().setId(servingStoreId).setType("redis") + .build(); StorageInfo servingStoreInfo = new StorageInfo(servingStoreSpec); String warehouseStoreId = "REDIS2"; - StorageSpec warehouseStoreSpec = StorageSpec.newBuilder().setId(warehouseStoreId).setType("redis").build(); + StorageSpec warehouseStoreSpec = StorageSpec.newBuilder().setId(warehouseStoreId) + .setType("redis").build(); StorageInfo warehouseStoreInfo = new StorageInfo(warehouseStoreSpec); when(entityInfoRepository.existsById("entity")).thenReturn(true); when(storageInfoRepository.existsById(servingStoreId)).thenReturn(true); when(storageInfoRepository.existsById(warehouseStoreId)).thenReturn(true); when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(servingStoreInfo)); - when(storageInfoRepository.findById(warehouseStoreId)).thenReturn(Optional.of(warehouseStoreInfo)); + when(storageInfoRepository.findById(warehouseStoreId)) + .thenReturn(Optional.of(warehouseStoreInfo)); SpecValidator validator = new SpecValidator( storageInfoRepository, @@ -488,7 +493,8 @@ public void featureSpecWithUnsupportedServingStoreShouldThrowIllegalArgumentExce when(entityInfoRepository.existsById("entity")).thenReturn(true); when(storageInfoRepository.existsById(servingStoreName)).thenReturn(true); when(storageInfoRepository.existsById(warehouseStorageName)).thenReturn(true); - when(storageInfoRepository.findById(servingStoreName)).thenReturn(Optional.of(redis1StorageInfo)); + when(storageInfoRepository.findById(servingStoreName)) + .thenReturn(Optional.of(redis1StorageInfo)); when(storageInfoRepository.findById(warehouseStorageName)).thenReturn(Optional.of(bqInfo)); SpecValidator validator = new SpecValidator( @@ -778,4 +784,55 @@ public void importSpecWithUnregisteredFeaturesShouldThrowIllegalArgumentExceptio "Validation for import spec failed: Feature some_nonexistent_feature not registered"); validator.validateImportSpec(input); } + + @Test + public void importSpecWithKafkaSourceAndCorrectOptionsShouldPassValidation() { + SpecValidator validator = + new SpecValidator( + storageInfoRepository, + entityInfoRepository, + featureGroupInfoRepository, + featureInfoRepository); + when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true); + when(entityInfoRepository.existsById("someEntity")).thenReturn(true); + Schema schema = + Schema.newBuilder() + .addFields(Field.newBuilder().setFeatureId("some_existing_feature").build()) + .build(); + ImportSpec input = + ImportSpec.newBuilder() + .setType("kafka") + .putOptions("topics", "my-kafka-topic") + .putOptions("server", "localhost:54321") + .setSchema(schema) + .addEntities("someEntity") + .build(); + validator.validateImportSpec(input); + } + + @Test + public void importSpecWithKafkaSourceWithoutOptionsShouldThrowIllegalArgumentException() { + SpecValidator validator = + new SpecValidator( + storageInfoRepository, + entityInfoRepository, + featureGroupInfoRepository, + featureInfoRepository); + when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true); + when(entityInfoRepository.existsById("someEntity")).thenReturn(true); + Schema schema = + Schema.newBuilder() + .addFields(Field.newBuilder().setFeatureId("some_existing_feature").build()) + .build(); + ImportSpec input = + ImportSpec.newBuilder() + .setType("kafka") + .setSchema(schema) + .addEntities("someEntity") + .build(); + exception.expect(IllegalArgumentException.class); + exception.expectMessage( + "Validation for import spec failed: Invalid options: Kafka ingestion requires either topics or servers"); + validator.validateImportSpec(input); + } }