Skip to content

Commit

Permalink
VAULT-17772: bump go-eventlogger to v0.2.1 (#21623)
Browse files Browse the repository at this point in the history
* go-eventlogger: moved to v0.2.1, allows removal of pipeline and nodes
  • Loading branch information
Peter Wilson authored Jul 6, 2023
1 parent afc8f7d commit 8bb9cbb
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
3 changes: 3 additions & 0 deletions changelog/21623.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
eventbus: updated go-eventlogger library to allow removal of nodes referenced by pipelines (used for subscriptions)
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
github.com/hashicorp/consul-template v0.32.0
github.com/hashicorp/consul/api v1.20.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/eventlogger v0.1.1
github.com/hashicorp/eventlogger v0.2.1
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
github.com/hashicorp/go-gcp-common v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1713,8 +1713,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo=
github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/eventlogger v0.2.1 h1:sjAOKO62BDDBn10516Uo7QDf5KEqzhU0LkUnbBptVUU=
github.com/hashicorp/eventlogger v0.2.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down
25 changes: 18 additions & 7 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package eventbus
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -146,7 +147,10 @@ func init() {
}

func NewEventBus(logger hclog.Logger) (*EventBus, error) {
broker := eventlogger.NewBroker()
broker, err := eventlogger.NewBroker()
if err != nil {
return nil, err
}

formatterID, err := uuid.GenerateUUID()
if err != nil {
Expand Down Expand Up @@ -218,7 +222,8 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
// add info needed to cancel the subscription
asyncNode.pipelineID = eventlogger.PipelineID(pipelineID)
asyncNode.cancelFunc = cancel
return asyncNode.ch, asyncNode.Close, nil
// Capture context in a closure for the cancel func
return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil
}

// SetSendTimeout sets the timeout of sending events. If the events are not accepted by the
Expand Down Expand Up @@ -257,13 +262,19 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
}

// Close tells the bus to stop sending us events.
func (node *asyncChanNode) Close() {
func (node *asyncChanNode) Close(ctx context.Context) {
node.closeOnce.Do(func() {
defer node.cancelFunc()
if node.broker != nil {
err := node.broker.RemovePipeline(eventTypeAll, node.pipelineID)
if err != nil {
node.logger.Warn("Error removing pipeline for closing node", "error", err)
isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID)

switch {
case err != nil && isPipelineRemoved:
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
node.logger.Warn(msg, err)
case err != nil:
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
node.logger.Warn(msg, err)
}
}
addSubscriptions(-1)
Expand All @@ -283,7 +294,7 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*
}
if timeout {
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
node.Close()
node.Close(ctx)
}
}()
return e, nil
Expand Down

0 comments on commit 8bb9cbb

Please sign in to comment.