diff --git a/beater/api/intake/handler.go b/beater/api/intake/handler.go index 5eda75a556c..054ba85b65a 100644 --- a/beater/api/intake/handler.go +++ b/beater/api/intake/handler.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" ) @@ -63,7 +64,11 @@ func Handler(processor *stream.Processor, report publish.Reporter) request.Handl return } - res := processor.HandleStream(c.Request.Context(), c.RateLimiter, c.RequestMetadata, reader, report) + metadata := model.Metadata{ + UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent}, + Client: model.Client{IP: c.RequestMetadata.ClientIP}, + System: model.System{IP: c.RequestMetadata.SystemIP}} + res := processor.HandleStream(c.Request.Context(), c.RateLimiter, &metadata, reader, report) sendResponse(c, res) } } diff --git a/beater/api/profile/handler.go b/beater/api/profile/handler.go index 7d74d07ac68..a9ae1c9cb63 100644 --- a/beater/api/profile/handler.go +++ b/beater/api/profile/handler.go @@ -35,7 +35,6 @@ import ( "github.com/elastic/apm-server/model/modeldecoder" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" - "github.com/elastic/apm-server/utility" "github.com/elastic/apm-server/validation" ) @@ -123,11 +122,11 @@ func Handler( err: errors.Wrap(err, "failed to decode metadata JSON"), } } - for k, v := range c.RequestMetadata { - utility.InsertInMap(raw, k, v.(map[string]interface{})) - } - metadata, err := modeldecoder.DecodeMetadata(raw, false) - if err != nil { + metadata := model.Metadata{ + UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent}, + Client: model.Client{IP: c.RequestMetadata.ClientIP}, + System: model.System{IP: c.RequestMetadata.SystemIP}} + if err := modeldecoder.DecodeMetadata(raw, false, &metadata); err != nil { var ve *validation.Error if errors.As(err, &ve) { return nil, requestError{ @@ -140,7 +139,7 @@ func Handler( err: errors.Wrap(err, "failed to decode metadata"), } } - profileMetadata = *metadata + profileMetadata = metadata case "profile": params, err := validateContentType(http.Header(part.Header), pprofMediaType) diff --git a/beater/middleware/request_metadata_middleware.go b/beater/middleware/request_metadata_middleware.go index 5f5b58e8387..f28f7610df4 100644 --- a/beater/middleware/request_metadata_middleware.go +++ b/beater/middleware/request_metadata_middleware.go @@ -28,13 +28,8 @@ func UserMetadataMiddleware() Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { dec := utility.ManualDecoder{} - user := map[string]interface{}{ - "user-agent": dec.UserAgentHeader(c.Request.Header), - } - if ip := utility.ExtractIP(c.Request); ip != nil { - user["ip"] = ip.String() - } - c.RequestMetadata["user"] = user + c.RequestMetadata.UserAgent = dec.UserAgentHeader(c.Request.Header) + c.RequestMetadata.ClientIP = utility.ExtractIP(c.Request) h(c) }, nil } @@ -45,10 +40,7 @@ func UserMetadataMiddleware() Middleware { func SystemMetadataMiddleware() Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { - if ip := utility.ExtractIP(c.Request); ip != nil { - system := map[string]interface{}{"ip": ip.String()} - c.RequestMetadata["system"] = system - } + c.RequestMetadata.SystemIP = utility.ExtractIP(c.Request) h(c) }, nil } diff --git a/beater/middleware/request_metadata_middleware_test.go b/beater/middleware/request_metadata_middleware_test.go index f07f23a31f9..ac69123c5c3 100644 --- a/beater/middleware/request_metadata_middleware_test.go +++ b/beater/middleware/request_metadata_middleware_test.go @@ -19,6 +19,7 @@ package middleware import ( "fmt" + "net" "testing" "github.com/stretchr/testify/assert" @@ -28,17 +29,17 @@ import ( func TestUserMetadataMiddleware(t *testing.T) { type test struct { - remoteAddr string - userAgent []string - expectIP string - expectUserAgent string + remoteAddr string + userAgent []string + expectedIP net.IP + expectedUserAgent string } ua1 := "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36" ua2 := "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:67.0) Gecko/20100101 Firefox/67.0" tests := []test{ - {remoteAddr: "1.2.3.4:1234", expectIP: "1.2.3.4", userAgent: []string{ua1, ua2}, expectUserAgent: fmt.Sprintf("%s, %s", ua1, ua2)}, - {remoteAddr: "not-an-ip:1234", userAgent: []string{ua1}, expectUserAgent: ua1}, + {remoteAddr: "1.2.3.4:1234", expectedIP: net.ParseIP("1.2.3.4"), userAgent: []string{ua1, ua2}, expectedUserAgent: fmt.Sprintf("%s, %s", ua1, ua2)}, + {remoteAddr: "not-an-ip:1234", userAgent: []string{ua1}, expectedUserAgent: ua1}, {remoteAddr: ""}, } @@ -50,23 +51,18 @@ func TestUserMetadataMiddleware(t *testing.T) { } Apply(UserMetadataMiddleware(), beatertest.HandlerIdle)(c) - expect := map[string]interface{}{ - "user-agent": test.expectUserAgent, // even if empty value - } - if test.expectIP != "" { - expect["ip"] = test.expectIP - } - assert.Equal(t, map[string]interface{}{"user": expect}, c.RequestMetadata) + assert.Equal(t, test.expectedUserAgent, c.RequestMetadata.UserAgent) + assert.Equal(t, test.expectedIP, c.RequestMetadata.ClientIP) } } func TestSystemMetadataMiddleware(t *testing.T) { type test struct { remoteAddr string - expectIP string + expectedIP net.IP } tests := []test{ - {remoteAddr: "1.2.3.4:1234", expectIP: "1.2.3.4"}, + {remoteAddr: "1.2.3.4:1234", expectedIP: net.ParseIP("1.2.3.4")}, {remoteAddr: "not-an-ip:1234"}, {remoteAddr: ""}, } @@ -76,14 +72,6 @@ func TestSystemMetadataMiddleware(t *testing.T) { c.Request.RemoteAddr = test.remoteAddr Apply(SystemMetadataMiddleware(), beatertest.HandlerIdle)(c) - if test.expectIP == "" { - assert.Empty(t, c.RequestMetadata) - } else { - assert.Equal(t, map[string]interface{}{ - "system": map[string]interface{}{ - "ip": test.expectIP, - }, - }, c.RequestMetadata) - } + assert.Equal(t, test.expectedIP, c.RequestMetadata.SystemIP) } } diff --git a/beater/request/context.go b/beater/request/context.go index 4bcab326f2a..65bfa26608b 100644 --- a/beater/request/context.go +++ b/beater/request/context.go @@ -19,6 +19,7 @@ package request import ( "encoding/json" + "net" "net/http" "strings" @@ -42,23 +43,29 @@ var ( // Context abstracts request and response information for http requests type Context struct { - Request *http.Request - Logger *logp.Logger - RateLimiter *rate.Limiter - Authorization authorization.Authorization - IsRum bool - Result Result - - // RequestMetadata contains metadata extracted from the request - // by middleware, and should be merged into event metadata. - RequestMetadata map[string]interface{} + Request *http.Request + Logger *logp.Logger + RateLimiter *rate.Limiter + Authorization authorization.Authorization + IsRum bool + Result Result + RequestMetadata Metadata w http.ResponseWriter writeAttempts int } +// Metadata contains metadata extracted from the request by middleware, +// and should be merged into the event metadata. +type Metadata struct { + ClientIP net.IP + SystemIP net.IP + UserAgent string +} + +// NewContext creates an empty Context struct func NewContext() *Context { - return &Context{RequestMetadata: make(map[string]interface{})} + return &Context{} } // Reset allows to reuse a context by removing all request specific information @@ -69,14 +76,19 @@ func (c *Context) Reset(w http.ResponseWriter, r *http.Request) { c.Authorization = &authorization.AllowAuth{} c.IsRum = false c.Result.Reset() - for k := range c.RequestMetadata { - delete(c.RequestMetadata, k) - } + c.RequestMetadata.Reset() c.w = w c.writeAttempts = 0 } +// Reset sets all attribtues of the Metadata instance to it's zero value +func (m *Metadata) Reset() { + m.ClientIP = nil + m.SystemIP = nil + m.UserAgent = "" +} + // Header returns the http.Header of the context's writer func (c *Context) Header() http.Header { return c.w.Header() diff --git a/beater/request/context_test.go b/beater/request/context_test.go index 81142f20087..a0da505e57c 100644 --- a/beater/request/context_test.go +++ b/beater/request/context_test.go @@ -65,6 +65,8 @@ func TestContext_Reset(t *testing.T) { assert.Equal(t, 0, c.writeAttempts) case "Result": assertResultIsEmpty(t, cVal.Field(i).Interface().(Result)) + case "RequestMetadata": + assert.Equal(t, Metadata{}, cVal.Field(i).Interface().(Metadata)) default: assert.Empty(t, cVal.Field(i).Interface(), cType.Field(i).Name) } diff --git a/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json index e2d516d3c9b..a8218df5e08 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json @@ -23,6 +23,9 @@ "version": "1.5.0" }, "float_gauge": 9.16, + "host": { + "ip": "127.0.0.1" + }, "integer_gauge": 42767, "labels": { "code": 200, @@ -129,6 +132,9 @@ } } }, + "host": { + "ip": "127.0.0.1" + }, "labels": { "tag1": "one", "tag2": 2 diff --git a/docs/data/elasticsearch/generated/metricsets.json b/docs/data/elasticsearch/generated/metricsets.json index 4d16befe0ca..025d4726f28 100644 --- a/docs/data/elasticsearch/generated/metricsets.json +++ b/docs/data/elasticsearch/generated/metricsets.json @@ -20,6 +20,9 @@ } } }, + "host": { + "ip": "127.0.0.1" + }, "labels": { "tag1": "one", "tag2": 2 @@ -74,6 +77,9 @@ "ingested": "2020-04-22T14:55:05.368308Z" }, "float_gauge": 9.16, + "host": { + "ip": "127.0.0.1" + }, "integer_gauge": 42767, "labels": { "code": 200, diff --git a/model/modeldecoder/metadata.go b/model/modeldecoder/metadata.go index 13a9aaf1a53..71468d304d7 100644 --- a/model/modeldecoder/metadata.go +++ b/model/modeldecoder/metadata.go @@ -33,21 +33,13 @@ var ( ) // DecodeRUMV3Metadata decodes v3 RUM metadata. -func DecodeRUMV3Metadata(input interface{}, hasShortFieldNames bool) (*model.Metadata, error) { - var out model.Metadata - if err := decodeMetadata(input, hasShortFieldNames, rumV3MetadataSchema, &out); err != nil { - return nil, err - } - return &out, nil +func DecodeRUMV3Metadata(input interface{}, hasShortFieldNames bool, out *model.Metadata) error { + return decodeMetadata(input, hasShortFieldNames, rumV3MetadataSchema, out) } // DecodeMetadata decodes v2 metadata. -func DecodeMetadata(input interface{}, hasShortFieldNames bool) (*model.Metadata, error) { - var out model.Metadata - if err := decodeMetadata(input, hasShortFieldNames, metadataSchema, &out); err != nil { - return nil, err - } - return &out, nil +func DecodeMetadata(input interface{}, hasShortFieldNames bool, out *model.Metadata) error { + return decodeMetadata(input, hasShortFieldNames, metadataSchema, out) } func decodeMetadata(input interface{}, hasShortFieldNames bool, schema *jsonschema.Schema, out *model.Metadata) error { @@ -61,8 +53,6 @@ func decodeMetadata(input interface{}, hasShortFieldNames bool, schema *jsonsche decodeProcess(getObject(raw, "process"), &out.Process) if userObj := getObject(raw, fieldName("user")); userObj != nil { decodeUser(userObj, hasShortFieldNames, &out.User, &out.Client) - // TODO(axw) stop decoding user.user-agent here (see #3885) - decodeString(userObj, "user-agent", &out.UserAgent.Original) } decodeCloud(getObject(raw, "cloud"), &out.Cloud) decodeLabels(getObject(raw, fieldName("labels")), &out.Labels) diff --git a/model/modeldecoder/metadata_test.go b/model/modeldecoder/metadata_test.go index d078883463d..e503e9e455b 100644 --- a/model/modeldecoder/metadata_test.go +++ b/model/modeldecoder/metadata_test.go @@ -38,7 +38,6 @@ const ( configuredHostname = "configured_hostname" systemArchitecture = "x86_64" systemPlatform = "linux" - systemIP = "192.168.0.1" containerID = "container-123" kubernetesNamespace = "k8s-namespace" @@ -50,7 +49,11 @@ const ( mail = "user@email.com" username = "user" userAgent = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" - userIP = "192.168.0.1" +) + +var ( + systemIP = net.ParseIP("192.168.0.1") + userIP = net.ParseIP("192.168.0.1") ) var fullInput = map[string]interface{}{ @@ -89,7 +92,6 @@ var fullInput = map[string]interface{}{ "configured_hostname": configuredHostname, "architecture": systemArchitecture, "platform": systemPlatform, - "ip": systemIP, "container": map[string]interface{}{ "id": containerID, }, @@ -105,11 +107,9 @@ var fullInput = map[string]interface{}{ }, }, "user": map[string]interface{}{ - "id": uid, - "email": mail, - "username": username, - "ip": userIP, - "user-agent": userAgent, + "id": uid, + "email": mail, + "username": username, }, "cloud": map[string]interface{}{ "availability_zone": "australia-southeast1-a", @@ -136,9 +136,16 @@ var fullInput = map[string]interface{}{ }, } +func metadata() *model.Metadata { + return &model.Metadata{ + UserAgent: model.UserAgent{Original: userAgent}, + Client: model.Client{IP: userIP}, + System: model.System{IP: systemIP}} +} + func TestDecodeMetadata(t *testing.T) { - output, err := DecodeMetadata(fullInput, false) - require.NoError(t, err) + output := metadata() + require.NoError(t, DecodeMetadata(fullInput, false, output)) assert.Equal(t, &model.Metadata{ Service: model.Service{ Name: serviceName, @@ -161,7 +168,7 @@ func TestDecodeMetadata(t *testing.T) { ConfiguredHostname: configuredHostname, Architecture: systemArchitecture, Platform: systemPlatform, - IP: net.ParseIP(systemIP), + IP: systemIP, Container: model.Container{ID: containerID}, Kubernetes: model.Kubernetes{ Namespace: kubernetesNamespace, @@ -179,7 +186,7 @@ func TestDecodeMetadata(t *testing.T) { Original: userAgent, }, Client: model.Client{ - IP: net.ParseIP(userIP), + IP: userIP, }, Cloud: model.Cloud{ AccountID: "acct123", @@ -200,7 +207,7 @@ func TestDecodeMetadata(t *testing.T) { func BenchmarkDecodeMetadata(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - if _, err := DecodeMetadata(fullInput, false); err != nil { + if err := DecodeMetadata(fullInput, false, &model.Metadata{}); err != nil { b.Fatal(err) } } @@ -210,7 +217,7 @@ func BenchmarkDecodeMetadataRecycled(b *testing.B) { b.ReportAllocs() var meta model.Metadata for i := 0; i < b.N; i++ { - if err := decodeMetadata(fullInput, false, metadataSchema, &meta); err != nil { + if err := DecodeMetadata(fullInput, false, &meta); err != nil { b.Fatal(err) } for k := range meta.Labels { @@ -220,10 +227,10 @@ func BenchmarkDecodeMetadataRecycled(b *testing.B) { } func TestDecodeMetadataInvalid(t *testing.T) { - _, err := DecodeMetadata(nil, false) + err := DecodeMetadata(nil, false, &model.Metadata{}) require.EqualError(t, err, "failed to validate metadata: error validating JSON: input missing") - _, err = DecodeMetadata("", false) + err = DecodeMetadata("", false, &model.Metadata{}) require.EqualError(t, err, "failed to validate metadata: error validating JSON: invalid input type") // baseInput holds the minimal valid input. Test-specific input is added to this. @@ -233,8 +240,7 @@ func TestDecodeMetadataInvalid(t *testing.T) { "name": "name", }, } - _, err = DecodeMetadata(baseInput, false) - require.NoError(t, err) + require.NoError(t, DecodeMetadata(baseInput, false, &model.Metadata{})) for _, test := range []struct { input map[string]interface{} @@ -276,7 +282,7 @@ func TestDecodeMetadataInvalid(t *testing.T) { input[k] = v } } - _, err := DecodeMetadata(input, false) + err = DecodeMetadata(input, false, &model.Metadata{}) require.Error(t, err) assert.Regexp(t, test.err, err.Error()) } diff --git a/model/modeldecoder/metricset.go b/model/modeldecoder/metricset.go index 2aeeae1d19a..31e0579e6d9 100644 --- a/model/modeldecoder/metricset.go +++ b/model/modeldecoder/metricset.go @@ -49,6 +49,7 @@ func DecodeMetricset(input Input, batch *model.Batch) error { return nil } +// DecodeRUMV3Metricset decodes a v3 metricset. func DecodeRUMV3Metricset(input Input, batch *model.Batch) error { metricset, err := decodeMetricset(input, rumV3Schema) if err != nil { diff --git a/model/modeldecoder/transaction_test.go b/model/modeldecoder/transaction_test.go index eff6d6653c4..1e2910721a9 100644 --- a/model/modeldecoder/transaction_test.go +++ b/model/modeldecoder/transaction_test.go @@ -369,14 +369,14 @@ func TestTransactionEventDecode(t *testing.T) { } func BenchmarkDecodeTransaction(b *testing.B) { - fullMetadata, err := DecodeMetadata(fullInput, false) - require.NoError(b, err) + var fullMetadata model.Metadata + require.NoError(b, DecodeMetadata(fullInput, false, &fullMetadata)) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { if err := DecodeTransaction(Input{ - Metadata: *fullMetadata, + Metadata: fullMetadata, Raw: fullTransactionInput, }, &model.Batch{}); err != nil { b.Fatal(err) diff --git a/processor/stream/benchmark_test.go b/processor/stream/benchmark_test.go index eea46f15aeb..ebfea04d63c 100644 --- a/processor/stream/benchmark_test.go +++ b/processor/stream/benchmark_test.go @@ -28,6 +28,7 @@ import ( "golang.org/x/time/rate" "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" ) @@ -66,7 +67,7 @@ func benchmarkStreamProcessor(b *testing.B, processor *Processor, files []string b.StopTimer() r.Reset(data) b.StartTimer() - processor.HandleStream(context.Background(), rl, map[string]interface{}{}, r, report) + processor.HandleStream(context.Background(), rl, &model.Metadata{}, r, report) } } } diff --git a/processor/stream/package_tests/intake_test_processor.go b/processor/stream/package_tests/intake_test_processor.go index 08983a9e96a..517c1358d77 100644 --- a/processor/stream/package_tests/intake_test_processor.go +++ b/processor/stream/package_tests/intake_test_processor.go @@ -106,7 +106,7 @@ func (p *intakeTestProcessor) Process(buf []byte) ([]beat.Event, error) { var reqs []publish.PendingReq report := tests.TestReporter(&reqs) - result := p.HandleStream(context.TODO(), nil, nil, bytes.NewBuffer(buf), report) + result := p.HandleStream(context.TODO(), nil, &model.Metadata{}, bytes.NewBuffer(buf), report) var events []beat.Event for _, req := range reqs { if req.Transformables != nil { diff --git a/processor/stream/package_tests/metadata_attrs_test.go b/processor/stream/package_tests/metadata_attrs_test.go index 43773269983..33577435aa9 100644 --- a/processor/stream/package_tests/metadata_attrs_test.go +++ b/processor/stream/package_tests/metadata_attrs_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/metadata/generated/schema" "github.com/elastic/apm-server/model/modeldecoder" "github.com/elastic/apm-server/processor/stream" @@ -53,8 +54,7 @@ func (p *MetadataProcessor) Validate(data interface{}) error { } // validate the metadata object against our jsonschema - _, err := modeldecoder.DecodeMetadata(rawMetadata, false) - if err != nil { + if err := modeldecoder.DecodeMetadata(rawMetadata, false, &model.Metadata{}); err != nil { return err } } diff --git a/processor/stream/processor.go b/processor/stream/processor.go index 4a517fdf4c9..ed0634892f6 100644 --- a/processor/stream/processor.go +++ b/processor/stream/processor.go @@ -47,7 +47,7 @@ const ( batchSize = 10 ) -type decodeMetadataFunc func(interface{}, bool) (*model.Metadata, error) +type decodeMetadataFunc func(interface{}, bool, *model.Metadata) error // functions with the decodeEventFunc signature decode their input argument into their batch argument (output) type decodeEventFunc func(modeldecoder.Input, *model.Batch) error @@ -105,7 +105,7 @@ func RUMV3Processor(cfg *config.Config, tcfg *transform.Config) *Processor { } } -func (p *Processor) readMetadata(reqMeta map[string]interface{}, reader *streamReader) (*model.Metadata, error) { +func (p *Processor) readMetadata(metadata *model.Metadata, reader *streamReader) (*model.Metadata, error) { rawModel, err := reader.Read() if err != nil { if err == io.EOF { @@ -127,12 +127,8 @@ func (p *Processor) readMetadata(reqMeta map[string]interface{}, reader *streamR Document: string(reader.LatestLine()), } } - for k, v := range reqMeta { - utility.InsertInMap(rawMetadata, k, v.(map[string]interface{})) - } - metadata, err := p.decodeMetadata(rawMetadata, p.Mconfig.HasShortFieldNames) - if err != nil { + if err := p.decodeMetadata(rawMetadata, p.Mconfig.HasShortFieldNames, metadata); err != nil { var ve *validation.Error if errors.As(err, &ve) { return nil, &Error{ @@ -224,7 +220,7 @@ func (p *Processor) readBatch( } // HandleStream processes a stream of events -func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limiter, meta map[string]interface{}, reader io.Reader, report publish.Reporter) *Result { +func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limiter, meta *model.Metadata, reader io.Reader, report publish.Reporter) *Result { res := &Result{} sr := p.getStreamReader(reader) diff --git a/processor/stream/processor_test.go b/processor/stream/processor_test.go index f035bc181c9..c357602faed 100644 --- a/processor/stream/processor_test.go +++ b/processor/stream/processor_test.go @@ -22,12 +22,14 @@ import ( "context" "encoding/json" "fmt" + "net" "path/filepath" "testing" "testing/iotest" "time" "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/transform" "github.com/stretchr/testify/assert" @@ -60,7 +62,7 @@ func TestHandlerReadStreamError(t *testing.T) { timeoutReader := iotest.TimeoutReader(bodyReader) sp := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}) - actualResult := sp.HandleStream(context.Background(), nil, map[string]interface{}{}, timeoutReader, report) + actualResult := sp.HandleStream(context.Background(), nil, &model.Metadata{}, timeoutReader, report) assertApproveResult(t, actualResult, "ReadError") } @@ -87,7 +89,7 @@ func TestHandlerReportingStreamError(t *testing.T) { bodyReader := bytes.NewBuffer(b) sp := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}) - actualResult := sp.HandleStream(context.Background(), nil, map[string]interface{}{}, bodyReader, test.report) + actualResult := sp.HandleStream(context.Background(), nil, &model.Metadata{}, bodyReader, test.report) assertApproveResult(t, actualResult, test.name) } } @@ -135,11 +137,7 @@ func TestIntegrationESOutput(t *testing.T) { reqTimestamp := time.Date(2018, 8, 1, 10, 0, 0, 0, time.UTC) ctx = utility.ContextWithRequestTime(ctx, reqTimestamp) - reqDecoderMeta := map[string]interface{}{ - "system": map[string]interface{}{ - "ip": "192.0.0.1", - }, - } + reqDecoderMeta := &model.Metadata{System: model.System{IP: net.ParseIP("192.0.0.1")}} p := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}) actualResult := p.HandleStream(ctx, nil, reqDecoderMeta, bodyReader, report) @@ -179,15 +177,12 @@ func TestIntegrationRum(t *testing.T) { reqTimestamp := time.Date(2018, 8, 1, 10, 0, 0, 0, time.UTC) ctx = utility.ContextWithRequestTime(ctx, reqTimestamp) - reqDecoderMeta := map[string]interface{}{ - "user": map[string]interface{}{ - "user-agent": "rum-2.0", - "ip": "192.0.0.1", - }, - } + reqDecoderMeta := model.Metadata{ + UserAgent: model.UserAgent{Original: "rum-2.0"}, + Client: model.Client{IP: net.ParseIP("192.0.0.1")}} p := RUMProcessor(&config.Config{MaxEventSize: 100 * 1024}, &transform.Config{}) - actualResult := p.HandleStream(ctx, nil, reqDecoderMeta, bodyReader, report) + actualResult := p.HandleStream(ctx, nil, &reqDecoderMeta, bodyReader, report) assertApproveResult(t, actualResult, test.name) }) } @@ -220,16 +215,12 @@ func TestRUMV3(t *testing.T) { name := fmt.Sprintf("test_approved_es_documents/testIntake%s", test.name) reqTimestamp := time.Date(2018, 8, 1, 10, 0, 0, 0, time.UTC) ctx := utility.ContextWithRequestTime(context.Background(), reqTimestamp) - - reqDecoderMeta := map[string]interface{}{ - "user": map[string]interface{}{ - "user-agent": "rum-2.0", - "ip": "192.0.0.1", - }, - } + reqDecoderMeta := model.Metadata{ + UserAgent: model.UserAgent{Original: "rum-2.0"}, + Client: model.Client{IP: net.ParseIP("192.0.0.1")}} p := RUMV3Processor(&config.Config{MaxEventSize: 100 * 1024}, &transform.Config{}) - actualResult := p.HandleStream(ctx, nil, reqDecoderMeta, bodyReader, reporter(name)) + actualResult := p.HandleStream(ctx, nil, &reqDecoderMeta, bodyReader, reporter(name)) assertApproveResult(t, actualResult, test.name) verifyErr := approvals.ApproveEvents(resultEvents, name) @@ -265,7 +256,7 @@ func TestRateLimiting(t *testing.T) { } actualResult := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}).HandleStream( - context.Background(), test.lim, map[string]interface{}{}, bytes.NewReader(b), report) + context.Background(), test.lim, &model.Metadata{}, bytes.NewReader(b), report) assertApproveResult(t, actualResult, test.name) } } diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json index 7a220c6c759..52915217442 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json @@ -6,6 +6,9 @@ "name": "elastic-node", "version": "3.14.0" }, + "host": { + "ip": "192.0.0.1" + }, "parent": { "id": "abcdef0123456789" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json index 7a220c6c759..52915217442 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json @@ -6,6 +6,9 @@ "name": "elastic-node", "version": "3.14.0" }, + "host": { + "ip": "192.0.0.1" + }, "parent": { "id": "abcdef0123456789" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json index 06d8e8af0cc..a287923824a 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json @@ -14,6 +14,9 @@ "message": "Cannot read property 'baz' of undefined" } }, + "host": { + "ip": "192.0.0.1" + }, "process": { "pid": 1234 }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json index d43c3a011c9..25e0c62cb62 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json @@ -14,6 +14,9 @@ }, "double_gauge": 3.141592653589793, "float_gauge": 9.16, + "host": { + "ip": "192.0.0.1" + }, "integer_gauge": 42767, "labels": { "code": 200, @@ -103,6 +106,9 @@ } } }, + "host": { + "ip": "192.0.0.1" + }, "labels": { "tag1": "one", "tag2": 2 diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json index 3394feddfab..035bc7d7c87 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json @@ -6,6 +6,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "error": { "culprit": "test/e2e/general-usecase/app.e2e-bundle.min.js?token=secret", "custom": { @@ -65,6 +68,9 @@ "name": "apm-a-rum-test-e2e-general-usecase", "version": "0.0.1" }, + "source": { + "ip": "192.0.0.1" + }, "timestamp": { "us": 1533117600000000 }, @@ -80,6 +86,9 @@ "email": "em", "id": "uId", "name": "un" + }, + "user_agent": { + "original": "rum-2.0" } } ] diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json index a2c5567eac6..6caa9bcf697 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json @@ -6,6 +6,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "http": { "request": { "headers": { @@ -56,6 +59,9 @@ }, "version": "0.0.1" }, + "source": { + "ip": "192.0.0.1" + }, "timestamp": { "us": 1533117600000000 }, @@ -122,6 +128,9 @@ "email": "em", "id": "uId", "name": "un" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -130,6 +139,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -182,6 +194,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -190,6 +205,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -242,6 +260,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -250,6 +271,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "destination": { "address": "localhost", "port": 8000 @@ -323,6 +347,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -331,6 +358,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -382,6 +412,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -390,6 +423,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "destination": { "address": "localhost", "port": 8000 @@ -463,6 +499,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -471,6 +510,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "destination": { "address": "localhost", "port": 8003 @@ -544,6 +586,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -552,6 +597,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "destination": { "address": "localhost", "port": 8003 @@ -626,6 +674,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -634,6 +685,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -708,6 +762,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -716,6 +773,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -757,6 +817,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -765,6 +828,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -806,6 +872,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -814,6 +883,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "testTagKey": "testTagValue" }, @@ -855,6 +927,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } }, { @@ -863,6 +938,9 @@ "name": "js-base", "version": "4.8.1" }, + "client": { + "ip": "192.0.0.1" + }, "labels": { "tag1": "value1", "testTagKey": "testTagValue" @@ -902,6 +980,9 @@ "email": "user@email.com", "id": "123", "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" } } ] diff --git a/tests/system/metricset.approved.json b/tests/system/metricset.approved.json index 4d16befe0ca..025d4726f28 100644 --- a/tests/system/metricset.approved.json +++ b/tests/system/metricset.approved.json @@ -20,6 +20,9 @@ } } }, + "host": { + "ip": "127.0.0.1" + }, "labels": { "tag1": "one", "tag2": 2 @@ -74,6 +77,9 @@ "ingested": "2020-04-22T14:55:05.368308Z" }, "float_gauge": 9.16, + "host": { + "ip": "127.0.0.1" + }, "integer_gauge": 42767, "labels": { "code": 200,