Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests that restart the pod w/ operator #123

Merged
merged 11 commits into from
Nov 21, 2018
23 changes: 23 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@ install: true
stages:
- test
- test-oc-and-k8s
- test-restarts
- deploy

jobs:
include:
- stage: test
name: "Maven & cont. image build"
language: java
script: make build-travis test

- stage: test-oc-and-k8s
name: "Specs [oc • CMs]"
env: BIN=oc VERSION=v3.9.0 CRD=0
script: &oc-script-defaults
- make build-travis
- ./.travis/.travis.prepare.openshift.sh
- ./.travis/.travis.test-oc-and-k8s.sh

- stage:
name: "Specs [oc • CRs]"
env: BIN=oc VERSION=v3.9.0 CRD=1
script: *oc-script-defaults

Expand All @@ -33,17 +37,36 @@ jobs:
# script: *oc-script-defaults

- stage:
name: "Specs [K8s • CMs]"
env: BIN=kubectl VERSION=v1.9.0 CRD=0 MINIKUBE_VERSION=v0.25.2
script: &kc-script-defaults
- make build-travis
- ./.travis/.travis.prepare.minikube.sh
- ./.travis/.travis.test-oc-and-k8s.sh

- stage:
name: "Specs [K8s • CRs]"
env: BIN=kubectl VERSION=v1.9.0 CRD=1 MINIKUBE_VERSION=v0.25.2
script: *kc-script-defaults

- stage:
name: "Restarts [oc • CMs]"
env: BIN=oc VERSION=v3.9.0 CRD=0
script:
- make build-travis
- ./.travis/.travis.prepare.openshift.sh
- ./.travis/.travis.test-restarts.sh

- stage:
name: "Restarts [K8s • CRs]"
env: BIN=kubectl VERSION=v1.9.0 CRD=1 MINIKUBE_VERSION=v0.25.2
script:
- make build-travis
- ./.travis/.travis.prepare.minikube.sh
- ./.travis/.travis.test-restarts.sh

- stage: deploy
name: "Push container images"
script:
# release x.y.z or x.y.z-centos if there is a release
# or release the latest image if building the master branch
Expand Down
170 changes: 170 additions & 0 deletions .travis/.travis.test-restarts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/bin/bash

DIR="${DIR:-$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )}"
BIN=${BIN:-oc}
if [ "$CRD" = "1" ]; then
CR="cr/"
KIND="sparkcluster"
else
CR=""
KIND="cm"
fi

cluster_up() {
echo -e "\n$(tput setaf 3)docker images:$(tput sgr0)\n"
docker images
echo
if [ "$BIN" = "oc" ]; then
set -x
oc cluster up
[ "$CRD" = "1" ] && oc login -u system:admin
set +x
else
echo "minikube"
start_minikube
fi
}

start_minikube() {
export CHANGE_MINIKUBE_NONE_USER=true
sudo minikube start --vm-driver=none --kubernetes-version=${VERSION} && \
minikube update-context
os::cmd::try_until_text "${BIN} get nodes" '\sReady'

kubectl cluster-info


# kube-addon-manager is responsible for managing other k8s components, such as kube-dns, dashboard, storage-provisioner..
os::cmd::try_until_text "${BIN} -n kube-system get pod -lcomponent=kube-addon-manager -o yaml" 'ready: true'

# Wait for kube-dns to be ready.
os::cmd::try_until_text "${BIN} -n kube-system get pod -lk8s-app=kube-dns -o yaml" 'ready: true'
}

tear_down() {
docker kill `docker ps -q` || true
}

setup_testing_framework() {
source "$(dirname "${BASH_SOURCE}")/../test/lib/init.sh"
os::util::environment::setup_time_vars
}

logs() {
echo -e "\n$(tput setaf 3)oc get all:$(tput sgr0)\n"
${BIN} get all
echo -e "\n$(tput setaf 3)Logs:$(tput sgr0)\n"
${BIN} logs $operator_pod
echo
}

errorLogs() {
echo -e "\n\n$(tput setaf 1)\n 😱 😱 😱\nBUILD FAILED\n\n😱 bad things have happened 😱$(tput sgr0)"
logs
exit 1
}

