From 8bb9cbbebaed39b290590f79a8857f5ba01fbf16 Mon Sep 17 00:00:00 2001 From: Peter Wilson Date: Thu, 6 Jul 2023 18:17:02 +0100 Subject: [PATCH] VAULT-17772: bump go-eventlogger to v0.2.1 (#21623) * go-eventlogger: moved to v0.2.1, allows removal of pipeline and nodes --- changelog/21623.txt | 3 +++ go.mod | 2 +- go.sum | 4 ++-- vault/eventbus/bus.go | 25 ++++++++++++++++++------- 4 files changed, 24 insertions(+), 10 deletions(-) create mode 100644 changelog/21623.txt diff --git a/changelog/21623.txt b/changelog/21623.txt new file mode 100644 index 000000000000..7fc272d13b5d --- /dev/null +++ b/changelog/21623.txt @@ -0,0 +1,3 @@ +```release-note:improvement +eventbus: updated go-eventlogger library to allow removal of nodes referenced by pipelines (used for subscriptions) +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 985080a8bb04..b26b637d7400 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,7 @@ require ( github.com/hashicorp/consul-template v0.32.0 github.com/hashicorp/consul/api v1.20.0 github.com/hashicorp/errwrap v1.1.0 - github.com/hashicorp/eventlogger v0.1.1 + github.com/hashicorp/eventlogger v0.2.1 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192 github.com/hashicorp/go-gcp-common v0.8.0 diff --git a/go.sum b/go.sum index 213099b1a6c6..fde3283d5be4 100644 --- a/go.sum +++ b/go.sum @@ -1713,8 +1713,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo= -github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= +github.com/hashicorp/eventlogger v0.2.1 h1:sjAOKO62BDDBn10516Uo7QDf5KEqzhU0LkUnbBptVUU= +github.com/hashicorp/eventlogger v0.2.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 6f66d423bc4c..bcafec48fad9 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -6,6 +6,7 @@ package eventbus import ( "context" "errors" + "fmt" "net/url" "strings" "sync" @@ -146,7 +147,10 @@ func init() { } func NewEventBus(logger hclog.Logger) (*EventBus, error) { - broker := eventlogger.NewBroker() + broker, err := eventlogger.NewBroker() + if err != nil { + return nil, err + } formatterID, err := uuid.GenerateUUID() if err != nil { @@ -218,7 +222,8 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat // add info needed to cancel the subscription asyncNode.pipelineID = eventlogger.PipelineID(pipelineID) asyncNode.cancelFunc = cancel - return asyncNode.ch, asyncNode.Close, nil + // Capture context in a closure for the cancel func + return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil } // SetSendTimeout sets the timeout of sending events. If the events are not accepted by the @@ -257,13 +262,19 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode { } // Close tells the bus to stop sending us events. -func (node *asyncChanNode) Close() { +func (node *asyncChanNode) Close(ctx context.Context) { node.closeOnce.Do(func() { defer node.cancelFunc() if node.broker != nil { - err := node.broker.RemovePipeline(eventTypeAll, node.pipelineID) - if err != nil { - node.logger.Warn("Error removing pipeline for closing node", "error", err) + isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID) + + switch { + case err != nil && isPipelineRemoved: + msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID) + node.logger.Warn(msg, err) + case err != nil: + msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID) + node.logger.Warn(msg, err) } } addSubscriptions(-1) @@ -283,7 +294,7 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (* } if timeout { node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id) - node.Close() + node.Close(ctx) } }() return e, nil