Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dart implementation fix - the object storage client aren't serializable #47

Open
wants to merge 3 commits into
base: cosDataStore
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClientProvider;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import com.google.gson.Gson;
Expand Down Expand Up @@ -148,23 +145,7 @@ private DartDataStore getDartDataSource() {
bucketID = getConfiguration().getString(Constants.UDF_DART_GCS_BUCKET_ID_KEY, Constants.UDF_DART_GCS_BUCKET_ID_DEFAULT);
}

DartDataStoreClient dartDataStoreClient;
switch (udfStoreProvider) {
case Constants.UDF_STORE_PROVIDER_GCS:
dartDataStoreClient = new GcsDartClient(projectID);
break;
case Constants.UDF_STORE_PROVIDER_OSS:
// TODO Check if OSS SDK supports projectID selection
dartDataStoreClient = new OssDartClient();
break;
case Constants.UDF_STORE_PROVIDER_COS:
// TODO Check if COS SDK supports projectID selection
dartDataStoreClient = new CosDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
return new DefaultDartDataStore(dartDataStoreClient, bucketID);
return new DefaultDartDataStore(new DartDataStoreClientProvider(udfStoreProvider, projectID), bucketID);
}

private LinkedHashMap<String, String> getProtosInInputStreams() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store;

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;

import java.io.Serializable;

public class DartDataStoreClientProvider implements Serializable {
private final String udfStoreProvider;
private final String projectID;

// Do not make this final, if so then the implementation of client should be Serializable
private DartDataStoreClient dartDataStoreClient;

public DartDataStoreClientProvider(String udfStoreProvider, String projectID) {
this.udfStoreProvider = udfStoreProvider;
this.projectID = projectID;
}

public DartDataStoreClient getDartDataStoreClient() {
// In a distributed system, we don't intend the client to be serialized and most of the implementations like
// GCP Storage implementation doesn't implement java.io.Serializable interface and you may see the below error
// Caused by: org.apache.flink.api.common.InvalidProgramException: com.google.api.services.storage.Storage@1c666a8f
// is not serializable. The object probably contains or references non serializable fields.
// Caused by: java.io.NotSerializableException: com.google.api.services.storage.Storage
if (dartDataStoreClient != null) {
return dartDataStoreClient;
}
switch (udfStoreProvider) {
case Constants.UDF_STORE_PROVIDER_GCS:
dartDataStoreClient = new GcsDartClient(projectID);
break;
case Constants.UDF_STORE_PROVIDER_OSS:
// TODO Check if OSS SDK supports projectID selection
dartDataStoreClient = new OssDartClient();
break;
case Constants.UDF_STORE_PROVIDER_COS:
// TODO Check if COS SDK supports projectID selection
dartDataStoreClient = new CosDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
return dartDataStoreClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.gotocompany.dagger.functions.udfs.scalar.dart.types.SetCache;
import com.gotocompany.dagger.functions.udfs.scalar.DartContains;
import com.gotocompany.dagger.functions.udfs.scalar.DartGet;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -30,19 +29,17 @@ public class DefaultDartDataStore implements DartDataStore, Serializable {
public static final String DART_GET_DIRECTORY = "dart-get/";
public static final String DART_CONTAINS_DIRECTORY = "dart-contains/";

private final DartDataStoreClientProvider clientProvider;
private final String bucketId;

@Getter
private final DartDataStoreClient storeClient;

/**
* Instantiates a new data store.
*
* @param storeClient a {@link DartDataStoreClient} implementation for the respective object storage provider
* @param bucketId the bucket id
* @param clientProvider a {@link DartDataStoreClient} implementation for the respective object storage provider
* @param bucketId the bucket id
*/
public DefaultDartDataStore(DartDataStoreClient storeClient, String bucketId) {
this.storeClient = storeClient;
public DefaultDartDataStore(DartDataStoreClientProvider clientProvider, String bucketId) {
this.clientProvider = clientProvider;
this.bucketId = bucketId;
}

Expand All @@ -58,7 +55,7 @@ public MapCache getMap(String mapName, MeterStatsManager meterStatsManager, Gaug
}

private Map<String, String> getMapOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) {
String jsonData = getStoreClient().fetchJsonData(
String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData(
DartGet.class.getSimpleName(),
gaugeManager,
this.bucketId,
Expand All @@ -77,7 +74,7 @@ private Map<String, String> getMapOfObjects(String dartName, MeterStatsManager m
}

private Set<String> getSetOfObjects(String dartName, MeterStatsManager meterManager, GaugeStatsManager gaugeManager) {
String jsonData = getStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName);
String jsonData = clientProvider.getDartDataStoreClient().fetchJsonData(DartContains.class.getSimpleName(), gaugeManager, this.bucketId, DART_CONTAINS_DIRECTORY + dartName);
ObjectMapper mapper = new ObjectMapper();
try {
ObjectNode node = (ObjectNode) mapper.readTree(jsonData);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store;

import com.gotocompany.dagger.functions.common.Constants;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

public class DartDataStoreClientProviderTest {
private DartDataStoreClientProvider dartDataStoreClientProvider;

@Before
public void setUp() {
dartDataStoreClientProvider = null;
}

@Test
public void shouldReturnGcsDartClientWhenUdfStoreProviderIsGcs() {
String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS;
String projectID = "test-project";

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID);

DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient();

assertTrue(client instanceof GcsDartClient);
}

@Test
public void shouldReturnOssDartClientWhenUdfStoreProviderIsOss() {
String udfStoreProvider = Constants.UDF_STORE_PROVIDER_OSS;

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null);
DartDataStoreClient client = dartDataStoreClientProvider.getDartDataStoreClient();

assertTrue(client instanceof OssDartClient);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionForUnknownUdfStoreProvider() {
String udfStoreProvider = "UNKNOWN-PROVIDER";

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, null);

try {
dartDataStoreClientProvider.getDartDataStoreClient();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Unknown UDF Store Provider: UNKNOWN-PROVIDER", e.getMessage());
throw e;
}
}

@Test
public void shouldReturnSameClientOnSubsequentCalls() {
String udfStoreProvider = Constants.UDF_STORE_PROVIDER_GCS;
String projectID = "test-project";

dartDataStoreClientProvider = new DartDataStoreClientProvider(udfStoreProvider, projectID);

DartDataStoreClient firstClient = dartDataStoreClientProvider.getDartDataStoreClient();
DartDataStoreClient secondClient = dartDataStoreClientProvider.getDartDataStoreClient();

Assert.assertEquals(firstClient, secondClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class DefaultDartDataStoreTest {

@Before
public void setUp() {
defaultDartDataStore = mock(DefaultDartDataStore.class);
// Subject
DartDataStoreClientProvider dartDataStoreClientProvider = mock(DartDataStoreClientProvider.class);
defaultDartDataStore = new DefaultDartDataStore(dartDataStoreClientProvider, "test-bucket");

gcsDartClient = mock(GcsDartClient.class);
meterStatsManager = mock(MeterStatsManager.class);
gaugeStatsManager = mock(GaugeStatsManager.class);
when(defaultDartDataStore.getSet(anyString(), any(), any())).thenCallRealMethod();
when(defaultDartDataStore.getMap(anyString(), any(), any())).thenCallRealMethod();
when(defaultDartDataStore.getStoreClient()).thenReturn(gcsDartClient);
when(dartDataStoreClientProvider.getDartDataStoreClient()).thenReturn(gcsDartClient);
listContent = Arrays.asList("listContent");
mapContent = Collections.singletonMap("key", "value");
}
Expand Down
Loading