Skip to content

Commit

Permalink
Add audit filtering feature (#24554)
Browse files Browse the repository at this point in the history
* Support filter nodes in backend factories and add some tests

* More tests and cleanup

* Attempt to move control of registration for nodes and pipelines to the audit broker (#24505)

* invert control of the pipelines/nodes to the audit broker vs. within each backend

* update noop audit test code to implement the pipeliner interface

* noop mount path has trailing slash

* attempting to make NoopAudit more friendly

* NoopAudit uses known salt

* Refactor audit.ProcessManual to support filter nodes

* HasFiltering

* rename the pipeliner

* use exported AuditEvent in Filter

* Add tests for registering and deregistering backends on the audit broker

* Add missing licence header to one file, fix a typo in two tests

---------

Co-authored-by: Peter Wilson <[email protected]>
  • Loading branch information
kubawi and Peter Wilson authored Dec 15, 2023
1 parent 287da25 commit 18e554a
Show file tree
Hide file tree
Showing 26 changed files with 2,148 additions and 393 deletions.
2 changes: 1 addition & 1 deletion audit/entry_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (f *EntryFilter) Process(ctx context.Context, e *eventlogger.Event) (*event
return nil, fmt.Errorf("%s: event is nil: %w", op, event.ErrInvalidParameter)
}

a, ok := e.Payload.(*auditEvent)
a, ok := e.Payload.(*AuditEvent)
if !ok {
return nil, fmt.Errorf("%s: cannot parse event payload: %w", op, event.ErrInvalidParameter)
}
Expand Down
13 changes: 5 additions & 8 deletions audit/entry_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ import (
"strings"
"time"

"github.com/jefferai/jsonx"

"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"

"github.com/go-jose/go-jose/v3/jwt"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/internal/observability/event"
"github.com/hashicorp/vault/sdk/helper/jsonutil"

"github.com/hashicorp/eventlogger"
"github.com/hashicorp/vault/sdk/logical"
"github.com/jefferai/jsonx"
)

var (
Expand All @@ -29,7 +26,7 @@ var (
)

// NewEntryFormatter should be used to create an EntryFormatter.
// Accepted options: WithPrefix, WithHeaderFormatter.
// Accepted options: WithHeaderFormatter, WithPrefix.
func NewEntryFormatter(config FormatterConfig, salter Salter, opt ...Option) (*EntryFormatter, error) {
const op = "audit.NewEntryFormatter"

Expand Down
24 changes: 17 additions & 7 deletions audit/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
)

// ProcessManual will attempt to create an (audit) event with the specified data
// and manually iterate over the supplied nodes calling Process on each.
// and manually iterate over the supplied nodes calling Process on each until the
// event is nil (which indicates the pipeline has completed).
// Order of IDs in the NodeID slice determines the order they are processed.
// (Audit) Event will be of RequestType (as opposed to ResponseType).
// The last node must be a sink node (eventlogger.NodeTypeSink).
// The last node must be a filter node (eventlogger.NodeTypeFilter) or
// sink node (eventlogger.NodeTypeSink).
func ProcessManual(ctx context.Context, data *logical.LogInput, ids []eventlogger.NodeID, nodes map[eventlogger.NodeID]eventlogger.Node) error {
switch {
case data == nil:
Expand Down Expand Up @@ -52,9 +54,15 @@ func ProcessManual(ctx context.Context, data *logical.LogInput, ids []eventlogge

// Process nodes in order, updating the event with the result.
// This means we *should* do:
// 1. formatter (temporary)
// 2. sink
// 1. filter (optional if configured)
// 2. formatter (temporary)
// 3. sink
for _, id := range ids {
// If the event is nil, we've completed processing the pipeline (hopefully
// by either a filter node or a sink node).
if e == nil {
break
}
node, ok := nodes[id]
if !ok {
return fmt.Errorf("node not found: %v", id)
Expand All @@ -74,12 +82,14 @@ func ProcessManual(ctx context.Context, data *logical.LogInput, ids []eventlogge
return err
}

// Track the last node we have processed, as we should end with a sink.
// Track the last node we have processed, as we should end with a filter or sink.
lastSeen = node.Type()
}

if lastSeen != eventlogger.NodeTypeSink {
return errors.New("last node must be a sink")
switch lastSeen {
case eventlogger.NodeTypeSink, eventlogger.NodeTypeFilter:
default:
return errors.New("last node must be a filter or sink")
}

return nil
Expand Down
70 changes: 66 additions & 4 deletions audit/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,13 @@ func TestProcessManual_LastNodeNotSink(t *testing.T) {

err = ProcessManual(namespace.RootContext(context.Background()), data, ids, nodes)
require.Error(t, err)
require.EqualError(t, err, "last node must be a sink")
require.EqualError(t, err, "last node must be a filter or sink")
}

// TestProcessManual ensures that the manual processing of a test message works
// as expected with proper inputs.
func TestProcessManual(t *testing.T) {
// TestProcessManualEndWithSink ensures that the manual processing of a test
// message works as expected with proper inputs, which mean processing ends with
// sink node.
func TestProcessManualEndWithSink(t *testing.T) {
t.Parallel()

var ids []eventlogger.NodeID
Expand All @@ -215,6 +216,39 @@ func TestProcessManual(t *testing.T) {
require.NoError(t, err)
}

// TestProcessManual_EndWithFilter ensures that the manual processing of a test
// message works as expected with proper inputs, which mean processing ends with
// sink node.
func TestProcessManual_EndWithFilter(t *testing.T) {
t.Parallel()

var ids []eventlogger.NodeID
nodes := make(map[eventlogger.NodeID]eventlogger.Node)

// Filter node
filterId, filterNode := newFilterNode(t)
ids = append(ids, filterId)
nodes[filterId] = filterNode

// Formatter node
formatterId, formatterNode := newFormatterNode(t)
ids = append(ids, formatterId)
nodes[formatterId] = formatterNode

// Sink node
sinkId, sinkNode := newSinkNode(t)
ids = append(ids, sinkId)
nodes[sinkId] = sinkNode

// Data
requestId, err := uuid.GenerateUUID()
require.NoError(t, err)
data := newData(requestId)

err = ProcessManual(namespace.RootContext(context.Background()), data, ids, nodes)
require.NoError(t, err)
}

// newSinkNode creates a new UUID and NoopSink (sink node).
func newSinkNode(t *testing.T) (eventlogger.NodeID, *event.NoopSink) {
t.Helper()
Expand All @@ -226,6 +260,25 @@ func newSinkNode(t *testing.T) (eventlogger.NodeID, *event.NoopSink) {
return sinkId, sinkNode
}

// TestFilter is a trivial implementation of eventlogger.Node used as a placeholder
// for Filter nodes in tests.
type TestFilter struct{}

// Process trivially filters the event preventing it from being processed by subsequent nodes.
func (f *TestFilter) Process(_ context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
return nil, nil
}

// Reopen does nothing.
func (f *TestFilter) Reopen() error {
return nil
}

// Type returns the eventlogger.NodeTypeFormatter type.
func (f *TestFilter) Type() eventlogger.NodeType {
return eventlogger.NodeTypeFilter
}

// TestFormatter is a trivial implementation of the eventlogger.Node interface
// used as a place-holder for Formatter nodes in tests.
type TestFormatter struct{}
Expand All @@ -248,6 +301,15 @@ func (f *TestFormatter) Type() eventlogger.NodeType {
return eventlogger.NodeTypeFormatter
}

// newFilterNode creates a new TestFormatter (filter node).
func newFilterNode(t *testing.T) (eventlogger.NodeID, *TestFilter) {
nodeId, err := event.GenerateNodeID()
require.NoError(t, err)
node := &TestFilter{}

return nodeId, node
}

// newFormatterNode creates a new TestFormatter (formatter node).
func newFormatterNode(t *testing.T) (eventlogger.NodeID, *TestFormatter) {
nodeId, err := event.GenerateNodeID()
Expand Down
2 changes: 2 additions & 0 deletions audit/sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/hashicorp/eventlogger"
)

var _ eventlogger.Node = (*SinkWrapper)(nil)

// SinkWrapper is a wrapper for any kind of Sink Node that processes events
// containing an AuditEvent payload.
type SinkWrapper struct {
Expand Down
12 changes: 5 additions & 7 deletions audit/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"io"
"time"

"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/vault/internal/observability/event"
"github.com/hashicorp/vault/sdk/helper/salt"
"github.com/hashicorp/vault/sdk/logical"
)
Expand Down Expand Up @@ -275,6 +275,10 @@ type Backend interface {
// Salter interface must be implemented by anything implementing Backend.
Salter

// The PipelineReader interface allows backends to surface information about their
// nodes for node and pipeline registration.
event.PipelineReader

// LogRequest is used to synchronously log a request. This is done after the
// request is authorized but before the request is executed. The arguments
// MUST not be modified in any way. They should be deep copied if this is
Expand All @@ -298,12 +302,6 @@ type Backend interface {

// Invalidate is called for path invalidation
Invalidate(context.Context)

// RegisterNodesAndPipeline provides an eventlogger.Broker pointer so that
// the Backend can call its RegisterNode and RegisterPipeline methods with
// the nodes and the pipeline that were created in the corresponding
// Factory function.
RegisterNodesAndPipeline(*eventlogger.Broker, string) error
}

// BackendConfig contains configuration parameters used in the factory func to
Expand Down
Loading

0 comments on commit 18e554a

Please sign in to comment.