Skip to content

Commit

Permalink
Allow concurrent broker restarts from same AZ (broker rack)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucian Ilie authored and ctrlaltluc committed Jun 22, 2023
1 parent 782f7f5 commit 3398dc9
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 17 deletions.
12 changes: 10 additions & 2 deletions config/samples/banzaicloud_v1beta1_kafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ spec:
# alerts with 'rollingupgrade'
# failureThreshold: 1

# concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
# it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
# brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
# racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
# 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
# This is a safe way to speed up the rolling upgrade.
# concurrentBrokerRestartsAllowed: 1

# brokerConfigGroups specifies multiple broker configs with unique name
brokerConfigGroups:
# Specify desired group name (eg., 'default_group')
Expand Down Expand Up @@ -189,11 +197,11 @@ spec:
# which has type per-broker
# priorityClassName can be used to set the broker pod's priority
# priorityClassName: "high-priority"

# When "hostNameOverride" and brokerConfig.nodePortExternalIP are empty and NodePort access method is selected for external listener
# nodePortNodeAdddressType defines the Kafka broker's Kubernetes node's address type that shall be used in the advertised.listeners property
# when nodeport is used for an external listener.
# its values can be Hostname, ExternalIP, InternalIP, InternalDNS,ExternalDNS
# its values can be Hostname, ExternalIP, InternalIP, InternalDNS,ExternalDNS
#nodePortNodeAddressType: "ExternalIP"
config: |
sasl.enabled.mechanisms=PLAIN
Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartsAllowed: 1},
},
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/tools v0.7.0 // indirect
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
11 changes: 11 additions & 0 deletions pkg/kafkaclient/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafkaclient

import (
"github.com/stretchr/testify/mock"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1beta1"
Expand Down Expand Up @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider {
func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
return NewFromCluster(client, cluster)
}

// MockedProvider is a Testify mock for providing Kafka clients that can be mocks too
type MockedProvider struct {
mock.Mock
}

func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
args := m.Called(client, cluster)
return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2)
}
1 change: 0 additions & 1 deletion pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ zookeeper.connect=example.zk:2181/`,

t.Run(test.testName, func(t *testing.T) {
mockClient := mocks.NewMockClient(mockCtrl)
mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
r := Reconciler{
Reconciler: resources.Reconciler{
Client: mockClient,
Expand Down
146 changes: 133 additions & 13 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -86,12 +88,23 @@ const (
nonControllerBrokerReconcilePriority
// controllerBrokerReconcilePriority the priority used for controller broker used to define its priority in the reconciliation order
controllerBrokerReconcilePriority

// defaultConcurrentBrokerRestartsAllowed the default number of brokers that can be restarted in parallel
defaultConcurrentBrokerRestartsAllowed = 1
)

var (
// kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`)
)

// Reconciler implements the Component Reconciler
type Reconciler struct {
resources.Reconciler
// kafkaClientProvider is used to create a new KafkaClient
kafkaClientProvider kafkaclient.Provider
// kafkaBrokerAvailabilityZoneMap is a map of broker id to availability zone used in concurrent broker restarts logic
kafkaBrokerAvailabilityZoneMap map[int32]string
}

// New creates a new reconciler for Kafka
Expand All @@ -102,9 +115,39 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk
DirectClient: directClient,
KafkaCluster: cluster,
},
kafkaClientProvider: kafkaClientProvider,
kafkaClientProvider: kafkaClientProvider,
kafkaBrokerAvailabilityZoneMap: getBrokerAzMap(cluster),
}
}

func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string {
brokerAzMap := make(map[int32]string)
for _, broker := range cluster.Spec.Brokers {
brokerRack := getBrokerRack(broker.ReadOnlyConfig)
if brokerRack != "" {
brokerAzMap[broker.Id] = brokerRack
}
}
// if incomplete broker AZ information, consider all brokers as being in different AZs
if len(brokerAzMap) != len(cluster.Spec.Brokers) {
for _, broker := range cluster.Spec.Brokers {
brokerAzMap[broker.Id] = fmt.Sprintf("%d", broker.Id)
}
}
return brokerAzMap
}

