diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go
index 3e4c5b59a..e39078c7c 100644
--- a/cmd/stanza/init_common.go
+++ b/cmd/stanza/init_common.go
@@ -16,6 +16,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/parser/severity"
_ "github.com/observiq/stanza/operator/builtin/parser/syslog"
_ "github.com/observiq/stanza/operator/builtin/parser/time"
+ _ "github.com/observiq/stanza/operator/builtin/parser/uri"
_ "github.com/observiq/stanza/operator/builtin/transformer/filter"
_ "github.com/observiq/stanza/operator/builtin/transformer/recombine"
diff --git a/docs/operators/uri_parser.md b/docs/operators/uri_parser.md
new file mode 100644
index 000000000..834e38dd1
--- /dev/null
+++ b/docs/operators/uri_parser.md
@@ -0,0 +1,180 @@
+## `uri_parser` operator
+
+The `uri_parser` operator parses the string-type field selected by `parse_from` as [URI](https://tools.ietf.org/html/rfc3986).
+
+`uri_parser` can handle:
+- Absolute URI
+ - `https://google.com/v1/app?user_id=2&uuid=57b4dad2-063c-4965-941c-adfd4098face`
+- Relative URI
+ - `/app?user=admin`
+- Query string
+ - `?request=681e6fc4-3314-4ccc-933e-4f9c9f0efd24&env=stage&env=dev`
+ - Query string must start with a question mark
+
+### Configuration Fields
+
+| Field | Default | Description |
+| --- | --- | --- |
+| `id` | `uri_parser` | A unique identifier for the operator |
+| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
+| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON |
+| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON |
+| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) |
+| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
+| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
+
+
+### Output Fields
+
+The following fields are returned. Empty fields are not returned.
+
+| Field | Type | Example | Description |
+| --- | --- | --- | --- |
+| scheme | `string` | `"http"` | [URI Scheme](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml). HTTP, HTTPS, FTP, etc. |
+| user | `string` | `"dev"` | [Userinfo](https://tools.ietf.org/html/rfc3986#section-3.2) username. Password is always ignored. |
+| host | `string` | `"golang.org"` | The [hostname](https://tools.ietf.org/html/rfc3986#section-3.2.2) such as `www.example.com`, `example.com`, `example`. A scheme is required in order to parse the `host` field. |
+| port | `string` | `"8443"` | The [port](https://tools.ietf.org/html/rfc3986#section-3.2.3) the request is sent to. A scheme is required in order to parse the `port` field. |
+| path | `string` | `"/v1/app"` | URI request [path](https://tools.ietf.org/html/rfc3986#section-3.3). |
+| query | `map[string][]string` | `"query":{"user":["admin"]}` | Parsed URI [query string](https://tools.ietf.org/html/rfc3986#section-3.4). |
+
+
+### Example Configurations
+
+
+#### Parse the field `message` as absolute URI
+
+Configuration:
+```yaml
+- type: uri_parser
+ parse_from: message
+```
+
+
+ Input record | Output record |
+
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "message": "https://dev:securepass@www.google.com/v1/app?user_id=2&uuid=57b4dad2-063c-4965-941c-adfd4098face"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "host": "google.com",
+ "path": "/v1/app",
+ "query": {
+ "user_id": [
+ "2"
+ ],
+ "uuid": [
+ "57b4dad2-063c-4965-941c-adfd4098face"
+ ]
+ },
+ "scheme": "https",
+ "user": "dev"
+ }
+}
+```
+
+ |
+
+
+
+#### Parse the field `message` as relative URI
+
+Configuration:
+```yaml
+- type: uri_parser
+ parse_from: message
+```
+
+
+ Input record | Output record |
+
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "message": "/app?user=admin"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "path": "/app",
+ "query": {
+ "user": [
+ "admin"
+ ]
+ }
+ }
+}
+```
+
+ |
+
+
+
+#### Parse the field `query` as URI query string
+
+Configuration:
+```yaml
+- type: uri_parser
+ parse_from: query
+```
+
+
+ Input record | Output record |
+
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "query": "?request=681e6fc4-3314-4ccc-933e-4f9c9f0efd24&env=stage&env=dev"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "timestamp": "",
+ "record": {
+ "query": {
+ "env": [
+ "stage",
+ "dev"
+ ],
+ "request": [
+ "681e6fc4-3314-4ccc-933e-4f9c9f0efd24"
+ ]
+ }
+ }
+}
+```
+
+ |
+
+
diff --git a/operator/builtin/parser/uri/uri.go b/operator/builtin/parser/uri/uri.go
new file mode 100644
index 000000000..72ba0ce42
--- /dev/null
+++ b/operator/builtin/parser/uri/uri.go
@@ -0,0 +1,152 @@
+package uri
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "strings"
+
+ "github.com/observiq/stanza/entry"
+ "github.com/observiq/stanza/operator"
+ "github.com/observiq/stanza/operator/helper"
+)
+
+func init() {
+ operator.Register("uri_parser", func() operator.Builder { return NewURIParserConfig("") })
+}
+
+// NewURIParserConfig creates a new uri parser config with default values.
+func NewURIParserConfig(operatorID string) *URIParserConfig {
+ return &URIParserConfig{
+ ParserConfig: helper.NewParserConfig(operatorID, "uri_parser"),
+ }
+}
+
+// URIParserConfig is the configuration of a uri parser operator.
+type URIParserConfig struct {
+ helper.ParserConfig `yaml:",inline"`
+}
+
+// Build will build a uri parser operator.
+func (c URIParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
+ parserOperator, err := c.ParserConfig.Build(context)
+ if err != nil {
+ return nil, err
+ }
+
+ uriParser := &URIParser{
+ ParserOperator: parserOperator,
+ }
+
+ return []operator.Operator{uriParser}, nil
+}
+
+// URIParser is an operator that parses a uri.
+type URIParser struct {
+ helper.ParserOperator
+}
+
+// Process will parse an entry.
+func (u *URIParser) Process(ctx context.Context, entry *entry.Entry) error {
+ return u.ParserOperator.ProcessWith(ctx, entry, u.parse)
+}
+
+// parse will parse a uri from a field and attach it to an entry.
+func (u *URIParser) parse(value interface{}) (interface{}, error) {
+ switch m := value.(type) {
+ case string:
+ return parseURI(m)
+ case []byte:
+ return parseURI(string(m))
+ default:
+ return nil, fmt.Errorf("type '%T' cannot be parsed as URI", value)
+ }
+}
+
+// parseURI takes an absolute or relative uri and returns the parsed values.
+func parseURI(value string) (map[string]interface{}, error) {
+ m := make(map[string]interface{})
+
+ if strings.HasPrefix(value, "?") {
+ // remove the query string '?' prefix before parsing
+ v, err := url.ParseQuery(value[1:])
+ if err != nil {
+ return nil, err
+ }
+ return queryToMap(v, m), nil
+ }
+
+ x, err := url.ParseRequestURI(value)
+ if err != nil {
+ return nil, err
+ }
+ return urlToMap(x, m), nil
+}
+
+// urlToMap converts a url.URL to a map, excludes any values that are not set.
+func urlToMap(p *url.URL, m map[string]interface{}) map[string]interface{} {
+ scheme := p.Scheme
+ if scheme != "" {
+ m["scheme"] = scheme
+ }
+
+ user := p.User.Username()
+ if user != "" {
+ m["user"] = user
+ }
+
+ host := p.Hostname()
+ if host != "" {
+ m["host"] = host
+ }
+
+ port := p.Port()
+ if port != "" {
+ m["port"] = port
+ }
+
+ path := p.EscapedPath()
+ if path != "" {
+ m["path"] = path
+ }
+
+ return queryToMap(p.Query(), m)
+}
+
+// queryToMap converts a query string url.Values to a map.
+func queryToMap(query url.Values, m map[string]interface{}) map[string]interface{} {
+ // no-op if query is empty, do not create the key m["query"]
+ if len(query) <= 0 {
+ return m
+ }
+
+ /* 'parameter' will represent url.Values
+ map[string]interface{}{
+ "parameter-a": []interface{}{
+ "a",
+ "b",
+ },
+ "parameter-b": []interface{}{
+ "x",
+ "y",
+ },
+ }
+ */
+ parameters := map[string]interface{}{}
+ for param, values := range query {
+ parameters[param] = queryParamValuesToMap(values)
+ }
+ m["query"] = parameters
+ return m
+}
+
+
+// queryParamValuesToMap takes query string parameter values and
+// returns an []interface populated with the values
+func queryParamValuesToMap(values []string) []interface{} {
+ v := make([]interface{}, len(values))
+ for i, value := range values {
+ v[i] = value
+ }
+ return v
+}
diff --git a/operator/builtin/parser/uri/uri_test.go b/operator/builtin/parser/uri/uri_test.go
new file mode 100644
index 000000000..3b5b18a15
--- /dev/null
+++ b/operator/builtin/parser/uri/uri_test.go
@@ -0,0 +1,596 @@
+package uri
+
+import (
+ "net/url"
+ "testing"
+
+ "github.com/observiq/stanza/testutil"
+
+ "github.com/stretchr/testify/require"
+)
+
+func newTestParser(t *testing.T) *URIParser {
+ cfg := NewURIParserConfig("test")
+ ops, err := cfg.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ op := ops[0]
+ return op.(*URIParser)
+}
+
+func TestURIParserBuildFailure(t *testing.T) {
+ cfg := NewURIParserConfig("test")
+ cfg.OnError = "invalid_on_error"
+ _, err := cfg.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "invalid `on_error` field")
+}
+
+func TestURIParserStringFailure(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parse("invalid")
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "parse \"invalid\": invalid URI for request")
+}
+
+func TestURIParserByteFailure(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parse([]byte("invalid"))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "parse \"invalid\": invalid URI for request")
+}
+
+func TestRegexParserInvalidType(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parse([]int{})
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "type '[]int' cannot be parsed as URI")
+}
+
+func TestURIParserParse(t *testing.T) {
+ cases := []struct {
+ name string
+ inputRecord interface{}
+ expectedRecord map[string]interface{}
+ expectErr bool
+ }{
+ {
+ "string",
+ "http://www.google.com/app?env=prod",
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "www.google.com",
+ "path": "/app",
+ "query": map[string]interface{}{
+ "env": []interface{}{
+ "prod",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "byte",
+ []byte("http://google.com/app?env=prod"),
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "google.com",
+ "path": "/app",
+ "query": map[string]interface{}{
+ "env": []interface{}{
+ "prod",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "byte",
+ []int{},
+ map[string]interface{}{},
+ true,
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ parser := URIParser{}
+ x, err := parser.parse(tc.inputRecord)
+ if tc.expectErr {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedRecord, x)
+ })
+ }
+}
+
+// Test all usecases: absolute uri, relative uri, query string
+func TestParseURI(t *testing.T) {
+ cases := []struct {
+ name string
+ inputRecord string
+ expectedRecord map[string]interface{}
+ expectErr bool
+ }{
+ {
+ "scheme-http",
+ "http://",
+ map[string]interface{}{
+ "scheme": "http",
+ },
+ false,
+ },
+ {
+ "scheme-user",
+ "http://myuser:mypass@",
+ map[string]interface{}{
+ "scheme": "http",
+ "user": "myuser",
+ },
+ false,
+ },
+ {
+ "scheme-host",
+ "http://golang.com",
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "golang.com",
+ },
+ false,
+ },
+ {
+ "scheme-host-root",
+ "http://golang.com/",
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "golang.com",
+ "path": "/",
+ },
+ false,
+ },
+ {
+ "scheme-host-minimal",
+ "http://golang",
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "golang",
+ },
+ false,
+ },
+ {
+ "host-missing-scheme",
+ "golang.org",
+ map[string]interface{}{},
+ true,
+ },
+ {
+ "sheme-port",
+ "http://:8080",
+ map[string]interface{}{
+ "scheme": "http",
+ "port": "8080",
+ },
+ false,
+ },
+ {
+ "port-missing-scheme",
+ ":8080",
+ map[string]interface{}{},
+ true,
+ },
+ {
+ "path",
+ "/docs",
+ map[string]interface{}{
+ "path": "/docs",
+ },
+ false,
+ },
+ {
+ "path-advanced",
+ `/x/y%2Fz`,
+ map[string]interface{}{
+ "path": `/x/y%2Fz`,
+ },
+ false,
+ },
+ {
+ "path-root",
+ "/",
+ map[string]interface{}{
+ "path": "/",
+ },
+ false,
+ },
+ {
+ "path-query",
+ "/v1/app?user=golang",
+ map[string]interface{}{
+ "path": "/v1/app",
+ "query": map[string]interface{}{
+ "user": []interface{}{
+ "golang",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "scheme-path",
+ "http:///v1/app",
+ map[string]interface{}{
+ "scheme": "http",
+ "path": "/v1/app",
+ },
+ false,
+ },
+ {
+ "scheme-host-query",
+ "https://app.com?token=0000&env=prod&env=stage",
+ map[string]interface{}{
+ "scheme": "https",
+ "host": "app.com",
+ "query": map[string]interface{}{
+ "token": []interface{}{
+ "0000",
+ },
+ "env": []interface{}{
+ "prod",
+ "stage",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "minimal",
+ "http://golang.org",
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "golang.org",
+ },
+ false,
+ },
+ {
+ "advanced",
+ "https://go:password@golang.org:8443/v2/app?env=stage&token=456&index=105838&env=prod",
+ map[string]interface{}{
+ "scheme": "https",
+ "user": "go",
+ "host": "golang.org",
+ "port": "8443",
+ "path": "/v2/app",
+ "query": map[string]interface{}{
+ "token": []interface{}{
+ "456",
+ },
+ "index": []interface{}{
+ "105838",
+ },
+ "env": []interface{}{
+ "stage",
+ "prod",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "magnet",
+ "magnet:?xt=urn:sha1:HNCKHTQCWBTRNJIV4WNAE52SJUQCZO6C",
+ map[string]interface{}{
+ "scheme": "magnet",
+ "query": map[string]interface{}{
+ "xt": []interface{}{
+ "urn:sha1:HNCKHTQCWBTRNJIV4WNAE52SJUQCZO6C",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "sftp",
+ "sftp://ftp.com//home/name/employee.csv",
+ map[string]interface{}{
+ "scheme": "sftp",
+ "host": "ftp.com",
+ "path": "//home/name/employee.csv",
+ },
+ false,
+ },
+ {
+ "missing-schema",
+ "golang.org/app",
+ map[string]interface{}{},
+ true,
+ },
+ {
+ "query-advanced",
+ "?token=0000&env=prod&env=stage&task=update&task=new&action=update",
+ map[string]interface{}{
+ "query": map[string]interface{}{
+ "token": []interface{}{
+ "0000",
+ },
+ "env": []interface{}{
+ "prod",
+ "stage",
+ },
+ "task": []interface{}{
+ "update",
+ "new",
+ },
+ "action": []interface{}{
+ "update",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "query",
+ "?token=0000",
+ map[string]interface{}{
+ "query": map[string]interface{}{
+ "token": []interface{}{
+ "0000",
+ },
+ },
+ },
+ false,
+ },
+ {
+ "query-empty",
+ "?",
+ map[string]interface{}{},
+ false,
+ },
+ {
+ "query-empty-key",
+ "?user=",
+ map[string]interface{}{
+ "query": map[string]interface{}{
+ "user": []interface{}{
+ "", // no value
+ },
+ },
+ },
+ false,
+ },
+ // Query string without a ? prefix is treated as a URI, therefor
+ // an error will be returned by url.Parse("user=dev")
+ {
+ "query-no-?-prefix",
+ "user=dev",
+ map[string]interface{}{},
+ true,
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ x, err := parseURI(tc.inputRecord)
+ if tc.expectErr {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedRecord, x)
+ })
+ }
+}
+
+func TestBuildParserURL(t *testing.T) {
+ newBasicURIParser := func() *URIParserConfig {
+ cfg := NewURIParserConfig("test")
+ cfg.OutputIDs = []string{"test"}
+ return cfg
+ }
+
+ t.Run("BasicConfig", func(t *testing.T) {
+ c := newBasicURIParser()
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ })
+}
+
+func TestURLToMap(t *testing.T) {
+ cases := []struct {
+ name string
+ inputRecord url.URL
+ expectedRecord map[string]interface{}
+ }{
+ {
+ "absolute-uri",
+ url.URL{
+ Scheme: "https",
+ Host: "google.com:8443",
+ Path: "/app",
+ RawQuery: "stage=prod&stage=dev",
+ },
+ map[string]interface{}{
+ "scheme": "https",
+ "host": "google.com",
+ "port": "8443",
+ "path": "/app",
+ "query": map[string]interface{}{
+ "stage": []interface{}{
+ "prod",
+ "dev",
+ },
+ },
+ },
+ },
+ {
+ "absolute-uri-simple",
+ url.URL{
+ Scheme: "http",
+ Host: "google.com",
+ },
+ map[string]interface{}{
+ "scheme": "http",
+ "host": "google.com",
+ },
+ },
+ {
+ "path",
+ url.URL{
+ Path: "/app",
+ RawQuery: "stage=prod&stage=dev",
+ },
+ map[string]interface{}{
+ "path": "/app",
+ "query": map[string]interface{}{
+ "stage": []interface{}{
+ "prod",
+ "dev",
+ },
+ },
+ },
+ },
+ {
+ "path-simple",
+ url.URL{
+ Path: "/app",
+ },
+ map[string]interface{}{
+ "path": "/app",
+ },
+ },
+ {
+ "query",
+ url.URL{
+ RawQuery: "stage=prod&stage=dev",
+ },
+ map[string]interface{}{
+ "query": map[string]interface{}{
+ "stage": []interface{}{
+ "prod",
+ "dev",
+ },
+ },
+ },
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ m := make(map[string]interface{})
+ require.Equal(t, tc.expectedRecord, urlToMap(&tc.inputRecord, m))
+ })
+ }
+}
+
+func TestQueryToMap(t *testing.T) {
+ cases := []struct {
+ name string
+ inputRecord url.Values
+ expectedRecord map[string]interface{}
+ }{
+ {
+ "query",
+ url.Values{
+ "stage": []string{
+ "prod",
+ "dev",
+ },
+ },
+ map[string]interface{}{
+ "query": map[string]interface{}{
+ "stage": []interface{}{
+ "prod",
+ "dev",
+ },
+ },
+ },
+ },
+ {
+ "empty",
+ url.Values{},
+ map[string]interface{}{},
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ m := make(map[string]interface{})
+ require.Equal(t, tc.expectedRecord, queryToMap(tc.inputRecord, m))
+ })
+ }
+}
+
+func TestQueryParamValuesToMap(t *testing.T) {
+ cases := []struct {
+ name string
+ inputRecord []string
+ expectedRecord []interface{}
+ }{
+ {
+ "simple",
+ []string{
+ "prod",
+ "dev",
+ },
+ []interface{}{
+ "prod",
+ "dev",
+ },
+ },
+ {
+ "empty",
+ []string{},
+ []interface{}{},
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ require.Equal(t, tc.expectedRecord, queryParamValuesToMap(tc.inputRecord))
+ })
+ }
+}
+
+func BenchmarkURIParserParse(b *testing.B) {
+ v := "https://dev:password@www.golang.org:8443/v1/app/stage?token=d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab&env=prod&env=stage&token=c6fa29f9-a31b-4584-b98d-aa8473b0e18d®ion=us-east1b&mode=fast"
+ parser := URIParser{}
+ for n := 0; n < b.N; n++ {
+ if _, err := parser.parse(v); err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func BenchmarkURLToMap(b *testing.B) {
+ m := make(map[string]interface{})
+ v := "https://dev:password@www.golang.org:8443/v1/app/stage?token=d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab&env=prod&env=stage&token=c6fa29f9-a31b-4584-b98d-aa8473b0e18d®ion=us-east1b&mode=fast"
+ u, err := url.ParseRequestURI(v)
+ if err != nil {
+ b.Fatal(err)
+ }
+ for n := 0; n < b.N; n++ {
+ urlToMap(u, m)
+ }
+}
+
+func BenchmarkQueryToMap(b *testing.B) {
+ m := make(map[string]interface{})
+ v := "?token=d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab&env=prod&env=stage&token=c6fa29f9-a31b-4584-b98d-aa8473b0e18d®ion=us-east1b&mode=fast"
+ u, err := url.ParseQuery(v)
+ if err != nil {
+ b.Fatal(err)
+ }
+ for n := 0; n < b.N; n++ {
+ queryToMap(u, m)
+ }
+}
+
+func BenchmarkQueryParamValuesToMap(b *testing.B) {
+ v := []string{
+ "d9e28b1d-2c7b-4853-be6a-d94f34a5d4ab",
+ "c6fa29f9-a31b-4584-b98d-aa8473b0e18",
+ }
+ for n := 0; n < b.N; n++ {
+ queryParamValuesToMap(v)
+ }
+}