Skip to content

Commit

Permalink
Add support for brokered queues and topics
Browse files Browse the repository at this point in the history
* Synchronize and store messaging tenants in infra state. This will be
used to lookup the tenant of an address to determine how that address
will be configured. It will also later be used to drive the creation
of vhost policies.
* Add 'transactional' scheduler that will schedule all addresses in a
namespace on the same broker, if they belong to a tenant with
transactional capability.
* If a tenant is transactional, instead of creating multiple autolinks
and linkroutes on the router, create a single in+out link route pair
with the prefix of the tenant (per endpoint for now) so that all
links for that tenant go directly to the broker.
* Add systemtest that verifies basic transactions work for queues and topics.

Issue EnMasseProject#4469
  • Loading branch information
lulf committed Jun 23, 2020
1 parent 97de89a commit 2a08478
Show file tree
Hide file tree
Showing 17 changed files with 563 additions and 117 deletions.
18 changes: 18 additions & 0 deletions api-model/src/main/resources/schema/kube-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,15 @@
"type": "object",
"description": "",
"properties": {
"capabilities": {
"type": "array",
"description": "",
"javaOmitEmpty": true,
"items": {
"type": "string",
"description": ""
}
},
"messagingInfrastructureRef": {
"$ref": "#/definitions/github_com_enmasseproject_enmasse_pkg_apis_enmasse_v1beta2_MessagingInfrastructureReference",
"javaType": "MessagingInfrastructureReference"
Expand All @@ -1150,6 +1159,15 @@
"type": "object",
"description": "",
"properties": {
"capabilities": {
"type": "array",
"description": "",
"javaOmitEmpty": true,
"items": {
"type": "string",
"description": ""
}
},
"conditions": {
"type": "array",
"description": "",
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/enmasse/v1beta2/types_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type NamespaceSelector struct {
MatchNames []string `json:"matchNames,omitempty"`
}

type MessagingCapability string

const (
MessagingCapabilityTransactional MessagingCapability = "transactional"
)

type MessagingInfrastructureReference struct {
// Name of referenced MessagingInfra.
Name string `json:"name"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/enmasse/v1beta2/types_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type MessagingTenant struct {
type MessagingTenantSpec struct {
// Reference to a specific MessagingInfra to use (must be available for this tenant).
MessagingInfrastructureRef *MessagingInfrastructureReference `json:"messagingInfrastructureRef,omitempty"`
// The desired capabilities common to all addresses for this tenant.
Capabilities []MessagingCapability `json:"capabilities,omitempty"`
}

type MessagingTenantStatus struct {
Expand All @@ -41,6 +43,8 @@ type MessagingTenantStatus struct {
// MessagingInfra this tenant is bound to.
MessagingInfrastructureRef MessagingInfrastructureReference `json:"messagingInfrastructureRef,omitempty"`
Conditions []MessagingTenantCondition `json:"conditions,omitempty"`
// The actual capabilities common to all addresses for this tenant.
Capabilities []MessagingCapability `json:"capabilities,omitempty"`
}

type MessagingTenantCondition struct {
Expand All @@ -56,6 +60,7 @@ type MessagingTenantConditionType string
const (
MessagingTenantBound MessagingTenantConditionType = "Bound"
MessagingTenantCaCreated MessagingTenantConditionType = "CaCreated"
MessagingTenantCreated MessagingTenantConditionType = "Created"
MessagingTenantReady MessagingTenantConditionType = "Ready"
)

Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/enmasse/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 15 additions & 25 deletions pkg/controller/messagingaddress/messagingaddress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

amqp "github.com/enmasseproject/enmasse/pkg/amqpcommand"
v1beta2 "github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2"
"github.com/enmasseproject/enmasse/pkg/controller/messagingaddress/scheduler"
"github.com/enmasseproject/enmasse/pkg/controller/messaginginfra"
"github.com/enmasseproject/enmasse/pkg/state"
"github.com/enmasseproject/enmasse/pkg/state/broker"
stateerrors "github.com/enmasseproject/enmasse/pkg/state/errors"
"github.com/enmasseproject/enmasse/pkg/util"
utilerrors "github.com/enmasseproject/enmasse/pkg/util/errors"
Expand All @@ -43,6 +43,9 @@ import (
var log = logf.Log.WithName("controller_messagingaddress")
var _ reconcile.Reconciler = &ReconcileMessagingAddress{}

var defaultScheduler = scheduler.NewDummyScheduler()
var transactionalScheduler = scheduler.NewTransactionalScheduler()

type ReconcileMessagingAddress struct {
client client.Client
reader client.Reader
Expand Down Expand Up @@ -100,27 +103,6 @@ func add(mgr manager.Manager, r *ReconcileMessagingAddress) error {
return err
}

/*
* Very dumb scheduler that doesn't look at broker capacity.
*/
type DummyScheduler struct {
}

var _ state.Scheduler = &DummyScheduler{}

func (s *DummyScheduler) ScheduleAddress(address *v1beta2.MessagingAddress, brokers []*broker.BrokerState) error {
if len(brokers) > 0 {
broker := brokers[0]
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{
State: v1beta2.MessagingAddressBrokerScheduled,
Host: broker.Host().Hostname,
})
} else {
return fmt.Errorf("no available broker")
}
return nil
}

func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconcile.Result, error) {

logger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
Expand Down Expand Up @@ -198,7 +180,7 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("provided wrong object type to finalizer, only supports MessagingAddress")
}

infra, err := messaginginfra.LookupInfra(ctx, r.client, address.Namespace)
_, infra, err := messaginginfra.LookupInfra(ctx, r.client, address.Namespace)
if err != nil {
// Not bound - allow dropping finalizer
if utilerrors.IsNotBound(err) || utilerrors.IsNotFound(err) {
Expand Down Expand Up @@ -268,17 +250,19 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
return result.Result(), err
}

var tenant *v1beta2.MessagingTenant
var infra *v1beta2.MessagingInfrastructure
// Retrieve the MessagingInfra for this MessagingAddress
result, err = rc.Process(func(address *v1beta2.MessagingAddress) (processorResult, error) {
i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
t, i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
if err != nil && (k8errors.IsNotFound(err) || utilerrors.IsNotBound(err)) {
foundTenant.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
return processorResult{RequeueAfter: 10 * time.Second}, nil
}
foundTenant.SetStatus(corev1.ConditionTrue, "", "")
infra = i
tenant = t
return processorResult{}, err

})
Expand Down Expand Up @@ -368,7 +352,13 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
}

// TODO: Make configurable and a better scheduler
scheduler := &DummyScheduler{}
scheduler := defaultScheduler
for _, capability := range tenant.Status.Capabilities {
if capability == v1beta2.MessagingCapabilityTransactional {
scheduler = transactionalScheduler
break
}
}

client := r.clientManager.GetClient(infra)
err := client.ScheduleAddress(address, scheduler)
Expand Down
39 changes: 39 additions & 0 deletions pkg/controller/messagingaddress/scheduler/dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020, EnMasse authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package scheduler

import (
"fmt"

"github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2"
"github.com/enmasseproject/enmasse/pkg/state"
"github.com/enmasseproject/enmasse/pkg/state/broker"
)

/*
* Very dumb scheduler that doesn't look at broker capacity.
*/
type dummyScheduler struct {
}

var _ state.Scheduler = &dummyScheduler{}

func NewDummyScheduler() state.Scheduler {
return &dummyScheduler{}
}

func (s *dummyScheduler) ScheduleAddress(address *v1beta2.MessagingAddress, brokers []*broker.BrokerState) error {
if len(brokers) > 0 {
broker := brokers[0]
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{
State: v1beta2.MessagingAddressBrokerScheduled,
Host: broker.Host().Hostname,
})
} else {
return fmt.Errorf("no available broker")
}
return nil
}
57 changes: 57 additions & 0 deletions pkg/controller/messagingaddress/scheduler/transactional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2020, EnMasse authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package scheduler

import (
"fmt"
"sync"

"github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2"
"github.com/enmasseproject/enmasse/pkg/state"
"github.com/enmasseproject/enmasse/pkg/state/broker"
)

/*
* Scheduler that forces all addresses for a namespace to the same broker.
*/
type transactionalScheduler struct {
namespaceToHost map[string]string
// Protects the map from concurrent access by multiple reconcilers
lock sync.Mutex
}

var _ state.Scheduler = &transactionalScheduler{}

func NewTransactionalScheduler() state.Scheduler {
return &transactionalScheduler{
namespaceToHost: make(map[string]string, 0),
}
}

func (s *transactionalScheduler) ScheduleAddress(address *v1beta2.MessagingAddress, brokers []*broker.BrokerState) error {
if len(brokers) > 0 {
s.lock.Lock()
defer s.lock.Unlock()
host, ok := s.namespaceToHost[address.Namespace]
if !ok {
// TODO: Find the one meeting resource requirements etc.
broker := brokers[0]
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{
State: v1beta2.MessagingAddressBrokerScheduled,
Host: broker.Host().Hostname,
})
s.namespaceToHost[address.Namespace] = broker.Host().Hostname
} else {
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{
State: v1beta2.MessagingAddressBrokerScheduled,
Host: host,
})
}
} else {
return fmt.Errorf("no available broker")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (r *ReconcileMessagingEndpoint) Reconcile(request reconcile.Request) (recon
// Retrieve the MessagingInfra for this MessagingEndpoint
var infra *v1beta2.MessagingInfrastructure
result, err = rc.Process(func(endpoint *v1beta2.MessagingEndpoint) (processorResult, error) {
i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
_, i, err := messaginginfra.LookupInfra(ctx, r.client, found.Namespace)
if err != nil && (k8errors.IsNotFound(err) || utilerrors.IsNotBound(err)) {
endpoint.Status.GetMessagingEndpointCondition(v1beta2.MessagingEndpointFoundTenant).SetStatus(corev1.ConditionFalse, "", err.Error())
endpoint.Status.Message = err.Error()
Expand Down Expand Up @@ -330,7 +330,7 @@ func (r *ReconcileMessagingEndpoint) reconcileFinalizer(ctx context.Context, log
return reconcile.Result{}, fmt.Errorf("provided wrong object type to finalizer, only supports MessagingEndpoint")
}

infra, err := messaginginfra.LookupInfra(ctx, r.client, endpoint.Namespace)
_, infra, err := messaginginfra.LookupInfra(ctx, r.client, endpoint.Namespace)
if err != nil {
// Not bound - allow dropping finalizer
if utilerrors.IsNotBound(err) || utilerrors.IsNotFound(err) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/messaginginfra/messaginginfra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,26 +631,26 @@ func (r *processorResult) Result() reconcile.Result {
}

// Find the MessagingInfra servicing a given namespace
func LookupInfra(ctx context.Context, c client.Client, namespace string) (*v1beta2.MessagingInfrastructure, error) {
func LookupInfra(ctx context.Context, c client.Client, namespace string) (*v1beta2.MessagingTenant, *v1beta2.MessagingInfrastructure, error) {
// Retrieve the MessagingTenant for this namespace
tenant := &v1beta2.MessagingTenant{}
err := c.Get(ctx, types.NamespacedName{Name: messagingtenant.TENANT_RESOURCE_NAME, Namespace: namespace}, tenant)
if err != nil {
if k8errors.IsNotFound(err) {
return nil, utilerrors.NewNotFoundError("MessagingTenant", messagingtenant.TENANT_RESOURCE_NAME, namespace)
return nil, nil, utilerrors.NewNotFoundError("MessagingTenant", messagingtenant.TENANT_RESOURCE_NAME, namespace)
}
return nil, err
return nil, nil, err
}

if !tenant.IsBound() {
return nil, utilerrors.NewNotBoundError(namespace)
return tenant, nil, utilerrors.NewNotBoundError(namespace)
}

// Retrieve the MessagingInfra for this MessagingTenant
infra := &v1beta2.MessagingInfrastructure{}
err = c.Get(ctx, types.NamespacedName{Name: tenant.Status.MessagingInfrastructureRef.Name, Namespace: tenant.Status.MessagingInfrastructureRef.Namespace}, infra)
if err != nil {
return nil, err
return tenant, nil, err
}
return infra, nil
return tenant, infra, nil
}
Loading

0 comments on commit 2a08478

Please sign in to comment.