diff --git a/api-frontend/src/main/java/io/seldon/apife/exception/SeldonAPIException.java b/api-frontend/src/main/java/io/seldon/apife/exception/SeldonAPIException.java index 409553a554..1ae25e7a3e 100644 --- a/api-frontend/src/main/java/io/seldon/apife/exception/SeldonAPIException.java +++ b/api-frontend/src/main/java/io/seldon/apife/exception/SeldonAPIException.java @@ -32,7 +32,8 @@ public enum ApiExceptionType { APIFE_MICROSERVICE_ERROR(103,"Microservice error",500), APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500), APIFE_INVALID_RESPONSE_JSON(105,"Invalid Response JSON",400), - APIFE_GRPC_NO_PRINCIPAL_FOUND(105,"No OAuth principal found",400); + APIFE_GRPC_NO_PRINCIPAL_FOUND(105,"No OAuth principal found",400), + APIFE_GRPC_NO_GRPC_CHANNEL_FOUND(106,"No Managed Channel found",400); int id; String message; diff --git a/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java b/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java index 2b7dad0dbe..fc441ec499 100644 --- a/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java +++ b/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonGrpcServer.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -33,48 +35,61 @@ import io.seldon.apife.AppProperties; import io.seldon.apife.api.oauth.InMemoryClientDetailsService; import io.seldon.apife.deployments.DeploymentStore; +import io.seldon.apife.deployments.DeploymentsHandler; +import io.seldon.apife.deployments.DeploymentsListener; import io.seldon.apife.exception.SeldonAPIException; import io.seldon.protos.DeploymentProtos.DeploymentSpec; import io.seldon.protos.DeploymentProtos.Endpoint; import io.seldon.protos.DeploymentProtos.SeldonDeployment; @Component -public class SeldonGrpcServer { +public class SeldonGrpcServer { protected static Logger logger = LoggerFactory.getLogger(SeldonGrpcServer.class.getName()); public static final int SERVER_PORT = 5000; private final int port; private final Server server; - ThreadLocal principalThreadLocal = new ThreadLocal(); - ConcurrentHashMap channelStore = new ConcurrentHashMap<>(); + private ThreadLocal principalThreadLocal = new ThreadLocal(); + private ConcurrentHashMap channelStore = new ConcurrentHashMap<>(); private final DeploymentStore deploymentStore; private final TokenStore tokenStore; private final AppProperties appProperties; - + private final grpcDeploymentsListener grpcDeploymentsListener; + private final DeploymentsHandler deploymentsHandler; + @Autowired - public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore) + public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler) { - this(appProperties,deploymentStore,tokenStore,SERVER_PORT); + this(appProperties,deploymentStore,tokenStore,deploymentsHandler,SERVER_PORT); } - public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,int port) + public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,int port) { - this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), port); + this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), deploymentsHandler, port); } - public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder serverBuilder, int port) + public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder serverBuilder,DeploymentsHandler deploymentsHandler, int port) { this.appProperties = appProperties; this.deploymentStore = deploymentStore; this.tokenStore = tokenStore; + this.grpcDeploymentsListener = new grpcDeploymentsListener(this); + this.deploymentsHandler = deploymentsHandler; + deploymentsHandler.addListener(this.grpcDeploymentsListener); this.port = port; server = serverBuilder .addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this))) .build(); } + + @PostConstruct + private void init() throws Exception{ + logger.info("Initializing..."); + deploymentsHandler.addListener(this.grpcDeploymentsListener); + } public void setPrincipal(String principal) { @@ -112,8 +127,7 @@ public ManagedChannel getChannel() { ManagedChannel channel = channelStore.get(principal); if (channel == null) { - channel = ManagedChannelBuilder.forAddress(deploymentSpec.getName(), appProperties.getEngineGrpcContainerPort()).usePlaintext(true).build(); - channelStore.putIfAbsent(principal,channel); + throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_GRPC_NO_GRPC_CHANNEL_FOUND,"Principal is "+principal); } return channel; } @@ -175,8 +189,18 @@ public static void main(String[] args) throws Exception { AppProperties appProperties = new AppProperties(); appProperties.setEngineGrpcContainerPort(5000); store.deploymentAdded(dep); - SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,SERVER_PORT); + SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,SERVER_PORT); server.start(); server.blockUntilShutdown(); } + + public void deploymentAdded(SeldonDeployment resource) { + ManagedChannel channel = ManagedChannelBuilder.forAddress(resource.getSpec().getName(), appProperties.getEngineGrpcContainerPort()).usePlaintext(true).build(); + channelStore.put(resource.getSpec().getOauthKey(),channel); + } + + public void deploymentRemoved(SeldonDeployment resource) { + channelStore.remove(resource.getSpec().getOauthKey()); + } + } diff --git a/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonService.java b/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonService.java index bc5ffe3903..aedc35d2d8 100644 --- a/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonService.java +++ b/api-frontend/src/main/java/io/seldon/apife/grpc/SeldonService.java @@ -41,7 +41,6 @@ public SeldonService(SeldonGrpcServer server) { @Override public void predict(io.seldon.protos.PredictionProtos.SeldonMessage request, io.grpc.stub.StreamObserver responseObserver) { - logger.debug("Received predict request for "+server.principalThreadLocal.get()); try { ManagedChannel channel = server.getChannel(); @@ -58,7 +57,6 @@ public void predict(io.seldon.protos.PredictionProtos.SeldonMessage request, @Override public void sendFeedback(io.seldon.protos.PredictionProtos.Feedback request, io.grpc.stub.StreamObserver responseObserver) { - logger.debug("Received feedback request for "+server.principalThreadLocal.get()); try { ManagedChannel channel = server.getChannel(); diff --git a/api-frontend/src/main/java/io/seldon/apife/grpc/grpcDeploymentsListener.java b/api-frontend/src/main/java/io/seldon/apife/grpc/grpcDeploymentsListener.java new file mode 100644 index 0000000000..93ce156310 --- /dev/null +++ b/api-frontend/src/main/java/io/seldon/apife/grpc/grpcDeploymentsListener.java @@ -0,0 +1,29 @@ +package io.seldon.apife.grpc; + +import io.seldon.apife.deployments.DeploymentsListener; +import io.seldon.protos.DeploymentProtos.SeldonDeployment; + +public class grpcDeploymentsListener implements DeploymentsListener { + + private final SeldonGrpcServer server; + + public grpcDeploymentsListener(SeldonGrpcServer server) { + super(); + this.server = server; + } + + @Override + public void deploymentAdded(SeldonDeployment resource) { + server.deploymentAdded(resource); + } + + @Override + public void deploymentUpdated(SeldonDeployment resource) { + // Do nothing + } + + @Override + public void deploymentRemoved(SeldonDeployment resource) { + server.deploymentRemoved(resource); + } +}