Skip to content

Commit

Permalink
events: Add filters to keep track of local and other subscriptions
Browse files Browse the repository at this point in the history
This adds a very basic implementation of a list of namespace+eventType
combinations that each node is interested in by just running the
glob operations in for-loops. Some parallelization is possible, but
not enabled by default.

It only wires up keeping track of what the local event bus is interested
in for now (but doesn't use it yet to filter messages).
  • Loading branch information
Christopher Swenson committed Nov 20, 2023
1 parent 4cf837d commit a9f78be
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 26 deletions.
6 changes: 5 additions & 1 deletion vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,11 @@ func NewCore(conf *CoreConfig) (*Core, error) {
eventsLogger := conf.Logger.Named("events")
c.allLoggers = append(c.allLoggers, eventsLogger)
// start the event system
events, err := eventbus.NewEventBus(eventsLogger)
nodeID, err := c.LoadNodeID()
if err != nil {
return nil, err
}
events, err := eventbus.NewEventBus(c.clusterID.Load, nodeID, eventsLogger)
if err != nil {
return nil, err
}
Expand Down
66 changes: 51 additions & 15 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,19 @@ var (
"path",
logical.EventMetadataDataPath,
}
initCloudEventsFormatterFilterOnce sync.Once
)

func init() {
// Initialize with a blank source URL until an event bus is created.
cloudEventsFormatterFilter = &cloudevents.FormatterFilter{
Source: &url.URL{},
Predicate: func(_ context.Context, e interface{}) (bool, error) {
return true, nil
},
}
}

// EventBus contains the main logic of running an event broker for Vault.
// Start() must be called before the EventBus will accept events for sending.
type EventBus struct {
Expand All @@ -54,6 +65,7 @@ type EventBus struct {
started atomic.Bool
formatterNodeID eventlogger.NodeID
timeout time.Duration
filters *Filters
}

type pluginEventBus struct {
Expand All @@ -72,6 +84,7 @@ type asyncChanNode struct {
closeOnce sync.Once
cancelFunc context.CancelFunc
pipelineID eventlogger.PipelineID
removeFilter func()
removePipeline func(ctx context.Context, t eventlogger.EventType, id eventlogger.PipelineID) (bool, error)
}

Expand Down Expand Up @@ -162,21 +175,36 @@ func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.Even
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}

func init() {
// TODO: maybe this should relate to the Vault core somehow?
sourceUrl, err := url.Parse("https://vaultproject.io/")
if err != nil {
panic(err)
}
cloudEventsFormatterFilter = &cloudevents.FormatterFilter{
Source: sourceUrl,
Predicate: func(_ context.Context, e interface{}) (bool, error) {
return true, nil
},
func setClusterID(clusterIDFunc func() string, localNodeID string) {
// Use the local node ID, in case we aren't running in a cluster.
if cloudEventsFormatterFilter.Source.Scheme == "" {
cloudEventsFormatterFilter.Source, _ = url.Parse("vault://" + localNodeID)
}
// The cluster ID is not available until after the cluster is unsealed.
// Poll for the cluster ID with exponential backoff
// TODO: refactor the core.clusterID to support condition variable maybe?
go func() {
clusterID := clusterIDFunc()
backoff := 1 * time.Millisecond
for clusterID == "" {
backoff = backoff * 2
if backoff > time.Hour {
backoff = time.Hour
}
time.Sleep(backoff)
}
initCloudEventsFormatterFilterOnce.Do(func() {
sourceUrl, err := url.Parse("vault://" + clusterID)
if err != nil {
panic(err)
}
cloudEventsFormatterFilter.Source = sourceUrl
})
}()
}

func NewEventBus(logger hclog.Logger) (*EventBus, error) {
func NewEventBus(clusterIDFunc func() string, localNodeID string, logger hclog.Logger) (*EventBus, error) {
setClusterID(clusterIDFunc, localNodeID)
broker, err := eventlogger.NewBroker()
if err != nil {
return nil, err
Expand All @@ -197,6 +225,7 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
broker: broker,
formatterNodeID: formatterNodeID,
timeout: defaultTimeout,
filters: NewFilters(localNodeID),
}, nil
}

Expand Down Expand Up @@ -240,7 +269,12 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
}

ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, bus.logger, bus.broker)

bus.filters.addPattern(bus.filters.self, namespacePathPatterns, pattern)

asyncNode := newAsyncNode(ctx, bus.logger, bus.broker, func() {
bus.filters.removePattern(bus.filters.self, namespacePathPatterns, pattern)
})
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
defer cancel()
Expand Down Expand Up @@ -301,7 +335,7 @@ func newFilterNode(namespacePatterns []string, pattern string, bexprFilter strin
}
}

// Filter for correct event type, including wildcards.
// NodeFilter for correct event type, including wildcards.
if !glob.Glob(pattern, eventRecv.EventType) {
return false, nil
}
Expand All @@ -315,11 +349,12 @@ func newFilterNode(namespacePatterns []string, pattern string, bexprFilter strin
}, nil
}

func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.Broker) *asyncChanNode {
func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.Broker, removeFilter func()) *asyncChanNode {
return &asyncChanNode{
ctx: ctx,
ch: make(chan *eventlogger.Event),
logger: logger,
removeFilter: removeFilter,
removePipeline: broker.RemovePipelineAndNodes,
}
}
Expand All @@ -328,6 +363,7 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.
func (node *asyncChanNode) Close(ctx context.Context) {
node.closeOnce.Do(func() {
defer node.cancelFunc()
node.removeFilter()
removed, err := node.removePipeline(ctx, eventTypeAll, node.pipelineID)

switch {
Expand Down
29 changes: 19 additions & 10 deletions vault/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// TestBusBasics tests that basic event sending and subscribing function.
func TestBusBasics(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 26 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestBusBasics(t *testing.T) {
// TestBusIgnoresSendContext tests that the context is ignored when sending to an event,
// so that we do not give up too quickly.
func TestBusIgnoresSendContext(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 81 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestBusIgnoresSendContext(t *testing.T) {
// TestSubscribeNonRootNamespace verifies that events for non-root namespaces
// aren't filtered out by the bus.
func TestSubscribeNonRootNamespace(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 122 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestSubscribeNonRootNamespace(t *testing.T) {

// TestNamespaceFiltering verifies that events for other namespaces are filtered out by the bus.
func TestNamespaceFiltering(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 165 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestNamespaceFiltering(t *testing.T) {

// TestBus2Subscriptions verifies that events of different types are successfully routed to the correct subscribers.
func TestBus2Subscriptions(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 225 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestBusSubscriptionsCancel(t *testing.T) {
for _, tc := range testCases {
t.Run(fmt.Sprintf("cancel=%v", tc.cancel), func(t *testing.T) {
subscriptions.Store(0)
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 296 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func waitFor(t *testing.T, maxWait time.Duration, f func() bool) {
// TestBusWildcardSubscriptions tests that a single subscription can receive
// multiple event types using * for glob patterns.
func TestBusWildcardSubscriptions(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 399 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestBusWildcardSubscriptions(t *testing.T) {
// TestDataPathIsPrependedWithMount tests that "data_path", if present in the
// metadata, is prepended with the plugin's mount.
func TestDataPathIsPrependedWithMount(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 474 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestDataPathIsPrependedWithMount(t *testing.T) {

// TestBexpr tests go-bexpr filters are evaluated on an event.
func TestBexpr(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 594 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -671,7 +671,7 @@ func TestBexpr(t *testing.T) {
// TestPipelineCleanedUp ensures pipelines are properly cleaned up after
// subscriptions are closed.
func TestPipelineCleanedUp(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", "", nil)

Check failure on line 674 in vault/eventbus/bus_test.go

View workflow job for this annotation

GitHub Actions / Run Go tests with data race detection / test-go (4)

cannot use "" (untyped string constant) as func() string value in argument to NewEventBus
if err != nil {
t.Fatal(err)
}
Expand All @@ -683,6 +683,10 @@ func TestPipelineCleanedUp(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// check that the filters are set
if !bus.filters.anyMatch(namespace.RootNamespace.Path, eventType) {
t.Fatal()
}
if !bus.broker.IsAnyPipelineRegistered(eventTypeAll) {
cancel()
t.Fatal()
Expand All @@ -693,4 +697,9 @@ func TestPipelineCleanedUp(t *testing.T) {
if bus.broker.IsAnyPipelineRegistered(eventTypeAll) {
t.Fatal()
}

// and that the filters are cleaned up
if bus.filters.anyMatch(namespace.RootNamespace.Path, eventType) {
t.Fatal()
}
}
124 changes: 124 additions & 0 deletions vault/eventbus/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package eventbus

import (
"slices"
"sync"
"sync/atomic"

"github.com/hashicorp/vault/sdk/logical"
"github.com/ryanuber/go-glob"
)

// Filters keeps track of all the event patterns that each node is interested in.
type Filters struct {
lock sync.RWMutex
parallel bool
self nodeID
filters map[nodeID]*NodeFilter
}

// nodeID is used to syntactically indicate that the string is a node's name identifier.
type nodeID string

// pattern is used to represent one or more combinations of patterns
type pattern struct {
eventTypePattern string
namespacePatterns []string
}

// NodeFilter keeps track of all patterns that a particular node is interested in.
type NodeFilter struct {
patterns []pattern
}

func (nf *NodeFilter) match(nsPath string, eventType logical.EventType) bool {
if nf == nil {
return false
}
for _, p := range nf.patterns {
if glob.Glob(p.eventTypePattern, string(eventType)) {
for _, nsp := range p.namespacePatterns {
if glob.Glob(nsp, nsPath) {
return true
}
}
}
}
return false
}

// NewFilters creates an empty set of filters to keep track of each node's pattern interests.
func NewFilters(self string) *Filters {
return &Filters{
self: nodeID(self),
filters: map[nodeID]*NodeFilter{},
}
}

// addPattern adds a pattern to a node's list.
func (f *Filters) addPattern(node nodeID, namespacePatterns []string, eventTypePattern string) {
f.lock.Lock()
defer f.lock.Unlock()
if _, ok := f.filters[node]; !ok {
f.filters[node] = &NodeFilter{}
}
f.filters[node].patterns = append(f.filters[node].patterns, pattern{eventTypePattern: eventTypePattern, namespacePatterns: namespacePatterns})
}

// removePattern removes a pattern from a node's list.
func (f *Filters) removePattern(node nodeID, namespacePatterns []string, eventTypePattern string) {
check := pattern{eventTypePattern: eventTypePattern, namespacePatterns: namespacePatterns}
f.lock.Lock()
defer f.lock.Unlock()
filters, ok := f.filters[node]
if !ok {
return
}
filters.patterns = slices.DeleteFunc(filters.patterns, func(m pattern) bool {
return m.eventTypePattern == check.eventTypePattern &&
slices.Equal(m.namespacePatterns, check.namespacePatterns)
})
}

// anyMatch returns true if any node's pattern list matches the arguments.
func (f *Filters) anyMatch(nsPath string, eventType logical.EventType) bool {
f.lock.RLock()
defer f.lock.RUnlock()
if f.parallel {
wg := sync.WaitGroup{}
anyMatched := atomic.Bool{}
for _, nf := range f.filters {
wg.Add(1)
go func(nf *NodeFilter) {
if nf.match(nsPath, eventType) {
anyMatched.Store(true)
}
wg.Done()
}(nf)
}
wg.Wait()
return anyMatched.Load()
} else {
for _, nf := range f.filters {
if nf.match(nsPath, eventType) {
return true
}
}
return false
}
}

// nodeMatch returns true if the given node's pattern list matches the arguments.
func (f *Filters) nodeMatch(node nodeID, nsPath string, eventType logical.EventType) bool {
f.lock.RLock()
defer f.lock.RUnlock()
return f.filters[node].match(nsPath, eventType)
}

// localMatch returns true if the local node's pattern list matches the arguments.
func (f *Filters) localMatch(nsPath string, eventType logical.EventType) bool {
return f.nodeMatch(f.self, nsPath, eventType)
}
Loading

0 comments on commit a9f78be

Please sign in to comment.