Skip to content

Commit

Permalink
Allow parallel broker restarts from same AZ (broker rack)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltluc committed May 25, 2023
1 parent c776e21 commit 420ba3a
Show file tree
Hide file tree
Showing 9 changed files with 645 additions and 16 deletions.
8 changes: 8 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ type RollingUpgradeConfig struct {
// distinct broker replicas with either offline replicas or out of sync replicas and the number of alerts triggered by
// alerts with 'rollingupgrade'
FailureThreshold int `json:"failureThreshold"`

// ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
// it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
// brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
// racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than 1
// replica per topic-partition to be unavailable at the same time. This is a safe way to speed up the rolling upgrade.
// +optional
ConcurrentBrokerRestartsAllowed int `json:"parallelBrokerRestarts,omitempty"`
}

// DisruptionBudget defines the configuration for PodDisruptionBudget where the workload is managed by the kafka-operator
Expand Down
7 changes: 7 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21177,6 +21177,13 @@ spec:
with either offline replicas or out of sync replicas and the
number of alerts triggered by alerts with 'rollingupgrade'
type: integer
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than 1
replica per topic-partition to be unavailable at the same time. This is a safe way to speed up the rolling upgrade.
type: integer
required:
- failureThreshold
type: object
Expand Down
7 changes: 7 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21014,6 +21014,13 @@ spec:
with either offline replicas or out of sync replicas and the
number of alerts triggered by alerts with 'rollingupgrade'
type: integer
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than 1
replica per topic-partition to be unavailable at the same time. This is a safe way to speed up the rolling upgrade.
type: integer
required:
- failureThreshold
type: object
Expand Down
11 changes: 9 additions & 2 deletions config/samples/banzaicloud_v1beta1_kafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ spec:
# alerts with 'rollingupgrade'
# failureThreshold: 1

# concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
# it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
# brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
# racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than 1
# replica per topic-partition to be unavailable at the same time. This is a safe way to speed up the rolling upgrade.
# concurrentBrokerRestartsAllowed: 1

# brokerConfigGroups specifies multiple broker configs with unique name
brokerConfigGroups:
# Specify desired group name (eg., 'default_group')
Expand Down Expand Up @@ -189,11 +196,11 @@ spec:
# which has type per-broker
# priorityClassName can be used to set the broker pod's priority
# priorityClassName: "high-priority"

# When "hostNameOverride" and brokerConfig.nodePortExternalIP are empty and NodePort access method is selected for external listener
# nodePortNodeAdddressType defines the Kafka broker's Kubernetes node's address type that shall be used in the advertised.listeners property
# when nodeport is used for an external listener.
# its values can be Hostname, ExternalIP, InternalIP, InternalDNS,ExternalDNS
# its values can be Hostname, ExternalIP, InternalIP, InternalDNS,ExternalDNS
#nodePortNodeAddressType: "ExternalIP"
config: |
sasl.enabled.mechanisms=PLAIN
Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartsAllowed: 1},
},
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kafkaclient/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafkaclient

import (
"github.com/stretchr/testify/mock"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1beta1"
Expand Down Expand Up @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider {
func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
return NewFromCluster(client, cluster)
}

// MockerProvider is a Testify mock for providing Kafka clients that can be mocks too
type MockedProvider struct {
mock.Mock
}

func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
args := m.Called(client, cluster)
return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2)
}
129 changes: 116 additions & 13 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -87,12 +88,23 @@ const (
nonControllerBrokerReconcilePriority
// controllerBrokerReconcilePriority the priority used for controller broker used to define its priority in the reconciliation order
controllerBrokerReconcilePriority

// defaultConcurrentBrokerRestartsAllowed the default number of brokers that can be restarted in parallel
defaultConcurrentBrokerRestartsAllowed = 1
)

var (
// kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`)
)

// Reconciler implements the Component Reconciler
type Reconciler struct {
resources.Reconciler
// kafkaClientProvider is used to create a new KafkaClient
kafkaClientProvider kafkaclient.Provider
// kafkaBrokerAvailabilityZoneMap is a map of broker id to availability zone used in concurrent broker restarts logic
kafkaBrokerAvailabilityZoneMap map[int32]string
}

// New creates a new reconciler for Kafka
Expand All @@ -103,9 +115,39 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk
DirectClient: directClient,
KafkaCluster: cluster,
},
kafkaClientProvider: kafkaClientProvider,
kafkaClientProvider: kafkaClientProvider,
kafkaBrokerAvailabilityZoneMap: getBrokerAzMap(cluster),
}
}

