Skip to content

Commit

Permalink
Add schema registry support (#40)
Browse files Browse the repository at this point in the history
* Add schema registry support to the conduit type

With schema support in the latest Conduit releases, this allows for multiple
conduit instances to share their schema via another schema registry.

Essentially overriding the builtin implementation.

Details are provided directly via the Conduit resource.
User, password and token can be provided as secrets but will be copied over
into controller owned secrets for management and ease of use.
  • Loading branch information
lyuboxa authored Oct 17, 2024
1 parent e5f2ee1 commit 3f1185c
Show file tree
Hide file tree
Showing 14 changed files with 1,350 additions and 13 deletions.
10 changes: 10 additions & 0 deletions api/v1alpha/conduit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
const (
ConditionConduitReady ConditionType = "Ready"
ConditionConduitConfigReady ConditionType = "ConfigReady"
ConditionConduitSecretReady ConditionType = "SecretReady"
ConditionConduitVolumeReady ConditionType = "VolumeBound"
ConditionConduitDeploymentRunning ConditionType = "DeploymentRunning"
ConditionConduitServiceReady ConditionType = "ServiceReady"
Expand All @@ -33,6 +34,7 @@ const (
var conduitConditions = NewConditionSet(
ConditionConduitReady,
ConditionConduitConfigReady,
ConditionConduitSecretReady,
ConditionConduitVolumeReady,
ConditionConduitDeploymentRunning,
ConditionConduitServiceReady,
Expand Down Expand Up @@ -67,6 +69,7 @@ type ConduitSpec struct {
Running *bool `json:"running,omitempty"`
Version string `json:"version,omitempty"`

Registry *SchemaRegistry `json:"schemaRegistry,omitempty"`
Connectors []*ConduitConnector `json:"connectors,omitempty"`
Processors []*ConduitProcessor `json:"processors,omitempty"`
}
Expand Down Expand Up @@ -94,6 +97,13 @@ type ConduitProcessor struct {
Settings []SettingsVar `json:"settings,omitempty"`
}

type SchemaRegistry struct {
URL string `json:"url,omitempty"`
Username SettingsVar `json:"basicAuthUser,omitempty"`
Password SettingsVar `json:"basicAuthPassword,omitempty"`
Token SettingsVar `json:"bearerToken,omitempty"`
}

type GlobalConfigMapRef struct {
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
Expand Down
34 changes: 26 additions & 8 deletions api/v1alpha/conduit_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha

import (
"fmt"
"net/url"
"path/filepath"
"strings"

Expand Down Expand Up @@ -123,11 +124,15 @@ func (r *Conduit) ValidateCreate() (admission.Warnings, error) {
))
}

if _, err := validateConnectors(r.Spec.Connectors); err != nil {
if err := validateConnectors(r.Spec.Connectors); err != nil {
errs = multierror.Append(errs, err)
}

if _, err := validateProcessors(r.Spec.Processors); err != nil {
if err := validateProcessors(r.Spec.Processors); err != nil {
errs = multierror.Append(errs, err)
}

if err := validateRegistry(r.Spec.Registry); err != nil {
errs = multierror.Append(errs, err)
}

Expand All @@ -137,7 +142,7 @@ func (r *Conduit) ValidateCreate() (admission.Warnings, error) {
// ValidateUpdate implements webhook.Validator for the Conduit resource.
// Error is returned when the changes to the resource are invalid.
func (r *Conduit) ValidateUpdate(runtime.Object) (admission.Warnings, error) {
return validateConnectors(r.Spec.Connectors)
return nil, validateConnectors(r.Spec.Connectors)
}

// ValidateDelete implements webhook.Validator for the Conduit resource.
Expand All @@ -148,7 +153,7 @@ func (r *Conduit) ValidateDelete() (admission.Warnings, error) {

// validateConnectors validates the attributes of connectors in the slice.
// Error is return when the validation fails.
func validateConnectors(cc []*ConduitConnector) (admission.Warnings, error) {
func validateConnectors(cc []*ConduitConnector) error {
var errs error

for _, c := range cc {
Expand All @@ -161,15 +166,15 @@ func validateConnectors(cc []*ConduitConnector) (admission.Warnings, error) {
}
}

if _, err := validateProcessors(c.Processors); err != nil {
if err := validateProcessors(c.Processors); err != nil {
errs = multierror.Append(errs, err)
}
}

return nil, errs
return errs
}

func validateProcessors(pp []*ConduitProcessor) (admission.Warnings, error) {
func validateProcessors(pp []*ConduitProcessor) error {
var errs error

for _, p := range pp {
Expand All @@ -183,7 +188,7 @@ func validateProcessors(pp []*ConduitProcessor) (admission.Warnings, error) {
}
}

return nil, errs
return errs
}

func validateConduitVersion(ver string) bool {
Expand All @@ -194,3 +199,16 @@ func validateConduitVersion(ver string) bool {
}
return conduitVerConstraint.Check(v)
}

func validateRegistry(sr *SchemaRegistry) error {
if sr.URL == "" {
return nil
}

_, err := url.Parse(sr.URL)
if err != nil {
return fmt.Errorf("failed to validate registry url: %w", err)
}

return nil
}
23 changes: 23 additions & 0 deletions api/v1alpha/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 125 additions & 0 deletions charts/conduit-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,131 @@ spec:
type: array
running:
type: boolean
schemaRegistry:
properties:
basicAuthPassword:
properties:
configMapRef:
properties:
key:
type: string
name:
type: string
namespace:
type: string
type: object
name:
type: string
secretRef:
description: SecretKeySelector selects a key of a Secret.
properties:
key:
description: The key of the secret to select from. Must
be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
optional:
description: Specify whether the Secret or its key must
be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
value:
type: string
type: object
basicAuthUser:
properties:
configMapRef:
properties:
key:
type: string
name:
type: string
namespace:
type: string
type: object
name:
type: string
secretRef:
description: SecretKeySelector selects a key of a Secret.
properties:
key:
description: The key of the secret to select from. Must
be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
optional:
description: Specify whether the Secret or its key must
be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
value:
type: string
type: object
bearerToken:
properties:
configMapRef:
properties:
key:
type: string
name:
type: string
namespace:
type: string
type: object
name:
type: string
secretRef:
description: SecretKeySelector selects a key of a Secret.
properties:
key:
description: The key of the secret to select from. Must
be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
optional:
description: Specify whether the Secret or its key must
be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
value:
type: string
type: object
url:
type: string
type: object
version:
type: string
type: object
Expand Down
Loading

0 comments on commit 3f1185c

Please sign in to comment.