info() {
((testIndex++))
echo "$(tput setaf 3)[$testIndex / $total] - Running ${FUNCNAME[1]}$(tput sgr0)"
}

testCreateOperator() {
info
[ "$CRD" = "1" ] && FOO="-crd" || FOO=""
os::cmd::expect_success_and_text "${BIN} create -f $DIR/../manifest/operator$FOO.yaml" '"?spark-operator"? created' && \
os::cmd::try_until_text "${BIN} get pod -l app.kubernetes.io/name=spark-operator -o yaml" 'ready: true'
if [ "$CRD" = "1" ]; then
os::cmd::try_until_text "${BIN} get crd" 'sparkclusters.radanalytics.io'
fi
sleep 10
export operator_pod=`${BIN} get pod -l app.kubernetes.io/name=spark-operator -o='jsonpath="{.items[0].metadata.name}"' | sed 's/"//g'`
}

testCreateCluster() {
info
[ "$CRD" = "1" ] && FOO="-cr" || FOO=""
os::cmd::expect_success_and_text "${BIN} create -f $DIR/../examples/cluster$FOO.yaml" '"?my-spark-cluster"? created' && \
os::cmd::try_until_text "${BIN} get pod -l radanalytics.io/deployment=my-spark-cluster-w -o yaml" 'ready: true' && \
os::cmd::try_until_text "${BIN} get pod -l radanalytics.io/deployment=my-spark-cluster-m -o yaml" 'ready: true'
}

testKillOperator() {
info
os::cmd::expect_success_and_text "${BIN} delete pod $operator_pod" 'pod "?'$operator_pod'"? deleted' && \
sleep 10
export operator_pod=`${BIN} get pod -l app.kubernetes.io/name=spark-operator -o='jsonpath="{.items[0].metadata.name}"' | sed 's/"//g'`
}

testScaleCluster() {
info
if [ "$CRD" = "1" ]; then
os::cmd::expect_success_and_text '${BIN} patch sparkcluster my-spark-cluster -p "{\"spec\":{\"worker\": {\"instances\": 1}}}" --type=merge' '"?my-spark-cluster"? patched' || errorLogs
else
os::cmd::expect_success_and_text '${BIN} patch cm my-spark-cluster -p "{\"data\":{\"config\": \"worker:\n instances: 1\"}}"' '"?my-spark-cluster"? patched' || errorLogs
fi
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkcluster=my-spark-cluster | wc -l" '2'
}

testDeleteCluster() {
info
os::cmd::expect_success_and_text '${BIN} delete ${KIND} my-spark-cluster' '"?my-spark-cluster"? deleted' && \
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkcluster=my-spark-cluster 2> /dev/null | wc -l" '0'
}

testApp() {
info
[ "$CRD" = "1" ] && FOO="test/cr/" || FOO=""
os::cmd::expect_success_and_text '${BIN} create -f examples/${FOO}app.yaml' '"?my-spark-app"? created' && \
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkapplication=my-spark-app 2> /dev/null | wc -l" '3'
}

testAppResult() {
info
sleep 2
local driver_pod=`${BIN} get pods --no-headers -l radanalytics.io/sparkapplication=my-spark-app -l spark-role=driver -o='jsonpath="{.items[0].metadata.name}"' | sed 's/"//g'` && \
os::cmd::try_until_text "${BIN} logs $driver_pod" 'Pi is roughly 3.1'
}

testDeleteApp() {
info
[ "$CRD" = "1" ] && FOO="sparkapplication" || FOO="cm"
os::cmd::expect_success_and_text '${BIN} delete ${FOO} my-spark-app' '"my-spark-app" deleted' && \
os::cmd::try_until_text "${BIN} get pods --no-headers -l radanalytics.io/sparkapplication=my-spark-app 2> /dev/null | wc -l" '0'
}

run_tests() {
testKillOperator || errorLogs
testCreateCluster || errorLogs
testKillOperator || errorLogs
testScaleCluster || errorLogs
testKillOperator || errorLogs
testDeleteCluster || errorLogs
testKillOperator || errorLogs

sleep 10
testApp || errorLogs
testKillOperator || errorLogs
testAppResult || errorLogs
logs
}

