Skip to content

Commit

Permalink
Merge pull request #123 from radanalyticsio/test-restarts
Browse files Browse the repository at this point in the history
Tests that restart the pod w/ operator
  • Loading branch information
Jirka Kremser authored Nov 21, 2018
2 parents ec04894 + 73a98e5 commit a6dcf12
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 7 deletions.
23 changes: 23 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@ install: true
stages:
- test
- test-oc-and-k8s
- test-restarts
- deploy

jobs:
include:
- stage: test
name: "Maven & cont. image build"
language: java
script: make build-travis test

- stage: test-oc-and-k8s
name: "Specs [oc • CMs]"
env: BIN=oc VERSION=v3.9.0 CRD=0
script: &oc-script-defaults
- make build-travis
- ./.travis/.travis.prepare.openshift.sh
- ./.travis/.travis.test-oc-and-k8s.sh

- stage:
name: "Specs [oc • CRs]"
env: BIN=oc VERSION=v3.9.0 CRD=1
script: *oc-script-defaults

Expand All @@ -33,17 +37,36 @@ jobs:
# script: *oc-script-defaults

- stage:
name: "Specs [K8s • CMs]"
env: BIN=kubectl VERSION=v1.9.0 CRD=0 MINIKUBE_VERSION=v0.25.2
script: &kc-script-defaults
- make build-travis
- ./.travis/.travis.prepare.minikube.sh
- ./.travis/.travis.test-oc-and-k8s.sh

- stage:
name: "Specs [K8s • CRs]"
env: BIN=kubectl VERSION=v1.9.0 CRD=1 MINIKUBE_VERSION=v0.25.2
script: *kc-script-defaults

- stage:
name: "Restarts [oc • CMs]"
env: BIN=oc VERSION=v3.9.0 CRD=0
script:
- make build-travis
- ./.travis/.travis.prepare.openshift.sh
- ./.travis/.travis.test-restarts.sh

- stage:
name: "Restarts [K8s • CRs]"
env: BIN=kubectl VERSION=v1.9.0 CRD=1 MINIKUBE_VERSION=v0.25.2
script:
- make build-travis
- ./.travis/.travis.prepare.minikube.sh
- ./.travis/.travis.test-restarts.sh

- stage: deploy
name: "Push container images"
script:
# release x.y.z or x.y.z-centos if there is a release
# or release the latest image if building the master branch
Expand Down
170 changes: 170 additions & 0 deletions .travis/.travis.test-restarts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/bin/bash

DIR="${DIR:-$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )}"
BIN=${BIN:-oc}
if [ "$CRD" = "1" ]; then
CR="cr/"
KIND="sparkcluster"
else
CR=""
KIND="cm"
fi

cluster_up() {
echo -e "\n$(tput setaf 3)docker images:$(tput sgr0)\n"
docker images
echo
if [ "$BIN" = "oc" ]; then
set -x
oc cluster up
[ "$CRD" = "1" ] && oc login -u system:admin
set +x
else
echo "minikube"
start_minikube
fi
}

start_minikube() {
export CHANGE_MINIKUBE_NONE_USER=true
sudo minikube start --vm-driver=none --kubernetes-version=${VERSION} && \
minikube update-context
os::cmd::try_until_text "${BIN} get nodes" '\sReady'

kubectl cluster-info


# kube-addon-manager is responsible for managing other k8s components, such as kube-dns, dashboard, storage-provisioner..
os::cmd::try_until_text "${BIN} -n kube-system get pod -lcomponent=kube-addon-manager -o yaml" 'ready: true'

# Wait for kube-dns to be ready.
os::cmd::try_until_text "${BIN} -n kube-system get pod -lk8s-app=kube-dns -o yaml" 'ready: true'
}

tear_down() {
docker kill `docker ps -q` || true
}

setup_testing_framework() {
source "$(dirname "${BASH_SOURCE}")/../test/lib/init.sh"
os::util::environment::setup_time_vars
}

logs() {
echo -e "\n$(tput setaf 3)oc get all:$(tput sgr0)\n"
${BIN} get all
echo -e "\n$(tput setaf 3)Logs:$(tput sgr0)\n"
${BIN} logs $operator_pod
echo
}

