Skip to content

Commit

Permalink
Add support for brokered queues and topics
Browse files Browse the repository at this point in the history
* Synchronize and store messaging tenants in infra state. This will be
used to lookup the tenant of an address to determine how that address
will be configured. It will also later be used to drive the creation
of vhost policies.
* Add scheduling of tenants that are transactional to ensure that addresses
for that tenant end up on the same broker.
* If a tenant is transactional, instead of creating multiple autolinks
and linkroutes on the router, create a single in+out link route pair
with the prefix of the tenant (per endpoint for now) so that all
links for that tenant go directly to the broker.
* Add systemtest that verifies basic transactions work for queues and
topics. Tests for durable subscriptions are disabled for now.
* Prevent deletion of brokers that are in use.

Issue EnMasseProject#4469
  • Loading branch information
lulf committed Jun 29, 2020
1 parent 1bafdfa commit 46d2952
Show file tree
Hide file tree
Showing 26 changed files with 1,306 additions and 113 deletions.
18 changes: 18 additions & 0 deletions api-model/src/main/resources/schema/kube-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,15 @@
"type": "object",
"description": "",
"properties": {
"capabilities": {
"type": "array",
"description": "",
"javaOmitEmpty": true,
"items": {
"type": "string",
"description": ""
}
},
"messagingInfrastructureRef": {
"$ref": "#/definitions/github_com_enmasseproject_enmasse_pkg_apis_enmasse_v1beta2_MessagingInfrastructureReference",
"javaType": "MessagingInfrastructureReference"
Expand All @@ -1150,6 +1159,15 @@
"type": "object",
"description": "",
"properties": {
"capabilities": {
"type": "array",
"description": "",
"javaOmitEmpty": true,
"items": {
"type": "string",
"description": ""
}
},
"conditions": {
"type": "array",
"description": "",
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/enmasse/v1beta2/types_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type NamespaceSelector struct {
MatchNames []string `json:"matchNames,omitempty"`
}

type MessagingCapability string

const (
MessagingCapabilityTransactional MessagingCapability = "transactional"
)

type MessagingInfrastructureReference struct {
// Name of referenced MessagingInfra.
Name string `json:"name"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/enmasse/v1beta2/types_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// +kubebuilder:printcolumn:name="Host",type="string",JSONPath=".status.host",description="The hostname."
// +kubebuilder:printcolumn:name="Message",type="string",JSONPath=".status.message",priority=1,description="Message describing the reason for the current Phase."
// +kubebuilder:printcolumn:name="Protocols",type="string",JSONPath=".spec.protocols",priority=1,description="Supported protocols."
// +kubebuilder:printcolumn:name="CertficateExpiry",type="string",JSONPath=".status.tls.certificateInfo.notAfter",priority=1,description="Certificate expiry."
// +kubebuilder:printcolumn:name="CertficateExpiry",type="string",JSONPath=".status.tls.certificateValidity.notAfter",priority=1,description="Certificate expiry."
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type MessagingEndpoint struct {
metav1.TypeMeta `json:",inline"`
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/enmasse/v1beta2/types_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type MessagingTenant struct {
type MessagingTenantSpec struct {
// Reference to a specific MessagingInfra to use (must be available for this tenant).
MessagingInfrastructureRef *MessagingInfrastructureReference `json:"messagingInfrastructureRef,omitempty"`
// The desired capabilities common to all addresses for this tenant.
Capabilities []MessagingCapability `json:"capabilities,omitempty"`
}

type MessagingTenantStatus struct {
Expand All @@ -41,6 +43,10 @@ type MessagingTenantStatus struct {
// MessagingInfra this tenant is bound to.
MessagingInfrastructureRef MessagingInfrastructureReference `json:"messagingInfrastructureRef,omitempty"`
Conditions []MessagingTenantCondition `json:"conditions,omitempty"`
// The actual capabilities common to all addresses for this tenant.
Capabilities []MessagingCapability `json:"capabilities,omitempty"`
// For transactional tenants, the broker addresses should be scheduled todo
Broker *MessagingAddressBroker `json:"broker,omitempty"`
}

type MessagingTenantCondition struct {
Expand All @@ -56,6 +62,8 @@ type MessagingTenantConditionType string
const (
MessagingTenantBound MessagingTenantConditionType = "Bound"
MessagingTenantCaCreated MessagingTenantConditionType = "CaCreated"
MessagingTenantScheduled MessagingTenantConditionType = "Scheduled"
MessagingTenantCreated MessagingTenantConditionType = "Created"
MessagingTenantReady MessagingTenantConditionType = "Ready"
)

Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/enmasse/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controller/iotconfig/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *ReconcileIoTConfig) processAdapterInfraCert(ctx context.Context, config

// for all adapters

infra, err := messaginginfra.LookupInfra(ctx, r.client, config.Namespace)
_, infra, err := messaginginfra.LookupInfra(ctx, r.client, config.Namespace)
if err != nil {
return reconcile.Result{}, err
}
Expand Down
34 changes: 5 additions & 29 deletions pkg/controller/messagingaddress/messagingaddress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
v1beta2 "github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2"
"github.com/enmasseproject/enmasse/pkg/controller/messaginginfra"
"github.com/enmasseproject/enmasse/pkg/state"
"github.com/enmasseproject/enmasse/pkg/state/broker"
stateerrors "github.com/enmasseproject/enmasse/pkg/state/errors"
"github.com/enmasseproject/enmasse/pkg/util"
utilerrors "github.com/enmasseproject/enmasse/pkg/util/errors"
Expand Down Expand Up @@ -100,27 +99,6 @@ func add(mgr manager.Manager, r *ReconcileMessagingAddress) error {
return err
}

/*
* Very dumb scheduler that doesn't look at broker capacity.
*/
type DummyScheduler struct {
}

var _ state.Scheduler = &DummyScheduler{}

func (s *DummyScheduler) ScheduleAddress(address *v1beta2.MessagingAddress, brokers []*broker.BrokerState) error {
if len(brokers) > 0 {
broker := brokers[0]
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{
State: v1beta2.MessagingAddressBrokerScheduled,
Host: broker.Host().Hostname,
})
} else {
return fmt.Errorf("no available broker")
}
return nil
}

func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconcile.Result, error) {

logger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
Expand Down Expand Up @@ -198,7 +176,7 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("provided wrong object type to finalizer, only supports MessagingAddress")
}

infra, err := messaginginfra.LookupInfra(ctx, r.client, address.Namespace)
_, infra, err := messaginginfra.LookupInfra(ctx, r.client, address.Namespace)
if err != nil {
// Not bound - allow dropping finalizer
if utilerrors.IsNotBound(err) || utilerrors.IsNotFound(err) {
Expand Down Expand Up @@ -271,7 +249,7 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
var infra *v1beta2.MessagingInfrastructure
// Retrieve the MessagingInfra for this MessagingAddress
result, err = rc.Process(func(address *v1beta2.MessagingAddress) (processorResult, error) {
i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
_, i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
if err != nil && (k8errors.IsNotFound(err) || utilerrors.IsNotBound(err)) {
foundTenant.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
Expand Down Expand Up @@ -356,22 +334,20 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc

result, err = rc.Process(func(address *v1beta2.MessagingAddress) (processorResult, error) {
// TODO: Handle changes to partitions etc.
// We're already scheduled so just make sure scheduler is synced
if len(address.Status.Brokers) > 0 {
// We're already scheduled so don't change
scheduled.SetStatus(corev1.ConditionTrue, "", "")
return processorResult{}, nil
}

// These addresses don't require scheduling
if address.Spec.Anycast != nil || address.Spec.Multicast != nil {
scheduled.SetStatus(corev1.ConditionTrue, "", "")
return processorResult{}, nil
}

// TODO: Make configurable and a better scheduler
scheduler := &DummyScheduler{}

client := r.clientManager.GetClient(infra)
err := client.ScheduleAddress(address, scheduler)
err := client.ScheduleAddress(address)
if err != nil {
scheduled.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (r *ReconcileMessagingEndpoint) Reconcile(request reconcile.Request) (recon
// Retrieve the MessagingInfra for this MessagingEndpoint
var infra *v1beta2.MessagingInfrastructure
result, err = rc.Process(func(endpoint *v1beta2.MessagingEndpoint) (processorResult, error) {
i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
_, i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
if err != nil && (k8errors.IsNotFound(err) || utilerrors.IsNotBound(err)) {
endpoint.Status.GetMessagingEndpointCondition(v1beta2.MessagingEndpointFoundTenant).SetStatus(corev1.ConditionFalse, "", err.Error())
endpoint.Status.Message = err.Error()
Expand Down Expand Up @@ -330,7 +330,7 @@ func (r *ReconcileMessagingEndpoint) reconcileFinalizer(ctx context.Context, log
return reconcile.Result{}, fmt.Errorf("provided wrong object type to finalizer, only supports MessagingEndpoint")
}

infra, err := messaginginfra.LookupInfra(ctx, r.client, endpoint.Namespace)
_, infra, err := messaginginfra.LookupInfra(ctx, r.client, endpoint.Namespace)
if err != nil {
// Not bound - allow dropping finalizer
if utilerrors.IsNotBound(err) || utilerrors.IsNotFound(err) {
Expand Down
23 changes: 21 additions & 2 deletions pkg/controller/messaginginfra/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package broker

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -15,7 +16,9 @@ import (
v1beta2 "github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2"
"github.com/enmasseproject/enmasse/pkg/controller/messaginginfra/cert"
"github.com/enmasseproject/enmasse/pkg/controller/messaginginfra/common"
"github.com/enmasseproject/enmasse/pkg/state"
. "github.com/enmasseproject/enmasse/pkg/state/common"
stateerrors "github.com/enmasseproject/enmasse/pkg/state/errors"
"github.com/enmasseproject/enmasse/pkg/util"
"github.com/enmasseproject/enmasse/pkg/util/install"

Expand All @@ -34,13 +37,16 @@ type BrokerController struct {
client client.Client
scheme *runtime.Scheme
certController *cert.CertController
clientManager state.ClientManager
}

func NewBrokerController(client client.Client, scheme *runtime.Scheme, certController *cert.CertController) *BrokerController {
clientManager := state.GetClientManager()
return &BrokerController{
client: client,
scheme: scheme,
certController: certController,
clientManager: clientManager,
}
}

Expand Down Expand Up @@ -96,18 +102,31 @@ func (b *BrokerController) ReconcileBrokers(ctx context.Context, logger logr.Log

}

infraClient := b.clientManager.GetClient(infra)
toDelete := numBrokersToDelete(infra.Spec.Broker.ScalingStrategy, brokers.Items)
newSize := len(brokers.Items) - toDelete
if toDelete > 0 {
logger.Info("Removing brokers", "toDelete", toDelete)
for i := len(brokers.Items) - 1; toDelete > 0; i-- {
err := b.client.Delete(ctx, &brokers.Items[i])
for i := len(brokers.Items) - 1; toDelete > 0 && i > 0; i-- {
err := infraClient.DeleteBroker(toHost(&brokers.Items[i]))
if err != nil {
if errors.Is(err, stateerrors.BrokerInUseError) {
continue
}
return nil, err
}
err = b.client.Delete(ctx, &brokers.Items[i])
if err != nil {
return nil, err
}
delete(hosts, toHost(&brokers.Items[i]))
toDelete--
}
}
// TODO: Depending on scaling strategy, support migrating queues
if toDelete > 0 {
return nil, fmt.Errorf("unable to scale down to %d brokers: %d brokers are still needed", newSize, newSize+toDelete)
}

// Update discoverable brokers
brokerPods := corev1.PodList{}
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/messaginginfra/messaginginfra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,26 +631,26 @@ func (r *processorResult) Result() reconcile.Result {
}

// Find the MessagingInfra servicing a given namespace
func LookupInfra(ctx context.Context, c client.Client, namespace string) (*v1beta2.MessagingInfrastructure, error) {
func LookupInfra(ctx context.Context, c client.Client, namespace string) (*v1beta2.MessagingTenant, *v1beta2.MessagingInfrastructure, error) {
// Retrieve the MessagingTenant for this namespace
tenant := &v1beta2.MessagingTenant{}
err := c.Get(ctx, types.NamespacedName{Name: messagingtenant.TENANT_RESOURCE_NAME, Namespace: namespace}, tenant)
if err != nil {
if k8errors.IsNotFound(err) {
return nil, utilerrors.NewNotFoundError("MessagingTenant", messagingtenant.TENANT_RESOURCE_NAME, namespace)
return nil, nil, utilerrors.NewNotFoundError("MessagingTenant", messagingtenant.TENANT_RESOURCE_NAME, namespace)
}
return nil, err
return nil, nil, err
}

if !tenant.IsBound() {
return nil, utilerrors.NewNotBoundError(namespace)
return tenant, nil, utilerrors.NewNotBoundError(namespace)
}

// Retrieve the MessagingInfra for this MessagingTenant
infra := &v1beta2.MessagingInfrastructure{}
err = c.Get(ctx, types.NamespacedName{Name: tenant.Status.MessagingInfrastructureRef.Name, Namespace: tenant.Status.MessagingInfrastructureRef.Namespace}, infra)
if err != nil {
return nil, err
return tenant, nil, err
}
return infra, nil
return tenant, infra, nil
}
Loading

0 comments on commit 46d2952

Please sign in to comment.