Skip to content

Commit

Permalink
Merge pull request #408 from cliveseldon/cluster_manager_cache_bug
Browse files Browse the repository at this point in the history
Cluster Manager Cache Fix
  • Loading branch information
gsunner authored Feb 5, 2019
2 parents 40f2d9e + 42c8d5e commit 876c5d8
Show file tree
Hide file tree
Showing 19 changed files with 494 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UninitializedMessageException;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
Expand Down Expand Up @@ -165,6 +166,11 @@ public SeldonDeployment getSeldonDeployment(String name,String namespace) {
logger.error("Failed to parse "+json,e);
return null;
}
catch (UninitializedMessageException e)
{
logger.error("Failed to parse "+json,e);
return null;
}
} catch (ApiException e) {
return null;
} catch (IOException e) {
Expand All @@ -178,7 +184,7 @@ public ExtensionsV1beta1DeploymentList getOwnedDeployments(String seldonDeployme
try
{
ApiClient client = k8sClientProvider.getClient();
ExtensionsV1beta1Api api = new ExtensionsV1beta1Api(client);
ExtensionsV1beta1Api api = k8sApiProvider.getExtensionsV1beta1Api(client);
ExtensionsV1beta1DeploymentList l = api.listNamespacedDeployment(namespace, null, null, null, false, Constants.LABEL_SELDON_ID+"="+seldonDeploymentName, null, null, null, false);
return l;
} catch (IOException e) {
Expand All @@ -195,7 +201,7 @@ public V1ServiceList getOwnedServices(String seldonDeploymentName,String namespa
try
{
ApiClient client = k8sClientProvider.getClient();
io.kubernetes.client.apis.CoreV1Api api = new CoreV1Api(client);
io.kubernetes.client.apis.CoreV1Api api = k8sApiProvider.getCoreV1Api(client);
V1ServiceList l = api.listNamespacedService(namespace, null, null, null, false, Constants.LABEL_SELDON_ID+"="+seldonDeploymentName, null, null, null, null);
return l;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

public interface SeldonDeploymentController {
public void createOrReplaceSeldonDeployment(SeldonDeployment mlDep);
public void createOrReplaceSeldonDeployment(SeldonDeployment mlDep,boolean added);
public void removeInitialUnusedResources(SeldonDeployment mlDep);
public void removeAllUnusedResources(SeldonDeployment mlDep);
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,40 +177,6 @@ private void removeServices(ApiClient client,String namespace,SeldonDeployment s
}
}

/**
* Currently Not used as issue with proto client needs further investigation
* @param client
* @param namespace
* @param seldonDeployment
* @param services
* @throws ApiException
* @throws IOException
* @throws SeldonDeploymentException
*/
private void removeServices(ProtoClient client,String namespace,SeldonDeployment seldonDeployment,List<Service> services) throws ApiException, IOException, SeldonDeploymentException
{
Set<String> names = getServiceNames(services);
V1ServiceList svcList = crdHandler.getOwnedServices(seldonNameCreator.getSeldonId(seldonDeployment),namespace);
for(V1Service s : svcList.getItems())
{
if (!names.contains(s.getMetadata().getName()))
{
final String deleteApiPath = "/apis/v1/namespaces/{namespace}/services/{name}"
.replaceAll("\\{" + "name" + "\\}", client.getApiClient().escapeString(s.getMetadata().getName()))
.replaceAll("\\{" + "namespace" + "\\}", client.getApiClient().escapeString(namespace));
DeleteOptions options = DeleteOptions.newBuilder().setPropagationPolicy("Foreground").build();
ObjectOrStatus<Deployment> os = client.delete(Service.newBuilder(),deleteApiPath,options);
if (os.status != null) {
logger.error("Error deleting service:"+ProtoBufUtils.toJson(os.status));
throw new SeldonDeploymentException("Failed to delete service "+s.getMetadata().getName());
}
else {
logger.debug("Deleted deployment:"+ProtoBufUtils.toJson(os.object));
}
}
}
}

private Set<String> getServiceNames(List<Service> services)
{
Set<String> names = new HashSet<>();
Expand Down Expand Up @@ -317,7 +283,7 @@ public void removeAllUnusedResources(SeldonDeployment mlDep) {


@Override
public void createOrReplaceSeldonDeployment(SeldonDeployment mlDep) {
public void createOrReplaceSeldonDeployment(SeldonDeployment mlDep,boolean added) {

if (mlDep.hasStatus() && mlDep.getStatus().hasState() && mlDep.getStatus().getState().equals(Constants.STATE_FAILED))
{
Expand All @@ -328,7 +294,7 @@ public void createOrReplaceSeldonDeployment(SeldonDeployment mlDep) {
{
String namespace = SeldonDeploymentUtils.getNamespace(mlDep);
SeldonDeployment existing = mlCache.get(mlDep);
if (existing == null || !existing.getSpec().equals(mlDep.getSpec()))
if (added || existing == null || !existing.getSpec().equals(mlDep.getSpec()))
{
logger.debug("Running updates for "+mlDep.getMetadata().getName());
mlCache.put(mlDep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ private void processWatch(SeldonDeployment mldep,String action) throws InvalidPr
switch(action)
{
case "ADDED":
seldonDeploymentController.createOrReplaceSeldonDeployment(mldep,true);
break;
case "MODIFIED":
seldonDeploymentController.createOrReplaceSeldonDeployment(mldep);
seldonDeploymentController.createOrReplaceSeldonDeployment(mldep,false);
break;
case "DELETED":
mlCache.remove(mldep);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.seldon.clustermanager.k8s.client;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.CustomObjectsApi;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;

public interface K8sApiProvider {

public CustomObjectsApi getCustomObjectsApi(ApiClient client);
public ExtensionsV1beta1Api getExtensionsV1beta1Api(ApiClient client);
public CoreV1Api getCoreV1Api(ApiClient client);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.springframework.stereotype.Component;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.CustomObjectsApi;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;

Expand All @@ -19,4 +20,9 @@ public ExtensionsV1beta1Api getExtensionsV1beta1Api(ApiClient client) {
return new ExtensionsV1beta1Api(client);
}

@Override
public CoreV1Api getCoreV1Api(ApiClient client) {
return new CoreV1Api(client);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void createMocks(String resourceFilename) throws Exception
Request.Builder requestBuilder = new Request.Builder().url("http://0.0.0.0:8000");
Response.Builder responseBuilder = new Response.Builder();
String jsonStr = readFile(resourceFilename,StandardCharsets.UTF_8).replaceAll("\n", "") + "\n";

Gson gson = new GsonBuilder().create();

ResponseBody responseBody = ResponseBody.create(MediaType.parse("Content-Type: application/json"), jsonStr);
Response response = responseBuilder.code(200).request(requestBuilder.build()).protocol(Protocol.HTTP_2).body(responseBody).build();
Expand All @@ -83,13 +83,14 @@ public void createMocks(String resourceFilename) throws Exception
any(Boolean.class),isNull(ProgressListener.class),isNull(ProgressRequestListener.class))).thenReturn(mockListNamespaceCall);
// Use in crdHandler
when(mockCustomObjectsApi.replaceNamespacedCustomObjectStatus(any(String.class), any(String.class), any(String.class), any(String.class), any(String.class), any(byte[].class))).thenThrow(ApiException.class);
when(mockCustomObjectsApi.replaceNamespacedCustomObject(any(String.class), any(String.class), any(String.class), any(String.class), any(String.class), any(byte[].class))).thenReturn(null);

when(mockCustomObjectsApi.replaceNamespacedCustomObject(any(String.class), any(String.class), any(String.class), any(String.class), any(String.class), any(byte[].class))).thenReturn(null);
// Can use this for get Custom Object
Object coObject = gson.fromJson(jsonStr, LinkedTreeMap.class);
when(mockCustomObjectsApi.getNamespacedCustomObject(any(String.class), any(String.class), any(String.class), any(String.class), any(String.class))).thenReturn(coObject);
mockK8sApiProvider = mock(K8sApiProvider.class);
when(mockK8sApiProvider.getCustomObjectsApi(any(ApiClient.class))).thenReturn(mockCustomObjectsApi);

Watch.Response<Object> watchResponse = mock(Watch.Response.class);
Gson gson = new GsonBuilder().create();
watchResponse.object = gson.fromJson(jsonStr, LinkedTreeMap.class);
watchResponse.type = "ADDED";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.seldon.clustermanager.k8s;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.junit.Assert;
import org.junit.Test;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.models.ExtensionsV1beta1DeploymentList;
import io.kubernetes.client.models.V1ServiceList;
import io.seldon.clustermanager.k8s.client.K8sApiProvider;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;


public class KubeCRDHandlerImplTest extends End2EndBase {

@Test
public void validSeldonDeploymentTest() throws Exception
{
createMocks("src/test/resources/model_simple.json");
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockK8sApiProvider, mockK8sClientProvider, props);

SeldonDeployment sDep = crdHandler.getSeldonDeployment("a", "b");
Assert.assertNotNull(sDep);
}

@Test
public void invalidSeldonDeploymentJsonTest() throws Exception
{
createMocks("src/test/resources/invalid.json");
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockK8sApiProvider, mockK8sClientProvider, props);

SeldonDeployment sDep = crdHandler.getSeldonDeployment("a", "b");
Assert.assertNull(sDep);
}

@Test
public void invalidSeldonDeploymentTest() throws Exception
{
createMocks("src/test/resources/model_invalid.json");
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockK8sApiProvider, mockK8sClientProvider, props);

SeldonDeployment sDep = crdHandler.getSeldonDeployment("a", "b");
Assert.assertNull(sDep);
}


@Test
public void getOwnedDeploymentsTest() throws Exception
{
createMocks("src/test/resources/model_simple.json");
K8sApiProvider mockApiProvider = mock(K8sApiProvider.class);
ExtensionsV1beta1DeploymentList l = new ExtensionsV1beta1DeploymentList();
ExtensionsV1beta1Api mockExtensionsApi = mock(ExtensionsV1beta1Api.class);
when(mockApiProvider.getExtensionsV1beta1Api(any(ApiClient.class))).thenReturn(mockExtensionsApi);
when(mockExtensionsApi.listNamespacedDeployment(any(String.class), any(String.class), any(String.class),
any(String.class), any(Boolean.class), any(String.class), any(Integer.class),
any(String.class),any(Integer.class),any(Boolean.class))).thenReturn(l);
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockApiProvider, mockK8sClientProvider, props);
ExtensionsV1beta1DeploymentList l2 = crdHandler.getOwnedDeployments("a", "b");
Assert.assertNotNull(l2);
Assert.assertEquals(l, l2);
}

@Test
public void getOwnedDeploymentsExceptionTest() throws Exception
{
createMocks("src/test/resources/model_simple.json");
K8sApiProvider mockApiProvider = mock(K8sApiProvider.class);
ExtensionsV1beta1DeploymentList l = new ExtensionsV1beta1DeploymentList();
ExtensionsV1beta1Api mockExtensionsApi = mock(ExtensionsV1beta1Api.class);
when(mockApiProvider.getExtensionsV1beta1Api(any(ApiClient.class))).thenReturn(mockExtensionsApi);
when(mockExtensionsApi.listNamespacedDeployment(any(String.class), any(String.class), any(String.class),
any(String.class), any(Boolean.class), any(String.class), any(Integer.class),
any(String.class),any(Integer.class),any(Boolean.class))).thenThrow(new ApiException());
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockApiProvider, mockK8sClientProvider, props);
ExtensionsV1beta1DeploymentList l2 = crdHandler.getOwnedDeployments("a", "b");
Assert.assertNull(l2);
}

@Test
public void getOwnedServicesTest() throws Exception
{
createMocks("src/test/resources/model_simple.json");
K8sApiProvider mockApiProvider = mock(K8sApiProvider.class);
V1ServiceList l = new V1ServiceList();
CoreV1Api mockCoreApi = mock(CoreV1Api.class);
when(mockApiProvider.getCoreV1Api(any(ApiClient.class))).thenReturn(mockCoreApi);
when(mockCoreApi.listNamespacedService(any(String.class), any(String.class), any(String.class),
any(String.class), any(Boolean.class), any(String.class), any(Integer.class),
any(String.class),any(Integer.class),any(Boolean.class))).thenReturn(l);
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockApiProvider, mockK8sClientProvider, props);
V1ServiceList l2 = crdHandler.getOwnedServices("a", "b");
Assert.assertNotNull(l2);
Assert.assertEquals(l, l2);
}

@Test
public void getOwnedServicesExceptionTest() throws Exception
{
createMocks("src/test/resources/model_simple.json");
K8sApiProvider mockApiProvider = mock(K8sApiProvider.class);
V1ServiceList l = new V1ServiceList();
CoreV1Api mockCoreApi = mock(CoreV1Api.class);
when(mockApiProvider.getCoreV1Api(any(ApiClient.class))).thenReturn(mockCoreApi);
when(mockCoreApi.listNamespacedService(any(String.class), any(String.class), any(String.class),
any(String.class), any(Boolean.class), any(String.class), any(Integer.class),
any(String.class),any(Integer.class),any(Boolean.class))).thenThrow(new ApiException());
KubeCRDHandler crdHandler = new KubeCRDHandlerImpl(mockApiProvider, mockK8sClientProvider, props);
V1ServiceList l2 = crdHandler.getOwnedServices("a", "b");
Assert.assertNull(l2);
}

}
Loading

0 comments on commit 876c5d8

Please sign in to comment.