Skip to content

Commit

Permalink
Merge pull request #3756 from hashicorp/sepehrfrgh-add-write-observat…
Browse files Browse the repository at this point in the history
…ion-interceptors

Telemetry: Add request and response interceptors with deny filter update
  • Loading branch information
sepehrfrgh authored Sep 20, 2023
2 parents 1ef8831 + 5b55278 commit 4425ef2
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 29 deletions.
4 changes: 2 additions & 2 deletions internal/daemon/controller/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func newGrpcServer(
unaryCtxInterceptor, // populated requestInfo from headers into the request ctx
errorInterceptor(ctx), // convert domain and api errors into headers for the http proxy
subtypes.AttributeTransformerInterceptor(ctx), // convert to/from generic attributes from/to subtype specific attributes
auditRequestInterceptor(ctx), // before we get started, audit the request
eventsRequestInterceptor(ctx), // before we get started, send the required events with the request
statusCodeInterceptor(ctx), // convert grpc codes into http status codes for the http proxy (can modify the resp)
auditResponseInterceptor(ctx), // as we finish, audit the response
eventsResponseInterceptor(ctx), // as we finish, send the required events with the response
grpc_recovery.UnaryServerInterceptor( // recover from panics with a grpc internal error
grpc_recovery.WithRecoveryHandlerContext(recoveryHandler()),
),
Expand Down
14 changes: 10 additions & 4 deletions internal/daemon/controller/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,10 @@ func isNil(i any) bool {
return false
}

func auditRequestInterceptor(
func eventsRequestInterceptor(
_ context.Context,
) grpc.UnaryServerInterceptor {
const op = "controller.auditRequestInterceptor"
const op = "controller.eventsRequestInterceptor"
return func(interceptorCtx context.Context,
req any,
_ *grpc.UnaryServerInfo,
Expand All @@ -380,16 +380,19 @@ func auditRequestInterceptor(
if err := event.WriteAudit(interceptorCtx, op, event.WithRequest(&event.Request{Details: clonedMsg})); err != nil {
return req, status.Errorf(codes.Internal, "unable to write request msg audit: %s", err)
}
if err := event.WriteObservation(interceptorCtx, op, event.WithRequest(&event.Request{Details: clonedMsg})); err != nil {
return req, status.Errorf(codes.Internal, "unable to write request msg observation: %s", err)
}
}

return handler(interceptorCtx, req)
}
}

func auditResponseInterceptor(
func eventsResponseInterceptor(
_ context.Context,
) grpc.UnaryServerInterceptor {
const op = "controller.auditResponseInterceptor"
const op = "controller.eventsResponseInterceptor"
return func(interceptorCtx context.Context,
req any,
_ *grpc.UnaryServerInfo,
Expand All @@ -405,6 +408,9 @@ func auditResponseInterceptor(
if err := event.WriteAudit(interceptorCtx, op, event.WithResponse(&event.Response{Details: clonedMsg})); err != nil {
return req, status.Errorf(codes.Internal, "unable to write response msg audit: %s", err)
}
if err := event.WriteObservation(interceptorCtx, op, event.WithResponse(&event.Response{Details: clonedMsg})); err != nil {
return req, status.Errorf(codes.Internal, "unable to write response msg observation: %s", err)
}
}

return resp, err
Expand Down
4 changes: 2 additions & 2 deletions internal/daemon/controller/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
workerReqInterceptor,
auditRequestInterceptor(c.baseContext), // before we get started, audit the request
auditResponseInterceptor(c.baseContext), // as we finish, audit the response
eventsRequestInterceptor(c.baseContext), // before we get started, send the required events with the request
eventsResponseInterceptor(c.baseContext), // as we finish, send the required events with the response
),
),
)
Expand Down
13 changes: 7 additions & 6 deletions internal/observability/event/cloudevents_formatter_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func newCloudEventsFormatterFilter(source *url.URL, format cloudevents.Format, o
}
n.deny = append(n.deny, f)
}
} else {
defaultDenyFilters, err := defaultCloudEventsDenyFilters()
if err != nil {
return nil, err
}
n.deny = append(n.deny, defaultDenyFilters...)
}
defaultDenyFilters, err := defaultCloudEventsDenyFilters()
if err != nil {
return nil, err
}
n.deny = append(n.deny, defaultDenyFilters...)
n.Predicate = newPredicate(n.allow, n.deny)
return &n, nil
}
Expand All @@ -83,7 +84,7 @@ func defaultCloudEventsDenyFilters() ([]*filter, error) {
const (
op = "event.defaultCloudEventsDenyFilters"
// denyWorkStatusEvents is a default filter for worker to controller API status requests
denyWorkStatusEvents = `"/Data/RequestInfo/Method" contains "ServerCoordinationService/Status"`
denyWorkStatusEvents = `"/type" contains "observation" and "/data/request_info/method" contains "ServerCoordinationService/Status"`
)
f, err := newFilter(denyWorkStatusEvents)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions internal/observability/event/cloudevents_formatter_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ func Test_newCloudEventsFormatterFilter(t *testing.T) {
wantDeny []string
}{
{
name: "no-opts",
source: testSource,
format: cloudevents.FormatJSON,
// default case should have default deny for filtering ServerCoordinationService/Status for observation events
name: "no-opts",
source: testSource,
format: cloudevents.FormatJSON,
wantDeny: []string{`"/type" contains "observation" and "/data/request_info/method" contains "ServerCoordinationService/Status"`},
},
{
name: "bad-allow-filter",
Expand Down Expand Up @@ -133,7 +135,7 @@ func Test_newCloudEventsFormatterFilter(t *testing.T) {
for _, f := range got.allow {
assert.Contains(tt.wantAllow, f.raw)
}
assert.Len(got.deny, len(tt.wantDeny)+1) // +1 since there's always a default deny
assert.Len(got.deny, len(tt.wantDeny))
defs, err := defaultCloudEventsDenyFilters()
require.NoError(err)
for _, f := range defs {
Expand Down
13 changes: 11 additions & 2 deletions internal/observability/event/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,17 @@ func WriteObservation(ctx context.Context, caller Op, opt ...Option) error {
}
}
opts := getOpts(opt...)
if opts.withDetails == nil && opts.withHeader == nil && !opts.withFlush {
return fmt.Errorf("%s: specify either header or details options for an event payload: %w", op, ErrInvalidParameter)
if opts.withDetails == nil && opts.withHeader == nil && !opts.withFlush && opts.withRequest == nil && opts.withResponse == nil {
return fmt.Errorf("%s: specify either header or details options or request or response for an event payload: "+
"%w", op, ErrInvalidParameter)
}
// For the case that the telemetry is not enabled, and we have events coming from interceptors.
if !eventer.conf.TelemetryEnabled && (opts.withRequest != nil || opts.withResponse != nil) {
return nil
}
// If telemetry is enabled, we add it to the options.
if eventer.conf.TelemetryEnabled {
opt = append(opt, WithTelemetry())
}
if opts.withRequestInfo == nil {
var err error
Expand Down
52 changes: 43 additions & 9 deletions internal/observability/event/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,12 @@ func Test_WriteObservation(t *testing.T) {
name string
noOperation bool
noFlush bool
telemetryFlag bool
observationPayload []observationPayload
header map[string]any
details map[string]any
Request *event.Request
Response *event.Response
ctx context.Context
observationSinkFileName string
setup func() error
Expand All @@ -324,9 +327,10 @@ func Test_WriteObservation(t *testing.T) {
wantErrContains string
}{
{
name: "no-info-event-id",
noFlush: true,
ctx: testCtxNoEventInfoId,
name: "no-info-event-id",
noFlush: true,
telemetryFlag: true,
ctx: testCtxNoEventInfoId,
observationPayload: []observationPayload{
{
header: []any{"name", "bar"},
Expand Down Expand Up @@ -365,6 +369,27 @@ func Test_WriteObservation(t *testing.T) {
wantErrIs: event.ErrInvalidParameter,
wantErrContains: "specify either header or details options",
},
{
name: "no-header-or-details-in-payload-no-request-no-response",
ctx: testCtx,
noFlush: true,
observationPayload: []observationPayload{
{},
},
wantErrIs: event.ErrInvalidParameter,
wantErrContains: "specify either header or details options or request or response for an event payload",
},
{
name: "telemetry-not-enabled-but-request-or-response-available",
ctx: testCtx,
noFlush: true,
telemetryFlag: false,
Request: &event.Request{
Operation: "create-test",
Endpoint: "0.0.0.0",
},
observationPayload: testPayloads,
},
{
name: "no-ctx-eventer-and-syseventer-not-initialized",
ctx: context.Background(),
Expand All @@ -373,9 +398,10 @@ func Test_WriteObservation(t *testing.T) {
wantErrContains: "missing both context and system eventer",
},
{
name: "use-syseventer",
noFlush: true,
ctx: context.Background(),
name: "use-syseventer",
noFlush: true,
telemetryFlag: true,
ctx: context.Background(),
observationPayload: []observationPayload{
{
header: []any{"name", "bar"},
Expand All @@ -391,8 +417,9 @@ func Test_WriteObservation(t *testing.T) {
cleanup: func() { event.TestResetSystEventer(t) },
},
{
name: "use-syseventer-with-cancelled-ctx",
noFlush: true,
name: "use-syseventer-with-cancelled-ctx",
noFlush: true,
telemetryFlag: true,
ctx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -415,6 +442,7 @@ func Test_WriteObservation(t *testing.T) {
{
name: "simple",
ctx: testCtx,
telemetryFlag: true,
observationPayload: testPayloads,
header: testWantHeader,
details: testWantDetails,
Expand All @@ -436,7 +464,9 @@ func Test_WriteObservation(t *testing.T) {
}
require.Greater(len(tt.observationPayload), 0)
for _, p := range tt.observationPayload {
err := event.WriteObservation(tt.ctx, event.Op(op), event.WithHeader(p.header...), event.WithDetails(p.details...))
err := event.WriteObservation(tt.ctx, event.Op(op), event.WithHeader(p.header...), event.WithDetails(p.details...),
event.WithRequest(tt.Request),
event.WithResponse(tt.Response))
if tt.wantErrIs != nil {
assert.ErrorIs(err, tt.wantErrIs)
if tt.wantErrContains != "" {
Expand All @@ -450,6 +480,10 @@ func Test_WriteObservation(t *testing.T) {
require.NoError(event.WriteObservation(tt.ctx, event.Op(tt.name), event.WithFlush()))
}

if !tt.telemetryFlag {
require.Nil(event.WriteObservation(tt.ctx, event.Op(tt.name), event.WithRequest(tt.Request),
event.WithResponse(tt.Response)))
}
if tt.observationSinkFileName != "" {
defer func() { _ = os.WriteFile(tt.observationSinkFileName, nil, 0o666) }()
b, err := ioutil.ReadFile(tt.observationSinkFileName)
Expand Down
3 changes: 3 additions & 0 deletions internal/observability/event/event_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func (o *observation) ComposeFrom(events []*eventlogger.Event) (eventlogger.Even
payload[hdrK] = hdrV
}
}
if g.RequestInfo != nil {
payload[RequestInfoField] = g.RequestInfo
}
if g.Detail != nil {
if _, ok := payload[DetailsField]; !ok {
payload[DetailsField] = []gated.EventPayloadDetails{}
Expand Down

0 comments on commit 4425ef2

Please sign in to comment.