diff --git a/docs/operators/uri_parser.md b/docs/operators/uri_parser.md new file mode 100644 index 00000000..607bbc68 --- /dev/null +++ b/docs/operators/uri_parser.md @@ -0,0 +1,180 @@ +## `uri_parser` operator + +The `uri_parser` operator parses the string-type field selected by `parse_from` as [URI](https://tools.ietf.org/html/rfc3986). + +`uri_parser` can handle: +- Absolute URI + - `https://google.com/v1/app?user_id=2&uuid=57b4dad2-063c-4965-941c-adfd4098face` +- Relative URI + - `/app?user=admin` +- Query string + - `?request=681e6fc4-3314-4ccc-933e-4f9c9f0efd24&env=stage&env=dev` + - Query string must start with a question mark + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `uri_parser` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON | +| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON | +| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) | +| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | + + +### Output Fields + +The following fields are returned. Empty fields are not returned. + +| Field | Type | Example | Description | +| --- | --- | --- | --- | +| scheme | `string` | `"http"` | [URI Scheme](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml). HTTP, HTTPS, FTP, etc. | +| user | `string` | `"dev"` | [Userinfo](https://tools.ietf.org/html/rfc3986#section-3.2) username. Password is always ignored. | +| host | `string` | `"golang.org"` | The [hostname](https://tools.ietf.org/html/rfc3986#section-3.2.2) such as `www.example.com`, `example.com`, `example`. A scheme is required in order to parse the `host` field. | +| port | `string` | `"8443"` | The [port](https://tools.ietf.org/html/rfc3986#section-3.2.3) the request is sent to. A scheme is required in order to parse the `port` field. | +| path | `string` | `"/v1/app"` | URI request [path](https://tools.ietf.org/html/rfc3986#section-3.3). | +| query | `map[string][]string` | `"query":{"user":["admin"]}` | Parsed URI [query string](https://tools.ietf.org/html/rfc3986#section-3.4). | + + +### Example Configurations + + +#### Parse the field `message` as absolute URI + +Configuration: +```yaml +- type: uri_parser + parse_from: message +``` + + + + + + + +
Input record Output record
+ +```json +{ + "timestamp": "", + "record": { + "message": "https://dev:pass@google.com/app?user_id=2&token=001" + } +} +``` + + + +```json +{ + "timestamp": "", + "record": { + "host": "google.com", + "path": "/app", + "query": { + "user_id": [ + "2" + ], + "token": [ + "001" + ] + }, + "scheme": "https", + "user": "dev" + } +} +``` + +
+ +#### Parse the field `message` as relative URI + +Configuration: +```yaml +- type: uri_parser + parse_from: message +``` + + + + + + + +
Input record Output record
+ +```json +{ + "timestamp": "", + "record": { + "message": "/app?user=admin" + } +} +``` + + + +```json +{ + "timestamp": "", + "record": { + "path": "/app", + "query": { + "user": [ + "admin" + ] + } + } +} +``` + +
+ +#### Parse the field `query` as URI query string + +Configuration: +```yaml +- type: uri_parser + parse_from: query +``` + + + + + + + +
Input record Output record
+ +```json +{ + "timestamp": "", + "record": { + "query": "?request=681e6fc4-3314-4ccc-933e-4f9c9f0efd24&env=stage&env=dev" + } +} +``` + + + +```json +{ + "timestamp": "", + "record": { + "query": { + "env": [ + "stage", + "dev" + ], + "request": [ + "681e6fc4-3314-4ccc-933e-4f9c9f0efd24" + ] + } + } +} +``` + +
diff --git a/operator/builtin/parser/uri/uri.go b/operator/builtin/parser/uri/uri.go new file mode 100644 index 00000000..968e0d13 --- /dev/null +++ b/operator/builtin/parser/uri/uri.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uri + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" +) + +func init() { + operator.Register("uri_parser", func() operator.Builder { return NewURIParserConfig("") }) +} + +// NewURIParserConfig creates a new uri parser config with default values. +func NewURIParserConfig(operatorID string) *URIParserConfig { + return &URIParserConfig{ + ParserConfig: helper.NewParserConfig(operatorID, "uri_parser"), + } +} + +// URIParserConfig is the configuration of a uri parser operator. +type URIParserConfig struct { + helper.ParserConfig `yaml:",inline"` +} + +// Build will build a uri parser operator. +func (c URIParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + parserOperator, err := c.ParserConfig.Build(context) + if err != nil { + return nil, err + } + + uriParser := &URIParser{ + ParserOperator: parserOperator, + } + + return []operator.Operator{uriParser}, nil +} + +// URIParser is an operator that parses a uri. +type URIParser struct { + helper.ParserOperator +} + +// Process will parse an entry. +func (u *URIParser) Process(ctx context.Context, entry *entry.Entry) error { + return u.ParserOperator.ProcessWith(ctx, entry, u.parse) +} + +// parse will parse a uri from a field and attach it to an entry. +func (u *URIParser) parse(value interface{}) (interface{}, error) { + switch m := value.(type) { + case string: + return parseURI(m) + case []byte: + return parseURI(string(m)) + default: + return nil, fmt.Errorf("type '%T' cannot be parsed as URI", value) + } +} + +// parseURI takes an absolute or relative uri and returns the parsed values. +func parseURI(value string) (map[string]interface{}, error) { + m := make(map[string]interface{}) + + if strings.HasPrefix(value, "?") { + // remove the query string '?' prefix before parsing + v, err := url.ParseQuery(value[1:]) + if err != nil { + return nil, err + } + return queryToMap(v, m), nil + } + + x, err := url.ParseRequestURI(value) + if err != nil { + return nil, err + } + return urlToMap(x, m), nil +} + +// urlToMap converts a url.URL to a map, excludes any values that are not set. +func urlToMap(p *url.URL, m map[string]interface{}) map[string]interface{} { + scheme := p.Scheme + if scheme != "" { + m["scheme"] = scheme + } + + user := p.User.Username() + if user != "" { + m["user"] = user + } + + host := p.Hostname() + if host != "" { + m["host"] = host + } + + port := p.Port() + if port != "" { + m["port"] = port + } + + path := p.EscapedPath() + if path != "" { + m["path"] = path + } + + return queryToMap(p.Query(), m) +} + +// queryToMap converts a query string url.Values to a map. +func queryToMap(query url.Values, m map[string]interface{}) map[string]interface{} { + // no-op if query is empty, do not create the key m["query"] + if len(query) <= 0 { + return m + } + + /* 'parameter' will represent url.Values + map[string]interface{}{ + "parameter-a": []interface{}{ + "a", + "b", + }, + "parameter-b": []interface{}{ + "x", + "y", + }, + } + */ + parameters := map[string]interface{}{} + for param, values := range query { + parameters[param] = queryParamValuesToMap(values) + } + m["query"] = parameters + return m +} + + +// queryParamValuesToMap takes query string parameter values and +// returns an []interface populated with the values +func queryParamValuesToMap(values []string) []interface{} { + v := make([]interface{}, len(values)) + for i, value := range values { + v[i] = value + } + return v +} diff --git a/operator/builtin/parser/uri/uri_test.go b/operator/builtin/parser/uri/uri_test.go new file mode 100644 index 00000000..594dfb82 --- /dev/null +++ b/operator/builtin/parser/uri/uri_test.go @@ -0,0 +1,610 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uri + +import ( + "net/url" + "testing" + + "github.com/open-telemetry/opentelemetry-log-collection/testutil" + + "github.com/stretchr/testify/require" +) + +func newTestParser(t *testing.T) *URIParser { + cfg := NewURIParserConfig("test") + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + return op.(*URIParser) +} + +func TestURIParserBuildFailure(t *testing.T) { + cfg := NewURIParserConfig("test") + cfg.OnError = "invalid_on_error" + _, err := cfg.Build(testutil.NewBuildContext(t)) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestURIParserStringFailure(t *testing.T) { + parser := newTestParser(t) + _, err := parser.parse("invalid") + require.Error(t, err) + require.Contains(t, err.Error(), "parse \"invalid\": invalid URI for request") +} + +func TestURIParserByteFailure(t *testing.T) { + parser := newTestParser(t) + _, err := parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), "parse \"invalid\": invalid URI for request") +} + +func TestRegexParserInvalidType(t *testing.T) { + parser := newTestParser(t) + _, err := parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type '[]int' cannot be parsed as URI") +} + +func TestURIParserParse(t *testing.T) { + cases := []struct { + name string + inputRecord interface{} + expectedRecord map[string]interface{} + expectErr bool + }{ + { + "string", + "http://www.google.com/app?env=prod", + map[string]interface{}{ + "scheme": "http", + "host": "www.google.com", + "path": "/app", + "query": map[string]interface{}{ + "env": []interface{}{ + "prod", + }, + }, + }, + false, + }, + { + "byte", + []byte("http://google.com/app?env=prod"), + map[string]interface{}{ + "scheme": "http", + "host": "google.com", + "path": "/app", + "query": map[string]interface{}{ + "env": []interface{}{ + "prod", + }, + }, + }, + false, + }, + { + "byte", + []int{}, + map[string]interface{}{}, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + parser := URIParser{} + x, err := parser.parse(tc.inputRecord) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedRecord, x) + }) + } +} + +// Test all usecases: absolute uri, relative uri, query string +func TestParseURI(t *testing.T) { + cases := []struct { + name string + inputRecord string + expectedRecord map[string]interface{} + expectErr bool + }{ + { + "scheme-http", + "http://", + map[string]interface{}{ + "scheme": "http", + }, + false, + }, + { + "scheme-user", + "http://myuser:mypass@", + map[string]interface{}{ + "scheme": "http", + "user": "myuser", + }, + false, + }, + { + "scheme-host", + "http://golang.com", + map[string]interface{}{ + "scheme": "http", + "host": "golang.com", + }, + false, + }, + { + "scheme-host-root", + "http://golang.com/", + map[string]interface{}{ + "scheme": "http", + "host": "golang.com", + "path": "/", + }, + false, + }, + { + "scheme-host-minimal", + "http://golang", + map[string]interface{}{ + "scheme": "http", + "host": "golang", + }, + false, + }, + { + "host-missing-scheme", + "golang.org", + map[string]interface{}{}, + true, + }, + { + "sheme-port", + "http://:8080", + map[string]interface{}{ + "scheme": "http", + "port": "8080", + }, + false, + }, + { + "port-missing-scheme", + ":8080", + map[string]interface{}{}, + true, + }, + { + "path", + "/docs", + map[string]interface{}{ + "path": "/docs", + }, + false, + }, + { + "path-advanced", + `/x/y%2Fz`, + map[string]interface{}{ + "path": `/x/y%2Fz`, + }, + false, + }, + { + "path-root", + "/", + map[string]interface{}{ + "path": "/", + }, + false, + }, + { + "path-query", + "/v1/app?user=golang", + map[string]interface{}{ + "path": "/v1/app", + "query": map[string]interface{}{ + "user": []interface{}{ + "golang", + }, + }, + }, + false, + }, + { + "scheme-path", + "http:///v1/app", + map[string]interface{}{ + "scheme": "http", + "path": "/v1/app", + }, + false, + }, + { + "scheme-host-query", + "https://app.com?token=0000&env=prod&env=stage", + map[string]interface{}{ + "scheme": "https", + "host": "app.com", + "query": map[string]interface{}{ + "token": []interface{}{ + "0000", + }, + "env": []interface{}{ + "prod", + "stage", + }, + }, + }, + false, + }, + { + "minimal", + "http://golang.org", + map[string]interface{}{ + "scheme": "http", + "host": "golang.org", + }, + false, + }, + { + "advanced", + "https://go:password@golang.org:8443/v2/app?env=stage&token=456&index=105838&env=prod", + map[string]interface{}{ + "scheme": "https", + "user": "go", + "host": "golang.org", + "port": "8443", + "path": "/v2/app", + "query": map[string]interface{}{ + "token": []interface{}{ + "456", + }, + "index": []interface{}{ + "105838", + }, + "env": []interface{}{ + "stage", + "prod", + }, + }, + }, + false, + }, + { + "magnet", + "magnet:?xt=urn:sha1:HNCKHTQCWBTRNJIV4WNAE52SJUQCZO6C", + map[string]interface{}{ + "scheme": "magnet", + "query": map[string]interface{}{ + "xt": []interface{}{ + "urn:sha1:HNCKHTQCWBTRNJIV4WNAE52SJUQCZO6C", + }, + }, + }, + false, + }, + { + "sftp", + "sftp://ftp.com//home/name/employee.csv", + map[string]interface{}{ + "scheme": "sftp", + "host": "ftp.com", + "path": "//home/name/employee.csv", + }, + false, + }, + { + "missing-schema", + "golang.org/app", + map[string]interface{}{}, + true, + }, + { + "query-advanced", + "?token=0000&env=prod&env=stage&task=update&task=new&action=update", + map[string]interface{}{ + "query": map[string]interface{}{ + "token": []interface{}{ + "0000", + }, + "env": []interface{}{ + "prod", + "stage", + }, + "task": []interface{}{ + "update", + "new", + }, + "action": []interface{}{ + "update", + }, + }, + }, + false, + }, + { + "query", + "?token=0000", + map[string]interface{}{ + "query": map[string]interface{}{ + "token": []interface{}{ + "0000", + }, + }, + }, + false, + }, + { + "query-empty", + "?", + map[string]interface{}{}, + false, + }, + { + "query-empty-key", + "?user=", + map[string]interface{}{ + "query": map[string]interface{}{ + "user": []interface{}{ + "", // no value + }, + }, + }, + false, + }, + // Query string without a ? prefix is treated as a URI, therefor + // an error will be returned by url.Parse("user=dev") + { + "query-no-?-prefix", + "user=dev", + map[string]interface{}{}, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + x, err := parseURI(tc.inputRecord) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedRecord, x) + }) + } +} + +func TestBuildParserURL(t *testing.T) { + newBasicURIParser := func() *URIParserConfig { + cfg := NewURIParserConfig("test") + cfg.OutputIDs = []string{"test"} + return cfg + } + + t.Run("BasicConfig", func(t *testing.T) { + c := newBasicURIParser() + _, err := c.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + }) +} + +func TestURLToMap(t *testing.T) { + cases := []struct { + name string + inputRecord url.URL + expectedRecord map[string]interface{} + }{ + { + "absolute-uri", + url.URL{ + Scheme: "https", + Host: "google.com:8443", + Path: "/app", + RawQuery: "stage=prod&stage=dev", + }, + map[string]interface{}{ + "scheme": "https", + "host": "google.com", + "port": "8443", + "path": "/app", + "query": map[string]interface{}{ + "stage": []interface{}{ + "prod", + "dev", + }, + }, + }, + }, + { + "absolute-uri-simple", + url.URL{ + Scheme: "http", + Host: "google.com", + }, + map[string]interface{}{ + "scheme": "http", + "host": "google.com", + }, + }, + { + "path", + url.URL{ + Path: "/app", + RawQuery: "stage=prod&stage=dev", + }, + map[string]interface{}{ + "path": "/app", + "query": map[string]interface{}{ + "stage": []interface{}{ + "prod", + "dev", + }, + }, + }, + }, + { + "path-simple", + url.URL{ + Path: "/app", + }, + map[string]interface{}{ + "path": "/app", + }, + }, + { + "query", + url.URL{ + RawQuery: "stage=prod&stage=dev", + }, + map[string]interface{}{ + "query": map[string]interface{}{ + "stage": []interface{}{ + "prod", + "dev", + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + m := make(map[string]interface{}) + require.Equal(t, tc.expectedRecord, urlToMap(&tc.inputRecord, m)) + }) + } +} + +func TestQueryToMap(t *testing.T) { + cases := []struct { + name string + inputRecord url.Values + expectedRecord map[string]interface{} + }{ + { + "query", + url.Values{ + "stage": []string{ + "prod", + "dev", + }, + }, + map[string]interface{}{ + "query": map[string]interface{}{ + "stage": []interface{}{ + "prod", + "dev", + }, + }, + }, + }, + { + "empty", + url.Values{}, + map[string]interface{}{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + m := make(map[string]interface{}) + require.Equal(t, tc.expectedRecord, queryToMap(tc.inputRecord, m)) + }) + } +} + +func TestQueryParamValuesToMap(t *testing.T) { + cases := []struct { + name string + inputRecord []string + expectedRecord []interface{} + }{ + { + "simple", + []string{ + "prod", + "dev", + }, + []interface{}{ + "prod", + "dev", + }, + }, + { + "empty", + []string{}, + []interface{}{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedRecord, queryParamValuesToMap(tc.inputRecord)) + }) + } +} + +func BenchmarkURIParserParse(b *testing.B) { + v := "https://dev:password@www.golang.org:8443/v1/app/stage?token=d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab&env=prod&env=stage&token=c6fa29f9-a31b-4584-b98d-aa8473b0e18d®ion=us-east1b&mode=fast" + parser := URIParser{} + for n := 0; n < b.N; n++ { + if _, err := parser.parse(v); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkURLToMap(b *testing.B) { + m := make(map[string]interface{}) + v := "https://dev:password@www.golang.org:8443/v1/app/stage?token=d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab&env=prod&env=stage&token=c6fa29f9-a31b-4584-b98d-aa8473b0e18d®ion=us-east1b&mode=fast" + u, err := url.ParseRequestURI(v) + if err != nil { + b.Fatal(err) + } + for n := 0; n < b.N; n++ { + urlToMap(u, m) + } +} + +func BenchmarkQueryToMap(b *testing.B) { + m := make(map[string]interface{}) + v := "?token=d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab&env=prod&env=stage&token=c6fa29f9-a31b-4584-b98d-aa8473b0e18d®ion=us-east1b&mode=fast" + u, err := url.ParseQuery(v) + if err != nil { + b.Fatal(err) + } + for n := 0; n < b.N; n++ { + queryToMap(u, m) + } +} + +func BenchmarkQueryParamValuesToMap(b *testing.B) { + v := []string{ + "d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab", + "c6fa29f9-a31b-4584-b98d-aa8473b0e18", + } + for n := 0; n < b.N; n++ { + queryParamValuesToMap(v) + } +}