Skip to content

Commit

Permalink
feat: JQ Processor (#88)
Browse files Browse the repository at this point in the history
* feat: init jq

* fix: jq test

* test: jq

* docs: jq

* refactor: ctx
  • Loading branch information
jshlbrd authored Mar 23, 2023
1 parent e1fcee6 commit 0adf249
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 1 deletion.
11 changes: 11 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
join: {
options: { separator: null },
},
jq: {
options: { query: null },
},
kv_store: {
options: { type: null, prefix: null, offset_ttl: null, kv_options: null },
},
Expand Down Expand Up @@ -461,6 +464,14 @@
type: 'join',
settings: std.mergePatch({ options: opt }, s),
},
jq(options=$.defaults.processor.jq.options,
settings=$.interfaces.processor.settings): {
local opt = std.mergePatch($.defaults.processor.jq.options, options),
local s = std.mergePatch($.interfaces.processor.settings, settings),

type: 'jq',
settings: std.mergePatch({ options: opt }, s),
},
kv_store(options=$.defaults.processor.kv_store.options,
settings=$.interfaces.processor.settings): {
local opt = std.mergePatch($.defaults.processor.kv_store.options, options),
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.1
github.com/iancoleman/strcase v0.2.0
github.com/ip2location/ip2location-go/v9 v9.5.0
github.com/itchyny/gojq v0.12.11
github.com/jshlbrd/go-aggregate v0.1.1
github.com/oschwald/geoip2-golang v1.8.0
github.com/oschwald/maxminddb-golang v1.10.0
Expand All @@ -30,6 +31,7 @@ require (
require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/itchyny/timefmt-go v0.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHL
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ip2location/ip2location-go/v9 v9.5.0 h1:7gqKncm4MhBrpJIK0PmV8o6Bf8YbbSAPjORzyjAv1iM=
github.com/ip2location/ip2location-go/v9 v9.5.0/go.mod h1:s5SV6YZL10TpfPpXw//7fEJC65G/yH7Oh+Tjq9JcQEQ=
github.com/itchyny/gojq v0.12.11 h1:YhLueoHhHiN4mkfM+3AyJV6EPcCxKZsOnYf+aVSwaQw=
github.com/itchyny/gojq v0.12.11/go.mod h1:o3FT8Gkbg/geT4pLI0tF3hvip5F3Y/uskjRz9OYa38g=
github.com/itchyny/timefmt-go v0.1.5 h1:G0INE2la8S6ru/ZI5JecgyzbbJNs5lG1RcBqa7Jm6GE=
github.com/itchyny/timefmt-go v0.1.5/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down Expand Up @@ -133,8 +137,9 @@ golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4=
golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
Expand Down
89 changes: 89 additions & 0 deletions process/jq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package process

import (
"context"
gojson "encoding/json"
"fmt"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/errors"
"github.com/itchyny/gojq"
)

// errJqNoOutputGenerated is returned when the jq query generates no output.
const errJqNoOutputGenerated = errors.Error("no output generated")

// jq processes data by applying jq queries.
//
// This processor supports the data handling pattern.
type procJQ struct {
process
Options procJQOptions `json:"options"`
}

type procJQOptions struct {
// Query is the jq query applied to data.
Query string `json:"query"`
}

// String returns the processor settings as an object.
func (p procJQ) String() string {
return toString(p)
}

// Closes resources opened by the processor.
func (p procJQ) Close(context.Context) error {
return nil
}

// Batch processes one or more capsules with the processor. Conditions are
// optionally applied to the data to enable processing.
func (p procJQ) Batch(ctx context.Context, capsules ...config.Capsule) ([]config.Capsule, error) {
return batchApply(ctx, capsules, p, p.Condition)
}

// Apply processes encapsulated data with the processor.
func (p procJQ) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) {
query, err := gojq.Parse(p.Options.Query)
if err != nil {
return capsule, err
}

var i interface{}
if err := gojson.Unmarshal(capsule.Data(), &i); err != nil {
return capsule, fmt.Errorf("process: jq: %v", err)
}

var arr []interface{}
iter := query.RunWithContext(ctx, i)

for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
return capsule, fmt.Errorf("process: jq: %v", err)
}

arr = append(arr, v)
}

