diff --git a/controllers/controller_common.go b/controllers/controller_common.go index bf56789c6..c86d8810b 100644 --- a/controllers/controller_common.go +++ b/controllers/controller_common.go @@ -81,18 +81,18 @@ func clusterLabelString(cluster *v1beta1.KafkaCluster) string { // checkBrokerConnectionError is a convenience wrapper for returning from common // broker connection errors func checkBrokerConnectionError(logger logr.Logger, err error) (ctrl.Result, error) { - switch errors.Cause(err).(type) { - case errorfactory.BrokersUnreachable: + switch { + case errors.As(err, &errorfactory.BrokersUnreachable{}): return ctrl.Result{ Requeue: true, RequeueAfter: time.Duration(15) * time.Second, }, nil - case errorfactory.BrokersNotReady: + case errors.As(err, &errorfactory.BrokersNotReady{}): return ctrl.Result{ Requeue: true, RequeueAfter: time.Duration(15) * time.Second, }, nil - case errorfactory.ResourceNotReady: + case errors.As(err, &errorfactory.ResourceNotReady{}): logger.Info("Needed resource for broker connection not found, may not be ready") return ctrl.Result{ Requeue: true, diff --git a/controllers/kafkacluster_controller.go b/controllers/kafkacluster_controller.go index 9e7fdf8e1..15366c667 100644 --- a/controllers/kafkacluster_controller.go +++ b/controllers/kafkacluster_controller.go @@ -124,44 +124,48 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req for _, rec := range reconcilers { err = rec.Reconcile(log) if err != nil { - switch errors.Cause(err).(type) { - case errorfactory.BrokersUnreachable: + switch { + case errors.As(err, &errorfactory.BrokersUnreachable{}): log.Info("Brokers unreachable, may still be starting up", "error", err.Error()) return ctrl.Result{ RequeueAfter: time.Duration(15) * time.Second, }, nil - case errorfactory.BrokersNotReady: + case errors.As(err, &errorfactory.BrokersNotReady{}): log.Info("Brokers not ready, may still be starting up", "error", err.Error()) return ctrl.Result{ RequeueAfter: time.Duration(15) * time.Second, }, nil - case errorfactory.ResourceNotReady: + case errors.As(err, &errorfactory.ResourceNotReady{}): log.Info("A new resource was not found or may not be ready", "error", err.Error()) return ctrl.Result{ RequeueAfter: time.Duration(7) * time.Second, }, nil - case errorfactory.ReconcileRollingUpgrade: + case errors.As(err, &errorfactory.ReconcileRollingUpgrade{}): log.Info("Rolling Upgrade in Progress") return ctrl.Result{ RequeueAfter: time.Duration(15) * time.Second, }, nil - case errorfactory.CruiseControlNotReady: + case errors.As(err, &errorfactory.CruiseControlNotReady{}): return ctrl.Result{ RequeueAfter: time.Duration(15) * time.Second, }, nil - case errorfactory.CruiseControlTaskRunning: + case errors.As(err, &errorfactory.CruiseControlTaskRunning{}): + return ctrl.Result{ + RequeueAfter: time.Duration(20) * time.Second, + }, nil + case errors.As(err, &errorfactory.CruiseControlTaskTimeout{}): return ctrl.Result{ RequeueAfter: time.Duration(20) * time.Second, }, nil - case errorfactory.CruiseControlTaskTimeout, errorfactory.CruiseControlTaskFailure: + case errors.As(err, &errorfactory.CruiseControlTaskFailure{}): return ctrl.Result{ RequeueAfter: time.Duration(20) * time.Second, }, nil - case errorfactory.PerBrokerConfigNotReady: + case errors.As(err, &errorfactory.PerBrokerConfigNotReady{}): log.V(1).Info("dynamically updated broker configuration hasn't propagated through yet") // for exponential backoff return ctrl.Result{}, err - case errorfactory.LoadBalancerIPNotReady: + case errors.As(err, &errorfactory.LoadBalancerIPNotReady{}): return ctrl.Result{ RequeueAfter: time.Duration(30) * time.Second, }, nil diff --git a/controllers/kafkauser_controller.go b/controllers/kafkauser_controller.go index 99c41d574..169840bb8 100644 --- a/controllers/kafkauser_controller.go +++ b/controllers/kafkauser_controller.go @@ -211,14 +211,14 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R user, err := pkiManager.ReconcileUserCertificate(ctx, instance, r.Scheme, cluster.Spec.GetKubernetesClusterDomain()) if err != nil { - switch errors.Cause(err).(type) { - case errorfactory.ResourceNotReady: + switch { + case errors.As(err, &errorfactory.ResourceNotReady{}): reqLogger.Info("generated secret not found, may not be ready") return ctrl.Result{ Requeue: true, RequeueAfter: time.Duration(5) * time.Second, }, nil - case errorfactory.FatalReconcileError: + case errors.As(err, &errorfactory.FatalReconcileError{}): // TODO: (tinyzimmer) - Sleep for longer for now to give user time to see the error // But really we should catch these kinds of issues in a pre-admission hook in a future PR // The user can fix while this is looping and it will pick it up next reconcile attempt diff --git a/pkg/errorfactory/errorfactory.go b/pkg/errorfactory/errorfactory.go index 2257a1d98..96d17a5a1 100644 --- a/pkg/errorfactory/errorfactory.go +++ b/pkg/errorfactory/errorfactory.go @@ -19,60 +19,99 @@ import "emperror.dev/errors" // ResourceNotReady states that resource is not ready type ResourceNotReady struct{ error } +// Unwrap() allows compliance with Go1.13+ error chaining behavior +func (e ResourceNotReady) Unwrap() error { return e.error } + // APIFailure states that something went wrong with the api type APIFailure struct{ error } +func (e APIFailure) Unwrap() error { return e.error } + // StatusUpdateError states that the operator failed to update the Status type StatusUpdateError struct{ error } +func (e StatusUpdateError) Unwrap() error { return e.error } + // BrokersUnreachable states that the given broker is unreachable type BrokersUnreachable struct{ error } +func (e BrokersUnreachable) Unwrap() error { return e.error } + // BrokersNotReady states that the broker is not ready type BrokersNotReady struct{ error } +func (e BrokersNotReady) Unwrap() error { return e.error } + // BrokersRequestError states that the broker could not understand the request type BrokersRequestError struct{ error } +func (e BrokersRequestError) Unwrap() error { return e.error } + // CreateTopicError states that the operator could not create the topic type CreateTopicError struct{ error } +func (e CreateTopicError) Unwrap() error { return e.error } + // TopicNotFound states that the given topic is not found type TopicNotFound struct{ error } +func (e TopicNotFound) Unwrap() error { return e.error } + // GracefulUpscaleFailed states that the operator failed to update the cluster gracefully type GracefulUpscaleFailed struct{ error } +func (e GracefulUpscaleFailed) Unwrap() error { return e.error } + // TooManyResources states that too many resource found type TooManyResources struct{ error } +func (e TooManyResources) Unwrap() error { return e.error } + // InternalError states that internal error happened type InternalError struct{ error } +func (e InternalError) Unwrap() error { return e.error } + // FatalReconcileError states that a fatal error happened type FatalReconcileError struct{ error } +func (e FatalReconcileError) Unwrap() error { return e.error } + // ReconcileRollingUpgrade states that rolling upgrade is reconciling type ReconcileRollingUpgrade struct{ error } +func (e ReconcileRollingUpgrade) Unwrap() error { return e.error } + // CruiseControlNotReady states that CC is not ready to receive connection type CruiseControlNotReady struct{ error } +func (e CruiseControlNotReady) Unwrap() error { return e.error } + // CruiseControlTaskRunning states that CC task is still running type CruiseControlTaskRunning struct{ error } +func (e CruiseControlTaskRunning) Unwrap() error { return e.error } + // CruiseControlTaskRunning states that CC task timed out type CruiseControlTaskTimeout struct{ error } +func (e CruiseControlTaskTimeout) Unwrap() error { return e.error } + // CruiseControlTaskFailure states that CC task was not found (CC restart?) or failed type CruiseControlTaskFailure struct{ error } +func (e CruiseControlTaskFailure) Unwrap() error { return e.error } + // PerBrokerConfigNotReady states that per-broker configurations has been updated for a broker type PerBrokerConfigNotReady struct{ error } +func (e PerBrokerConfigNotReady) Unwrap() error { return e.error } + // LoadBalancerIPNotReady states that the LoadBalancer IP is not yet created type LoadBalancerIPNotReady struct{ error } +func (e LoadBalancerIPNotReady) Unwrap() error { return e.error } + // New creates a new error factory error func New(t interface{}, err error, msg string, wrapArgs ...interface{}) error { wrapped := errors.WrapIfWithDetails(err, msg, wrapArgs...) diff --git a/pkg/errorfactory/errorfactory_test.go b/pkg/errorfactory/errorfactory_test.go index 2d7c3d7a0..50f28c576 100644 --- a/pkg/errorfactory/errorfactory_test.go +++ b/pkg/errorfactory/errorfactory_test.go @@ -53,3 +53,29 @@ func TestNew(t *testing.T) { } } } + +func TestUnwrapMethod(t *testing.T) { + // testError is a custom error type used to test the features of unwrapping errors using errors.Is() and errors.As() + type testError struct{ error } + var tstErr = testError{errors.New("inner-error")} + + for _, errType := range errorTypes { + errType := errType + err := New(errType, tstErr, "test-message") + + // This tests the use of errors.Is() using the Unwrap() method + if ok := errors.Is(err, tstErr); !ok { + t.Errorf("Type %T does not Unwrap() correctly using errors.Is(). Expected: %t ; Got: %t", errType, true, ok) + } + + var c testError + // This tests the use of errors.As() using the Unwrap() method + if ok := errors.As(err, &c); !ok { + t.Errorf("Type %T does not Unwrap() correctly using errors.As(). Expected: %t ; Got: %t", errType, true, ok) + // This tests whether errors.As() succeeded in extracting the correct wrapped error into the given variable, not just the boolean return value + if c != tstErr { + t.Errorf("Type %T does not extract correctly the wrapped error using errors.As(). Expected: %v ; Got: %v", errType, tstErr, c) + } + } + } +} diff --git a/pkg/webhooks/errors.go b/pkg/webhooks/errors.go index 45659542a..9e7c1e5f1 100644 --- a/pkg/webhooks/errors.go +++ b/pkg/webhooks/errors.go @@ -26,20 +26,36 @@ const ( invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster" outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)" outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" - removingStorageMsg = "removing storage from a broker is not supported" - errorDuringValidationMsg = "error during validation" + unsupportedRemovingStorageMsg = "removing storage from a broker is not supported" + + // errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors + errorDuringValidationMsg = "error during validation" ) func IsAdmissionCantConnect(err error) bool { - if apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectErrorMsg) { - return true - } - return false + return apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectErrorMsg) +} + +func IsCantConnectAPIServer(err error) bool { + return apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectAPIServerMsg) } func IsInvalidReplicationFactor(err error) bool { - if apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidReplicationFactorErrMsg) { - return true - } - return false + return apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidReplicationFactorErrMsg) +} + +func IsOutOfRangeReplicationFactor(err error) bool { + return apierrors.IsInvalid(err) && strings.Contains(err.Error(), outOfRangeReplicationFactorErrMsg) +} + +func IsOutOfRangePartitions(err error) bool { + return apierrors.IsInvalid(err) && strings.Contains(err.Error(), outOfRangePartitionsErrMsg) +} + +func IsInvalidRemovingStorage(err error) bool { + return apierrors.IsInvalid(err) && strings.Contains(err.Error(), unsupportedRemovingStorageMsg) +} + +func IsErrorDuringValidation(err error) bool { + return apierrors.IsInternalError(err) && strings.Contains(err.Error(), errorDuringValidationMsg) } diff --git a/pkg/webhooks/errors_test.go b/pkg/webhooks/errors_test.go index 2948907ab..84cb81675 100644 --- a/pkg/webhooks/errors_test.go +++ b/pkg/webhooks/errors_test.go @@ -23,6 +23,9 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" banzaicloudv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" + banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestIsAdmissionConnectionError(t *testing.T) { @@ -66,3 +69,126 @@ func TestIsInvalidReplicationFactor(t *testing.T) { t.Error("Expected is invalid replication error to be false, got true") } } + +func TestIsCantConnectAPIServer(t *testing.T) { + testCases := []struct { + testName string + err error + want bool + }{ + { + testName: "cantConnectAPIServer", + err: apierrors.NewInternalError(errors.Wrap(errors.New("..."), cantConnectAPIServerMsg)), + want: true, + }, + { + testName: "wrong-error-message", + err: apierrors.NewInternalError(errors.Wrap(errors.New("..."), "wrong-error-message")), + want: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + if got := IsCantConnectAPIServer(tc.err); got != tc.want { + t.Errorf("Check connection to API Server error message. Expected: %t ; Got: %t", tc.want, got) + } + }) + } +} + +func TestIsOutOfRangeReplicationFactor(t *testing.T) { + kafkaTopic := banzaicloudv1alpha1.KafkaTopic{ObjectMeta: metav1.ObjectMeta{Name: "test-KafkaTopic"}} + var fieldErrs field.ErrorList + fieldErrs = append(fieldErrs, field.Invalid(field.NewPath("spec").Child("replicationFactor"), "-2", outOfRangeReplicationFactorErrMsg)) + err := apierrors.NewInvalid( + kafkaTopic.GetObjectKind().GroupVersionKind().GroupKind(), + kafkaTopic.Name, fieldErrs) + + if ok := IsOutOfRangeReplicationFactor(err); !ok { + t.Errorf("Check Out of Range ReplicationFactor error message. Expected: %t ; Got: %t", true, ok) + } +} + +func TestIsOutOfRangePartitions(t *testing.T) { + kafkaTopic := banzaicloudv1alpha1.KafkaTopic{ObjectMeta: metav1.ObjectMeta{Name: "test-KafkaTopic"}} + var fieldErrs field.ErrorList + fieldErrs = append(fieldErrs, field.Invalid(field.NewPath("spec").Child("partitions"), "-2", outOfRangePartitionsErrMsg)) + err := apierrors.NewInvalid( + kafkaTopic.GetObjectKind().GroupVersionKind().GroupKind(), + kafkaTopic.Name, fieldErrs) + + if ok := IsOutOfRangePartitions(err); !ok { + t.Errorf("Check Out of Range Partitions error message. Expected: %t ; Got: %t", true, ok) + } +} + +func TestIsInvalidRemovingStorage(t *testing.T) { + testCases := []struct { + testName string + fieldErrs field.ErrorList + want bool + }{ + { + testName: "field.Invalid_removingStorage", + fieldErrs: append(field.ErrorList{}, field.Invalid(field.NewPath("spec").Child("brokers").Index(0).Child("brokerConfigGroup"), "test-broker-config-group", unsupportedRemovingStorageMsg+", provided brokerConfigGroup not found")), + want: true, + }, + { + testName: "field.NotFound_removingStorage", + fieldErrs: append(field.ErrorList{}, field.NotFound(field.NewPath("spec").Child("brokers").Index(0).Child("storageConfig").Index(0), "/test/storageConfig/mount/path"+", "+unsupportedRemovingStorageMsg)), + want: true, + }, + { + testName: "field.Invalid_wrong-error-message", + fieldErrs: append(field.ErrorList{}, field.Invalid(field.NewPath("spec").Child("brokers").Index(0).Child("brokerConfigGroup"), "test-broker-config-group", "wrong-error-message"+", provided brokerConfigGroup not found")), + want: false, + }, + { + testName: "field.NotFound_wrong-error-message", + fieldErrs: append(field.ErrorList{}, field.NotFound(field.NewPath("spec").Child("brokers").Index(0).Child("storageConfig").Index(0), "/test/storageConfig/mount/path"+", "+"wrong-error-message")), + want: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + kafkaCluster := banzaicloudv1beta1.KafkaCluster{ObjectMeta: metav1.ObjectMeta{Name: "test-KafkaCluster"}} + + err := apierrors.NewInvalid( + kafkaCluster.GetObjectKind().GroupVersionKind().GroupKind(), + kafkaCluster.Name, tc.fieldErrs) + + if got := IsInvalidRemovingStorage(err); got != tc.want { + t.Errorf("Check Storage Removal Error message. Expected: %t ; Got: %t", tc.want, got) + } + }) + } +} + +func TestIsErrorDuringValidation(t *testing.T) { + testCases := []struct { + testName string + err error + want bool + }{ + { + testName: "errorDuringValidation", + err: apierrors.NewInternalError(errors.WithMessage(errors.New("..."), errorDuringValidationMsg)), + want: true, + }, + { + testName: "wrong-error-message", + err: apierrors.NewInternalError(errors.WithMessage(errors.New("..."), "wrong-error-message")), + want: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + if got := IsErrorDuringValidation(tc.err); got != tc.want { + t.Errorf("Check overall Error During Validation error message. Expected: %t ; Got: %t", tc.want, got) + } + }) + } +} diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index 685960a82..8222a2103 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -20,7 +20,7 @@ import ( "emperror.dev/errors" "golang.org/x/exp/slices" - apiErrors "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -42,7 +42,7 @@ func (s KafkaClusterValidator) ValidateUpdate(ctx context.Context, oldObj, newOb fieldErr, err := checkBrokerStorageRemoval(&kafkaClusterOld.Spec, &kafkaClusterNew.Spec) if err != nil { log.Error(err, errorDuringValidationMsg) - return apiErrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg)) + return apierrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg)) } if fieldErr != nil { allErrs = append(allErrs, fieldErr) @@ -51,7 +51,7 @@ func (s KafkaClusterValidator) ValidateUpdate(ctx context.Context, oldObj, newOb return nil } log.Info("rejected", "invalid field(s)", allErrs.ToAggregate().Error()) - return apiErrors.NewInvalid( + return apierrors.NewInvalid( kafkaClusterNew.GroupVersionKind().GroupKind(), kafkaClusterNew.Name, allErrs) } @@ -78,7 +78,7 @@ func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaic // checking broukerConfigGroup existence if brokerNew.BrokerConfigGroup != "" { if _, exists := kafkaClusterSpecNew.BrokerConfigGroups[brokerNew.BrokerConfigGroup]; !exists { - return field.Invalid(field.NewPath("spec").Child("brokers").Index(int(brokerNew.Id)).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, removingStorageMsg+", provided brokerConfigGroup not found"), nil + return field.Invalid(field.NewPath("spec").Child("brokers").Index(int(brokerNew.Id)).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, unsupportedRemovingStorageMsg+", provided brokerConfigGroup not found"), nil } } brokerConfigsNew, err := brokerNew.GetBrokerConfig(*kafkaClusterSpecNew) @@ -99,9 +99,9 @@ func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaic if !isStorageFound { fromConfigGroup := getMissingMounthPathLocation(storageConfigOld.MountPath, kafkaClusterSpecOld, int32(k)) if fromConfigGroup != nil && *fromConfigGroup { - return field.Invalid(field.NewPath("spec").Child("brokers").Index(k).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, fmt.Sprintf("%s, missing storageConfig mounthPath: %s", removingStorageMsg, storageConfigOld.MountPath)), nil + return field.Invalid(field.NewPath("spec").Child("brokers").Index(k).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, fmt.Sprintf("%s, missing storageConfig mounthPath: %s", unsupportedRemovingStorageMsg, storageConfigOld.MountPath)), nil } - return field.NotFound(field.NewPath("spec").Child("brokers").Index(k).Child("storageConfig").Index(e), storageConfigOld.MountPath+", "+removingStorageMsg), nil + return field.NotFound(field.NewPath("spec").Child("brokers").Index(k).Child("storageConfig").Index(e), storageConfigOld.MountPath+", "+unsupportedRemovingStorageMsg), nil } } } diff --git a/pkg/webhooks/kafkatopic_validator.go b/pkg/webhooks/kafkatopic_validator.go index 7a9227667..39b94f4b5 100644 --- a/pkg/webhooks/kafkatopic_validator.go +++ b/pkg/webhooks/kafkatopic_validator.go @@ -20,7 +20,7 @@ import ( "emperror.dev/errors" - apiErrors "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" @@ -59,13 +59,13 @@ func (s *KafkaTopicValidator) validate(ctx context.Context, obj runtime.Object) fieldErrs, err := s.validateKafkaTopic(ctx, kafkaTopic, log) if err != nil { log.Error(err, errorDuringValidationMsg) - return apiErrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg)) + return apierrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg)) } if len(fieldErrs) == 0 { return nil } log.Info("rejected", "invalid field(s)", fieldErrs.ToAggregate().Error()) - return apiErrors.NewInvalid( + return apierrors.NewInvalid( kafkaTopic.GetObjectKind().GroupVersionKind().GroupKind(), kafkaTopic.Name, fieldErrs) } @@ -94,7 +94,7 @@ func (s *KafkaTopicValidator) validateKafkaTopic(ctx context.Context, topic *ban // Check if the cluster being referenced actually exists if cluster, err = k8sutil.LookupKafkaCluster(ctx, s.Client, clusterName, clusterNamespace); err != nil { - if !apiErrors.IsNotFound(err) { + if !apierrors.IsNotFound(err) { return nil, errors.Wrap(err, cantConnectAPIServerMsg) } if k8sutil.IsMarkedForDeletion(topic.ObjectMeta) { @@ -159,7 +159,7 @@ func (s *KafkaTopicValidator) checkKafka(ctx context.Context, topic *banzaicloud // Check if this is the correct CR for this topic topicCR := &banzaicloudv1alpha1.KafkaTopic{} if err := s.Client.Get(ctx, types.NamespacedName{Name: topic.Name, Namespace: topic.Namespace}, topicCR); err != nil { - if apiErrors.IsNotFound(err) { + if apierrors.IsNotFound(err) { // User is trying to overwrite an existing topic - bad user logMsg := fmt.Sprintf("topic already exists on kafka cluster '%s'", topic.Spec.ClusterRef.Name) return field.Invalid(field.NewPath("spec").Child("name"), topic.Spec.Name, logMsg), nil