Skip to content

Commit

Permalink
Chore: Fix linter errors
Browse files Browse the repository at this point in the history
  • Loading branch information
joanlopez committed Apr 19, 2024
1 parent d546366 commit 3e5ddc9
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 64 deletions.
2 changes: 1 addition & 1 deletion js/modules/k6/experimental/streams/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func errToObj(rt *goja.Runtime, err any) goja.Value {
return rt.ToValue(err)
}

if e, ok := err.(*goja.Exception); ok { //nolint:errorlint // we don't really want to unwrap here
if e, ok := err.(*goja.Exception); ok {
return e.Value().ToObject(rt)
}

Expand Down
21 changes: 16 additions & 5 deletions js/modules/k6/experimental/streams/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj
var err error

// 1. If underlyingSource is missing, set it to null.
var underlyingSource *goja.Object = nil
var underlyingSource *goja.Object

var (
strategy *goja.Object
Expand Down Expand Up @@ -99,8 +99,13 @@ func (mi *ModuleInstance) NewReadableStream(call goja.ConstructorCall) *goja.Obj
// 5.3. Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
highWaterMark := extractHighWaterMark(rt, strategy, 1)

// 5.4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm).
stream.setupReadableStreamDefaultControllerFromUnderlyingSource(underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm)
// 5.4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(...).
stream.setupReadableStreamDefaultControllerFromUnderlyingSource(
underlyingSource,
underlyingSourceDict,
highWaterMark,
sizeAlgorithm,
)
}

streamObj := rt.ToValue(stream).ToObject(rt)
Expand Down Expand Up @@ -171,7 +176,11 @@ func (mi *ModuleInstance) NewCountQueuingStrategy(call goja.ConstructorCall) *go
//
// It allows to create a CountQueuingStrategy with or without the 'size' property,
// depending on how the containing ReadableStream is initialized.
func (mi *ModuleInstance) newCountQueuingStrategy(rt *goja.Runtime, call goja.ConstructorCall, size goja.Value) *goja.Object {
func (mi *ModuleInstance) newCountQueuingStrategy(
rt *goja.Runtime,
call goja.ConstructorCall,
size goja.Value,
) *goja.Object {
obj := rt.NewObject()

if len(call.Arguments) != 1 {
Expand Down Expand Up @@ -216,7 +225,9 @@ func extractHighWaterMark(rt *goja.Runtime, strategy *goja.Object, defaultHWM fl
highWaterMark := strategy.Get("highWaterMark")

// 3. If highWaterMark is NaN or highWaterMark < 0, throw a RangeError exception.
if goja.IsNaN(strategy.Get("highWaterMark")) || !isNumber(strategy.Get("highWaterMark")) || !isNonNegativeNumber(strategy.Get("highWaterMark")) {
if goja.IsNaN(strategy.Get("highWaterMark")) ||
!isNumber(strategy.Get("highWaterMark")) ||
!isNonNegativeNumber(strategy.Get("highWaterMark")) {
throw(rt, newError(RangeError, "highWaterMark must be a non-negative number"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,13 @@ func (controller *ReadableStreamDefaultController) enqueue(chunk goja.Value) err
// 2. Let stream be controller.[[stream]].
stream := controller.stream

// 3. If ! IsReadableStreamLocked(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0, perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
// 3. If ! IsReadableStreamLocked(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0,
// perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
if stream.isLocked() && stream.getNumReadRequests() > 0 {
stream.fulfillReadRequest(chunk, false)
} else { // 4. Otherwise,
// 4.1. Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting the result as a completion record.
// 4.1. Let result be the result of performing controller.[[strategySizeAlgorithm]],
// passing in chunk, and interpreting the result as a completion record.
size, err := controller.strategySizeAlgorithm(goja.Undefined(), chunk)
// 4.2 If result is an abrupt completion,
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (reader *ReadableStreamDefaultReader) ReleaseLock() {

// release implements the [ReadableStreamDefaultReaderRelease] algorithm.
//
// [ReadableStreamDefaultReaderRelease]: https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease
// [ReadableStreamDefaultReaderRelease]:
// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease
func (reader *ReadableStreamDefaultReader) release() {
// 1. Perform ! ReadableStreamReaderGenericRelease(reader).
reader.BaseReadableStreamReader.release()
Expand Down
5 changes: 2 additions & 3 deletions js/modules/k6/experimental/streams/readable_stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ func (reader *BaseReadableStreamReader) release() {
}

var streamReader *BaseReadableStreamReader
switch v := stream.reader.(type) {
case *ReadableStreamDefaultReader:
if v, ok := stream.reader.(*ReadableStreamDefaultReader); ok {
streamReader = &v.BaseReadableStreamReader
}

Expand All @@ -134,7 +133,7 @@ func (reader *BaseReadableStreamReader) release() {

// 4. If stream.[[state]] is "readable", reject reader.[[closedPromise]] with a TypeError exception.
if stream.state == ReadableStreamStateReadable {
//reader.closedPromiseRejectFunc(newError(TypeError, "stream is readable"))
reader.closedPromiseRejectFunc(newError(TypeError, "stream is readable"))
} else { // 5. Otherwise, set reader.[[closedPromise]] to a promise rejected with a TypeError exception.
reader.closedPromise = newRejectedPromise(stream.vu, newError(TypeError, "stream is not readable"))
}
Expand Down
97 changes: 46 additions & 51 deletions js/modules/k6/experimental/streams/readable_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ type ReadableStream struct {
// with the ability to control the state and queue of this stream.
controller ReadableStreamController

// detached is a boolean flag set to true when the stream is transferred
detached bool

// disturbed is true when the stream has been read from or canceled
disturbed bool

Expand Down Expand Up @@ -167,20 +164,7 @@ func (stream *ReadableStream) setupReadableStreamDefaultControllerFromUnderlying
// which returns the result of invoking underlyingSourceDict["start"] with argument
// list « controller » and callback this value underlyingSource.
if underlyingSourceDict.startSet {
call, ok := goja.AssertFunction(underlyingSource.Get("start"))
if !ok {
common.Throw(stream.runtime, errors.New("underlyingSource.[[start]] must be a function"))
}

startAlgorithm = func(obj *goja.Object) (v goja.Value) {
var err error
v, err = call(underlyingSource, obj)
if err != nil {
panic(err)
}

return v
}
startAlgorithm = stream.startAlgorithm(underlyingSource)
}

// 6. If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which
Expand All @@ -205,37 +189,58 @@ func (stream *ReadableStream) setupReadableStreamDefaultControllerFromUnderlying
// reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list « reason » and
// callback this value underlyingSource.
if underlyingSourceDict.cancelSet {
call, ok := goja.AssertFunction(underlyingSource.Get("cancel"))
if !ok {
common.Throw(stream.runtime, errors.New("underlyingSource.[[cancel]] must be a function"))
cancelAlgorithm = stream.cancelAlgorithm(underlyingSource)
}

// 8. Perform ? SetUpReadableStreamDefaultController(...)
stream.setupDefaultController(controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)
}

func (stream *ReadableStream) startAlgorithm(underlyingSource *goja.Object) UnderlyingSourceStartCallback {
call, ok := goja.AssertFunction(underlyingSource.Get("start"))
if !ok {
common.Throw(stream.runtime, errors.New("underlyingSource.[[start]] must be a function"))
}

return func(obj *goja.Object) (v goja.Value) {
var err error
v, err = call(underlyingSource, obj)
if err != nil {
panic(err)
}

cancelAlgorithm = func(reason any) goja.Value {
var p *goja.Promise
return v
}
}

if e := stream.runtime.Try(func() {
res, err := call(underlyingSource, stream.runtime.ToValue(reason))
if err != nil {
panic(err)
}

if cp, ok := res.Export().(*goja.Promise); ok {
p = cp
}
}); e != nil {
p = newRejectedPromise(stream.vu, e.Value())
func (stream *ReadableStream) cancelAlgorithm(underlyingSource *goja.Object) UnderlyingSourceCancelCallback {
call, ok := goja.AssertFunction(underlyingSource.Get("cancel"))
if !ok {
common.Throw(stream.runtime, errors.New("underlyingSource.[[cancel]] must be a function"))
}

return func(reason any) goja.Value {
var p *goja.Promise

if e := stream.runtime.Try(func() {
res, err := call(underlyingSource, stream.runtime.ToValue(reason))
if err != nil {
panic(err)
}

if p == nil {
p = newResolvedPromise(stream.vu, goja.Undefined())
if cp, ok := res.Export().(*goja.Promise); ok {
p = cp
}
}); e != nil {
p = newRejectedPromise(stream.vu, e.Value())
}

return stream.vu.Runtime().ToValue(p)
if p == nil {
p = newResolvedPromise(stream.vu, goja.Undefined())
}
}

// 8. Perform ? SetUpReadableStreamDefaultController(...)
stream.setupDefaultController(controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)
return stream.vu.Runtime().ToValue(p)
}
}

// setupDefaultController implements the specification's [SetUpReadableStreamDefaultController] abstract operation.
Expand Down Expand Up @@ -264,14 +269,10 @@ func (stream *ReadableStream) setupDefaultController(

// 4. Set controller.[[started]], controller.[[closeRequested]], controller.[[pullAgain]], and
// controller.[[pulling]] to false.
controller.started = false
controller.closeRequested = false
controller.pullAgain = false
controller.pulling = false
controller.started, controller.closeRequested, controller.pullAgain, controller.pulling = false, false, false, false

// 5. Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm and controller.[[strategyHWM]] to highWaterMark.
controller.strategySizeAlgorithm = sizeAlgorithm
controller.strategyHWM = highWaterMark
controller.strategySizeAlgorithm, controller.strategyHWM = sizeAlgorithm, highWaterMark

// 6. Set controller.[[pullAlgorithm]] to pullAlgorithm.
controller.pullAlgorithm = pullAlgorithm
Expand Down Expand Up @@ -301,27 +302,22 @@ func (stream *ReadableStream) setupDefaultController(
} else {
startPromise = newResolvedPromise(controller.stream.vu, startResult)
}

_, err = promiseThen(stream.vu.Runtime(), startPromise,
// 11. Upon fulfillment of startPromise,
func(goja.Value) {
// 11.1. Set controller.[[started]] to true.
controller.started = true

// 11.2. Assert: controller.[[pulling]] is false.
if controller.pulling {
common.Throw(stream.vu.Runtime(), newError(AssertionError, "controller `pulling` state is not false"))
}

// 11.3. Assert: controller.[[pullAgain]] is false.
if controller.pullAgain {
common.Throw(stream.vu.Runtime(), newError(AssertionError, "controller `pullAgain` state is not false"))
}

// 11.4. Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
controller.callPullIfNeeded()
},

// 12. Upon rejection of startPromise with reason r,
func(err goja.Value) {
controller.error(err)
Expand Down Expand Up @@ -545,7 +541,6 @@ func (stream *ReadableStream) fulfillReadRequest(chunk any, done bool) {
readRequest.closeSteps()
return nil
})

} else {
// 7. Otherwise, perform readRequest’s chunk steps, given chunk.
callback(func() error {
Expand Down
3 changes: 3 additions & 0 deletions js/modules/k6/experimental/streams/readable_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

func TestReadableStream(t *testing.T) {
t.Parallel()

suites := []string{
"bad-strategies.js",
"bad-underlying-sources.js",
Expand All @@ -26,6 +28,7 @@ func TestReadableStream(t *testing.T) {
}

for _, s := range suites {
s := s
t.Run(s, func(t *testing.T) {
t.Parallel()
ts := newConfiguredRuntime(t)
Expand Down
4 changes: 3 additions & 1 deletion js/modules/k6/experimental/streams/underlying_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func NewUnderlyingSourceFromObject(rt *goja.Runtime, obj *goja.Object) (Underlyi

// We only accept a valid underlyingSource.[[type]]
underlyingSourceType := obj.Get("type")
if underlyingSourceType != nil && !goja.IsUndefined(obj.Get("type")) && obj.Get("type").String() != ReadableStreamTypeBytes {
if underlyingSourceType != nil &&
!goja.IsUndefined(obj.Get("type")) &&
obj.Get("type").String() != ReadableStreamTypeBytes {
return underlyingSource, newError(TypeError, "invalid underlying source type")
}

Expand Down

0 comments on commit 3e5ddc9

Please sign in to comment.