Skip to content

Commit

Permalink
Change to using sobek instead of goja
Browse files Browse the repository at this point in the history
We are moving to a fork of goja under grafana org called sobek.

More info in:
- grafana/k6#3772
- grafana/k6#3773
  • Loading branch information
mstoykov authored and skryukov committed Jun 24, 2024
1 parent 4efc679 commit 3c316ae
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 123 deletions.
27 changes: 15 additions & 12 deletions cable.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import (
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"

"github.com/dop251/goja"
"github.com/gorilla/websocket"
"github.com/grafana/sobek"
"github.com/sirupsen/logrus"
)

// errCableInInitContext is returned when cable used in the init context
var errCableInInitContext = common.NewInitContextError("using cable in the init context is not supported")

// Connect connects to the websocket, creates and starts client, and returns it to the js.
func (c *Cable) Connect(cableUrl string, opts goja.Value) (*Client, error) {
func (c *Cable) Connect(cableUrl string, opts sobek.Value) (*Client, error) {
state := c.vu.State()
if state == nil {
return nil, errCableInInitContext
Expand Down Expand Up @@ -99,20 +99,24 @@ func (c *Cable) Connect(cableUrl string, opts goja.Value) (*Client, error) {

metrics.PushIfNotDone(c.vu.Context(), state.Samples, metrics.ConnectedSamples{
Samples: []metrics.Sample{
{TimeSeries: metrics.TimeSeries{
Metric: state.BuiltinMetrics.WSSessions,
Tags: tagsAndMeta.Tags,
},
{
TimeSeries: metrics.TimeSeries{
Metric: state.BuiltinMetrics.WSSessions,
Tags: tagsAndMeta.Tags,
},
Time: connectionStart,
Metadata: tagsAndMeta.Metadata,
Value: 1},
{TimeSeries: metrics.TimeSeries{
Metric: state.BuiltinMetrics.WSConnecting,
Tags: tagsAndMeta.Tags,
Value: 1,
},
{
TimeSeries: metrics.TimeSeries{
Metric: state.BuiltinMetrics.WSConnecting,
Tags: tagsAndMeta.Tags,
},
Time: connectionStart,
Metadata: tagsAndMeta.Metadata,
Value: metrics.D(connectionEnd.Sub(connectionStart))},
Value: metrics.D(connectionEnd.Sub(connectionStart)),
},
},
Tags: tagsAndMeta.Tags,
Time: connectionStart,
Expand All @@ -138,7 +142,6 @@ func (c *Cable) Connect(cableUrl string, opts goja.Value) (*Client, error) {
}

err = client.start()

if err != nil {
logger.Errorf("failed to initialize Action Cable connection: %v", err)
return nil, nil
Expand Down
34 changes: 15 additions & 19 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

"github.com/dop251/goja"
"github.com/grafana/sobek"
"github.com/sirupsen/logrus"
"go.k6.io/k6/js/modules"
)
Expand All @@ -22,7 +22,7 @@ type Channel struct {
ackMu sync.Mutex
readCh chan *cableMsg

asyncHandlers []goja.Callable
asyncHandlers []sobek.Callable

ignoreReads bool

Expand All @@ -42,7 +42,7 @@ func NewChannel(c *Client, identifier string) *Channel {
}

// Perform sends passed action with additional data to the channel
func (ch *Channel) Perform(action string, attr goja.Value) error {
func (ch *Channel) Perform(action string, attr sobek.Value) error {
rt := ch.client.vu.Runtime()
obj := attr.ToObject(rt).Export().(map[string]interface{})
obj["action"] = action
Expand All @@ -64,7 +64,7 @@ func (ch *Channel) IgnoreReads() {
}

// Receive checks channels messages query for message, sugar for ReceiveN(1, attrs)
func (ch *Channel) Receive(attr goja.Value) interface{} {
func (ch *Channel) Receive(attr sobek.Value) interface{} {
results := ch.ReceiveN(1, attr)
if len(results) == 0 {
return nil
Expand All @@ -74,12 +74,11 @@ func (ch *Channel) Receive(attr goja.Value) interface{} {
}

// ReceiveN checks channels messages query for provided number of messages satisfying provided condition.
func (ch *Channel) ReceiveN(n int, cond goja.Value) []interface{} {
func (ch *Channel) ReceiveN(n int, cond sobek.Value) []interface{} {
var results []interface{}
timeout := ch.client.recTimeout
timer := time.NewTimer(timeout)
matcher, err := ch.buildMatcher(cond)

if err != nil {
panic(err)
}
Expand All @@ -105,12 +104,11 @@ func (ch *Channel) ReceiveN(n int, cond goja.Value) []interface{} {
}

// ReceiveAll fethes all messages for a given number of seconds.
func (ch *Channel) ReceiveAll(sec int, cond goja.Value) []interface{} {
func (ch *Channel) ReceiveAll(sec int, cond sobek.Value) []interface{} {
var results []interface{}
timeout := time.Duration(sec) * time.Second
timer := time.NewTimer(timeout)
matcher, err := ch.buildMatcher(cond)

if err != nil {
panic(err)
}
Expand All @@ -129,8 +127,8 @@ func (ch *Channel) ReceiveAll(sec int, cond goja.Value) []interface{} {
}

// Register callback to receive messages asynchronously
func (ch *Channel) OnMessage(fn goja.Value) {
f, isFunc := goja.AssertFunction(fn)
func (ch *Channel) OnMessage(fn sobek.Value) {
f, isFunc := sobek.AssertFunction(fn)

if !isFunc {
panic("argument must be a function")
Expand Down Expand Up @@ -177,8 +175,7 @@ func (ch *Channel) handleAsync(msg *cableMsg) {
}

for _, h := range ch.asyncHandlers {
_, err := h(goja.Undefined(), ch.client.vu.Runtime().ToValue(msg.Message))

_, err := h(sobek.Undefined(), ch.client.vu.Runtime().ToValue(msg.Message))
if err != nil {
if !strings.Contains(err.Error(), "context canceled") {
ch.logger.Errorf("can't call provided function: %s", err)
Expand All @@ -193,12 +190,11 @@ type Matcher interface {

type FuncMatcher struct {
vu modules.VU
f goja.Callable
f sobek.Callable
}

func (m *FuncMatcher) Match(msg interface{}) bool {
result, err := m.f(goja.Undefined(), m.vu.Runtime().ToValue(msg))

result, err := m.f(sobek.Undefined(), m.vu.Runtime().ToValue(msg))
if err != nil {
m.vu.State().Logger.Errorf("can't call provided function: %v", err)
}
Expand Down Expand Up @@ -250,16 +246,16 @@ func (PassthruMatcher) Match(_ interface{}) bool {
// - when condition is a func, result of func(msg) is used as a result of match
// - when condition is a string, match is successful when message matches provided string
// - when condition is an object, match is successful when message includes all object attributes
func (ch *Channel) buildMatcher(cond goja.Value) (Matcher, error) {
if cond == nil || goja.IsUndefined(cond) || goja.IsNull(cond) {
func (ch *Channel) buildMatcher(cond sobek.Value) (Matcher, error) {
if cond == nil || sobek.IsUndefined(cond) || sobek.IsNull(cond) {
return &PassthruMatcher{}, nil
}

if _, ok := cond.(*goja.Symbol); ok {
if _, ok := cond.(*sobek.Symbol); ok {
return &StringMatcher{cond.String()}, nil
}

userFunc, isFunc := goja.AssertFunction(cond)
userFunc, isFunc := sobek.AssertFunction(cond)

if isFunc {
return &FuncMatcher{ch.client.vu, userFunc}, nil
Expand Down
20 changes: 8 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"

"github.com/dop251/goja"
"github.com/gorilla/websocket"
"github.com/grafana/sobek"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -46,9 +46,8 @@ type Client struct {
}

// Subscribe creates and returns Channel
func (c *Client) Subscribe(channelName string, paramsIn goja.Value) (*Channel, error) {
func (c *Client) Subscribe(channelName string, paramsIn sobek.Value) (*Channel, error) {
promise, err := c.SubscribeAsync(channelName, paramsIn)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,20 +86,18 @@ func (sp *SubscribePromise) Await(ms int) (*Channel, error) {
}

// Subscribe creates and returns Channel
func (c *Client) SubscribeAsync(channelName string, paramsIn goja.Value) (*SubscribePromise, error) {
func (c *Client) SubscribeAsync(channelName string, paramsIn sobek.Value) (*SubscribePromise, error) {
c.mu.Lock()
defer c.mu.Unlock()

params, err := c.parseParams(paramsIn)

if err != nil {
return nil, err
}

params["channel"] = channelName

identifierJSON, err := json.Marshal(params)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -136,8 +133,8 @@ func (c *Client) Disconnect() {
}

// Repeat function in a loop until it returns false
func (c *Client) Loop(fn goja.Value) {
f, isFunc := goja.AssertFunction(fn)
func (c *Client) Loop(fn sobek.Value) {
f, isFunc := sobek.AssertFunction(fn)

if !isFunc {
panic("argument must be a function")
Expand All @@ -150,7 +147,7 @@ func (c *Client) Loop(fn goja.Value) {
return
default:
c.mu.Lock()
ret, err := f(goja.Undefined())
ret, err := f(sobek.Undefined())
c.mu.Unlock()

if err != nil {
Expand Down Expand Up @@ -310,13 +307,12 @@ func (c *Client) receiveIgnoringPing() (*cableMsg, error) {

return &msg, nil
}

}

func (c *Client) parseParams(in goja.Value) (map[string]interface{}, error) {
func (c *Client) parseParams(in sobek.Value) (map[string]interface{}, error) {
params := make(map[string]interface{})

if in == nil || goja.IsUndefined(in) || goja.IsNull(in) {
if in == nil || sobek.IsUndefined(in) || sobek.IsNull(in) {
return params, nil
}

Expand Down
1 change: 0 additions & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var ProtobufCodec = &Codec{
}

raw, err := ioutil.ReadAll(r)

if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions connect_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/http"
"time"

"github.com/dop251/goja"
"github.com/grafana/sobek"
)

type connectOptions struct {
Expand All @@ -25,10 +25,10 @@ const (
defaultReceiveTimeout = 1000
)

func parseOptions(rt *goja.Runtime, inOpts goja.Value) (*connectOptions, error) {
func parseOptions(rt *sobek.Runtime, inOpts sobek.Value) (*connectOptions, error) {
var outOpts connectOptions

if inOpts == nil || goja.IsUndefined(inOpts) || goja.IsNull(inOpts) {
if inOpts == nil || sobek.IsUndefined(inOpts) || sobek.IsNull(inOpts) {
return &outOpts, nil
}

Expand Down
45 changes: 31 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,35 +1,52 @@
module github.com/anycable/xk6-cable

go 1.19
go 1.20

require (
github.com/dop251/goja v0.0.0-20230621100801-7749907a8a20
github.com/golang/protobuf v1.5.3
github.com/gorilla/websocket v1.5.0
github.com/golang/protobuf v1.5.4
github.com/gorilla/websocket v1.5.1
github.com/grafana/sobek v0.0.0-20240607083612-4f0cd64f4e78
github.com/sirupsen/logrus v1.9.3
github.com/vmihailenco/msgpack/v5 v5.3.5
go.k6.io/k6 v0.46.0
go.k6.io/k6 v0.51.1-0.20240610082146-1f01a9bc2365
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/dlclark/regexp2 v1.9.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/dop251/goja v0.0.0-20240516125602-ccbae20bcec2 // indirect
github.com/evanw/esbuild v0.21.2 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect
github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.18.1 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/guregu/null.v3 v3.5.0 // indirect
)
Loading

0 comments on commit 3c316ae

Please sign in to comment.