func getBrokerRack(readOnlyConfig string) string {
if readOnlyConfig == "" {
return ""
}
match := kafkaConfigBrokerRackRegex.FindStringSubmatch(readOnlyConfig)
if len(match) == 2 {
return match[1]
}
return ""
}

func getCreatedPvcForBroker(
ctx context.Context,
c client.Reader,
Expand Down Expand Up @@ -874,18 +917,23 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if err != nil {
return errors.WrapIf(err, "failed to reconcile resource")
}
for _, pod := range podList.Items {
pod := pod
if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating"), "rolling upgrade in progress")
}
if k8sutil.IsPodContainsPendingContainer(&pod) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still creating"), "rolling upgrade in progress")
}
if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress")
}

errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount
// Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once
concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed()
terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items)
if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
currentPodAz := r.getBrokerAz(currentPod)
if concurrentBrokerRestartsAllowed > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress")
}

// Check broker count with out-of-sync and offline replicas against the rolling upgrade failure threshold
errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount
kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
if err != nil {
return errorfactory.New(errorfactory.BrokersUnreachable{}, err, "could not connect to kafka brokers")
Expand All @@ -905,20 +953,24 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if len(outOfSyncReplicas) > 0 {
log.Info("out-of-sync replicas", "IDs", outOfSyncReplicas)
}

impactedReplicas := make(map[int32]struct{})
for _, brokerID := range allOfflineReplicas {
impactedReplicas[brokerID] = struct{}{}
}
for _, brokerID := range outOfSyncReplicas {
impactedReplicas[brokerID] = struct{}{}
}

errorCount += len(impactedReplicas)

if errorCount >= r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("cluster is not healthy"), "rolling upgrade in progress")
}

// If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ
if concurrentBrokerRestartsAllowed > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 {
if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress")
}
}
}
}

Expand All @@ -940,6 +992,53 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}) bool {
if currentPodAz == "" && len(impactedReplicas) > 0 {
return true
}
for brokerWithFailure := range impactedReplicas {
if currentPodAz != r.kafkaBrokerAvailabilityZoneMap[brokerWithFailure] {
return true
}
}
return false
}

func (r *Reconciler) getConcurrentBrokerRestartsAllowed() int {
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartsAllowed > 1 {
return r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartsAllowed
}
return defaultConcurrentBrokerRestartsAllowed
}

func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod) bool {
if currentPodAz == "" && len(terminatingOrPendingPods) > 0 {
return true
}
for _, terminatingOrPendingPod := range terminatingOrPendingPods {
if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod) {
return true
}
}
return false
}

// Checks for match between pod labels and TaintedBrokersSelector
func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool {
selector, err := metav1.LabelSelectorAsSelector(r.KafkaCluster.Spec.TaintedBrokersSelector)

if err != nil {
log.Error(err, "Invalid tainted brokers label selector")
return false
}

if selector.Empty() {
return false
}
return selector.Matches(labels.Set(pod.Labels))
}

//nolint:funlen
func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error {
brokersVolumesState := make(map[string]map[string]v1beta1.VolumeState)
var brokerIds []string
Expand Down Expand Up @@ -1289,6 +1388,27 @@ func (r *Reconciler) determineControllerId() (int32, error) {
return controllerID, nil
}

func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod {
var pods []corev1.Pod
for _, pod := range items {
if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) {
pods = append(pods, pod)
}
if k8sutil.IsPodContainsPendingContainer(&pod) {
pods = append(pods, pod)
}
}
return pods
}

func (r *Reconciler) getBrokerAz(pod *corev1.Pod) string {
brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32)
if err != nil {
return ""
}
return r.kafkaBrokerAvailabilityZoneMap[int32(brokerId)]
}

func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster,
eListenerName string, ingressConfigName string) (*corev1.Service, error) {
foundLBService := &corev1.Service{}
Expand Down
Loading

0 comments on commit 3398dc9

Please sign in to comment.