Skip to content

Commit

Permalink
update apife to work cluster wide
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox committed Dec 7, 2018
1 parent 5489337 commit 51ba6f7
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 55 deletions.
10 changes: 9 additions & 1 deletion api-frontend/src/main/java/io/seldon/apife/AppProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class AppProperties {
private int engineContainerPort;
private int engineGrpcContainerPort;
private String namespace;
private boolean singleNamespace = true;

public int getEngineContainerPort() {
return engineContainerPort;
Expand All @@ -43,7 +44,14 @@ public String getNamespace() {
public void setNamespace(String namespace) {
this.namespace = namespace;
}
@Override

public boolean isSingleNamespace() {
return singleNamespace;
}
public void setSingleNamespace(boolean singleNamespace) {
this.singleNamespace = singleNamespace;
}
@Override
public String toString() {
return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.seldon.apife.AppProperties;
import io.seldon.apife.api.oauth.InMemoryClientDetailsService;
import io.seldon.apife.k8s.KubernetesUtil;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

Expand All @@ -34,17 +36,19 @@ public class DeploymentStore implements DeploymentsListener {
protected static Logger logger = LoggerFactory.getLogger(DeploymentStore.class.getName());

//Oauth key to deployment def
private ConcurrentMap<String, DeploymentSpec> deploymentStore = new ConcurrentHashMap<>();
private ConcurrentMap<String, SeldonDeployment> deploymentStore = new ConcurrentHashMap<>();

private final DeploymentsHandler deploymentsHandler;

private InMemoryClientDetailsService clientDetailsService;
private final InMemoryClientDetailsService clientDetailsService;
private final AppProperties appProperties;
private final KubernetesUtil k8sUtil = new KubernetesUtil();

@Autowired
public DeploymentStore(DeploymentsHandler deploymentsHandler,InMemoryClientDetailsService clientDetailsService)
public DeploymentStore(DeploymentsHandler deploymentsHandler,InMemoryClientDetailsService clientDetailsService,AppProperties appProperties)
{
this.deploymentsHandler = deploymentsHandler;
this.clientDetailsService = clientDetailsService;
this.appProperties = appProperties;
}

@PostConstruct
Expand All @@ -53,18 +57,26 @@ private void init() throws Exception{
deploymentsHandler.addListener(this);
}

public DeploymentSpec getDeployment(String clientId)
{
return deploymentStore.get(clientId);
}


public SeldonDeployment getDeployment(String clientId)
{
return deploymentStore.get(clientId);
}

@Override
public void deploymentAdded(SeldonDeployment mlDep) {
final DeploymentSpec deploymentDef = mlDep.getSpec();
final String namespace = k8sUtil.getNamespace(mlDep);

if (appProperties.isSingleNamespace())
{
deploymentStore.put(deploymentDef.getOauthKey(), mlDep);
clientDetailsService.addClient(deploymentDef.getOauthKey(), deploymentDef.getOauthSecret());
}

deploymentStore.put(deploymentDef.getOauthKey(), deploymentDef);
clientDetailsService.addClient(deploymentDef.getOauthKey(), deploymentDef.getOauthSecret());
// Always add namespaced key
final String namespacedKey = deploymentDef.getOauthKey() + namespace;
deploymentStore.put(namespacedKey, mlDep);
clientDetailsService.addClient(namespacedKey, deploymentDef.getOauthSecret());

logger.info("Succesfully added or updated deployment "+deploymentDef.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@
import io.grpc.ServerInterceptors;
import io.grpc.netty.NettyServerBuilder;
import io.seldon.apife.AppProperties;
import io.seldon.apife.api.oauth.InMemoryClientDetailsService;
import io.seldon.apife.config.AnnotationsConfig;
import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.apife.deployments.DeploymentsHandler;
import io.seldon.apife.exception.SeldonAPIException;
import io.seldon.apife.k8s.DeploymentWatcher;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

@Component
Expand Down Expand Up @@ -154,8 +151,8 @@ public ManagedChannel getChannel() {
throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_GRPC_NO_PRINCIPAL_FOUND,"");
}

final DeploymentSpec deploymentSpec = deploymentStore.getDeployment(principal);
if (deploymentSpec == null)
final SeldonDeployment mlDep = deploymentStore.getDeployment(principal);
if (mlDep == null)
{
throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_NO_RUNNING_DEPLOYMENT,"Principal is "+principal);
}
Expand Down Expand Up @@ -209,27 +206,6 @@ private void blockUntilShutdown() throws InterruptedException {
}
}

/**
* Main method for basic testing.
*/
public static void main(String[] args) throws Exception {
DeploymentStore store = new DeploymentStore(null,new InMemoryClientDetailsService());
SeldonDeployment dep = SeldonDeployment.newBuilder()
.setApiVersion(DeploymentWatcher.VERSION)
.setKind("SeldonDeplyment")
.setSpec(DeploymentSpec.newBuilder()
.setName("0.0.0.0")
.setOauthKey("key")
.setOauthSecret("secret")
).build();
AppProperties appProperties = new AppProperties();
appProperties.setEngineGrpcContainerPort(5000);
store.deploymentAdded(dep);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,null,SERVER_PORT);
server.start();
server.blockUntilShutdown();
}

public void deploymentAdded(SeldonDeployment resource) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(resource.getSpec().getName(), appProperties.getEngineGrpcContainerPort()).usePlaintext(true).build();
channelStore.put(resource.getSpec().getOauthKey(),channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
import io.kubernetes.client.apis.CustomObjectsApi;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import io.seldon.apife.AppProperties;
import io.seldon.apife.deployments.DeploymentsHandler;
import io.seldon.apife.deployments.DeploymentsListener;
import io.seldon.apife.pb.ProtoBufUtils;
import io.seldon.apife.AppProperties;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

@Component
Expand All @@ -67,14 +67,16 @@ public class DeploymentWatcher implements DeploymentsHandler{
private int resourceVersionProcessed = 0;
private final Set<DeploymentsListener> listeners;
private final String namespace;
private final boolean clusterWide;

@Autowired
public DeploymentWatcher(AppProperties clusterManagerProperites) throws IOException
public DeploymentWatcher(AppProperties appProperties) throws IOException
{
this.client = Config.defaultClient();
this.listeners = new HashSet<>();
Configuration.setDefaultApiClient(client);
this.namespace = StringUtils.isEmpty(clusterManagerProperites.getNamespace()) ? "default" : clusterManagerProperites.getNamespace();
this.namespace = StringUtils.isEmpty(appProperties.getNamespace()) ? "default" : appProperties.getNamespace();
this.clusterWide = !appProperties.isSingleNamespace();
}

private void processWatch(SeldonDeployment mlDep,String action)
Expand Down Expand Up @@ -127,11 +129,21 @@ public int watchSeldonSeldonDeployments(int resourceVersion,int resourceVersionP
rs = ""+resourceVersion;
logger.info("Watching with rs "+rs);
CustomObjectsApi api = new CustomObjectsApi();
Watch<Object> watch = Watch.createWatch(
Watch<Object> watch;
if (this.clusterWide)
{
watch = Watch.createWatch(
client,
api.listClusterCustomObjectCall(GROUP, VERSION, KIND_PLURAL, null, null, rs, true, null, null),
new TypeToken<Watch.Response<Object>>(){}.getType());
}
else
{
watch = Watch.createWatch(
client,
api.listNamespacedCustomObjectCall(GROUP, VERSION, namespace, KIND_PLURAL, null, null, rs, true, null, null),
new TypeToken<Watch.Response<Object>>(){}.getType());

}
int maxResourceVersion = resourceVersion;
try{
for (Watch.Response<Object> item : watch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.micrometer.spring.web.servlet.DefaultWebMvcTagsProvider;
import io.micrometer.spring.web.servlet.WebMvcTags;
import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

@Component
public class AuthorizedWebMvcTagsProvider extends DefaultWebMvcTagsProvider {
Expand Down Expand Up @@ -62,10 +63,11 @@ public Tag principal(String principalName) {

public Tag deploymentName(String principalName)
{
if (principalName == null || !StringUtils.hasText(deploymentStore.getDeployment(principalName).getName()))
SeldonDeployment mlDep = deploymentStore.getDeployment(principalName);
if (principalName == null || mlDep == null || !StringUtils.hasText(mlDep.getSpec().getName()))
return Tag.of(DEPLOYMENT_NAME_METRIC, "None");
else
return Tag.of(DEPLOYMENT_NAME_METRIC,deploymentStore.getDeployment(principalName).getName());
return Tag.of(DEPLOYMENT_NAME_METRIC,mlDep.getSpec().getName());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.apife.exception.SeldonAPIException;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.apife.k8s.KubernetesUtil;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;


@Service
Expand All @@ -36,21 +37,28 @@ public class PredictionService {
@Autowired
InternalPredictionService internalPredictionService;

private final KubernetesUtil k8sUtil = new KubernetesUtil();

public String predict(String request,String clientId) {

DeploymentSpec deployment = deploymentStore.getDeployment(clientId);
SeldonDeployment deployment = deploymentStore.getDeployment(clientId);
if (deployment != null)
return internalPredictionService.getPrediction(request, deployment.getName());
{
final String endpoint = deployment.getSpec().getName() + "." + k8sUtil.getNamespace(deployment);
return internalPredictionService.getPrediction(request, endpoint);
}
else
throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_NO_RUNNING_DEPLOYMENT,"no deployment with id "+clientId);

}

public void sendFeedback(String feedback, String deploymentId){
DeploymentSpec deployment = deploymentStore.getDeployment(deploymentId);
SeldonDeployment deployment = deploymentStore.getDeployment(deploymentId);
if (deployment != null)
internalPredictionService.sendFeedback(feedback, deployment.getName());
{
final String endpoint = deployment.getSpec().getName() + "." + k8sUtil.getNamespace(deployment);
internalPredictionService.sendFeedback(feedback, endpoint);
}
else
throw new SeldonAPIException(SeldonAPIException.ApiExceptionType.APIFE_NO_RUNNING_DEPLOYMENT,"no deployment with id "+deploymentId);

Expand Down
1 change: 1 addition & 0 deletions api-frontend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ log4j.logger.org.springframework.security=DEBUG
io.seldon.apife.engine-container-port=8000
io.seldon.apife.engine-grpc-container-port=5001
io.seldon.apife.namespace=${SELDON_CLUSTER_MANAGER_POD_NAMESPACE}
io.seldon.apife.single-namespace=${SELDON_SINGLE_NAMESPACE}


management.metrics.web.server.requests-metric-name=seldon.api.ingress.server.requests
Expand Down
2 changes: 2 additions & 0 deletions helm-charts/seldon-core/templates/apife-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ spec:
value: kafka:9092
- name: SELDON_CLUSTER_MANAGER_REDIS_HOST
value: {{ .Release.Name }}-redis
- name: SELDON_SINGLE_NAMESPACE
value: '{{ .Values.single_namespace }}'
- name: SELDON_CLUSTER_MANAGER_POD_NAMESPACE
valueFrom:
fieldRef:
Expand Down
8 changes: 4 additions & 4 deletions notebooks/seldon_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ def create_random_data(data_size,rows=1):
arr = np.random.rand(rows*data_size)
return (shape,arr)

def get_token(oauth_key,oauth_secret,endpoint):
def get_token(oauth_key,oauth_secret,namespace,endpoint):
payload = {'grant_type': 'client_credentials'}
response = requests.post(
"http://"+endpoint+"/oauth/token",
auth=HTTPBasicAuth(oauth_key, oauth_secret),
auth=HTTPBasicAuth(oauth_key+namespace, oauth_secret),
data=payload)
print(response.text)
token = response.json()["access_token"]
return token

def rest_request_api_gateway(oauth_key,oauth_secret,endpoint="localhost:8002",data_size=5,rows=1,data=None):
token = get_token(oauth_key,oauth_secret,endpoint)
def rest_request_api_gateway(oauth_key,oauth_secret,namespace,endpoint="localhost:8002",data_size=5,rows=1,data=None):
token = get_token(oauth_key,oauth_secret,namespace,endpoint)
if data is None:
shape, arr = create_random_data(data_size,rows)
else:
Expand Down

0 comments on commit 51ba6f7

Please sign in to comment.