main() {
export total=17
export testIndex=0
tear_down
setup_testing_framework
os::test::junit::declare_suite_start "operator/tests-restarts"
cluster_up
testCreateOperator || { ${BIN} get events; ${BIN} get pods; exit 1; }
if [ "$#" -gt 0 ]; then
# run single test that is passed as arg
$1
else
run_tests
fi
os::test::junit::declare_suite_end
tear_down
}

main $@
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class RunningClusters {
.labelNames("cluster")
.register();

public static final Counter startedTotal = Counter.build()
public static final Gauge startedTotal = Gauge.build()
.name("operator_started_total")
.help("Spark clusters has been started by operator.")
.register();
Expand All @@ -43,13 +43,21 @@ public void put(SparkCluster ci) {
}

public void delete(String name) {
runningClusters.dec();
workers.labels(name).set(0);
clusters.remove(name);
if (clusters.containsKey(name)) {
runningClusters.dec();
workers.labels(name).set(0);
clusters.remove(name);
}
}

public SparkCluster getCluster(String name) {
return this.clusters.get(name);
}

public void resetMetrics() {
startedTotal.set(0);
workers.clear();
startedTotal.set(0);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.radanalytics.operator.cluster;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Functions;
import com.google.common.collect.Sets;
import io.fabric8.kubernetes.api.model.DoneableReplicationController;
Expand All @@ -18,7 +19,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static io.radanalytics.operator.common.AnsiColors.*;
Expand Down Expand Up @@ -82,6 +85,7 @@ public void fullReconciliation() {
// 5. modify / scale

log.info("Running full reconciliation for namespace {} and kind {}..", namespace, entityName);
final AtomicBoolean change = new AtomicBoolean(false);
Set<SparkCluster> desiredSet = super.getDesiredSet();
Map<String, SparkCluster> desiredMap = desiredSet.stream().collect(Collectors.toMap(SparkCluster::getName, Functions.identity()));
Map<String, Integer> actual = getActual();
Expand All @@ -94,9 +98,11 @@ public void fullReconciliation() {

if (!toBeCreated.isEmpty()) {
log.info("toBeCreated: {}", toBeCreated);
change.set(true);
}
if (!toBeDeleted.isEmpty()) {
log.info("toBeDeleted: {}", toBeDeleted);
change.set(true);
}

// add new
Expand All @@ -118,12 +124,38 @@ public void fullReconciliation() {
int desiredWorkers = Optional.ofNullable(dCluster.getWorker()).orElse(new RCSpec()).getInstances();
Integer actualWorkers = actual.get(dCluster.getName());
if (actualWorkers != null && desiredWorkers != actualWorkers) {
// update the internal representation with the actual # of workers
Optional.ofNullable(clusters.getCluster(dCluster.getName()).getWorker())
.ifPresent(worker -> worker.setInstances(actualWorkers));
change.set(true);
// update the internal representation with the actual # of workers and call onModify
if (clusters.getCluster(dCluster.getName()) == null) {
// deep copy via json -> room for optimization
ObjectMapper om = new ObjectMapper();
try {
SparkCluster actualCluster = om.readValue(om.writeValueAsString(dCluster), SparkCluster.class);
Optional.ofNullable(actualCluster.getWorker()).ifPresent(w -> w.setInstances(actualWorkers));
clusters.put(actualCluster);
} catch (IOException e) {
log.warn(e.getMessage());
e.printStackTrace();
return;
}
} else {
Optional.ofNullable(clusters.getCluster(dCluster.getName())).map(SparkCluster::getWorker)
.ifPresent(worker -> worker.setInstances(actualWorkers));
}
log.info("scaling cluster {}", dCluster.getName());
onModify(dCluster);
}
});

// first reconciliation after (re)start -> update the clusters instance
if (!fullReconciliationRun) {
clusters.resetMetrics();
desiredMap.entrySet().forEach(e -> clusters.put(e.getValue()));
}

if (!change.get()) {
log.info("no change was detected during the reconciliation");
}
}

private Map<String, Integer> getActual() {
Expand Down