var b []byte
switch len(arr) {
case 0:
err = errJqNoOutputGenerated
case 1:
b, err = gojson.Marshal(arr[0])
capsule.SetData(b)
default:
b, err = gojson.Marshal(arr)
capsule.SetData(b)
}

if err != nil {
return capsule, fmt.Errorf("process: jq: %v", err)
}

return capsule, nil
}
134 changes: 134 additions & 0 deletions process/jq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package process

import (
"bytes"
"context"
"testing"

"github.com/brexhq/substation/config"
)

var jsonTests = []struct {
name string
proc procJQ
test []byte
expected []byte
err error
}{
{
"access",
procJQ{
process: process{},
Options: procJQOptions{
Query: `.a`,
},
},
[]byte(`{"a":"b"}`),
[]byte(`"b"`),
nil,
},
{
"access",
procJQ{
process: process{},
Options: procJQOptions{
Query: `.a, .c`,
},
},
[]byte(`{"a":"b","c":"d"}`),
[]byte(`["b","d"]`),
nil,
},
{
"access",
procJQ{
process: process{},
Options: procJQOptions{
Query: `.a`,
},
},
[]byte(`{"a":{"b":"c"}}`),
[]byte(`{"b":"c"}`),
nil,
},
{
"array",
procJQ{
process: process{},
Options: procJQOptions{
Query: `.a`,
},
},
[]byte(`{"a":["b","c","d"]}`),
[]byte(`["b","c","d"]`),
nil,
},
{
"slice",
procJQ{
process: process{},
Options: procJQOptions{
Query: `.a[-1:]`,
},
},
[]byte(`{"a":["b","c","d","e","f","g"]}`),
[]byte(`["g"]`),
nil,
},
{
"recursion",
procJQ{
process: process{},
Options: procJQOptions{
Query: `walk( if type == "object" then
with_entries( select(
(.value != "") and
(.value != {}) and
(.value != null)
) )
else
. end)`,
},
},
[]byte(`{"a":{"b":{"c":""}},"d":null,"e":"f"}`),
[]byte(`{"e":"f"}`),
nil,
},
}

func TestJq(t *testing.T) {
ctx := context.TODO()
capsule := config.NewCapsule()

for _, test := range jsonTests {
capsule.SetData(test.test)

result, err := test.proc.Apply(ctx, capsule)
if err != nil {
t.Error(err)
}

if !bytes.Equal(result.Data(), test.expected) {
t.Errorf("expected %s, got %s", test.expected, result.Data())
}
}
}

func benchmarkJq(b *testing.B, applier procJQ, test config.Capsule) {
ctx := context.TODO()
for i := 0; i < b.N; i++ {
_, _ = applier.Apply(ctx, test)
}
}

func BenchmarkJq(b *testing.B) {
capsule := config.NewCapsule()
for _, test := range jsonTests {
b.Run(test.name,
func(b *testing.B) {
capsule.SetData(test.test)
benchmarkJq(b, test.proc, capsule)
},
)
}
}
8 changes: 8 additions & 0 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func NewApplier(cfg config.Config) (Applier, error) {
var p procJoin
_ = config.Decode(cfg.Settings, &p)
return p, nil
case "jq":
var p procJQ
_ = config.Decode(cfg.Settings, &p)
return p, nil
case "kv_store":
var p procKVStore
_ = config.Decode(cfg.Settings, &p)
Expand Down Expand Up @@ -319,6 +323,10 @@ func NewBatcher(cfg config.Config) (Batcher, error) { //nolint: cyclop, gocyclo
var p procJoin
_ = config.Decode(cfg.Settings, &p)
return p, nil
case "jq":
var p procJQ
_ = config.Decode(cfg.Settings, &p)
return p, nil
case "kv_store":
var p procKVStore
_ = config.Decode(cfg.Settings, &p)
Expand Down

0 comments on commit 0adf249

Please sign in to comment.