errorLogs() {
echo -e "\n\n$(tput setaf 1)\n 😱 😱 😱\nBUILD FAILED\n\n😱 bad things have happened 😱$(tput sgr0)"
logs
exit 1
}

info() {
((testIndex++))
echo "$(tput setaf 3)[$testIndex / $total] - Running ${FUNCNAME[1]}$(tput sgr0)"
}

testCreateOperator() {
info
[ "$CRD" = "1" ] && FOO="-crd" || FOO=""
os::cmd::expect_success_and_text "${BIN} create -f $DIR/../manifest/operator$FOO.yaml" '"?spark-operator"? created' && \
os::cmd::try_until_text "${BIN} get pod -l app.kubernetes.io/name=spark-operator -o yaml" 'ready: true'
if [ "$CRD" = "1" ]; then
os::cmd::try_until_text "${BIN} get crd" 'sparkclusters.radanalytics.io'
fi
sleep 10
export operator_pod=`${BIN} get pod -l app.kubernetes.io/name=spark-operator -o='jsonpath="{.items[0].metadata.name}"' | sed 's/"//g'`
}

testCreateCluster() {
info
[ "$CRD" = "1" ] && FOO="-cr" || FOO=""
os::cmd::expect_success_and_text "${BIN} create -f $DIR/../examples/cluster$FOO.yaml" '"?my-spark-cluster"? created' && \
os::cmd::try_until_text "${BIN} get pod -l radanalytics.io/deployment=my-spark-cluster-w -o yaml" 'ready: true' && \
os::cmd::try_until_text "${BIN} get pod -l radanalytics.io/deployment=my-spark-cluster-m -o yaml" 'ready: true'
}

testKillOperator() {
info
os::cmd::expect_success_and_text "${BIN} delete pod $operator_pod" 'pod "?'$operator_pod'"? deleted' && \
sleep 10
export operator_pod=`${BIN} get pod -l app.kubernetes.io/name=spark-operator -o='jsonpath="{.items[0].metadata.name}"' | sed 's/"//g'`
}

testScaleCluster() {
info
if [ "$CRD" = "1" ]; then
os::cmd::expect_success_and_text '${BIN} patch sparkcluster my-spark-cluster -p "{\"spec\":{\"worker\": {\"instances\": 1}}}" --type=merge' '"?my-spark-cluster"? patched' || errorLogs
else
os::cmd::expect_success_and_text '${BIN} patch cm my-spark-cluster -p "{\"data\":{\"config\": \"worker:\n instances: 1\"}}"' '"?my-spark-cluster"? patched' || errorLogs
fi
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkcluster=my-spark-cluster | wc -l" '2'
}

testDeleteCluster() {
info
os::cmd::expect_success_and_text '${BIN} delete ${KIND} my-spark-cluster' '"?my-spark-cluster"? deleted' && \
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkcluster=my-spark-cluster 2> /dev/null | wc -l" '0'
}

testApp() {
info
[ "$CRD" = "1" ] && FOO="test/cr/" || FOO=""
os::cmd::expect_success_and_text '${BIN} create -f examples/${FOO}app.yaml' '"?my-spark-app"? created' && \
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkapplication=my-spark-app 2> /dev/null | wc -l" '3'
}

testAppResult() {
info
sleep 2
local driver_pod=`${BIN} get pods --no-headers -l radanalytics.io/sparkapplication=my-spark-app -l spark-role=driver -o='jsonpath="{.items[0].metadata.name}"' | sed 's/"//g'` && \
os::cmd::try_until_text "${BIN} logs $driver_pod" 'Pi is roughly 3.1'
}

testDeleteApp() {
info
[ "$CRD" = "1" ] && FOO="sparkapplication" || FOO="cm"
os::cmd::expect_success_and_text '${BIN} delete ${FOO} my-spark-app' '"my-spark-app" deleted' && \
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkapplication=my-spark-app 2> /dev/null | wc -l" '0'
}

run_tests() {
testKillOperator || errorLogs
testCreateCluster || errorLogs
testKillOperator || errorLogs
testScaleCluster || errorLogs
testKillOperator || errorLogs
testDeleteCluster || errorLogs
testKillOperator || errorLogs

sleep 10
testApp || errorLogs
testKillOperator || errorLogs
testAppResult || errorLogs
logs
}

