Skip to content

Commit

Permalink
Make Galera service A/P
Browse files Browse the repository at this point in the history
The service created for a Galera cluster always balances traffic
to all available Galera replicas, which means the cluster is
effectively configured as an A/A service.

In order to limit the burden on clients of supporting A/A semantics,
change the way the service is configured to act as A/P instead.

A new callback script is introduced in pods. It is called by mysqld
automatically every time the Galera library detects a change in the
cluster (node join, crash, network partition...). This callback
script reconfigures the service CR via label selectors to drive
traffic to a single pod at a time.

Jira: OSPRH-7405
  • Loading branch information
dciabrin committed Jun 13, 2024
1 parent ce9687c commit 5fb18af
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 3 deletions.
18 changes: 16 additions & 2 deletions controllers/galera_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
Resources: []string{"pods"},
Verbs: []string{"create", "get", "list", "watch", "update", "patch", "delete"},
},
{
APIGroups: []string{""},
Resources: []string{"services"},
Verbs: []string{"create", "get", "list", "watch", "update", "patch", "delete"},
},
}
rbacResult, err := common_rbac.ReconcileRbac(ctx, helper, instance, rbacRules)
if err != nil {
Expand Down Expand Up @@ -472,7 +477,7 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
headless := &corev1.Service{ObjectMeta: pkghl.ObjectMeta}
op, err := controllerutil.CreateOrPatch(ctx, r.Client, headless, func() error {
headless.Spec = pkghl.Spec
err := controllerutil.SetOwnerReference(instance, headless, r.Client.Scheme())
err := controllerutil.SetControllerReference(instance, headless, r.Client.Scheme())
if err != nil {
return err
}
Expand All @@ -488,8 +493,17 @@ func (r *GaleraReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
pkgsvc := mariadb.ServiceForAdoption(instance, "galera", adoption)
service := &corev1.Service{ObjectMeta: pkgsvc.ObjectMeta}
op, err = controllerutil.CreateOrPatch(ctx, r.Client, service, func() error {
// NOTE(dciabrin) We deploy Galera as an A/P service (i.e. no multi-master writes)
// by setting labels in the service's label selectors.
// This label is dynamically set based on the status of the Galera cluster,
// so in this CreateOrPatch block we must reuse whatever is present in
// the existing service CR in case we're patching it.
activePod, present := service.Spec.Selector[mariadb.ActivePodSelectorKey]
service.Spec = pkgsvc.Spec
err := controllerutil.SetOwnerReference(instance, service, r.Client.Scheme())
if present {
service.Spec.Selector[mariadb.ActivePodSelectorKey] = activePod
}
err := controllerutil.SetControllerReference(instance, service, r.Client.Scheme())
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mariadb/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ package mariadb
const (
// ServiceName -
ServiceName = "mariadb"

// ActivePodSelectorKey - Selector key used to configure A/P service behavior
ActivePodSelectorKey = "statefulset.kubernetes.io/pod-name"
)
23 changes: 22 additions & 1 deletion pkg/mariadb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,35 @@ func ServiceForAdoption(db metav1.Object, dbType string, adoption *databasev1bet
}

func internalService(db metav1.Object, dbType string) *corev1.Service {
selectors := LabelSelectors(db, dbType)
// NOTE(dciabrin) we currently deploy the Galera cluster as A/P,
// by configuring the service's label selector to create
// a single endpoint matching a single pod's name.
// This label is later updated by a script called by Galera any
// time the cluster's state changes.
// When the service CR is being created, it is configured to
// point to the first pod. This is fair enough as:
// 1. when there's no pod running, there's no service anyway
// 2. As soon as a galera node becomes available, the label will
// be reconfigured by the script if needed.
// 3. If the Galera cluster is already running, picking a random
// node out of the running pods will work because Galera is
// a multi-master service.
// 4. If the service CR gets deleted for whatever reason, and the
// cluster is still running, picking a random node out of the
// running pods will work because Galera is a multi-master
// service. This is true as long the first pod is not in a
// network partition without quorum.
// TODO improve that fallback pod selection
selectors[ActivePodSelectorKey] = db.GetName() + "-galera-0"
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: db.GetName(),
Namespace: db.GetNamespace(),
Labels: ServiceLabels(db),
},
Spec: corev1.ServiceSpec{
Selector: LabelSelectors(db, dbType),
Selector: selectors,
Ports: []corev1.ServicePort{
{Name: "database", Port: 3306, Protocol: corev1.ProtocolTCP},
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/mariadb/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func getGaleraVolumes(g *mariadbv1.Galera) []corev1.Volume {
Key: "detect_gcomm_and_start.sh",
Path: "detect_gcomm_and_start.sh",
},
{
Key: "mysql_wsrep_notify.sh",
Path: "mysql_wsrep_notify.sh",
},
},
},
},
Expand Down
222 changes: 222 additions & 0 deletions templates/galera/bin/mysql_wsrep_notify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#!/bin/bash

# NOTE(dciabrin) we might use downward API to populate those in the future
PODNAME=$HOSTNAME
SERVICE=${PODNAME/-galera-[0-9]*/}

# API server config
APISERVER=https://kubernetes.default.svc
SERVICEACCOUNT=/var/run/secrets/kubernetes.io/serviceaccount
NAMESPACE=$(cat ${SERVICEACCOUNT}/namespace)
TOKEN=$(cat ${SERVICEACCOUNT}/token)
CACERT=${SERVICEACCOUNT}/ca.crt

# Retry config
RETRIES=6
WAIT=1

##
## Utilities functions
##
function log() {
echo "$(date +%F_%H_%M_%S) `basename $0` $*"
}

function log_error() {
echo "$(date +%F_%H_%M_%S) `basename $0` ERROR: $*" >&2
}

function mysql_get_status {
local name=$1
mysql -nNE -uroot -p$DB_ROOT_PASSWORD -e "show status like '${name}';" | tail -1
if [ $? != 0 ]; then
log_error "could not get value of mysql variable '${name}' (rc=$?)"
return 1
fi
}

# Refresh environment variables with the latest WSREP state from mysql
function mysql_probe_state {
UUID=$(mysql_get_status wsrep_gcomm_uuid)
PARTITION=$(mysql_get_status wsrep_cluster_status)
INDEX=$(mysql_get_status wsrep_local_index)
SIZE=$(mysql_get_status wsrep_cluster_size)
}

# REST API call to the k8s API server
function api_server {
local request=$1
local service=$2
# NOTE: a PUT request to the API server is basically a conditional write,
# it only succeeds if no other write have been done on the CR in the mean time,
# (i.e. if the timestamp of the JSON that is being sent to the API server matches
# the timestamp of the service CR in the cluster)
if [ "$request" = "PUT" ]; then
request="$request -d @-"
fi
local output
output=$(curl -s --cacert ${CACERT} --header "Content-Type:application/json" --header "Authorization: Bearer ${TOKEN}" --request $request ${APISERVER}/api/v1/namespaces/${NAMESPACE}/services/${service})

local rc=$?
if [ $rc != 0 ]; then
log_error "call to API server failed for service ${service} (rc=$rc)"
return 1
fi
if echo "${output}" | grep -q '"status": "Failure"'; then
message=$(echo "${output}" | parse_output '["message"]')
code=$(echo "${output}" | parse_output '["code"]')
log_error "API server returned an error for service ${SERVICE}: ${message} (code=${code})"
return 1
fi
echo "${output}"
return 0
}

# Update the service's active endpoint
# (parse JSON with python3 as we don't have jq in the container image)
function service_endpoint {
local endpoint=$1
if [ -n "${endpoint}" ]; then
python3 -c 'import json,sys;s=json.load(sys.stdin);s["spec"]["selector"]["statefulset.kubernetes.io/pod-name"]="'${endpoint}'";print(json.dumps(s,indent=2))'
else
python3 -c 'import json,sys;s=json.load(sys.stdin);s["spec"]["selector"].pop("statefulset.kubernetes.io/pod-name", None);print(json.dumps(s,indent=2))'
fi
[ $? == 0 ] || log_error "Could not parse json endpoint (rc=$?)"
}

# retrieve data from a JSON structure
# (parse JSON with python3 as we don't have jq in the container image)
function parse_output {
local key=$1
python3 -c 'import json,sys;s=json.load(sys.stdin);print(s'${key}')'
[ $? == 0 ] || log_error "Could not parse json endpoint (rc=$?)"
}

# Generic retry logic for an action function
function retry {
local action=$1
local retries=$RETRIES
local wait=$WAIT
local rc=1
$action
rc=$?
while [ $rc -ne 0 -a $retries -gt 0 ]; do
log_error "previous action failed, retrying."
sleep $wait
$action
rc=$?
retries=$((retries - 1))
# reprobe mysql state now, as if the cluster state changed since
# the start of this script, we might not need to retry the action
mysql_probe_state
done
if [ $rc -ne 0 ]; then
log_error "Could not run action after ${RETRIES} tries. Stop retrying."
fi
return $rc
}


##
## Actions
##

## Change the current Active endpoint in a service
function reconfigure_service_endpoint {
if [ $PARTITION != "Primary" -o "$INDEX" != "0" ]; then
log "Node ${PODNAME} is not the first member of a Primary partion (index: ${INDEX}). Exiting"
return 0
fi

CURRENT_SVC=$(api_server GET "$SERVICE")
[ $? == 0 ] || return 1

CURRENT_ENDPOINT=$(echo "$CURRENT_SVC" | parse_output '["spec"]["selector"].get("statefulset.kubernetes.io/pod-name","")')
[ $? == 0 ] || return 1
if [ "${CURRENT_ENDPOINT}" == "${PODNAME}" ]; then
log "Node ${PODNAME} is currently the active endpoint for service ${SERVICE}. Nothing to be done."
return 0
fi

NEW_SVC=$(echo "$CURRENT_SVC" | service_endpoint "$PODNAME")
[ $? == 0 ] || return 1

log "Setting ${PODNAME} as the new active endpoint for service ${SERVICE}"
UPDATE_RESULT=$(echo "$NEW_SVC" | api_server PUT "$SERVICE")
[ $? == 0 ] || return 1

return 0
}

## Change the Active endpoint from the service
function remove_service_endpoint {
CURRENT_SVC=$(api_server GET "$SERVICE")
[ $? == 0 ] || return 1

CURRENT_ENDPOINT=$(echo "$CURRENT_SVC" | parse_output '["spec"]["selector"].get("statefulset.kubernetes.io/pod-name","")')
[ $? == 0 ] || return 1
if [ "${CURRENT_ENDPOINT}" != "${PODNAME}" ]; then
log "Node ${PODNAME} is currently not the active endpoint for service ${SERVICE}. Nothing to be done."
return 0
fi

NEW_SVC=$(echo "$CURRENT_SVC" | service_endpoint "")
[ $? == 0 ] || return 1

log "Removing ${PODNAME} endpoint from service ${SERVICE}"
UPDATE_RESULT=$(echo "$NEW_SVC" | api_server PUT "$SERVICE")
[ $? == 0 ] || return 1

return 0
}



## Main

# mysql diverts this script's stdout/stderr, so in order for its output
# to be logged properly, reuse dumb-init's stdout
exec &> >(tee -a /proc/1/fd/1) 2>&1
log "called with args: $*"

# Galera always calls script with --status argument
# All other arguments (uuid,partition,index...) are optional,
# so get those values by probing mysql directly
STATUS=""
PARTITION=""
INDEX=""
while [ $# -gt 0 ]; do
case $1 in
--status)
STATUS=$2
shift;;
--uuid|--members|--primary|--index)
shift;;
esac
shift
done

if [ -z "${STATUS}" ]; then
log_error called without --status STATUS
exit 1
fi

# Condition: disconnecting -> remove oneself from endpoint if Active
if echo "${STATUS}" | grep -i -q -e 'disconnecting'; then
retry "remove_service_endpoint"
exit $?
fi

# Conditions that do not require endpoint updates
if echo "${STATUS}" | grep -i -q -v -e 'synced'; then
exit 0
fi

# At this point mysql is started, query missing arguments
mysql_probe_state

# Condition: first member of the primary partition -> set as Active endpoint
if [ $PARTITION = "Primary" -a $SIZE -ge 0 -a "$INDEX" = "0" ]; then
retry "reconfigure_service_endpoint"
exit $?
fi
1 change: 1 addition & 0 deletions templates/galera/config/galera.cnf.in
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ thread_cache_size = 8
thread_stack = 256K
tmpdir = /tmp
user = mysql
wsrep_notify_cmd = /usr/local/bin/mysql_wsrep_notify.sh
wsrep_auto_increment_control = 1
wsrep_causal_reads = 0
wsrep_certify_nonPK = 1
Expand Down

0 comments on commit 5fb18af

Please sign in to comment.