Skip to content

Commit

Permalink
Merge pull request #151 from radanalyticsio/namespace-fix
Browse files Browse the repository at this point in the history
Fix #149: operator should deploy resources to the same namespace it is watching
  • Loading branch information
Jirka Kremser authored Nov 21, 2018
2 parents f835a25 + d53bfcc commit ec04894
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
8 changes: 4 additions & 4 deletions src/main/java/io/radanalytics/operator/app/AppOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ protected void onInit() {
@Override
protected void onAdd(SparkApplication app) {
KubernetesResourceList list = deployer.getResourceList(app, namespace);
client.resourceList(list).createOrReplace();
client.resourceList(list).inNamespace(namespace).createOrReplace();
}

@Override
protected void onDelete(SparkApplication app) {
String name = app.getName();
client.services().withLabels(deployer.getLabelsForDeletion(name)).delete();
client.replicationControllers().withLabels(deployer.getLabelsForDeletion(name)).delete();
client.pods().withLabels(deployer.getLabelsForDeletion(name)).delete();
client.services().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
client.replicationControllers().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
client.pods().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ public class KubernetesSparkClusterDeployer {
private KubernetesClient client;
private String entityName;
private String prefix;
private String namespace;

KubernetesSparkClusterDeployer(KubernetesClient client, String entityName, String prefix) {
KubernetesSparkClusterDeployer(KubernetesClient client, String entityName, String prefix, String namespace) {
this.client = client;
this.entityName = entityName;
this.prefix = prefix;
this.namespace = namespace;
}

public KubernetesResourceList getResourceList(SparkCluster cluster) {
Expand Down Expand Up @@ -269,7 +271,7 @@ private ReplicationController addInitContainers(ReplicationController rc,
}

private boolean cmExists(String name) {
ConfigMap configMap = client.configMaps().withName(name).get();
ConfigMap configMap = client.configMaps().inNamespace(namespace).withName(name).get();
return configMap != null && configMap.getData() != null && !configMap.getData().isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ public SparkClusterOperator() {

protected void onAdd(SparkCluster cluster) {
KubernetesResourceList list = getDeployer().getResourceList(cluster);
client.resourceList(list).createOrReplace();
client.resourceList(list).inNamespace(namespace).createOrReplace();
clusters.put(cluster);
}

protected void onDelete(SparkCluster cluster) {
String name = cluster.getName();
client.services().withLabels(getDeployer().getDefaultLabels(name)).delete();
client.replicationControllers().withLabels(getDeployer().getDefaultLabels(name)).delete();
client.pods().withLabels(getDeployer().getDefaultLabels(name)).delete();
client.services().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
client.replicationControllers().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
client.pods().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete();
clusters.delete(name);
}

Expand All @@ -64,11 +64,11 @@ protected void onModify(SparkCluster newCluster) {
if (isOnlyScale(existingCluster, newCluster)) {
log.info("{}scaling{} from {}{}{} worker replicas to {}{}{}", re(), xx(), ye(),
existingCluster.getWorker().getInstances(), xx(), ye(), newWorkers, xx());
client.replicationControllers().withName(name + "-w").scale(newWorkers);
client.replicationControllers().inNamespace(namespace).withName(name + "-w").scale(newWorkers);
} else {
log.info("{}recreating{} cluster {}{}{}", re(), xx(), ye(), existingCluster.getName(), xx());
KubernetesResourceList list = getDeployer().getResourceList(newCluster);
client.resourceList(list).createOrReplace();
client.resourceList(list).inNamespace(namespace).createOrReplace();
clusters.put(newCluster);
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ private Map<String, Integer> getActual() {

public KubernetesSparkClusterDeployer getDeployer() {
if (this.deployer == null) {
this.deployer = new KubernetesSparkClusterDeployer(client, entityName, prefix);
this.deployer = new KubernetesSparkClusterDeployer(client, entityName, prefix, namespace);
}
return deployer;
}
Expand Down

0 comments on commit ec04894

Please sign in to comment.