Skip to content

Commit

Permalink
updates to allow engine to use svc names for all calls
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox committed Jun 13, 2018
1 parent dd266c0 commit 553ea88
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 109 deletions.
4 changes: 4 additions & 0 deletions cluster-manager/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Local testing:

export SELDON_CLUSTER_MANAGER_POD_NAMESPACE=seldon
export ENGINE_CONTAINER_IMAGE_AND_VERSION=seldonio/engine:0.1.8-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringJoiner;

Expand Down Expand Up @@ -187,7 +190,7 @@ private PredictiveUnit findPredictiveUnitForContainer(PredictiveUnit unit,String
}
}

private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx,String deploymentName,String predictorName)
private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int portNum,String deploymentName,String predictorName)
{
V1.Container.Builder c2Builder = V1.Container.newBuilder(c);

Expand All @@ -199,8 +202,8 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx,St
{
if (pu.getEndpoint().getType() == Endpoint.EndpointType.REST)
{
c2Builder.addPorts(ContainerPort.newBuilder().setName("http").setContainerPort(clusterManagerProperites.getPuContainerPortBase() + idx));
containerPort = clusterManagerProperites.getPuContainerPortBase() + idx;
c2Builder.addPorts(ContainerPort.newBuilder().setName("http").setContainerPort(portNum));
containerPort = portNum;

if (!c.hasLivenessProbe())
{
Expand All @@ -222,8 +225,8 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx,St
}
else
{
c2Builder.addPorts(ContainerPort.newBuilder().setName("grpc").setContainerPort(clusterManagerProperites.getPuContainerPortBase() + idx));
containerPort = clusterManagerProperites.getPuContainerPortBase() + idx;
c2Builder.addPorts(ContainerPort.newBuilder().setName("grpc").setContainerPort(portNum));
containerPort = portNum;

if (!c.hasLivenessProbe())
{
Expand All @@ -247,7 +250,7 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx,St
}
}
else
containerPort = c.getPorts(0).getContainerPort();
throw new UnsupportedOperationException(String.format("Found container port already set with http or grpc label. This is not presently allowed. Found port {}",containerPort));

// Add environment variable for the port used in case the model needs to access it
final String ENV_PREDICTIVE_UNIT_SERVICE_PORT ="PREDICTIVE_UNIT_SERVICE_PORT";
Expand Down Expand Up @@ -310,9 +313,14 @@ private void updatePredictiveUnitBuilderByName(PredictiveUnit.Builder puBuilder,
}
}

private String getPredictorServiceName(SeldonDeployment mlDep,int predictorIdx,int podTemplateIdx)
private String getPredictorServiceNameValue(SeldonDeployment mlDep,String predictorName,String containerName)
{
return mlDep.getSpec().getName()+"-"+predictorIdx+"-"+podTemplateIdx;
return mlDep.getSpec().getName()+"-"+predictorName+"-"+containerName;
}

private String getPredictorServiceNameKey(String containerName)
{
return LABEL_SELDON_APP+"-"+containerName;
}

@Override
Expand All @@ -329,35 +337,37 @@ public SeldonDeployment defaulting(SeldonDeployment mlDep) {
for(int pbIdx=0;pbIdx<mlDep.getSpec().getPredictorsCount();pbIdx++)
{
PredictorSpec p = mlDep.getSpec().getPredictors(pbIdx);
Map<String,Integer> servicePortMap = new HashMap<>();
int currentServicePortNum = clusterManagerProperites.getPuContainerPortBase();
for(int ptsIdx=0;ptsIdx<p.getComponentSpecsCount();ptsIdx++)
{
V1.PodTemplateSpec spec = p.getComponentSpecs(ptsIdx);
// Choose first podTemplate Spec has special. It will container the engine service orchestrator so needs to have labels for service
String serviceName;
ObjectMeta.Builder metaBuilder = ObjectMeta.newBuilder(spec.getMetadata());
if (ptsIdx == 0)
{
serviceName = mlDep.getSpec().getName();
metaBuilder.putLabels(LABEL_SELDON_APP, serviceName);
mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getComponentSpecsBuilder(ptsIdx).setMetadata(metaBuilder);
}
else
{
serviceName = getPredictorServiceName(mlDep, pbIdx, ptsIdx);
metaBuilder.putLabels(LABEL_SELDON_APP, serviceName);
mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getComponentSpecsBuilder(ptsIdx).setMetadata(metaBuilder);
}
int cIdx = 0;

mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getComponentSpecsBuilder(ptsIdx).getSpecBuilder().clearContainers();
String predictorName = p.getName();
for(V1.Container c : spec.getSpec().getContainersList())
for(int cIdx = 0;cIdx < spec.getSpec().getContainersCount();cIdx++)
{
V1.Container c2 = this.updateContainer(c, findPredictiveUnitForContainer(mlDep.getSpec().getPredictors(pbIdx).getGraph(),c.getName()),cIdx,deploymentName,predictorName);
V1.Container c = spec.getSpec().getContainers(cIdx);
String containerServiceKey = getPredictorServiceNameKey(c.getName());
String containerServiceValue = getPredictorServiceNameValue(mlDep, p.getName(), c.getName());
metaBuilder.putLabels(containerServiceKey, containerServiceValue);

int portNum;
if (servicePortMap.containsKey(c.getName()))
portNum = servicePortMap.get(c.getName());
else
{
portNum = currentServicePortNum;
servicePortMap.put(c.getName(), portNum);
currentServicePortNum++;
}
V1.Container c2 = this.updateContainer(c, findPredictiveUnitForContainer(mlDep.getSpec().getPredictors(pbIdx).getGraph(),c.getName()),portNum,deploymentName,predictorName);
mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getComponentSpecsBuilder(ptsIdx).getSpecBuilder().addContainers(cIdx, c2);
String containerHostName = ptsIdx == 0 ? "0.0.0.0" : serviceName ;
updatePredictiveUnitBuilderByName(mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getGraphBuilder(),c2,containerHostName);
cIdx++;
updatePredictiveUnitBuilderByName(mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getGraphBuilder(),c2,containerServiceValue);
}
System.out.println("pbIdx"+pbIdx+" ptsIdx "+ptsIdx);
mlBuilder.getSpecBuilder().getPredictorsBuilder(pbIdx).getComponentSpecsBuilder(ptsIdx).setMetadata(metaBuilder);
}
}

Expand Down Expand Up @@ -462,23 +472,24 @@ private String getAmbassadorAnnotation(SeldonDeployment mlDep,String serviceName
}


private void addServicePorts(PredictiveUnit pu,String serviceName,ServiceSpec.Builder svcSpecBuilder)
private void addServicePort(PredictiveUnit pu,String serviceName,ServiceSpec.Builder svcSpecBuilder)
{
if (pu.hasEndpoint())
{
Endpoint e = pu.getEndpoint();
if (e.getServiceHost().equals(serviceName))
{
svcSpecBuilder.addPorts(ServicePort.newBuilder()
.setProtocol("TCP")
.setPort(e.getServicePort())
.setTargetPort(IntOrString.newBuilder().setIntVal(e.getServicePort()))
.setName(e.getType().toString().toLowerCase())
);
.setProtocol("TCP")
.setPort(e.getServicePort())
.setTargetPort(IntOrString.newBuilder().setIntVal(e.getServicePort()))
//.setName("http")
);
return;
}
}
for(int i=0;i<pu.getChildrenCount();i++)
addServicePorts(pu.getChildren(i), serviceName,svcSpecBuilder);
addServicePort(pu.getChildren(i), serviceName,svcSpecBuilder);
}

@Override
Expand All @@ -489,61 +500,96 @@ public DeploymentResources createResources(SeldonDeployment mlDep) throws Seldon
List<Service> services = new ArrayList<>();
// for each predictor Create/replace deployment
String serviceLabel = mlDep.getSpec().getName();
Set<String> createdServices = new HashSet<>();
for(int pbIdx=0;pbIdx<mlDep.getSpec().getPredictorsCount();pbIdx++)
{
PredictorSpec p = mlDep.getSpec().getPredictors(pbIdx);

{//Deployment for engine service orchestrator
PodTemplateSpec.Builder podSpecBuilder = PodTemplateSpec.newBuilder();
podSpecBuilder.getSpecBuilder()
.addContainers(createEngineContainer(mlDep,p))
.setTerminationGracePeriodSeconds(20);
podSpecBuilder.getMetadataBuilder()
.putLabels(LABEL_SELDON_APP, mlDep.getSpec().getName())
.putAnnotations("prometheus.io/path", "/prometheus")
.putAnnotations("prometheus.io/port",""+clusterManagerProperites.getEngineContainerPort())
.putAnnotations("prometheus.io/scrape", "true");
String depName = getKubernetesDeploymentName(mlDep.getSpec().getName(),p.getName(),0);
ObjectMeta.Builder depMetaBuilder = ObjectMeta.newBuilder()
.setName(depName)
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_APP, serviceLabel)
.putLabels(Constants.LABEL_SELDON_ID, mlDep.getSpec().getName())
.putLabels("app", depName)
.putLabels("version", "v1") //FIXME
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_KEY, SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_VAL)
.addOwnerReferences(ownerRef);
Deployment deployment = V1beta1Extensions.Deployment.newBuilder()
.setMetadata(depMetaBuilder)
.setSpec(DeploymentSpec.newBuilder()
.setTemplate(podSpecBuilder.build())
.setStrategy(DeploymentStrategy.newBuilder().setRollingUpdate(RollingUpdateDeployment.newBuilder().setMaxUnavailable(IntOrString.newBuilder().setType(1).setStrVal("10%"))))
.setReplicas(p.getReplicas()))
.build();

deployments.add(deployment);
}


for(int ptsIdx=0;ptsIdx<p.getComponentSpecsCount();ptsIdx++)
{
V1.PodTemplateSpec spec = p.getComponentSpecs(ptsIdx);
String depName = getKubernetesDeploymentName(mlDep.getSpec().getName(),p.getName(),ptsIdx);
String depName = getKubernetesDeploymentName(mlDep.getSpec().getName(),p.getName(),ptsIdx+1);
PodTemplateSpec.Builder podSpecBuilder = PodTemplateSpec.newBuilder(spec);
String depServiceLabelValue;
if (ptsIdx == 0) // Deployment containing engine
{
depServiceLabelValue = serviceLabel;
podSpecBuilder.getSpecBuilder()
.addContainers(createEngineContainer(mlDep,p))
.setTerminationGracePeriodSeconds(20);
podSpecBuilder.getMetadataBuilder()
.putAnnotations("prometheus.io/path", "/prometheus")
.putAnnotations("prometheus.io/port",""+clusterManagerProperites.getEngineContainerPort())
.putAnnotations("prometheus.io/scrape", "true");
}
else
ObjectMeta.Builder depMetaBuilder = ObjectMeta.newBuilder()
.setName(depName)
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_APP, serviceLabel)
.putLabels(Constants.LABEL_SELDON_ID, mlDep.getSpec().getName())
.putLabels("app", depName)
.putLabels("version", "v1") //FIXME
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_KEY, SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_VAL)
.addOwnerReferences(ownerRef);

