Skip to content

Commit

Permalink
ensure grpc channels updated on deployment add/remove
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox committed Jan 8, 2018
1 parent fe2bc97 commit bc170b5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public enum ApiExceptionType {
APIFE_MICROSERVICE_ERROR(103,"Microservice error",500),
APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500),
APIFE_INVALID_RESPONSE_JSON(105,"Invalid Response JSON",400),
APIFE_GRPC_NO_PRINCIPAL_FOUND(105,"No OAuth principal found",400);
APIFE_GRPC_NO_PRINCIPAL_FOUND(105,"No OAuth principal found",400),
APIFE_GRPC_NO_GRPC_CHANNEL_FOUND(106,"No Managed Channel found",400);

int id;
String message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -33,48 +35,61 @@
import io.seldon.apife.AppProperties;
import io.seldon.apife.api.oauth.InMemoryClientDetailsService;
import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.apife.deployments.DeploymentsHandler;
import io.seldon.apife.deployments.DeploymentsListener;
import io.seldon.apife.exception.SeldonAPIException;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.Endpoint;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

@Component
public class SeldonGrpcServer {
public class SeldonGrpcServer {
protected static Logger logger = LoggerFactory.getLogger(SeldonGrpcServer.class.getName());

public static final int SERVER_PORT = 5000;

private final int port;
private final Server server;
ThreadLocal<String> principalThreadLocal = new ThreadLocal<String>();
ConcurrentHashMap<String,ManagedChannel> channelStore = new ConcurrentHashMap<>();
private ThreadLocal<String> principalThreadLocal = new ThreadLocal<String>();
private ConcurrentHashMap<String,ManagedChannel> channelStore = new ConcurrentHashMap<>();

private final DeploymentStore deploymentStore;
private final TokenStore tokenStore;
private final AppProperties appProperties;

private final grpcDeploymentsListener grpcDeploymentsListener;
private final DeploymentsHandler deploymentsHandler;

@Autowired
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler)
{
this(appProperties,deploymentStore,tokenStore,SERVER_PORT);
this(appProperties,deploymentStore,tokenStore,deploymentsHandler,SERVER_PORT);
}

public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,int port)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,int port)
{
this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), port);
this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), deploymentsHandler, port);
}


public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder, int port)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder,DeploymentsHandler deploymentsHandler, int port)
{
this.appProperties = appProperties;
this.deploymentStore = deploymentStore;
this.tokenStore = tokenStore;
this.grpcDeploymentsListener = new grpcDeploymentsListener(this);
this.deploymentsHandler = deploymentsHandler;
deploymentsHandler.addListener(this.grpcDeploymentsListener);
this.port = port;
server = serverBuilder
.addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this)))
.build();
}

@PostConstruct
private void init() throws Exception{
logger.info("Initializing...");
deploymentsHandler.addListener(this.grpcDeploymentsListener);
}

public void setPrincipal(String principal)
{
Expand Down Expand Up @@ -112,8 +127,7 @@ public ManagedChannel getChannel() {
ManagedChannel channel = channelStore.get(principal);
if (channel == null)
{
channel = ManagedChannelBuilder.forAddress(deploymentSpec.getName(), appProperties.getEngineGrpcContainerPort()).usePlaintext(true).build();
channelStore.putIfAbsent(principal,channel);
throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_GRPC_NO_GRPC_CHANNEL_FOUND,"Principal is "+principal);
}
return channel;
}
Expand Down Expand Up @@ -175,8 +189,18 @@ public static void main(String[] args) throws Exception {
AppProperties appProperties = new AppProperties();
appProperties.setEngineGrpcContainerPort(5000);
store.deploymentAdded(dep);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,SERVER_PORT);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,SERVER_PORT);
server.start();
server.blockUntilShutdown();
}

public void deploymentAdded(SeldonDeployment resource) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(resource.getSpec().getName(), appProperties.getEngineGrpcContainerPort()).usePlaintext(true).build();
channelStore.put(resource.getSpec().getOauthKey(),channel);
}

public void deploymentRemoved(SeldonDeployment resource) {
channelStore.remove(resource.getSpec().getOauthKey());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public SeldonService(SeldonGrpcServer server) {
@Override
public void predict(io.seldon.protos.PredictionProtos.SeldonMessage request,
io.grpc.stub.StreamObserver<io.seldon.protos.PredictionProtos.SeldonMessage> responseObserver) {
logger.debug("Received predict request for "+server.principalThreadLocal.get());
try
{
ManagedChannel channel = server.getChannel();
Expand All @@ -58,7 +57,6 @@ public void predict(io.seldon.protos.PredictionProtos.SeldonMessage request,
@Override
public void sendFeedback(io.seldon.protos.PredictionProtos.Feedback request,
io.grpc.stub.StreamObserver<io.seldon.protos.PredictionProtos.SeldonMessage> responseObserver) {
logger.debug("Received feedback request for "+server.principalThreadLocal.get());
try
{
ManagedChannel channel = server.getChannel();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.seldon.apife.grpc;

import io.seldon.apife.deployments.DeploymentsListener;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

public class grpcDeploymentsListener implements DeploymentsListener {

private final SeldonGrpcServer server;

public grpcDeploymentsListener(SeldonGrpcServer server) {
super();
this.server = server;
}

@Override
public void deploymentAdded(SeldonDeployment resource) {
server.deploymentAdded(resource);
}

@Override
public void deploymentUpdated(SeldonDeployment resource) {
// Do nothing
}

@Override
public void deploymentRemoved(SeldonDeployment resource) {
server.deploymentRemoved(resource);
}
}

0 comments on commit bc170b5

Please sign in to comment.