forked from EnMasseProject/enmasse
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for brokered queues and topics
* Synchronize and store messaging tenants in infra state. This will be used to lookup the tenant of an address to determine how that address will be configured. It will also later be used to drive the creation of vhost policies. * Add 'transactional' scheduler that will schedule all addresses in a namespace on the same broker, if they belong to a tenant with transactional capability. * If a tenant is transactional, instead of creating multiple autolinks and linkroutes on the router, create a single in+out link route pair with the prefix of the tenant (per endpoint for now) so that all links for that tenant go directly to the broker. * Add systemtest that verifies basic transactions work for queues and topics. Issue EnMasseProject#4469
- Loading branch information
Showing
17 changed files
with
563 additions
and
117 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright 2020, EnMasse authors. | ||
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). | ||
*/ | ||
|
||
package scheduler | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2" | ||
"github.com/enmasseproject/enmasse/pkg/state" | ||
"github.com/enmasseproject/enmasse/pkg/state/broker" | ||
) | ||
|
||
/* | ||
* Very dumb scheduler that doesn't look at broker capacity. | ||
*/ | ||
type dummyScheduler struct { | ||
} | ||
|
||
var _ state.Scheduler = &dummyScheduler{} | ||
|
||
func NewDummyScheduler() state.Scheduler { | ||
return &dummyScheduler{} | ||
} | ||
|
||
func (s *dummyScheduler) ScheduleAddress(address *v1beta2.MessagingAddress, brokers []*broker.BrokerState) error { | ||
if len(brokers) > 0 { | ||
broker := brokers[0] | ||
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{ | ||
State: v1beta2.MessagingAddressBrokerScheduled, | ||
Host: broker.Host().Hostname, | ||
}) | ||
} else { | ||
return fmt.Errorf("no available broker") | ||
} | ||
return nil | ||
} |
57 changes: 57 additions & 0 deletions
57
pkg/controller/messagingaddress/scheduler/transactional.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2020, EnMasse authors. | ||
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). | ||
*/ | ||
|
||
package scheduler | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/enmasseproject/enmasse/pkg/apis/enmasse/v1beta2" | ||
"github.com/enmasseproject/enmasse/pkg/state" | ||
"github.com/enmasseproject/enmasse/pkg/state/broker" | ||
) | ||
|
||
/* | ||
* Scheduler that forces all addresses for a namespace to the same broker. | ||
*/ | ||
type transactionalScheduler struct { | ||
namespaceToHost map[string]string | ||
// Protects the map from concurrent access by multiple reconcilers | ||
lock sync.Mutex | ||
} | ||
|
||
var _ state.Scheduler = &transactionalScheduler{} | ||
|
||
func NewTransactionalScheduler() state.Scheduler { | ||
return &transactionalScheduler{ | ||
namespaceToHost: make(map[string]string, 0), | ||
} | ||
} | ||
|
||
func (s *transactionalScheduler) ScheduleAddress(address *v1beta2.MessagingAddress, brokers []*broker.BrokerState) error { | ||
if len(brokers) > 0 { | ||
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
host, ok := s.namespaceToHost[address.Namespace] | ||
if !ok { | ||
// TODO: Find the one meeting resource requirements etc. | ||
broker := brokers[0] | ||
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{ | ||
State: v1beta2.MessagingAddressBrokerScheduled, | ||
Host: broker.Host().Hostname, | ||
}) | ||
s.namespaceToHost[address.Namespace] = broker.Host().Hostname | ||
} else { | ||
address.Status.Brokers = append(address.Status.Brokers, v1beta2.MessagingAddressBroker{ | ||
State: v1beta2.MessagingAddressBrokerScheduled, | ||
Host: host, | ||
}) | ||
} | ||
} else { | ||
return fmt.Errorf("no available broker") | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.