depMetaBuilder.putAllLabels(spec.getMetadata().getLabelsMap());
//for(Entry<String,String> podLabel : spec.getMetadata().getLabelsMap().entrySet())
//{
// depMetaBuilder.put
//}

for(V1.Container c : spec.getSpec().getContainersList())
{
depServiceLabelValue = getPredictorServiceName(mlDep, pbIdx, ptsIdx);
String containerServiceKey = getPredictorServiceNameKey(c.getName());
String containerServiceValue = getPredictorServiceNameValue(mlDep, p.getName(), c.getName());

podSpecBuilder.getSpecBuilder()
.setTerminationGracePeriodSeconds(20);

//Add service
Service.Builder s = Service.newBuilder()
.setMetadata(ObjectMeta.newBuilder()
.setName(depServiceLabelValue)
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_APP, depServiceLabelValue)
.putLabels("seldon-deployment-id", mlDep.getSpec().getName())
.addOwnerReferences(ownerRef)
.putAnnotations("getambassador.io/config",getAmbassadorAnnotation(mlDep,serviceLabel))
);
ServiceSpec.Builder svcSpecBuilder = ServiceSpec.newBuilder();
addServicePorts(p.getGraph(), depServiceLabelValue, svcSpecBuilder);
svcSpecBuilder.setType("ClusterIP")
.putSelector(SeldonDeploymentOperatorImpl.LABEL_SELDON_APP,depServiceLabelValue);

