Skip to content

Commit

Permalink
Refactor metadata enrichment and fix bug in v3 Intake (elastic#3943)
Browse files Browse the repository at this point in the history
Decode request metadata into a struct instead of a map and pass the metadata struct
into a decoder function. The refactoring automatically fixes a bug where the `client.ip`
and `user_agent` were not set for v3 RUM events.

implements elastic#3885
fixes elastic#3942
  • Loading branch information
simitt committed Jul 6, 2020
1 parent 800da06 commit 4d0ae1f
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 128 deletions.
7 changes: 6 additions & 1 deletion beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/apm-server/beater/headers"
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"
)
Expand Down Expand Up @@ -63,7 +64,11 @@ func Handler(processor *stream.Processor, report publish.Reporter) request.Handl
return
}

res := processor.HandleStream(c.Request.Context(), c.RateLimiter, c.RequestMetadata, reader, report)
metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
res := processor.HandleStream(c.Request.Context(), c.RateLimiter, &metadata, reader, report)
sendResponse(c, res)
}
}
Expand Down
13 changes: 6 additions & 7 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/elastic/apm-server/model/modeldecoder"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/utility"
"github.com/elastic/apm-server/validation"
)

Expand Down Expand Up @@ -123,11 +122,11 @@ func Handler(
err: errors.Wrap(err, "failed to decode metadata JSON"),
}
}
for k, v := range c.RequestMetadata {
utility.InsertInMap(raw, k, v.(map[string]interface{}))
}
metadata, err := modeldecoder.DecodeMetadata(raw, false)
if err != nil {
metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
if err := modeldecoder.DecodeMetadata(raw, false, &metadata); err != nil {
var ve *validation.Error
if errors.As(err, &ve) {
return nil, requestError{
Expand All @@ -140,7 +139,7 @@ func Handler(
err: errors.Wrap(err, "failed to decode metadata"),
}
}
profileMetadata = *metadata
profileMetadata = metadata

case "profile":
params, err := validateContentType(http.Header(part.Header), pprofMediaType)
Expand Down
14 changes: 3 additions & 11 deletions beater/middleware/request_metadata_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@ func UserMetadataMiddleware() Middleware {
return func(h request.Handler) (request.Handler, error) {
return func(c *request.Context) {
dec := utility.ManualDecoder{}
user := map[string]interface{}{
"user-agent": dec.UserAgentHeader(c.Request.Header),
}
if ip := utility.ExtractIP(c.Request); ip != nil {
user["ip"] = ip.String()
}
c.RequestMetadata["user"] = user
c.RequestMetadata.UserAgent = dec.UserAgentHeader(c.Request.Header)
c.RequestMetadata.ClientIP = utility.ExtractIP(c.Request)
h(c)
}, nil
}
Expand All @@ -45,10 +40,7 @@ func UserMetadataMiddleware() Middleware {
func SystemMetadataMiddleware() Middleware {
return func(h request.Handler) (request.Handler, error) {
return func(c *request.Context) {
if ip := utility.ExtractIP(c.Request); ip != nil {
system := map[string]interface{}{"ip": ip.String()}
c.RequestMetadata["system"] = system
}
c.RequestMetadata.SystemIP = utility.ExtractIP(c.Request)
h(c)
}, nil
}
Expand Down
36 changes: 12 additions & 24 deletions beater/middleware/request_metadata_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package middleware

import (
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -28,17 +29,17 @@ import (

func TestUserMetadataMiddleware(t *testing.T) {
type test struct {
remoteAddr string
userAgent []string
expectIP string
expectUserAgent string
remoteAddr string
userAgent []string
expectedIP net.IP
expectedUserAgent string
}

ua1 := "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36"
ua2 := "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:67.0) Gecko/20100101 Firefox/67.0"
tests := []test{
{remoteAddr: "1.2.3.4:1234", expectIP: "1.2.3.4", userAgent: []string{ua1, ua2}, expectUserAgent: fmt.Sprintf("%s, %s", ua1, ua2)},
{remoteAddr: "not-an-ip:1234", userAgent: []string{ua1}, expectUserAgent: ua1},
{remoteAddr: "1.2.3.4:1234", expectedIP: net.ParseIP("1.2.3.4"), userAgent: []string{ua1, ua2}, expectedUserAgent: fmt.Sprintf("%s, %s", ua1, ua2)},
{remoteAddr: "not-an-ip:1234", userAgent: []string{ua1}, expectedUserAgent: ua1},
{remoteAddr: ""},
}

Expand All @@ -50,23 +51,18 @@ func TestUserMetadataMiddleware(t *testing.T) {
}

Apply(UserMetadataMiddleware(), beatertest.HandlerIdle)(c)
expect := map[string]interface{}{
"user-agent": test.expectUserAgent, // even if empty value
}
if test.expectIP != "" {
expect["ip"] = test.expectIP
}
assert.Equal(t, map[string]interface{}{"user": expect}, c.RequestMetadata)
assert.Equal(t, test.expectedUserAgent, c.RequestMetadata.UserAgent)
assert.Equal(t, test.expectedIP, c.RequestMetadata.ClientIP)
}
}

func TestSystemMetadataMiddleware(t *testing.T) {
type test struct {
remoteAddr string
expectIP string
expectedIP net.IP
}
tests := []test{
{remoteAddr: "1.2.3.4:1234", expectIP: "1.2.3.4"},
{remoteAddr: "1.2.3.4:1234", expectedIP: net.ParseIP("1.2.3.4")},
{remoteAddr: "not-an-ip:1234"},
{remoteAddr: ""},
}
Expand All @@ -76,14 +72,6 @@ func TestSystemMetadataMiddleware(t *testing.T) {
c.Request.RemoteAddr = test.remoteAddr

Apply(SystemMetadataMiddleware(), beatertest.HandlerIdle)(c)
if test.expectIP == "" {
assert.Empty(t, c.RequestMetadata)
} else {
assert.Equal(t, map[string]interface{}{
"system": map[string]interface{}{
"ip": test.expectIP,
},
}, c.RequestMetadata)
}
assert.Equal(t, test.expectedIP, c.RequestMetadata.SystemIP)
}
}
40 changes: 26 additions & 14 deletions beater/request/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package request

import (
"encoding/json"
"net"
"net/http"
"strings"

Expand All @@ -42,23 +43,29 @@ var (

// Context abstracts request and response information for http requests
type Context struct {
Request *http.Request
Logger *logp.Logger
RateLimiter *rate.Limiter
Authorization authorization.Authorization
IsRum bool
Result Result

// RequestMetadata contains metadata extracted from the request
// by middleware, and should be merged into event metadata.
RequestMetadata map[string]interface{}
Request *http.Request
Logger *logp.Logger
RateLimiter *rate.Limiter
Authorization authorization.Authorization
IsRum bool
Result Result
RequestMetadata Metadata

w http.ResponseWriter
writeAttempts int
}

// Metadata contains metadata extracted from the request by middleware,
// and should be merged into the event metadata.
type Metadata struct {
ClientIP net.IP
SystemIP net.IP
UserAgent string
}

// NewContext creates an empty Context struct
func NewContext() *Context {
return &Context{RequestMetadata: make(map[string]interface{})}
return &Context{}
}

// Reset allows to reuse a context by removing all request specific information
Expand All @@ -69,14 +76,19 @@ func (c *Context) Reset(w http.ResponseWriter, r *http.Request) {
c.Authorization = &authorization.AllowAuth{}
c.IsRum = false
c.Result.Reset()
for k := range c.RequestMetadata {
delete(c.RequestMetadata, k)
}
c.RequestMetadata.Reset()

c.w = w
c.writeAttempts = 0
}

// Reset sets all attribtues of the Metadata instance to it's zero value
func (m *Metadata) Reset() {
m.ClientIP = nil
m.SystemIP = nil
m.UserAgent = ""
}

// Header returns the http.Header of the context's writer
func (c *Context) Header() http.Header {
return c.w.Header()
Expand Down
2 changes: 2 additions & 0 deletions beater/request/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func TestContext_Reset(t *testing.T) {
assert.Equal(t, 0, c.writeAttempts)
case "Result":
assertResultIsEmpty(t, cVal.Field(i).Interface().(Result))
case "RequestMetadata":
assert.Equal(t, Metadata{}, cVal.Field(i).Interface().(Metadata))
default:
assert.Empty(t, cVal.Field(i).Interface(), cType.Field(i).Name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
"version": "1.5.0"
},
"float_gauge": 9.16,
"host": {
"ip": "127.0.0.1"
},
"integer_gauge": 42767,
"labels": {
"code": 200,
Expand Down Expand Up @@ -129,6 +132,9 @@
}
}
},
"host": {
"ip": "127.0.0.1"
},
"labels": {
"tag1": "one",
"tag2": 2
Expand Down
6 changes: 6 additions & 0 deletions docs/data/elasticsearch/generated/metricsets.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
}
}
},
"host": {
"ip": "127.0.0.1"
},
"labels": {
"tag1": "one",
"tag2": 2
Expand Down Expand Up @@ -74,6 +77,9 @@
"ingested": "2020-04-22T14:55:05.368308Z"
},
"float_gauge": 9.16,
"host": {
"ip": "127.0.0.1"
},
"integer_gauge": 42767,
"labels": {
"code": 200,
Expand Down
18 changes: 4 additions & 14 deletions model/modeldecoder/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,13 @@ var (
)

// DecodeRUMV3Metadata decodes v3 RUM metadata.
func DecodeRUMV3Metadata(input interface{}, hasShortFieldNames bool) (*model.Metadata, error) {
var out model.Metadata
if err := decodeMetadata(input, hasShortFieldNames, rumV3MetadataSchema, &out); err != nil {
return nil, err
}
return &out, nil
func DecodeRUMV3Metadata(input interface{}, hasShortFieldNames bool, out *model.Metadata) error {
return decodeMetadata(input, hasShortFieldNames, rumV3MetadataSchema, out)
}

// DecodeMetadata decodes v2 metadata.
func DecodeMetadata(input interface{}, hasShortFieldNames bool) (*model.Metadata, error) {
var out model.Metadata
if err := decodeMetadata(input, hasShortFieldNames, metadataSchema, &out); err != nil {
return nil, err
}
return &out, nil
func DecodeMetadata(input interface{}, hasShortFieldNames bool, out *model.Metadata) error {
return decodeMetadata(input, hasShortFieldNames, metadataSchema, out)
}

func decodeMetadata(input interface{}, hasShortFieldNames bool, schema *jsonschema.Schema, out *model.Metadata) error {
Expand All @@ -61,8 +53,6 @@ func decodeMetadata(input interface{}, hasShortFieldNames bool, schema *jsonsche
decodeProcess(getObject(raw, "process"), &out.Process)
if userObj := getObject(raw, fieldName("user")); userObj != nil {
decodeUser(userObj, hasShortFieldNames, &out.User, &out.Client)
// TODO(axw) stop decoding user.user-agent here (see #3885)
decodeString(userObj, "user-agent", &out.UserAgent.Original)
}
decodeCloud(getObject(raw, "cloud"), &out.Cloud)
decodeLabels(getObject(raw, fieldName("labels")), &out.Labels)
Expand Down
Loading

0 comments on commit 4d0ae1f

Please sign in to comment.