Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Galera service A/P #229

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions controllers/galera_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
Resources: []string{"pods"},
Verbs: []string{"create", "get", "list", "watch", "update", "patch", "delete"},
},
{
APIGroups: []string{""},
Resources: []string{"services"},
Verbs: []string{"get", "list", "update", "patch"},
},
}
rbacResult, err := common_rbac.ReconcileRbac(ctx, helper, instance, rbacRules)
if err != nil {
Expand Down Expand Up @@ -472,7 +477,7 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
headless := &corev1.Service{ObjectMeta: pkghl.ObjectMeta}
op, err := controllerutil.CreateOrPatch(ctx, r.Client, headless, func() error {
headless.Spec = pkghl.Spec
err := controllerutil.SetOwnerReference(instance, headless, r.Client.Scheme())
err := controllerutil.SetControllerReference(instance, headless, r.Client.Scheme())
if err != nil {
return err
}
Expand All @@ -488,8 +493,17 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
pkgsvc := mariadb.ServiceForAdoption(instance, "galera", adoption)
service := &corev1.Service{ObjectMeta: pkgsvc.ObjectMeta}
op, err = controllerutil.CreateOrPatch(ctx, r.Client, service, func() error {
// NOTE(dciabrin) We deploy Galera as an A/P service (i.e. no multi-master writes)
// by setting labels in the service's label selectors.
// This label is dynamically set based on the status of the Galera cluster,
// so in this CreateOrPatch block we must reuse whatever is present in
// the existing service CR in case we're patching it.
activePod, present := service.Spec.Selector[mariadb.ActivePodSelectorKey]
service.Spec = pkgsvc.Spec
err := controllerutil.SetOwnerReference(instance, service, r.Client.Scheme())
if present {
service.Spec.Selector[mariadb.ActivePodSelectorKey] = activePod
}
err := controllerutil.SetControllerReference(instance, service, r.Client.Scheme())
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mariadb/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ package mariadb
const (
// ServiceName -
ServiceName = "mariadb"

// ActivePodSelectorKey - Selector key used to configure A/P service behavior
ActivePodSelectorKey = "statefulset.kubernetes.io/pod-name"
)
23 changes: 22 additions & 1 deletion pkg/mariadb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,35 @@ func ServiceForAdoption(db metav1.Object, dbType string, adoption *databasev1bet
}

func internalService(db metav1.Object, dbType string) *corev1.Service {
selectors := LabelSelectors(db, dbType)
// NOTE(dciabrin) we currently deploy the Galera cluster as A/P,
// by configuring the service's label selector to create
// a single endpoint matching a single pod's name.
// This label is later updated by a script called by Galera any
// time the cluster's state changes.
// When the service CR is being created, it is configured to
// point to the first pod. This is fair enough as:
// 1. when there's no pod running, there's no service anyway
// 2. As soon as a galera node becomes available, the label will
// be reconfigured by the script if needed.
// 3. If the Galera cluster is already running, picking a random
// node out of the running pods will work because Galera is
// a multi-master service.
// 4. If the service CR gets deleted for whatever reason, and the
// cluster is still running, picking a random node out of the
// running pods will work because Galera is a multi-master
// service. This is true as long the first pod is not in a
// network partition without quorum.
// TODO improve that fallback pod selection
selectors[ActivePodSelectorKey] = db.GetName() + "-galera-0"
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: db.GetName(),
Namespace: db.GetNamespace(),
Labels: ServiceLabels(db),
},
Spec: corev1.ServiceSpec{
Selector: LabelSelectors(db, dbType),
Selector: selectors,
Ports: []corev1.ServicePort{
{Name: "database", Port: 3306, Protocol: corev1.ProtocolTCP},
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/mariadb/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func getGaleraVolumes(g *mariadbv1.Galera) []corev1.Volume {
Key: "detect_gcomm_and_start.sh",
Path: "detect_gcomm_and_start.sh",
},
{
Key: "mysql_wsrep_notify.sh",
Path: "mysql_wsrep_notify.sh",
},
},
},
},
Expand Down
222 changes: 222 additions & 0 deletions templates/galera/bin/mysql_wsrep_notify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#!/bin/bash

# NOTE(dciabrin) we might use downward API to populate those in the future
PODNAME=$HOSTNAME
SERVICE=${PODNAME/-galera-[0-9]*/}

# API server config
APISERVER=https://kubernetes.default.svc
SERVICEACCOUNT=/var/run/secrets/kubernetes.io/serviceaccount
NAMESPACE=$(cat ${SERVICEACCOUNT}/namespace)
TOKEN=$(cat ${SERVICEACCOUNT}/token)
CACERT=${SERVICEACCOUNT}/ca.crt

# Retry config
RETRIES=6
WAIT=1

##
## Utilities functions
##
function log() {
echo "$(date +%F_%H_%M_%S) `basename $0` $*"
}

function log_error() {
echo "$(date +%F_%H_%M_%S) `basename $0` ERROR: $*" >&2
}

function mysql_get_status {
local name=$1
mysql -nNE -uroot -p$DB_ROOT_PASSWORD -e "show status like '${name}';" | tail -1
if [ $? != 0 ]; then
log_error "could not get value of mysql variable '${name}' (rc=$?)"
return 1
fi
}

# Refresh environment variables with the latest WSREP state from mysql
function mysql_probe_state {
UUID=$(mysql_get_status wsrep_gcomm_uuid)
PARTITION=$(mysql_get_status wsrep_cluster_status)
INDEX=$(mysql_get_status wsrep_local_index)
SIZE=$(mysql_get_status wsrep_cluster_size)
}

# REST API call to the k8s API server
function api_server {
local request=$1
local service=$2
# NOTE: a PUT request to the API server is basically a conditional write,
# it only succeeds if no other write have been done on the CR in the mean time,
# (i.e. if the timestamp of the JSON that is being sent to the API server matches
# the timestamp of the service CR in the cluster)
if [ "$request" = "PUT" ]; then
request="$request -d @-"
fi
local output
output=$(curl -s --cacert ${CACERT} --header "Content-Type:application/json" --header "Authorization: Bearer ${TOKEN}" --request $request ${APISERVER}/api/v1/namespaces/${NAMESPACE}/services/${service})

local rc=$?
if [ $rc != 0 ]; then
log_error "call to API server failed for service ${service} (rc=$rc)"
return 1
fi
if echo "${output}" | grep -q '"status": "Failure"'; then
message=$(echo "${output}" | parse_output '["message"]')
code=$(echo "${output}" | parse_output '["code"]')
log_error "API server returned an error for service ${SERVICE}: ${message} (code=${code})"
return 1
fi
echo "${output}"
return 0
}

# Update the service's active endpoint
# (parse JSON with python3 as we don't have jq in the container image)
function service_endpoint {
local endpoint=$1
if [ -n "${endpoint}" ]; then
python3 -c 'import json,sys;s=json.load(sys.stdin);s["spec"]["selector"]["statefulset.kubernetes.io/pod-name"]="'${endpoint}'";print(json.dumps(s,indent=2))'
else
python3 -c 'import json,sys;s=json.load(sys.stdin);s["spec"]["selector"].pop("statefulset.kubernetes.io/pod-name", None);print(json.dumps(s,indent=2))'
fi
[ $? == 0 ] || log_error "Could not parse json endpoint (rc=$?)"
}

# retrieve data from a JSON structure
# (parse JSON with python3 as we don't have jq in the container image)
function parse_output {
local key=$1
python3 -c 'import json,sys;s=json.load(sys.stdin);print(s'${key}')'
[ $? == 0 ] || log_error "Could not parse json endpoint (rc=$?)"
}

# Generic retry logic for an action function
function retry {
local action=$1
local retries=$RETRIES
local wait=$WAIT
local rc=1
$action
rc=$?
while [ $rc -ne 0 -a $retries -gt 0 ]; do
log_error "previous action failed, retrying."
sleep $wait
$action
rc=$?
retries=$((retries - 1))
# reprobe mysql state now, as if the cluster state changed since
# the start of this script, we might not need to retry the action
mysql_probe_state
done
if [ $rc -ne 0 ]; then
log_error "Could not run action after ${RETRIES} tries. Stop retrying."
fi
return $rc
}


##
## Actions
##

## Change the current Active endpoint in a service
function reconfigure_service_endpoint {
if [ $PARTITION != "Primary" -o "$INDEX" != "0" ]; then
log "Node ${PODNAME} is not the first member of a Primary partion (index: ${INDEX}). Exiting"
return 0
fi

CURRENT_SVC=$(api_server GET "$SERVICE")
[ $? == 0 ] || return 1

CURRENT_ENDPOINT=$(echo "$CURRENT_SVC" | parse_output '["spec"]["selector"].get("statefulset.kubernetes.io/pod-name","")')
[ $? == 0 ] || return 1
if [ "${CURRENT_ENDPOINT}" == "${PODNAME}" ]; then
log "Node ${PODNAME} is currently the active endpoint for service ${SERVICE}. Nothing to be done."
return 0
fi

NEW_SVC=$(echo "$CURRENT_SVC" | service_endpoint "$PODNAME")
[ $? == 0 ] || return 1

log "Setting ${PODNAME} as the new active endpoint for service ${SERVICE}"
UPDATE_RESULT=$(echo "$NEW_SVC" | api_server PUT "$SERVICE")
[ $? == 0 ] || return 1

return 0
}

## Change the Active endpoint from the service
function remove_service_endpoint {
CURRENT_SVC=$(api_server GET "$SERVICE")
[ $? == 0 ] || return 1

CURRENT_ENDPOINT=$(echo "$CURRENT_SVC" | parse_output '["spec"]["selector"].get("statefulset.kubernetes.io/pod-name","")')
[ $? == 0 ] || return 1
if [ "${CURRENT_ENDPOINT}" != "${PODNAME}" ]; then
log "Node ${PODNAME} is currently not the active endpoint for service ${SERVICE}. Nothing to be done."
return 0
fi

NEW_SVC=$(echo "$CURRENT_SVC" | service_endpoint "")
[ $? == 0 ] || return 1

log "Removing ${PODNAME} endpoint from service ${SERVICE}"
UPDATE_RESULT=$(echo "$NEW_SVC" | api_server PUT "$SERVICE")
[ $? == 0 ] || return 1

return 0
}



## Main

# mysql diverts this script's stdout/stderr, so in order for its output
# to be logged properly, reuse dumb-init's stdout
exec &> >(tee -a /proc/1/fd/1) 2>&1
log "called with args: $*"

# Galera always calls script with --status argument
# All other arguments (uuid,partition,index...) are optional,
# so get those values by probing mysql directly
STATUS=""
PARTITION=""
INDEX=""
while [ $# -gt 0 ]; do
case $1 in
--status)
STATUS=$2
shift;;
--uuid|--members|--primary|--index)
shift;;
esac
shift
done

if [ -z "${STATUS}" ]; then
log_error called without --status STATUS
exit 1
fi

# Condition: disconnecting -> remove oneself from endpoint if Active
if echo "${STATUS}" | grep -i -q -e 'disconnecting'; then
retry "remove_service_endpoint"
exit $?
fi

# Conditions that do not require endpoint updates
if echo "${STATUS}" | grep -i -q -v -e 'synced'; then
exit 0
fi

# At this point mysql is started, query missing arguments
mysql_probe_state

# Condition: first member of the primary partition -> set as Active endpoint
if [ $PARTITION = "Primary" -a $SIZE -ge 0 -a "$INDEX" = "0" ]; then
retry "reconfigure_service_endpoint"
exit $?
fi
1 change: 1 addition & 0 deletions templates/galera/config/galera.cnf.in
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ thread_cache_size = 8
thread_stack = 256K
tmpdir = /tmp
user = mysql
wsrep_notify_cmd = /usr/local/bin/mysql_wsrep_notify.sh
wsrep_auto_increment_control = 1
wsrep_causal_reads = 0
wsrep_certify_nonPK = 1
Expand Down
Loading