s.setSpec(svcSpecBuilder);
services.add(s.build());
if (!createdServices.contains(containerServiceValue))
{
//Add service
Service.Builder s = Service.newBuilder()
.setMetadata(ObjectMeta.newBuilder()
.setName(containerServiceValue)
.putLabels(containerServiceKey, containerServiceValue)
.putLabels("seldon-deployment-id", mlDep.getSpec().getName())
.addOwnerReferences(ownerRef)
);
ServiceSpec.Builder svcSpecBuilder = ServiceSpec.newBuilder();
addServicePort(p.getGraph(), containerServiceValue, svcSpecBuilder);
svcSpecBuilder.setType("ClusterIP")
.putSelector(containerServiceKey,containerServiceValue);

depMetaBuilder.putLabels(containerServiceKey, containerServiceValue);
s.setSpec(svcSpecBuilder);
services.add(s.build());
}
}





Deployment deployment = V1beta1Extensions.Deployment.newBuilder()
.setMetadata(ObjectMeta.newBuilder()
.setName(depName)
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_APP, depServiceLabelValue)
.putLabels(Constants.LABEL_SELDON_ID, mlDep.getSpec().getName())
.putLabels("app", depName)
.putLabels("version", "v1") //FIXME
.putLabels(SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_KEY, SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_VAL)
.addOwnerReferences(ownerRef)
)
.setMetadata(depMetaBuilder)
.setSpec(DeploymentSpec.newBuilder()
.setTemplate(podSpecBuilder.build())
.setStrategy(DeploymentStrategy.newBuilder().setRollingUpdate(RollingUpdateDeployment.newBuilder().setMaxUnavailable(IntOrString.newBuilder().setType(1).setStrVal("10%"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testDefaulting() throws IOException
Assert.assertEquals(1,mlDep2.getSpec().getPredictors(0).getComponentSpecs(0).getSpec().getContainers(0).getPortsCount());
Assert.assertEquals("http",mlDep2.getSpec().getPredictors(0).getComponentSpecs(0).getSpec().getContainers(0).getPorts(0).getName());
Assert.assertEquals(Endpoint.EndpointType.REST_VALUE,mlDep2.getSpec().getPredictors(0).getGraph().getEndpoint().getType().getNumber());
Assert.assertEquals("0.0.0.0",mlDep2.getSpec().getPredictors(0).getGraph().getEndpoint().getServiceHost());
Assert.assertEquals("test-deployment-fx-market-predictor-mean-classifier",mlDep2.getSpec().getPredictors(0).getGraph().getEndpoint().getServiceHost());
}

@Test
Expand All @@ -60,6 +60,6 @@ public void testDefaultingGrpc() throws IOException
Assert.assertEquals(1,mlDep2.getSpec().getPredictors(0).getComponentSpecs(0).getSpec().getContainers(0).getPortsCount());
Assert.assertEquals("grpc",mlDep2.getSpec().getPredictors(0).getComponentSpecs(0).getSpec().getContainers(0).getPorts(0).getName());
Assert.assertEquals(Endpoint.EndpointType.GRPC_VALUE,mlDep2.getSpec().getPredictors(0).getGraph().getEndpoint().getType().getNumber());
Assert.assertEquals("0.0.0.0",mlDep2.getSpec().getPredictors(0).getGraph().getEndpoint().getServiceHost());
Assert.assertEquals("test-deployment-fx-market-predictor-mean-classifier",mlDep2.getSpec().getPredictors(0).getGraph().getEndpoint().getServiceHost());
}
}
Loading

0 comments on commit 553ea88

Please sign in to comment.