Skip to content

Commit

Permalink
Address Pull Request review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiade committed Nov 6, 2023
1 parent bcf150c commit 922bbfa
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 57 deletions.
6 changes: 3 additions & 3 deletions js/modules/k6/experimental/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ type SeekMode = int

const (
// SeekModeStart sets the offset relative to the start of the file.
SeekModeStart SeekMode = iota + 1
SeekModeStart SeekMode = 0

// SeekModeCurrent seeks relative to the current offset.
SeekModeCurrent
SeekModeCurrent = 1

// SeekModeEnd seeks relative to the end of the file.
//
// When using this mode the seek operation will move backwards from
// the end of the file.
SeekModeEnd
SeekModeEnd = 2
)

func (f *file) size() int64 {
Expand Down
92 changes: 47 additions & 45 deletions js/modules/k6/experimental/fs/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,57 +186,51 @@ func (f *File) Stat() *goja.Promise {
//
// It is possible for a read to successfully return with 0 bytes.
// This does not indicate EOF.
//
//nolint:funlen
func (f *File) Read(into goja.Value) *goja.Promise {
// This method performs an asynchronous read operation and modifies the provided Uint8Array in place.
// To ensure thread safety and avoid concurrency issues, we take special precautions when creating the promise:
//
// 1. Instead of using the standard [promises.New] method, we manually create a promise.
// 2. We register a callback to be executed by the VU's runtime. This ensures that the modification
// of the JS runtime's `buffer` occurs on the main thread during the promise's resolution.
promise, resolveFunc, rejectFunc := f.vu.Runtime().NewPromise()
promise, resolve, reject := f.vu.Runtime().NewPromise()

if common.IsNullish(into) {
rejectFunc(newFsError(TypeError, "read() failed; reason: into cannot be null or undefined"))
reject(newFsError(TypeError, "read() failed; reason: into cannot be null or undefined"))
return promise
}

// We expect the into argument to be a `Uint8Array` instance

// intoObj := into.ToObject(f.vu.Runtime())
// uint8ArrayConstructor := f.vu.Runtime().Get("Uint8Array")
// if isUint8Array := intoObj.Get("constructor").SameAs(uint8ArrayConstructor); !isUint8Array {
intoObj := into.ToObject(f.vu.Runtime())
uint8ArrayConstructor := f.vu.Runtime().Get("Uint8Array")
if isUint8Array := intoObj.Get("constructor").SameAs(uint8ArrayConstructor); !isUint8Array {
rejectFunc(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
if !isUint8Array(f.vu.Runtime(), intoObj) {
reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise
}

// Obtain the underlying ArrayBuffer from the Uint8Array
ab, ok := intoObj.Get("buffer").Export().(goja.ArrayBuffer)
if !ok {
rejectFunc(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise
}

// Copy the ArrayBuffer into a byte slice, so that we can pass it to the
// [file.Read] method without risking to modify the original ArrayBuffer, and
// running into concurrency issues (data race).
// To avoid concurrency linked to modifying the runtime's `into` buffer from multiple
// goroutines we make sure to work on a separate copy, and will copy the bytes back
// into the runtime's `into` buffer once the promise is resolved.
intoBytes := ab.Bytes()
buffer := make([]byte, len(intoBytes))

// We register a callback to be executed by the VU's runtime.
// This ensures that the modification of the JS runtime's `into` buffer
// occurs on the main thread, during the promise's resolution.
callback := f.vu.RegisterCallback()
go func() {
n, err := f.file.Read(buffer)
n, readErr := f.file.Read(buffer)
callback(func() error {
if err == nil {
// Although the read operation happens as part of the goroutine, we
// still need to make sure that:
// 1. Any side effects, like modifying the `buffer`, are deferred and
// executed on the main thread via the registered callback.
// 2. This approach ensures that while the file read operation can proceed
// asynchronously, any side effects that might interfere with the JS runtime
// are executed in a controlled and sequential manner on the main thread.
_ = copy(intoBytes, buffer)
resolveFunc(n)
_ = copy(intoBytes[0:n], buffer)

// Read was successful, resolve early with the number of
// bytes read.
if readErr == nil {
resolve(n)
return nil
}

Expand All @@ -246,23 +240,18 @@ func (f *File) Read(into goja.Value) *goja.Promise {
// However, following deno's behavior, we express
// EOF to users by returning null, when and only when there aren't any
// more bytes to read.
//
// Thus, although the [file.Read] method will return an EOFError, and
// an n > 0, we make sure to take the EOFError returned into consideration
// only when n == 0.
var fsErr *fsError
isFsErr := errors.As(err, &fsErr)
isFSErr := errors.As(readErr, &fsErr)

if !isFsErr {
rejectFunc(err)
if !isFSErr {
reject(readErr)
return nil
}

_ = copy(intoBytes, buffer)
if fsErr.kind == EOFError && n == 0 {
resolveFunc(nil)
resolve(goja.Null())
} else {
resolveFunc(n)
resolve(n)
}

return nil
Expand All @@ -278,7 +267,7 @@ func (f *File) Read(into goja.Value) *goja.Promise {
// is expressed in bytes from the selected start, current, or end position depending
// the provided `whence`.
func (f *File) Seek(offset goja.Value, whence goja.Value) *goja.Promise {
promise, resolve, reject := promises.New(f.vu)
promise, resolve, reject := f.vu.Runtime().NewPromise()

if common.IsNullish(offset) {
reject(newFsError(TypeError, "seek() failed; reason: the offset argument cannot be null or undefined"))
Expand All @@ -297,7 +286,7 @@ func (f *File) Seek(offset goja.Value, whence goja.Value) *goja.Promise {
}

var intWhence int64
if err := f.vu.Runtime().ExportTo(whence, &intWhence); err != nil {
if err := f.vu.Runtime().ExportTo(whence, intWhence); err != nil {
reject(newFsError(TypeError, "seek() failed; reason: the whence argument cannot be interpreted as integer"))
return promise
}
Expand All @@ -311,15 +300,28 @@ func (f *File) Seek(offset goja.Value, whence goja.Value) *goja.Promise {
return promise
}

callback := f.vu.RegisterCallback()
go func() {
newOffset, err := f.file.Seek(intOffset, seekMode)
if err != nil {
reject(err)
return
}
callback(func() error {
if err != nil {
reject(err)
return err
}

resolve(newOffset)
resolve(newOffset)
return nil
})
}()

return promise
}

func isUint8Array(rt *goja.Runtime, o *goja.Object) bool {
uint8ArrayConstructor := rt.Get("Uint8Array")
if isUint8Array := o.Get("constructor").SameAs(uint8ArrayConstructor); !isUint8Array {
return false
}

return true
}
41 changes: 32 additions & 9 deletions js/modules/k6/experimental/fs/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestFile(t *testing.T) {

testFilePath := fsext.FilePathSeparator + testFileName
fs := newTestFs(t, func(fs afero.Fs) error {
return afero.WriteFile(fs, testFilePath, []byte("hello"), 0o644)
return afero.WriteFile(fs, testFilePath, []byte("01234"), 0o644)
})
runtime.VU.InitEnvField.FileSystems["file"] = fs

Expand All @@ -253,19 +253,37 @@ func TestFile(t *testing.T) {
throw 'expected read to return 3, got ' + bytesRead + ' instead';
}
// We expect the buffer to be filled with the three first
// bytes of the file, namely '012'.
if (buffer[0] !== 48 || buffer[1] !== 49 || buffer[2] !== 50) {
throw 'expected buffer to be [48, 49, 50], got ' + buffer + ' instead';
}
fileContent.set(buffer, 0);
bytesRead = await file.read(buffer)
if (bytesRead !== 2) {
throw 'expected read to return 2, got ' + bytesRead + ' instead';
}
// We expect the buffer to hold the two last bytes of the
// file, namely '34', and as we read only two bytes, its last
// one is expected to be untouched from the previous read.
if (buffer[0] !== 51 || buffer[1] !== 52 || buffer[2] !== 50) {
throw 'expected buffer to be [51, 52, 50], got ' + buffer + ' instead';
}
fileContent.set(buffer.subarray(0, bytesRead), 3);
bytesRead = await file.read(buffer)
if (bytesRead !== null) {
throw 'expected read to return null, got ' + bytesRead + ' instead';
}
// We expect the buffer to be untouched.
if (buffer[0] !== 51 || buffer[1] !== 52 || buffer[2] !== 50) {
throw 'expected buffer to be [51, 52, 50], got ' + buffer + ' instead';
}
`, testFilePath)))

assert.NoError(t, err)
Expand All @@ -279,25 +297,30 @@ func TestFile(t *testing.T) {

testFilePath := fsext.FilePathSeparator + testFileName
fs := newTestFs(t, func(fs afero.Fs) error {
return afero.WriteFile(fs, testFilePath, []byte("hello"), 0o644)
return afero.WriteFile(fs, testFilePath, []byte("012"), 0o644)
})
runtime.VU.InitEnvField.FileSystems["file"] = fs

_, err = runtime.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
let buffer = new Uint8Array(5);
let buffer = new Uint8Array(3);
// Reading the whole file should return 5.
// Reading the whole file should return 3.
let bytesRead = await file.read(buffer);
if (bytesRead !== 5) {
throw 'expected read to return 5, got ' + bytesRead + ' instead';
if (bytesRead !== 3) {
throw 'expected read to return 3, got ' + bytesRead + ' instead';
}
// Reading from the end of the file should return null.
// Reading from the end of the file should return null and
// leave the buffer untouched.
bytesRead = await file.read(buffer);
if (bytesRead !== null) {
throw 'expected read to return null got ' + bytesRead + ' instead';
}
if (buffer[0] !== 48 || buffer[1] !== 49 || buffer[2] !== 50) {
throw 'expected buffer to be [48, 49, 50], got ' + buffer + ' instead';
}
`, testFilePath)))

assert.NoError(t, err)
Expand Down Expand Up @@ -414,7 +437,7 @@ func TestFile(t *testing.T) {

testFilePath := fsext.FilePathSeparator + testFileName
fs := newTestFs(t, func(fs afero.Fs) error {
return afero.WriteFile(fs, testFilePath, []byte("hello"), 0o644)
return afero.WriteFile(fs, testFilePath, []byte("012"), 0o644)
})
runtime.VU.InitEnvField.FileSystems["file"] = fs

Expand All @@ -439,7 +462,7 @@ func TestFile(t *testing.T) {

testFilePath := fsext.FilePathSeparator + testFileName
fs := newTestFs(t, func(fs afero.Fs) error {
return afero.WriteFile(fs, testFilePath, []byte("hello"), 0o644)
return afero.WriteFile(fs, testFilePath, []byte("012"), 0o644)
})
runtime.VU.InitEnvField.FileSystems["file"] = fs

Expand Down

0 comments on commit 922bbfa

Please sign in to comment.