Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Augment error behaviour #920

Merged
merged 9 commits into from
Jan 31, 2023
8 changes: 4 additions & 4 deletions controllers/controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ func clusterLabelString(cluster *v1beta1.KafkaCluster) string {
// checkBrokerConnectionError is a convenience wrapper for returning from common
// broker connection errors
func checkBrokerConnectionError(logger logr.Logger, err error) (ctrl.Result, error) {
switch errors.Cause(err).(type) {
case errorfactory.BrokersUnreachable:
switch {
case errors.As(err, &errorfactory.BrokersUnreachable{}):
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.BrokersNotReady:
case errors.As(err, &errorfactory.BrokersNotReady{}):
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.ResourceNotReady:
case errors.As(err, &errorfactory.ResourceNotReady{}):
logger.Info("Needed resource for broker connection not found, may not be ready")
return ctrl.Result{
Requeue: true,
Expand Down
24 changes: 14 additions & 10 deletions controllers/kafkacluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,44 +124,48 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req
for _, rec := range reconcilers {
err = rec.Reconcile(log)
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.BrokersUnreachable:
switch {
case errors.As(err, &errorfactory.BrokersUnreachable{}):
log.Info("Brokers unreachable, may still be starting up", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.BrokersNotReady:
case errors.As(err, &errorfactory.BrokersNotReady{}):
log.Info("Brokers not ready, may still be starting up", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.ResourceNotReady:
case errors.As(err, &errorfactory.ResourceNotReady{}):
log.Info("A new resource was not found or may not be ready", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(7) * time.Second,
}, nil
case errorfactory.ReconcileRollingUpgrade:
case errors.As(err, &errorfactory.ReconcileRollingUpgrade{}):
log.Info("Rolling Upgrade in Progress")
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.CruiseControlNotReady:
case errors.As(err, &errorfactory.CruiseControlNotReady{}):
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.CruiseControlTaskRunning:
case errors.As(err, &errorfactory.CruiseControlTaskRunning{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errors.As(err, &errorfactory.CruiseControlTaskTimeout{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.CruiseControlTaskTimeout, errorfactory.CruiseControlTaskFailure:
case errors.As(err, &errorfactory.CruiseControlTaskFailure{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.PerBrokerConfigNotReady:
case errors.As(err, &errorfactory.PerBrokerConfigNotReady{}):
log.V(1).Info("dynamically updated broker configuration hasn't propagated through yet")
// for exponential backoff
return ctrl.Result{}, err
case errorfactory.LoadBalancerIPNotReady:
case errors.As(err, &errorfactory.LoadBalancerIPNotReady{}):
return ctrl.Result{
RequeueAfter: time.Duration(30) * time.Second,
}, nil
Expand Down
6 changes: 3 additions & 3 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R

user, err := pkiManager.ReconcileUserCertificate(ctx, instance, r.Scheme, cluster.Spec.GetKubernetesClusterDomain())
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.ResourceNotReady:
switch {
case errors.As(err, &errorfactory.ResourceNotReady{}):
reqLogger.Info("generated secret not found, may not be ready")
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(5) * time.Second,
}, nil
case errorfactory.FatalReconcileError:
case errors.As(err, &errorfactory.FatalReconcileError{}):
// TODO: (tinyzimmer) - Sleep for longer for now to give user time to see the error
// But really we should catch these kinds of issues in a pre-admission hook in a future PR
// The user can fix while this is looping and it will pick it up next reconcile attempt
Expand Down
39 changes: 39 additions & 0 deletions pkg/errorfactory/errorfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,99 @@ import "emperror.dev/errors"
// ResourceNotReady states that resource is not ready
type ResourceNotReady struct{ error }

// Unwrap() allows compliance with Go1.13+ error chaining behavior
func (e ResourceNotReady) Unwrap() error { return e.error }
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved

// APIFailure states that something went wrong with the api
type APIFailure struct{ error }

func (e APIFailure) Unwrap() error { return e.error }

// StatusUpdateError states that the operator failed to update the Status
type StatusUpdateError struct{ error }

func (e StatusUpdateError) Unwrap() error { return e.error }

// BrokersUnreachable states that the given broker is unreachable
type BrokersUnreachable struct{ error }

func (e BrokersUnreachable) Unwrap() error { return e.error }

// BrokersNotReady states that the broker is not ready
type BrokersNotReady struct{ error }

func (e BrokersNotReady) Unwrap() error { return e.error }

// BrokersRequestError states that the broker could not understand the request
type BrokersRequestError struct{ error }

func (e BrokersRequestError) Unwrap() error { return e.error }

// CreateTopicError states that the operator could not create the topic
type CreateTopicError struct{ error }

func (e CreateTopicError) Unwrap() error { return e.error }

// TopicNotFound states that the given topic is not found
type TopicNotFound struct{ error }

func (e TopicNotFound) Unwrap() error { return e.error }

// GracefulUpscaleFailed states that the operator failed to update the cluster gracefully
type GracefulUpscaleFailed struct{ error }

func (e GracefulUpscaleFailed) Unwrap() error { return e.error }

// TooManyResources states that too many resource found
type TooManyResources struct{ error }

func (e TooManyResources) Unwrap() error { return e.error }

// InternalError states that internal error happened
type InternalError struct{ error }

func (e InternalError) Unwrap() error { return e.error }

// FatalReconcileError states that a fatal error happened
type FatalReconcileError struct{ error }

func (e FatalReconcileError) Unwrap() error { return e.error }

// ReconcileRollingUpgrade states that rolling upgrade is reconciling
type ReconcileRollingUpgrade struct{ error }

func (e ReconcileRollingUpgrade) Unwrap() error { return e.error }

// CruiseControlNotReady states that CC is not ready to receive connection
type CruiseControlNotReady struct{ error }

func (e CruiseControlNotReady) Unwrap() error { return e.error }

// CruiseControlTaskRunning states that CC task is still running
type CruiseControlTaskRunning struct{ error }

func (e CruiseControlTaskRunning) Unwrap() error { return e.error }

// CruiseControlTaskRunning states that CC task timed out
type CruiseControlTaskTimeout struct{ error }

func (e CruiseControlTaskTimeout) Unwrap() error { return e.error }

// CruiseControlTaskFailure states that CC task was not found (CC restart?) or failed
type CruiseControlTaskFailure struct{ error }

func (e CruiseControlTaskFailure) Unwrap() error { return e.error }

// PerBrokerConfigNotReady states that per-broker configurations has been updated for a broker
type PerBrokerConfigNotReady struct{ error }

func (e PerBrokerConfigNotReady) Unwrap() error { return e.error }

// LoadBalancerIPNotReady states that the LoadBalancer IP is not yet created
type LoadBalancerIPNotReady struct{ error }

func (e LoadBalancerIPNotReady) Unwrap() error { return e.error }

// New creates a new error factory error
func New(t interface{}, err error, msg string, wrapArgs ...interface{}) error {
wrapped := errors.WrapIfWithDetails(err, msg, wrapArgs...)
Expand Down
26 changes: 26 additions & 0 deletions pkg/errorfactory/errorfactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,29 @@ func TestNew(t *testing.T) {
}
}
}

func TestUnwrapMethod(t *testing.T) {
// testError is a custom error type used to test the features of unwrapping errors using errors.Is() and errors.As()
type testError struct{ error }
var tstErr = testError{errors.New("inner-error")}

for _, errType := range errorTypes {
errType := errType
err := New(errType, tstErr, "test-message")

// This tests the use of errors.Is() using the Unwrap() method
if ok := errors.Is(err, tstErr); !ok {
t.Errorf("Type %T does not Unwrap() correctly using errors.Is(). Expected: %t ; Got: %t", errType, true, ok)
}

var c testError
// This tests the use of errors.As() using the Unwrap() method
if ok := errors.As(err, &c); !ok {
t.Errorf("Type %T does not Unwrap() correctly using errors.As(). Expected: %t ; Got: %t", errType, true, ok)
// This tests whether errors.As() succeeded in extracting the correct wrapped error into the given variable, not just the boolean return value
if c != tstErr {
t.Errorf("Type %T does not extract correctly the wrapped error using errors.As(). Expected: %v ; Got: %v", errType, tstErr, c)
}
}
}
}
36 changes: 26 additions & 10 deletions pkg/webhooks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,36 @@ const (
invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster"
outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
removingStorageMsg = "removing storage from a broker is not supported"
errorDuringValidationMsg = "error during validation"
unsupportedRemovingStorageMsg = "removing storage from a broker is not supported"

// errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors
errorDuringValidationMsg = "error during validation"
)

func IsAdmissionCantConnect(err error) bool {
if apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectErrorMsg) {
return true
}
return false
return apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectErrorMsg)
}

func IsCantConnectAPIServer(err error) bool {
panyuenlau marked this conversation as resolved.
Show resolved Hide resolved
return apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectAPIServerMsg)
}

func IsInvalidReplicationFactor(err error) bool {
if apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidReplicationFactorErrMsg) {
return true
}
return false
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidReplicationFactorErrMsg)
}

func IsOutOfRangeReplicationFactor(err error) bool {
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), outOfRangeReplicationFactorErrMsg)
}

func IsOutOfRangePartitions(err error) bool {
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), outOfRangePartitionsErrMsg)
}

func IsInvalidRemovingStorage(err error) bool {
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), unsupportedRemovingStorageMsg)
}

func IsErrorDuringValidation(err error) bool {
return apierrors.IsInternalError(err) && strings.Contains(err.Error(), errorDuringValidationMsg)
}
Loading