func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string {
brokerAzMap := make(map[int32]string)
for _, broker := range cluster.Spec.Brokers {
brokerRack := getBrokerRack(broker.ReadOnlyConfig)
if brokerRack != "" {
brokerAzMap[broker.Id] = brokerRack
}
}
// if incomplete broker AZ information, consider all brokers as being in different AZs
if len(brokerAzMap) != len(cluster.Spec.Brokers) {
for _, broker := range cluster.Spec.Brokers {
brokerAzMap[broker.Id] = fmt.Sprintf("%d", broker.Id)
}
}
return brokerAzMap
}

func getBrokerRack(readOnlyConfig string) string {
if readOnlyConfig == "" {
return ""
}
match := kafkaConfigBrokerRackRegex.FindStringSubmatch(readOnlyConfig)
if len(match) == 2 {
return match[1]
}
return ""
}

func getCreatedPvcForBroker(
ctx context.Context,
c client.Reader,
Expand Down Expand Up @@ -877,18 +919,23 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if err != nil {
return errors.WrapIf(err, "failed to reconcile resource")
}
for _, pod := range podList.Items {
pod := pod
if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating"), "rolling upgrade in progress")
}
if k8sutil.IsPodContainsPendingContainer(&pod) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still creating"), "rolling upgrade in progress")
}
if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress")
}

errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount
// Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once
concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed()
terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items)
if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
currentPodAz := r.getBrokerAz(currentPod)
if concurrentBrokerRestartsAllowed > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress")
}

// Check broker count with out-of-sync and offline replicas against the rolling upgrade failure threshold
errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount
kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
if err != nil {
return errorfactory.New(errorfactory.BrokersUnreachable{}, err, "could not connect to kafka brokers")
Expand All @@ -908,20 +955,24 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if len(outOfSyncReplicas) > 0 {
log.Info("out-of-sync replicas", "IDs", outOfSyncReplicas)
}

impactedReplicas := make(map[int32]struct{})
for _, brokerID := range allOfflineReplicas {
impactedReplicas[brokerID] = struct{}{}
}
for _, brokerID := range outOfSyncReplicas {
impactedReplicas[brokerID] = struct{}{}
}

errorCount += len(impactedReplicas)

if errorCount >= r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("cluster is not healthy"), "rolling upgrade in progress")
}

// If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ
if concurrentBrokerRestartsAllowed > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 {
if r.existsFailedBrokerFromAnotherAz(currentPodAz, impactedReplicas) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress")
}
}
}
}

Expand All @@ -943,6 +994,37 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

func (r *Reconciler) existsFailedBrokerFromAnotherAz(currentPodAz string, impactedReplicas map[int32]struct{}) bool {
if currentPodAz == "" && len(impactedReplicas) > 0 {
return true
}
for brokerWithFailure := range impactedReplicas {
if currentPodAz != r.kafkaBrokerAvailabilityZoneMap[brokerWithFailure] {
return true
}
}
return false
}

func (r *Reconciler) getConcurrentBrokerRestartsAllowed() int {
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartsAllowed > 1 {
return r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartsAllowed
}
return defaultConcurrentBrokerRestartsAllowed
}

func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod) bool {
if currentPodAz == "" && len(terminatingOrPendingPods) > 0 {
return true
}
for _, terminatingOrPendingPod := range terminatingOrPendingPods {
if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod) {
return true
}
}
return false
}

// Checks for match between pod labels and TaintedBrokersSelector
func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool {
selector, err := metav1.LabelSelectorAsSelector(r.KafkaCluster.Spec.TaintedBrokersSelector)
Expand Down Expand Up @@ -1382,6 +1464,27 @@ func (r *Reconciler) determineControllerId() (int32, error) {
return controllerID, nil
}

func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod {
var pods []corev1.Pod
for _, pod := range items {
if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) {
pods = append(pods, pod)
}
if k8sutil.IsPodContainsPendingContainer(&pod) {
pods = append(pods, pod)
}
}
return pods
}

func (r *Reconciler) getBrokerAz(pod *corev1.Pod) string {
brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32)
if err != nil {
return ""
}
return r.kafkaBrokerAvailabilityZoneMap[int32(brokerId)]
}

func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster,
eListenerName string, ingressConfigName string) (*corev1.Service, error) {
foundLBService := &corev1.Service{}
Expand Down
Loading

0 comments on commit 420ba3a

Please sign in to comment.