main() {
export total=17
export testIndex=0
tear_down
setup_testing_framework
os::test::junit::declare_suite_start "operator/tests-restarts"
cluster_up
testCreateOperator || { ${BIN} get events; ${BIN} get pods; exit 1; }
if [ "$#" -gt 0 ]; then
# run single test that is passed as arg
$1
else
run_tests
fi
os::test::junit::declare_suite_end
tear_down
}

main $@
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class RunningClusters {
.labelNames("cluster")
.register();

public static final Counter startedTotal = Counter.build()
public static final Gauge startedTotal = Gauge.build()
.name("operator_started_total")
.help("Spark clusters has been started by operator.")
.register();
Expand All @@ -43,13 +43,21 @@ public void put(SparkCluster ci) {
}

public void delete(String name) {
runningClusters.dec();
workers.labels(name).set(0);
clusters.remove(name);
if (clusters.containsKey(name)) {
runningClusters.dec();
workers.labels(name).set(0);
clusters.remove(name);
}
}

public SparkCluster getCluster(String name) {
return this.clusters.get(name);
}

public void resetMetrics() {
startedTotal.set(0);
workers.clear();
startedTotal.set(0);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.radanalytics.operator.cluster;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Functions;
import com.google.common.collect.Sets;
import io.fabric8.kubernetes.api.model.DoneableReplicationController;
Expand All @@ -18,7 +19,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static io.radanalytics.operator.common.AnsiColors.*;
Expand Down Expand Up @@ -82,6 +85,7 @@ public void fullReconciliation() {
// 5. modify / scale

log.info("Running full reconciliation for namespace {} and kind {}..", namespace, entityName);
final AtomicBoolean change = new AtomicBoolean(false);
Set<SparkCluster> desiredSet = super.getDesiredSet();
Map<String, SparkCluster> desiredMap = desiredSet.stream().collect(Collectors.toMap(SparkCluster::getName, Functions.identity()));
Map<String, Integer> actual = getActual();
Expand All @@ -94,9 +98,11 @@ public void fullReconciliation() {

if (!toBeCreated.isEmpty()) {
log.info("toBeCreated: {}", toBeCreated);
change.set(true);
}
if (!toBeDeleted.isEmpty()) {
log.info("toBeDeleted: {}", toBeDeleted);
change.set(true);
}

// add new
Expand All @@ -118,12 +124,38 @@ public void fullReconciliation() {
int desiredWorkers = Optional.ofNullable(dCluster.getWorker()).orElse(new RCSpec()).getInstances();
Integer actualWorkers = actual.get(dCluster.getName());
if (actualWorkers != null && desiredWorkers != actualWorkers) {
// update the internal representation with the actual # of workers
Optional.ofNullable(clusters.getCluster(dCluster.getName()).getWorker())
.ifPresent(worker -> worker.setInstances(actualWorkers));
change.set(true);
// update the internal representation with the actual # of workers and call onModify
if (clusters.getCluster(dCluster.getName()) == null) {
// deep copy via json -> room for optimization
ObjectMapper om = new ObjectMapper();
try {
SparkCluster actualCluster = om.readValue(om.writeValueAsString(dCluster), SparkCluster.class);
Optional.ofNullable(actualCluster.getWorker()).ifPresent(w -> w.setInstances(actualWorkers));
clusters.put(actualCluster);
} catch (IOException e) {
log.warn(e.getMessage());
e.printStackTrace();
return;
}
} else {
Optional.ofNullable(clusters.getCluster(dCluster.getName())).map(SparkCluster::getWorker)
.ifPresent(worker -> worker.setInstances(actualWorkers));
}
log.info("scaling cluster {}", dCluster.getName());
onModify(dCluster);
}
});

// first reconciliation after (re)start -> update the clusters instance
if (!fullReconciliationRun) {
clusters.resetMetrics();
desiredMap.entrySet().forEach(e -> clusters.put(e.getValue()));
}

if (!change.get()) {
log.info("no change was detected during the reconciliation");
}
}

private Map<String, Integer> getActual() {
Expand Down

0 comments on commit a6dcf12

Please sign in to comment.