From 673df8e92ed9a3080b2de1a4cecf7dd270c2fa48 Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 23 Oct 2024 22:43:06 +0530 Subject: [PATCH 1/3] Dart implementation fix - the object storage client aren't serializable Most of the client implementation including GCS, is not serializable, so fixed this issue by making client implementation not part of the serialization, and when the client is passed over wire and the client doesn't exist, it initializes as and when it is required. // In a distributed system, we don't intend the client to be serialized and most of the implementations like // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f // is not serializable. The object probably contains or references non serializable fields. // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage --- .../udfs/factories/FunctionFactory.java | 19 +------- .../store/DartDataStoreClientProvider.java | 48 +++++++++++++++++++ .../dart/store/DefaultDartDataStore.java | 17 +++---- .../dart/store/DefaultDartDataStoreTest.java | 9 ++-- 4 files changed, 62 insertions(+), 31 deletions(-) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java index 97490d1f1..9588a1c44 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java @@ -3,6 +3,7 @@ import com.gotocompany.dagger.functions.common.Constants; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClientProvider; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; @@ -148,23 +149,7 @@ private DartDataStore getDartDataSource() { bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT); } - DartDataStoreClient dartDataStoreClient; - switch (udfStoreProvider) { - case Constants.UDF_STORE_PROVIDER_GCS: - dartDataStoreClient = new GcsDartClient(projectID); - break; - case Constants.UDF_STORE_PROVIDER_OSS: - // TODO Check if OSS SDK supports projectID selection - dartDataStoreClient = new OssDartClient(); - break; - case Constants.UDF_STORE_PROVIDER_COS: - // TODO Check if COS SDK supports projectID selection - dartDataStoreClient = new CosDartClient(); - break; - default: - throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider); - } - return new DefaultDartDataStore(dartDataStoreClient, bucketID); + return new DefaultDartDataStore(new DartDataStoreClientProvider(udfStoreProvider, projectID), bucketID); } private LinkedHashMap getProtosInInputStreams() { diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java new file mode 100644 index 000000000..3a9572f22 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProvider.java @@ -0,0 +1,48 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; + +import com.gotocompany.dagger.functions.common.Constants; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; + +import java.io.Serializable; + +public class DartDataStoreClientProvider implements Serializable { + private final String udfStoreProvider; + private final String projectID; + + // Do not make this final, if so then the implementation of client should be Serializable + private DartDataStoreClient dartDataStoreClient; + + public DartDataStoreClientProvider(String udfStoreProvider, String projectID) { + this.udfStoreProvider = udfStoreProvider; + this.projectID = projectID; + } + + public DartDataStoreClient getDartDataStoreClient() { + // In a distributed system, we don't intend the client to be serialized and most of the implementations like + // GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error + // Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f + // is not serializable. The object probably contains or references non serializable fields. + // Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage + if (dartDataStoreClient != null) { + return dartDataStoreClient; + } + switch (udfStoreProvider) { + case Constants.UDF_STORE_PROVIDER_GCS: + dartDataStoreClient = new GcsDartClient(projectID); + break; + case Constants.UDF_STORE_PROVIDER_OSS: + // TODO Check if OSS SDK supports projectID selection + dartDataStoreClient = new OssDartClient(); + break; + case Constants.UDF_STORE_PROVIDER_COS: + // TODO Check if COS SDK supports projectID selection + dartDataStoreClient = new CosDartClient(); + break; + default: + throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider); + } + return dartDataStoreClient; + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java index a1e61b613..7a9f814fc 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStore.java @@ -7,7 +7,6 @@ import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache; import com.gotocompany.dagger.functions.udfs.scalar.DartContains; import com.gotocompany.dagger.functions.udfs.scalar.DartGet; -import lombok.Getter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -30,19 +29,17 @@ public class DefaultDartDataStore implements DartDataStore, Serializable { public static final String DART_GET_DIRECTORY = "dart-get/"; public static final String DART_CONTAINS_DIRECTORY = "dart-contains/"; + private final DartDataStoreClientProvider clientProvider; private final String bucketId; - @Getter - private final DartDataStoreClient storeClient; - /** * Instantiates a new data store. * - * @param storeClient a {@link DartDataStoreClient} implementation for the respective object storage provider - * @param bucketId the bucket id + * @param clientProvider a {@link DartDataStoreClient} implementation for the respective object storage provider + * @param bucketId the bucket id */ - public DefaultDartDataStore(DartDataStoreClient storeClient, String bucketId) { - this.storeClient = storeClient; + public DefaultDartDataStore(DartDataStoreClientProvider clientProvider, String bucketId) { + this.clientProvider = clientProvider; this.bucketId = bucketId; } @@ -58,7 +55,7 @@ public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, Gaug } private Map getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - String jsonData = getStoreClient().fetchJsonData( + String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData( DartGet.class.getSimpleName(), gaugeManager, this.bucketId, @@ -77,7 +74,7 @@ private Map getMapOfObjects(String dartName, MeterStatsManager m } private Set getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) { - String jsonData = getStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName); + String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName); ObjectMapper mapper = new ObjectMapper(); try { ObjectNode node = (ObjectNode) mapper.readTree(jsonData); diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java index d17f23eaf..eb2f95d70 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DefaultDartDataStoreTest.java @@ -35,13 +35,14 @@ public class DefaultDartDataStoreTest { @Before public void setUp() { - defaultDartDataStore = mock(DefaultDartDataStore.class); + // Subject + DartDataStoreClientProvider dartDataStoreClientProvider = mock(DartDataStoreClientProvider.class); + defaultDartDataStore = new DefaultDartDataStore(dartDataStoreClientProvider, "test-bucket"); + gcsDartClient = mock(GcsDartClient.class); meterStatsManager = mock(MeterStatsManager.class); gaugeStatsManager = mock(GaugeStatsManager.class); - when(defaultDartDataStore.getSet(anyString(), any(), any())).thenCallRealMethod(); - when(defaultDartDataStore.getMap(anyString(), any(), any())).thenCallRealMethod(); - when(defaultDartDataStore.getStoreClient()).thenReturn(gcsDartClient); + when(dartDataStoreClientProvider.getDartDataStoreClient()).thenReturn(gcsDartClient); listContent = Arrays.asList("listContent"); mapContent = Collections.singletonMap("key", "value"); } From 85b6455f5f081d3ef41cb8b99430d683ad76bd8c Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 23 Oct 2024 23:11:41 +0530 Subject: [PATCH 2/3] checkstyle fix --- .../dagger/functions/udfs/factories/FunctionFactory.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java index 9588a1c44..4a2709a48 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/factories/FunctionFactory.java @@ -2,12 +2,8 @@ import com.gotocompany.dagger.functions.common.Constants; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClientProvider; import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; -import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import com.google.gson.Gson; From 5d37d29d6a6168b5db1aa9d5f8202a19b65a1a1b Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 23 Oct 2024 23:54:34 +0530 Subject: [PATCH 3/3] Add unit tests for DartDataStoreClientProvider --- .../DartDataStoreClientProviderTest.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java new file mode 100644 index 000000000..f706c776c --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/scalar/dart/store/DartDataStoreClientProviderTest.java @@ -0,0 +1,68 @@ +package com.gotocompany.dagger.functions.udfs.scalar.dart.store; + +import com.gotocompany.dagger.functions.common.Constants; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient; +import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class DartDataStoreClientProviderTest { + private DartDataStoreClientProvider dartDataStoreClientProvider; + + @Before + public void setUp() { + dartDataStoreClientProvider = null; + } + + @Test + public void shouldReturnGcsDartClientWhenUdfStoreProviderIsGcs() { + String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS; + String projectID = "test-project"; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID); + + DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient(); + + assertTrue(client instanceof GcsDartClient); + } + + @Test + public void shouldReturnOssDartClientWhenUdfStoreProviderIsOss() { + String udfStoreProvider = Constants.UDF_STORE_PROVIDER_OSS; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null); + DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient(); + + assertTrue(client instanceof OssDartClient); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionForUnknownUdfStoreProvider() { + String udfStoreProvider = "UNKNOWN-PROVIDER"; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null); + + try { + dartDataStoreClientProvider.getDartDataStoreClient(); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Unknown UDF Store Provider: UNKNOWN-PROVIDER", e.getMessage()); + throw e; + } + } + + @Test + public void shouldReturnSameClientOnSubsequentCalls() { + String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS; + String projectID = "test-project"; + + dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID); + + DartDataStoreClient firstClient = dartDataStoreClientProvider.getDartDataStoreClient(); + DartDataStoreClient secondClient = dartDataStoreClientProvider.getDartDataStoreClient(); + + Assert.assertEquals(firstClient, secondClient); + } +}