From 51ba6f70995adad0c63ec1d05d52abc482de0bbc Mon Sep 17 00:00:00 2001 From: Clive Cox Date: Fri, 7 Dec 2018 11:49:03 +0000 Subject: [PATCH] update apife to work cluster wide --- .../java/io/seldon/apife/AppProperties.java | 10 +++++- .../apife/deployments/DeploymentStore.java | 36 ++++++++++++------- .../seldon/apife/grpc/SeldonGrpcServer.java | 28 ++------------- .../seldon/apife/k8s/DeploymentWatcher.java | 22 +++++++++--- .../metrics/AuthorizedWebMvcTagsProvider.java | 6 ++-- .../apife/service/PredictionService.java | 18 +++++++--- .../src/main/resources/application.properties | 1 + .../templates/apife-deployment.yaml | 2 ++ notebooks/seldon_utils.py | 8 ++--- 9 files changed, 76 insertions(+), 55 deletions(-) diff --git a/api-frontend/src/main/java/io/seldon/apife/AppProperties.java b/api-frontend/src/main/java/io/seldon/apife/AppProperties.java index 664615c667..bd1efe0c3b 100644 --- a/api-frontend/src/main/java/io/seldon/apife/AppProperties.java +++ b/api-frontend/src/main/java/io/seldon/apife/AppProperties.java @@ -23,6 +23,7 @@ public class AppProperties { private int engineContainerPort; private int engineGrpcContainerPort; private String namespace; + private boolean singleNamespace = true; public int getEngineContainerPort() { return engineContainerPort; @@ -43,7 +44,14 @@ public String getNamespace() { public void setNamespace(String namespace) { this.namespace = namespace; } - @Override + + public boolean isSingleNamespace() { + return singleNamespace; + } + public void setSingleNamespace(boolean singleNamespace) { + this.singleNamespace = singleNamespace; + } + @Override public String toString() { return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); } diff --git a/api-frontend/src/main/java/io/seldon/apife/deployments/DeploymentStore.java b/api-frontend/src/main/java/io/seldon/apife/deployments/DeploymentStore.java index bdc58d2c05..255cc3c044 100644 --- a/api-frontend/src/main/java/io/seldon/apife/deployments/DeploymentStore.java +++ b/api-frontend/src/main/java/io/seldon/apife/deployments/DeploymentStore.java @@ -25,7 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import io.seldon.apife.AppProperties; import io.seldon.apife.api.oauth.InMemoryClientDetailsService; +import io.seldon.apife.k8s.KubernetesUtil; import io.seldon.protos.DeploymentProtos.DeploymentSpec; import io.seldon.protos.DeploymentProtos.SeldonDeployment; @@ -34,17 +36,19 @@ public class DeploymentStore implements DeploymentsListener { protected static Logger logger = LoggerFactory.getLogger(DeploymentStore.class.getName()); //Oauth key to deployment def - private ConcurrentMap deploymentStore = new ConcurrentHashMap<>(); + private ConcurrentMap deploymentStore = new ConcurrentHashMap<>(); private final DeploymentsHandler deploymentsHandler; - - private InMemoryClientDetailsService clientDetailsService; + private final InMemoryClientDetailsService clientDetailsService; + private final AppProperties appProperties; + private final KubernetesUtil k8sUtil = new KubernetesUtil(); @Autowired - public DeploymentStore(DeploymentsHandler deploymentsHandler,InMemoryClientDetailsService clientDetailsService) + public DeploymentStore(DeploymentsHandler deploymentsHandler,InMemoryClientDetailsService clientDetailsService,AppProperties appProperties) { this.deploymentsHandler = deploymentsHandler; this.clientDetailsService = clientDetailsService; + this.appProperties = appProperties; } @PostConstruct @@ -53,18 +57,26 @@ private void init() throws Exception{ deploymentsHandler.addListener(this); } - public DeploymentSpec getDeployment(String clientId) - { - return deploymentStore.get(clientId); - } - - + public SeldonDeployment getDeployment(String clientId) + { + return deploymentStore.get(clientId); + } + @Override public void deploymentAdded(SeldonDeployment mlDep) { final DeploymentSpec deploymentDef = mlDep.getSpec(); + final String namespace = k8sUtil.getNamespace(mlDep); + + if (appProperties.isSingleNamespace()) + { + deploymentStore.put(deploymentDef.getOauthKey(), mlDep); + clientDetailsService.addClient(deploymentDef.getOauthKey(), deploymentDef.getOauthSecret()); + } - deploymentStore.put(deploymentDef.getOauthKey(), deploymentDef); - clientDetailsService.addClient(deploymentDef.getOauthKey(), deploymentDef.getOauthSecret()); + // Always add namespaced key + final String namespacedKey = deploymentDef.getOauthKey() + namespace; + deploymentStore.put(namespacedKey, mlDep); + clientDetailsService.addClient(namespacedKey, deploymentDef.getOauthSecret()); logger.info("Succesfully added or updated deployment "+deploymentDef.getName()); } diff --git a/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java b/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java index 3ec86f2ba6..2ab3a0779c 100644 --- a/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java +++ b/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java @@ -34,13 +34,10 @@ import io.grpc.ServerInterceptors; import io.grpc.netty.NettyServerBuilder; import io.seldon.apife.AppProperties; -import io.seldon.apife.api.oauth.InMemoryClientDetailsService; import io.seldon.apife.config.AnnotationsConfig; import io.seldon.apife.deployments.DeploymentStore; import io.seldon.apife.deployments.DeploymentsHandler; import io.seldon.apife.exception.SeldonAPIException; -import io.seldon.apife.k8s.DeploymentWatcher; -import io.seldon.protos.DeploymentProtos.DeploymentSpec; import io.seldon.protos.DeploymentProtos.SeldonDeployment; @Component @@ -154,8 +151,8 @@ public ManagedChannel getChannel() { throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_GRPC_NO_PRINCIPAL_FOUND,""); } - final DeploymentSpec deploymentSpec = deploymentStore.getDeployment(principal); - if (deploymentSpec == null) + final SeldonDeployment mlDep = deploymentStore.getDeployment(principal); + if (mlDep == null) { throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_NO_RUNNING_DEPLOYMENT,"Principal is "+principal); } @@ -209,27 +206,6 @@ private void blockUntilShutdown() throws InterruptedException { } } - /** - * Main method for basic testing. - */ - public static void main(String[] args) throws Exception { - DeploymentStore store = new DeploymentStore(null,new InMemoryClientDetailsService()); - SeldonDeployment dep = SeldonDeployment.newBuilder() - .setApiVersion(DeploymentWatcher.VERSION) - .setKind("SeldonDeplyment") - .setSpec(DeploymentSpec.newBuilder() - .setName("0.0.0.0") - .setOauthKey("key") - .setOauthSecret("secret") - ).build(); - AppProperties appProperties = new AppProperties(); - appProperties.setEngineGrpcContainerPort(5000); - store.deploymentAdded(dep); - SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,null,SERVER_PORT); - server.start(); - server.blockUntilShutdown(); - } - public void deploymentAdded(SeldonDeployment resource) { ManagedChannel channel = ManagedChannelBuilder.forAddress(resource.getSpec().getName(), appProperties.getEngineGrpcContainerPort()).usePlaintext(true).build(); channelStore.put(resource.getSpec().getOauthKey(),channel); diff --git a/api-frontend/src/main/java/io/seldon/apife/k8s/DeploymentWatcher.java b/api-frontend/src/main/java/io/seldon/apife/k8s/DeploymentWatcher.java index a2f8f56353..a070aeccf2 100644 --- a/api-frontend/src/main/java/io/seldon/apife/k8s/DeploymentWatcher.java +++ b/api-frontend/src/main/java/io/seldon/apife/k8s/DeploymentWatcher.java @@ -46,10 +46,10 @@ import io.kubernetes.client.apis.CustomObjectsApi; import io.kubernetes.client.util.Config; import io.kubernetes.client.util.Watch; +import io.seldon.apife.AppProperties; import io.seldon.apife.deployments.DeploymentsHandler; import io.seldon.apife.deployments.DeploymentsListener; import io.seldon.apife.pb.ProtoBufUtils; -import io.seldon.apife.AppProperties; import io.seldon.protos.DeploymentProtos.SeldonDeployment; @Component @@ -67,14 +67,16 @@ public class DeploymentWatcher implements DeploymentsHandler{ private int resourceVersionProcessed = 0; private final Set listeners; private final String namespace; + private final boolean clusterWide; @Autowired - public DeploymentWatcher(AppProperties clusterManagerProperites) throws IOException + public DeploymentWatcher(AppProperties appProperties) throws IOException { this.client = Config.defaultClient(); this.listeners = new HashSet<>(); Configuration.setDefaultApiClient(client); - this.namespace = StringUtils.isEmpty(clusterManagerProperites.getNamespace()) ? "default" : clusterManagerProperites.getNamespace(); + this.namespace = StringUtils.isEmpty(appProperties.getNamespace()) ? "default" : appProperties.getNamespace(); + this.clusterWide = !appProperties.isSingleNamespace(); } private void processWatch(SeldonDeployment mlDep,String action) @@ -127,11 +129,21 @@ public int watchSeldonSeldonDeployments(int resourceVersion,int resourceVersionP rs = ""+resourceVersion; logger.info("Watching with rs "+rs); CustomObjectsApi api = new CustomObjectsApi(); - Watch watch = Watch.createWatch( + Watch watch; + if (this.clusterWide) + { + watch = Watch.createWatch( + client, + api.listClusterCustomObjectCall(GROUP, VERSION, KIND_PLURAL, null, null, rs, true, null, null), + new TypeToken>(){}.getType()); + } + else + { + watch = Watch.createWatch( client, api.listNamespacedCustomObjectCall(GROUP, VERSION, namespace, KIND_PLURAL, null, null, rs, true, null, null), new TypeToken>(){}.getType()); - + } int maxResourceVersion = resourceVersion; try{ for (Watch.Response item : watch) { diff --git a/api-frontend/src/main/java/io/seldon/apife/metrics/AuthorizedWebMvcTagsProvider.java b/api-frontend/src/main/java/io/seldon/apife/metrics/AuthorizedWebMvcTagsProvider.java index 1064f2a132..ac2494b878 100644 --- a/api-frontend/src/main/java/io/seldon/apife/metrics/AuthorizedWebMvcTagsProvider.java +++ b/api-frontend/src/main/java/io/seldon/apife/metrics/AuthorizedWebMvcTagsProvider.java @@ -28,6 +28,7 @@ import io.micrometer.spring.web.servlet.DefaultWebMvcTagsProvider; import io.micrometer.spring.web.servlet.WebMvcTags; import io.seldon.apife.deployments.DeploymentStore; +import io.seldon.protos.DeploymentProtos.SeldonDeployment; @Component public class AuthorizedWebMvcTagsProvider extends DefaultWebMvcTagsProvider { @@ -62,10 +63,11 @@ public Tag principal(String principalName) { public Tag deploymentName(String principalName) { - if (principalName == null || !StringUtils.hasText(deploymentStore.getDeployment(principalName).getName())) + SeldonDeployment mlDep = deploymentStore.getDeployment(principalName); + if (principalName == null || mlDep == null || !StringUtils.hasText(mlDep.getSpec().getName())) return Tag.of(DEPLOYMENT_NAME_METRIC, "None"); else - return Tag.of(DEPLOYMENT_NAME_METRIC,deploymentStore.getDeployment(principalName).getName()); + return Tag.of(DEPLOYMENT_NAME_METRIC,mlDep.getSpec().getName()); } diff --git a/api-frontend/src/main/java/io/seldon/apife/service/PredictionService.java b/api-frontend/src/main/java/io/seldon/apife/service/PredictionService.java index c78faee196..469eef71de 100644 --- a/api-frontend/src/main/java/io/seldon/apife/service/PredictionService.java +++ b/api-frontend/src/main/java/io/seldon/apife/service/PredictionService.java @@ -22,7 +22,8 @@ import io.seldon.apife.deployments.DeploymentStore; import io.seldon.apife.exception.SeldonAPIException; -import io.seldon.protos.DeploymentProtos.DeploymentSpec; +import io.seldon.apife.k8s.KubernetesUtil; +import io.seldon.protos.DeploymentProtos.SeldonDeployment; @Service @@ -36,21 +37,28 @@ public class PredictionService { @Autowired InternalPredictionService internalPredictionService; + private final KubernetesUtil k8sUtil = new KubernetesUtil(); public String predict(String request,String clientId) { - DeploymentSpec deployment = deploymentStore.getDeployment(clientId); + SeldonDeployment deployment = deploymentStore.getDeployment(clientId); if (deployment != null) - return internalPredictionService.getPrediction(request, deployment.getName()); + { + final String endpoint = deployment.getSpec().getName() + "." + k8sUtil.getNamespace(deployment); + return internalPredictionService.getPrediction(request, endpoint); + } else throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_NO_RUNNING_DEPLOYMENT,"no deployment with id "+clientId); } public void sendFeedback(String feedback, String deploymentId){ - DeploymentSpec deployment = deploymentStore.getDeployment(deploymentId); + SeldonDeployment deployment = deploymentStore.getDeployment(deploymentId); if (deployment != null) - internalPredictionService.sendFeedback(feedback, deployment.getName()); + { + final String endpoint = deployment.getSpec().getName() + "." + k8sUtil.getNamespace(deployment); + internalPredictionService.sendFeedback(feedback, endpoint); + } else throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_NO_RUNNING_DEPLOYMENT,"no deployment with id "+deploymentId); diff --git a/api-frontend/src/main/resources/application.properties b/api-frontend/src/main/resources/application.properties index 59f95d9487..5ccdb7b8b6 100644 --- a/api-frontend/src/main/resources/application.properties +++ b/api-frontend/src/main/resources/application.properties @@ -6,6 +6,7 @@ log4j.logger.org.springframework.security=DEBUG io.seldon.apife.engine-container-port=8000 io.seldon.apife.engine-grpc-container-port=5001 io.seldon.apife.namespace=${SELDON_CLUSTER_MANAGER_POD_NAMESPACE} +io.seldon.apife.single-namespace=${SELDON_SINGLE_NAMESPACE} management.metrics.web.server.requests-metric-name=seldon.api.ingress.server.requests diff --git a/helm-charts/seldon-core/templates/apife-deployment.yaml b/helm-charts/seldon-core/templates/apife-deployment.yaml index 63daf525b5..1c0900d770 100644 --- a/helm-charts/seldon-core/templates/apife-deployment.yaml +++ b/helm-charts/seldon-core/templates/apife-deployment.yaml @@ -34,6 +34,8 @@ spec: value: kafka:9092 - name: SELDON_CLUSTER_MANAGER_REDIS_HOST value: {{ .Release.Name }}-redis + - name: SELDON_SINGLE_NAMESPACE + value: '{{ .Values.single_namespace }}' - name: SELDON_CLUSTER_MANAGER_POD_NAMESPACE valueFrom: fieldRef: diff --git a/notebooks/seldon_utils.py b/notebooks/seldon_utils.py index ab9e1f2b5a..805f35b8f4 100644 --- a/notebooks/seldon_utils.py +++ b/notebooks/seldon_utils.py @@ -10,18 +10,18 @@ def create_random_data(data_size,rows=1): arr = np.random.rand(rows*data_size) return (shape,arr) -def get_token(oauth_key,oauth_secret,endpoint): +def get_token(oauth_key,oauth_secret,namespace,endpoint): payload = {'grant_type': 'client_credentials'} response = requests.post( "http://"+endpoint+"/oauth/token", - auth=HTTPBasicAuth(oauth_key, oauth_secret), + auth=HTTPBasicAuth(oauth_key+namespace, oauth_secret), data=payload) print(response.text) token = response.json()["access_token"] return token -def rest_request_api_gateway(oauth_key,oauth_secret,endpoint="localhost:8002",data_size=5,rows=1,data=None): - token = get_token(oauth_key,oauth_secret,endpoint) +def rest_request_api_gateway(oauth_key,oauth_secret,namespace,endpoint="localhost:8002",data_size=5,rows=1,data=None): + token = get_token(oauth_key,oauth_secret,namespace,endpoint) if data is None: shape, arr = create_random_data(data_size,rows) else: