From 9152450341a9155b79024cb2c84d4189ab56aca2 Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 15 May 2023 21:52:46 -0700 Subject: [PATCH 1/6] feat: handle buffer or utf8 --- index.js | 72 ++++++++++++++++++++++++++++++++-------------- test/retry.test.js | 18 ++++++------ test/write.test.js | 16 +++++------ 3 files changed, 68 insertions(+), 38 deletions(-) diff --git a/index.js b/index.js index 40a6a00..782ba77 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,7 @@ const path = require('path') const sleep = require('atomic-sleep') const BUSY_WRITE_TIMEOUT = 100 +const kEmptyBuffer = Buffer.allocUnsafe(0) // 16 KB. Don't write more than docker buffer size. // https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13 @@ -95,7 +96,7 @@ function SonicBoom (opts) { this._len = 0 this.fd = -1 this._writing = false - this._writingBuf = '' + this._writingBuf = kEmptyBuffer this._ending = false this._reopening = false this._asyncDrainScheduled = false @@ -106,6 +107,7 @@ function SonicBoom (opts) { this.maxLength = maxLength || 0 this.maxWrite = maxWrite || MAX_WRITE this.sync = sync || false + this.writable = true this._fsync = fsync || false this.append = append || false this.mode = mode @@ -141,7 +143,7 @@ function SonicBoom (opts) { } else { // Let's give the destination some time to process the chunk. setTimeout(() => { - fs.write(this.fd, this._writingBuf, 'utf8', this.release) + fs.write(this.fd, this._writingBuf, this.release) }, BUSY_WRITE_TIMEOUT) } } else { @@ -167,20 +169,20 @@ function SonicBoom (opts) { // TODO if we have a multi-byte character in the buffer, we need to // n might not be the same as this._writingBuf.length, so we might loose // characters here. The solution to this problem is to use a Buffer for _writingBuf. - this._writingBuf = this._writingBuf.slice(n) + this._writingBuf = this._writingBuf.subarray(n) if (this._writingBuf.length) { if (!this.sync) { - fs.write(this.fd, this._writingBuf, 'utf8', this.release) + fs.write(this.fd, this._writingBuf, this.release) return } try { do { - const n = fs.writeSync(this.fd, this._writingBuf, 'utf8') + const n = fs.writeSync(this.fd, this._writingBuf) this._len -= n - this._writingBuf = this._writingBuf.slice(n) - } while (this._writingBuf) + this._writingBuf = this._writingBuf.subarray(n) + } while (this._writingBuf.length) } catch (err) { this.release(err) return @@ -234,11 +236,35 @@ function emitDrain (sonic) { inherits(SonicBoom, EventEmitter) -SonicBoom.prototype.write = function (data) { +function bufLength (bufs) { + let idx = 0 + const l = bufs.length + let size = 0 + while (idx < l) { + size += bufs[idx].length + idx += 1 + } + return size +} + +function mergeBuf (bufs) { + if (bufs.length === 0) { + return null + } + + if (bufs.length === 1) { + return bufs[0] + } + + return Buffer.concat(bufs) +} + +SonicBoom.prototype.write = function (_data) { if (this.destroyed) { throw new Error('SonicBoom destroyed') } + const data = Buffer.isBuffer(_data) ? _data : Buffer.from(_data, 'utf8') const len = this._len + data.length const bufs = this._bufs @@ -249,11 +275,11 @@ SonicBoom.prototype.write = function (data) { if ( bufs.length === 0 || - bufs[bufs.length - 1].length + data.length > this.maxWrite + bufLength(bufs[bufs.length - 1]) + data.length > this.maxWrite ) { - bufs.push('' + data) + bufs.push([data]) } else { - bufs[bufs.length - 1] += data + bufs[bufs.length - 1].push(data) } this._len = len @@ -275,7 +301,7 @@ SonicBoom.prototype.flush = function () { } if (this._bufs.length === 0) { - this._bufs.push('') + this._bufs.push([]) } actualWrite(this) @@ -360,18 +386,22 @@ SonicBoom.prototype.flushSync = function () { } if (!this._writing && this._writingBuf.length > 0) { - this._bufs.unshift(this._writingBuf) - this._writingBuf = '' + this._bufs.unshift([this._writingBuf]) + this._writingBuf = kEmptyBuffer } - let buf = '' + let buf = kEmptyBuffer while (this._bufs.length || buf.length) { if (buf.length <= 0) { - buf = this._bufs[0] + buf = mergeBuf(this._bufs[0]) + if (buf === null) { + this._bufs.shift() + continue + } } try { - const n = fs.writeSync(this.fd, buf, 'utf8') - buf = buf.slice(n) + const n = fs.writeSync(this.fd, buf) + buf = buf.subarray(n) this._len = Math.max(this._len - n, 0) if (buf.length <= 0) { this._bufs.shift() @@ -397,17 +427,17 @@ SonicBoom.prototype.destroy = function () { function actualWrite (sonic) { const release = sonic.release sonic._writing = true - sonic._writingBuf = sonic._writingBuf || sonic._bufs.shift() || '' + sonic._writingBuf = (sonic._writingBuf.length && sonic._writingBuf) || mergeBuf(sonic._bufs.shift()) || kEmptyBuffer if (sonic.sync) { try { - const written = fs.writeSync(sonic.fd, sonic._writingBuf, 'utf8') + const written = fs.writeSync(sonic.fd, sonic._writingBuf) release(null, written) } catch (err) { release(err) } } else { - fs.write(sonic.fd, sonic._writingBuf, 'utf8', release) + fs.write(sonic.fd, sonic._writingBuf, release) } } diff --git a/test/retry.test.js b/test/retry.test.js index a9a86ed..2b35d89 100644 --- a/test/retry.test.js +++ b/test/retry.test.js @@ -16,7 +16,7 @@ function buildTests (test, sync) { t.plan(7) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EAGAIN') @@ -56,7 +56,7 @@ test('emit error on async EAGAIN', (t) => { t.plan(11) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EAGAIN') @@ -109,7 +109,7 @@ test('retry on EAGAIN (sync)', (t) => { t.plan(7) const fakeFs = Object.create(fs) - fakeFs.writeSync = function (fd, buf, enc, cb) { + fakeFs.writeSync = function (fd, buf, cb) { t.pass('fake fs.writeSync called') fakeFs.writeSync = fs.writeSync const err = new Error('EAGAIN') @@ -148,7 +148,7 @@ test('emit error on EAGAIN (sync)', (t) => { t.plan(11) const fakeFs = Object.create(fs) - fakeFs.writeSync = function (fd, buf, enc, cb) { + fakeFs.writeSync = function (fd, buf, cb) { t.pass('fake fs.writeSync called') fakeFs.writeSync = fs.writeSync const err = new Error('EAGAIN') @@ -228,7 +228,7 @@ test('retryEAGAIN receives remaining buffer on async if write fails', (t) => { t.ok(stream.write('done')) }) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EAGAIN') @@ -279,14 +279,14 @@ test('retryEAGAIN receives remaining buffer if exceeds maxWrite', (t) => { t.pass('ready emitted') }) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') const err = new Error('EAGAIN') err.code = 'EAGAIN' process.nextTick(cb, err) } - fakeFs.writeSync = function (fd, buf, enc, cb) { + fakeFs.writeSync = function (fd, buf, cb) { t.pass('fake fs.write called') const err = new Error('EAGAIN') err.code = 'EAGAIN' @@ -325,7 +325,7 @@ test('retry on EBUSY', (t) => { t.plan(7) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EBUSY') @@ -364,7 +364,7 @@ test('emit error on async EBUSY', (t) => { t.plan(11) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EBUSY') diff --git a/test/write.test.js b/test/write.test.js index f911561..f6d53bf 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -181,12 +181,12 @@ function buildTests (test, sync) { }) if (sync) { - fakeFs.writeSync = function (fd, buf, enc) { + fakeFs.writeSync = function (fd, buf) { t.pass('fake fs.writeSync called') throw new Error('recoverable error') } } else { - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') setTimeout(() => cb(new Error('recoverable error')), 0) } @@ -253,11 +253,11 @@ test('write buffers that are not totally written', (t) => { t.plan(9) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('fake fs.write called') - fakeFs.write = function (fd, buf, enc, cb) { + fakeFs.write = function (fd, buf, cb) { t.pass('calling real fs.write, ' + buf) - fs.write(fd, buf, enc, cb) + fs.write(fd, buf, cb) } process.nextTick(cb, null, 0) } @@ -336,7 +336,7 @@ test('write enormously large buffers async atomicly', (t) => { const buf = Buffer.alloc(1023).fill('x').toString() - fakeFs.write = function (fd, _buf, enc, cb) { + fakeFs.write = function (fd, _buf, cb) { if (_buf.length % buf.length !== 0) { t.fail('write called with wrong buffer size') } @@ -375,7 +375,7 @@ test('write should not drop new data if buffer is not full', (t) => { const buf = Buffer.alloc(100).fill('x').toString() - fakeFs.write = function (fd, _buf, enc, cb) { + fakeFs.write = function (fd, _buf, cb) { t.equal(_buf.length, buf.length + 2) setImmediate(cb, null, _buf.length) fakeFs.write = () => t.error('shouldnt call write again') @@ -407,7 +407,7 @@ test('write should drop new data if buffer is full', (t) => { const buf = Buffer.alloc(100).fill('x').toString() - fakeFs.write = function (fd, _buf, enc, cb) { + fakeFs.write = function (fd, _buf, cb) { t.equal(_buf.length, buf.length) setImmediate(cb, null, _buf.length) fakeFs.write = () => t.error('shouldnt call write more than once') From 1ed75f02a33585cc512c550280bf80d77ba9308b Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 15 May 2023 22:14:36 -0700 Subject: [PATCH 2/6] chore: adjust bench to show raw buffer perf --- bench.js | 37 ++++++++++++++++++++++++++++++++----- index.js | 31 ++++++++++--------------------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/bench.js b/bench.js index a08682f..54b5ae6 100644 --- a/bench.js +++ b/bench.js @@ -15,11 +15,8 @@ const dummyConsole = new Console(fs.createWriteStream('/dev/null')) const MAX = 10000 -let str = '' - -for (let i = 0; i < 10; i++) { - str += 'hello' -} +const buf = Buffer.alloc(50, 'hello', 'utf8') +const str = buf.toString() setTimeout(doBench, 100) @@ -59,6 +56,36 @@ const run = bench([ dummyConsole.log(str) } setImmediate(cb) + }, + function benchSonicBuf (cb) { + sonic.once('drain', cb) + for (let i = 0; i < MAX; i++) { + sonic.write(buf) + } + }, + function benchSonicSyncBuf (cb) { + sonicSync.once('drain', cb) + for (let i = 0; i < MAX; i++) { + sonicSync.write(buf) + } + }, + function benchSonic4kBuf (cb) { + sonic4k.once('drain', cb) + for (let i = 0; i < MAX; i++) { + sonic4k.write(buf) + } + }, + function benchSonicSync4kBuf (cb) { + sonicSync4k.once('drain', cb) + for (let i = 0; i < MAX; i++) { + sonicSync4k.write(buf) + } + }, + function benchCoreBuf (cb) { + core.once('drain', cb) + for (let i = 0; i < MAX; i++) { + core.write(buf) + } } ], 1000) diff --git a/index.js b/index.js index 782ba77..a27df79 100644 --- a/index.js +++ b/index.js @@ -93,6 +93,7 @@ function SonicBoom (opts) { fd = fd || dest this._bufs = [] + this._lens = [] this._len = 0 this.fd = -1 this._writing = false @@ -236,27 +237,16 @@ function emitDrain (sonic) { inherits(SonicBoom, EventEmitter) -function bufLength (bufs) { - let idx = 0 - const l = bufs.length - let size = 0 - while (idx < l) { - size += bufs[idx].length - idx += 1 - } - return size -} - -function mergeBuf (bufs) { +function mergeBuf (bufs, len) { if (bufs.length === 0) { - return null + return kEmptyBuffer } if (bufs.length === 1) { return bufs[0] } - return Buffer.concat(bufs) + return Buffer.concat(bufs, len) } SonicBoom.prototype.write = function (_data) { @@ -267,6 +257,7 @@ SonicBoom.prototype.write = function (_data) { const data = Buffer.isBuffer(_data) ? _data : Buffer.from(_data, 'utf8') const len = this._len + data.length const bufs = this._bufs + const lens = this._lens if (this.maxLength && len > this.maxLength) { this.emit('drop', data) @@ -275,11 +266,13 @@ SonicBoom.prototype.write = function (_data) { if ( bufs.length === 0 || - bufLength(bufs[bufs.length - 1]) + data.length > this.maxWrite + lens[lens.length - 1] + data.length > this.maxWrite ) { bufs.push([data]) + lens.push(data.length) } else { bufs[bufs.length - 1].push(data) + lens[lens.length - 1] += data.length } this._len = len @@ -393,11 +386,7 @@ SonicBoom.prototype.flushSync = function () { let buf = kEmptyBuffer while (this._bufs.length || buf.length) { if (buf.length <= 0) { - buf = mergeBuf(this._bufs[0]) - if (buf === null) { - this._bufs.shift() - continue - } + buf = mergeBuf(this._bufs[0], this._lens[0]) } try { const n = fs.writeSync(this.fd, buf) @@ -427,7 +416,7 @@ SonicBoom.prototype.destroy = function () { function actualWrite (sonic) { const release = sonic.release sonic._writing = true - sonic._writingBuf = (sonic._writingBuf.length && sonic._writingBuf) || mergeBuf(sonic._bufs.shift()) || kEmptyBuffer + sonic._writingBuf = sonic._writingBuf.length ? sonic._writingBuf : mergeBuf(sonic._bufs.shift(), sonic._lens.shift()) if (sonic.sync) { try { From 99b04707d83014854624d8fb14b4e075d751023e Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 15 May 2023 22:19:37 -0700 Subject: [PATCH 3/6] chore: empty out lengths array --- index.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/index.js b/index.js index a27df79..60c8b29 100644 --- a/index.js +++ b/index.js @@ -295,6 +295,7 @@ SonicBoom.prototype.flush = function () { if (this._bufs.length === 0) { this._bufs.push([]) + this._lens.push(0) } actualWrite(this) @@ -394,6 +395,7 @@ SonicBoom.prototype.flushSync = function () { this._len = Math.max(this._len - n, 0) if (buf.length <= 0) { this._bufs.shift() + this._lens.shift() } } catch (err) { const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY' @@ -438,6 +440,7 @@ function actualClose (sonic) { sonic.destroyed = true sonic._bufs = [] + sonic._lens = [] if (sonic.fd !== 1 && sonic.fd !== 2) { fs.close(sonic.fd, done) From a694b87fc1eb7a36b92fdcb598699f9cac8ee68b Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 16 May 2023 12:54:54 -0700 Subject: [PATCH 4/6] feat: contentMode setting --- bench.js | 20 +++--- index.js | 170 +++++++++++++++++++++++++++++++++++++++------ test/retry.test.js | 30 ++++---- test/write.test.js | 26 +++---- 4 files changed, 189 insertions(+), 57 deletions(-) diff --git a/bench.js b/bench.js index 54b5ae6..26cd526 100644 --- a/bench.js +++ b/bench.js @@ -11,6 +11,10 @@ const sonic = new SonicBoom({ fd }) const sonic4k = new SonicBoom({ fd, minLength: 4096 }) const sonicSync = new SonicBoom({ fd, sync: true }) const sonicSync4k = new SonicBoom({ fd, minLength: 4096, sync: true }) +const sonicBuffer = new SonicBoom({ fd, contentMode: 'buffer' }) +const sonic4kBuffer = new SonicBoom({ fd, contentMode: 'buffer', minLength: 4096 }) +const sonicSyncBuffer = new SonicBoom({ fd, contentMode: 'buffer', sync: true }) +const sonicSync4kBuffer = new SonicBoom({ fd, contentMode: 'buffer', minLength: 4096, sync: true }) const dummyConsole = new Console(fs.createWriteStream('/dev/null')) const MAX = 10000 @@ -58,27 +62,27 @@ const run = bench([ setImmediate(cb) }, function benchSonicBuf (cb) { - sonic.once('drain', cb) + sonicBuffer.once('drain', cb) for (let i = 0; i < MAX; i++) { - sonic.write(buf) + sonicBuffer.write(buf) } }, function benchSonicSyncBuf (cb) { - sonicSync.once('drain', cb) + sonicSyncBuffer.once('drain', cb) for (let i = 0; i < MAX; i++) { - sonicSync.write(buf) + sonicSyncBuffer.write(buf) } }, function benchSonic4kBuf (cb) { - sonic4k.once('drain', cb) + sonic4kBuffer.once('drain', cb) for (let i = 0; i < MAX; i++) { - sonic4k.write(buf) + sonic4kBuffer.write(buf) } }, function benchSonicSync4kBuf (cb) { - sonicSync4k.once('drain', cb) + sonicSync4kBuffer.once('drain', cb) for (let i = 0; i < MAX; i++) { - sonicSync4k.write(buf) + sonicSync4kBuffer.write(buf) } }, function benchCoreBuf (cb) { diff --git a/index.js b/index.js index 60c8b29..4ed326c 100644 --- a/index.js +++ b/index.js @@ -57,7 +57,11 @@ function openFile (file, sonic) { // start if (!sonic._writing && sonic._len > sonic.minLength && !sonic.destroyed) { - actualWrite(sonic) + if (sonic.contentMode === 'buffer') { + actualWriteBuffer(sonic) + } else { + actualWrite(sonic) + } } } @@ -88,16 +92,15 @@ function SonicBoom (opts) { return new SonicBoom(opts) } - let { fd, dest, minLength, maxLength, maxWrite, sync, append = true, mode, mkdir, retryEAGAIN, fsync } = opts || {} + let { fd, dest, minLength, maxLength, maxWrite, sync, append = true, mkdir, retryEAGAIN, fsync, contentMode, mode } = opts || {} fd = fd || dest - this._bufs = [] - this._lens = [] this._len = 0 this.fd = -1 + this._bufs = [] + this._lens = [] this._writing = false - this._writingBuf = kEmptyBuffer this._ending = false this._reopening = false this._asyncDrainScheduled = false @@ -115,6 +118,29 @@ function SonicBoom (opts) { this.retryEAGAIN = retryEAGAIN || (() => true) this.mkdir = mkdir || false + let fsWriteSync + let fsWrite + let _actualWrite + if (contentMode === 'buffer') { + this.contentMode = 'buffer' + this._writingBuf = kEmptyBuffer + this.write = writeBuffer + this.flush = flushBuffer + this.flushSync = flushBufferSync + fsWriteSync = () => fs.writeSync(this.fd, this._writingBuf) + fsWrite = () => fs.write(this.fd, this._writingBuf, this.release) + _actualWrite = actualWriteBuffer + } else { + this.contentMode = 'utf8' + this._writingBuf = '' + this.write = write + this.flush = flush + this.flushSync = flushSync + fsWriteSync = () => fs.writeSync(this.fd, this._writingBuf, 'utf8') + fsWrite = () => fs.write(this.fd, this._writingBuf, 'utf8', this.release) + _actualWrite = actualWrite + } + if (typeof fd === 'number') { this.fd = fd process.nextTick(() => this.emit('ready')) @@ -143,9 +169,7 @@ function SonicBoom (opts) { } } else { // Let's give the destination some time to process the chunk. - setTimeout(() => { - fs.write(this.fd, this._writingBuf, this.release) - }, BUSY_WRITE_TIMEOUT) + setTimeout(fsWrite, BUSY_WRITE_TIMEOUT) } } else { this._writing = false @@ -170,19 +194,19 @@ function SonicBoom (opts) { // TODO if we have a multi-byte character in the buffer, we need to // n might not be the same as this._writingBuf.length, so we might loose // characters here. The solution to this problem is to use a Buffer for _writingBuf. - this._writingBuf = this._writingBuf.subarray(n) + this._writingBuf = this._writingBuf.slice(n) if (this._writingBuf.length) { if (!this.sync) { - fs.write(this.fd, this._writingBuf, this.release) + fsWrite() return } try { do { - const n = fs.writeSync(this.fd, this._writingBuf) + const n = fsWriteSync() this._len -= n - this._writingBuf = this._writingBuf.subarray(n) + this._writingBuf = this._writingBuf.slice(n) } while (this._writingBuf.length) } catch (err) { this.release(err) @@ -200,10 +224,10 @@ function SonicBoom (opts) { this._reopening = false this.reopen() } else if (len > this.minLength) { - actualWrite(this) + _actualWrite(this) } else if (this._ending) { if (len > 0) { - actualWrite(this) + _actualWrite(this) } else { this._writing = false actualClose(this) @@ -249,12 +273,42 @@ function mergeBuf (bufs, len) { return Buffer.concat(bufs, len) } -SonicBoom.prototype.write = function (_data) { +function write (data) { + if (this.destroyed) { + throw new Error('SonicBoom destroyed') + } + + const len = this._len + data.length + const bufs = this._bufs + + if (this.maxLength && len > this.maxLength) { + this.emit('drop', data) + return this._len < this._hwm + } + + if ( + bufs.length === 0 || + bufs[bufs.length - 1].length + data.length > this.maxWrite + ) { + bufs.push('' + data) + } else { + bufs[bufs.length - 1] += data + } + + this._len = len + + if (!this._writing && this._len >= this.minLength) { + actualWrite(this) + } + + return this._len < this._hwm +} + +function writeBuffer (data) { if (this.destroyed) { throw new Error('SonicBoom destroyed') } - const data = Buffer.isBuffer(_data) ? _data : Buffer.from(_data, 'utf8') const len = this._len + data.length const bufs = this._bufs const lens = this._lens @@ -278,13 +332,29 @@ SonicBoom.prototype.write = function (_data) { this._len = len if (!this._writing && this._len >= this.minLength) { - actualWrite(this) + actualWriteBuffer(this) } return this._len < this._hwm } -SonicBoom.prototype.flush = function () { +function flush () { + if (this.destroyed) { + throw new Error('SonicBoom destroyed') + } + + if (this._writing || this.minLength <= 0) { + return + } + + if (this._bufs.length === 0) { + this._bufs.push('') + } + + actualWrite(this) +} + +function flushBuffer () { if (this.destroyed) { throw new Error('SonicBoom destroyed') } @@ -298,7 +368,7 @@ SonicBoom.prototype.flush = function () { this._lens.push(0) } - actualWrite(this) + actualWriteBuffer(this) } SonicBoom.prototype.reopen = function (file) { @@ -364,13 +434,54 @@ SonicBoom.prototype.end = function () { } if (this._len > 0 && this.fd >= 0) { - actualWrite(this) + if (this.contentMode === 'buffer') { + actualWriteBuffer(this) + } else { + actualWrite(this) + } } else { actualClose(this) } } -SonicBoom.prototype.flushSync = function () { +function flushSync () { + if (this.destroyed) { + throw new Error('SonicBoom destroyed') + } + + if (this.fd < 0) { + throw new Error('sonic boom is not ready yet') + } + + if (!this._writing && this._writingBuf.length > 0) { + this._bufs.unshift(this._writingBuf) + this._writingBuf = '' + } + + let buf = '' + while (this._bufs.length || buf) { + if (buf.length <= 0) { + buf = this._bufs[0] + } + try { + const n = fs.writeSync(this.fd, buf, 'utf8') + buf = buf.slice(n) + this._len = Math.max(this._len - n, 0) + if (buf.length <= 0) { + this._bufs.shift() + } + } catch (err) { + const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY' + if (shouldRetry && !this.retryEAGAIN(err, buf.length, this._len - buf.length)) { + throw err + } + + sleep(BUSY_WRITE_TIMEOUT) + } + } +} + +function flushBufferSync () { if (this.destroyed) { throw new Error('SonicBoom destroyed') } @@ -416,6 +527,23 @@ SonicBoom.prototype.destroy = function () { } function actualWrite (sonic) { + const release = sonic.release + sonic._writing = true + sonic._writingBuf = sonic._writingBuf || sonic._bufs.shift() || '' + + if (sonic.sync) { + try { + const written = fs.writeSync(sonic.fd, sonic._writingBuf, 'utf8') + release(null, written) + } catch (err) { + release(err) + } + } else { + fs.write(sonic.fd, sonic._writingBuf, 'utf8', release) + } +} + +function actualWriteBuffer (sonic) { const release = sonic.release sonic._writing = true sonic._writingBuf = sonic._writingBuf.length ? sonic._writingBuf : mergeBuf(sonic._bufs.shift(), sonic._lens.shift()) diff --git a/test/retry.test.js b/test/retry.test.js index 2b35d89..6b62ee7 100644 --- a/test/retry.test.js +++ b/test/retry.test.js @@ -16,12 +16,12 @@ function buildTests (test, sync) { t.plan(7) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EAGAIN') err.code = 'EAGAIN' - process.nextTick(cb, err) + process.nextTick(args.pop(), err) } const SonicBoom = proxyquire('../', { fs: fakeFs @@ -56,12 +56,12 @@ test('emit error on async EAGAIN', (t) => { t.plan(11) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EAGAIN') err.code = 'EAGAIN' - process.nextTick(cb, err) + process.nextTick(args[args.length - 1], err) } const SonicBoom = proxyquire('../', { fs: fakeFs @@ -109,7 +109,7 @@ test('retry on EAGAIN (sync)', (t) => { t.plan(7) const fakeFs = Object.create(fs) - fakeFs.writeSync = function (fd, buf, cb) { + fakeFs.writeSync = function (fd, buf, enc) { t.pass('fake fs.writeSync called') fakeFs.writeSync = fs.writeSync const err = new Error('EAGAIN') @@ -148,7 +148,7 @@ test('emit error on EAGAIN (sync)', (t) => { t.plan(11) const fakeFs = Object.create(fs) - fakeFs.writeSync = function (fd, buf, cb) { + fakeFs.writeSync = function (fd, buf, enc) { t.pass('fake fs.writeSync called') fakeFs.writeSync = fs.writeSync const err = new Error('EAGAIN') @@ -228,13 +228,13 @@ test('retryEAGAIN receives remaining buffer on async if write fails', (t) => { t.ok(stream.write('done')) }) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EAGAIN') err.code = 'EAGAIN' t.ok(stream.write('sonic boom\n')) - process.nextTick(cb, err) + process.nextTick(args[args.length - 1], err) } t.ok(stream.write('hello world\n')) @@ -279,14 +279,14 @@ test('retryEAGAIN receives remaining buffer if exceeds maxWrite', (t) => { t.pass('ready emitted') }) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') const err = new Error('EAGAIN') err.code = 'EAGAIN' - process.nextTick(cb, err) + process.nextTick(args.pop(), err) } - fakeFs.writeSync = function (fd, buf, cb) { + fakeFs.writeSync = function (fd, buf, enc) { t.pass('fake fs.write called') const err = new Error('EAGAIN') err.code = 'EAGAIN' @@ -325,12 +325,12 @@ test('retry on EBUSY', (t) => { t.plan(7) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EBUSY') err.code = 'EBUSY' - process.nextTick(cb, err) + process.nextTick(args.pop(), err) } const SonicBoom = proxyquire('..', { fs: fakeFs @@ -364,12 +364,12 @@ test('emit error on async EBUSY', (t) => { t.plan(11) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') fakeFs.write = fs.write const err = new Error('EBUSY') err.code = 'EBUSY' - process.nextTick(cb, err) + process.nextTick(args.pop(), err) } const SonicBoom = proxyquire('..', { fs: fakeFs diff --git a/test/write.test.js b/test/write.test.js index f6d53bf..1c0913b 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -181,14 +181,14 @@ function buildTests (test, sync) { }) if (sync) { - fakeFs.writeSync = function (fd, buf) { + fakeFs.writeSync = function (fd, buf, enc) { t.pass('fake fs.writeSync called') throw new Error('recoverable error') } } else { - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') - setTimeout(() => cb(new Error('recoverable error')), 0) + setTimeout(() => args.pop()(new Error('recoverable error')), 0) } } @@ -253,13 +253,13 @@ test('write buffers that are not totally written', (t) => { t.plan(9) const fakeFs = Object.create(fs) - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('fake fs.write called') - fakeFs.write = function (fd, buf, cb) { + fakeFs.write = function (fd, buf, ...args) { t.pass('calling real fs.write, ' + buf) - fs.write(fd, buf, cb) + fs.write(fd, buf, ...args) } - process.nextTick(cb, null, 0) + process.nextTick(args[args.length - 1], null, 0) } const SonicBoom = proxyquire('../', { fs: fakeFs @@ -336,12 +336,12 @@ test('write enormously large buffers async atomicly', (t) => { const buf = Buffer.alloc(1023).fill('x').toString() - fakeFs.write = function (fd, _buf, cb) { + fakeFs.write = function (fd, _buf, ...args) { if (_buf.length % buf.length !== 0) { t.fail('write called with wrong buffer size') } - setImmediate(cb, null, _buf.length) + setImmediate(args[args.length - 1], null, _buf.length) } for (let i = 0; i < 1024 * 512; i++) { @@ -375,9 +375,9 @@ test('write should not drop new data if buffer is not full', (t) => { const buf = Buffer.alloc(100).fill('x').toString() - fakeFs.write = function (fd, _buf, cb) { + fakeFs.write = function (fd, _buf, ...args) { t.equal(_buf.length, buf.length + 2) - setImmediate(cb, null, _buf.length) + setImmediate(args[args.length - 1], null, _buf.length) fakeFs.write = () => t.error('shouldnt call write again') stream.end() } @@ -407,9 +407,9 @@ test('write should drop new data if buffer is full', (t) => { const buf = Buffer.alloc(100).fill('x').toString() - fakeFs.write = function (fd, _buf, cb) { + fakeFs.write = function (fd, _buf, ...args) { t.equal(_buf.length, buf.length) - setImmediate(cb, null, _buf.length) + setImmediate(args[args.length - 1], null, _buf.length) fakeFs.write = () => t.error('shouldnt call write more than once') } From ad82abce79c29c95bfe1a9d36c1e6f0bba4978d0 Mon Sep 17 00:00:00 2001 From: AVVS Date: Thu, 29 Jun 2023 09:40:28 -0700 Subject: [PATCH 5/6] chore: contentMode in .d.ts, save _actualWrite --- index.js | 68 ++++++++++++++++++++----------------------- types/index.d.ts | 1 + types/index.test-d.ts | 2 ++ 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/index.js b/index.js index 4ed326c..5936137 100644 --- a/index.js +++ b/index.js @@ -13,6 +13,9 @@ const kEmptyBuffer = Buffer.allocUnsafe(0) // https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13 const MAX_WRITE = 16 * 1024 +const kContentModeBuffer = 'buffer' +const kContentModeUtf8 = 'utf8' + function openFile (file, sonic) { sonic._opening = true sonic._writing = true @@ -57,11 +60,7 @@ function openFile (file, sonic) { // start if (!sonic._writing && sonic._len > sonic.minLength && !sonic.destroyed) { - if (sonic.contentMode === 'buffer') { - actualWriteBuffer(sonic) - } else { - actualWrite(sonic) - } + sonic._actualWrite() } } @@ -120,25 +119,24 @@ function SonicBoom (opts) { let fsWriteSync let fsWrite - let _actualWrite - if (contentMode === 'buffer') { - this.contentMode = 'buffer' + if (contentMode === kContentModeBuffer) { this._writingBuf = kEmptyBuffer this.write = writeBuffer this.flush = flushBuffer this.flushSync = flushBufferSync + this._actualWrite = actualWriteBuffer fsWriteSync = () => fs.writeSync(this.fd, this._writingBuf) fsWrite = () => fs.write(this.fd, this._writingBuf, this.release) - _actualWrite = actualWriteBuffer - } else { - this.contentMode = 'utf8' + } else if (contentMode === undefined || contentMode === kContentModeUtf8) { this._writingBuf = '' this.write = write this.flush = flush this.flushSync = flushSync + this._actualWrite = actualWrite fsWriteSync = () => fs.writeSync(this.fd, this._writingBuf, 'utf8') fsWrite = () => fs.write(this.fd, this._writingBuf, 'utf8', this.release) - _actualWrite = actualWrite + } else { + throw new Error(`SonicBoom supports "${kContentModeUtf8}" and "${kContentModeBuffer}", but passed ${contentMode}`) } if (typeof fd === 'number') { @@ -224,10 +222,10 @@ function SonicBoom (opts) { this._reopening = false this.reopen() } else if (len > this.minLength) { - _actualWrite(this) + this._actualWrite() } else if (this._ending) { if (len > 0) { - _actualWrite(this) + this._actualWrite() } else { this._writing = false actualClose(this) @@ -298,7 +296,7 @@ function write (data) { this._len = len if (!this._writing && this._len >= this.minLength) { - actualWrite(this) + this._actualWrite() } return this._len < this._hwm @@ -332,7 +330,7 @@ function writeBuffer (data) { this._len = len if (!this._writing && this._len >= this.minLength) { - actualWriteBuffer(this) + this._actualWrite() } return this._len < this._hwm @@ -351,7 +349,7 @@ function flush () { this._bufs.push('') } - actualWrite(this) + this._actualWrite() } function flushBuffer () { @@ -368,7 +366,7 @@ function flushBuffer () { this._lens.push(0) } - actualWriteBuffer(this) + this._actualWrite() } SonicBoom.prototype.reopen = function (file) { @@ -434,11 +432,7 @@ SonicBoom.prototype.end = function () { } if (this._len > 0 && this.fd >= 0) { - if (this.contentMode === 'buffer') { - actualWriteBuffer(this) - } else { - actualWrite(this) - } + this._actualWrite() } else { actualClose(this) } @@ -526,37 +520,37 @@ SonicBoom.prototype.destroy = function () { actualClose(this) } -function actualWrite (sonic) { - const release = sonic.release - sonic._writing = true - sonic._writingBuf = sonic._writingBuf || sonic._bufs.shift() || '' +function actualWrite () { + const release = this.release + this._writing = true + this._writingBuf = this._writingBuf || this._bufs.shift() || '' - if (sonic.sync) { + if (this.sync) { try { - const written = fs.writeSync(sonic.fd, sonic._writingBuf, 'utf8') + const written = fs.writeSync(this.fd, this._writingBuf, 'utf8') release(null, written) } catch (err) { release(err) } } else { - fs.write(sonic.fd, sonic._writingBuf, 'utf8', release) + fs.write(this.fd, this._writingBuf, 'utf8', release) } } -function actualWriteBuffer (sonic) { - const release = sonic.release - sonic._writing = true - sonic._writingBuf = sonic._writingBuf.length ? sonic._writingBuf : mergeBuf(sonic._bufs.shift(), sonic._lens.shift()) +function actualWriteBuffer () { + const release = this.release + this._writing = true + this._writingBuf = this._writingBuf.length ? this._writingBuf : mergeBuf(this._bufs.shift(), this._lens.shift()) - if (sonic.sync) { + if (this.sync) { try { - const written = fs.writeSync(sonic.fd, sonic._writingBuf) + const written = fs.writeSync(this.fd, this._writingBuf) release(null, written) } catch (err) { release(err) } } else { - fs.write(sonic.fd, sonic._writingBuf, release) + fs.write(this.fd, this._writingBuf, release) } } diff --git a/types/index.d.ts b/types/index.d.ts index a808737..5408636 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -17,6 +17,7 @@ export type SonicBoomOpts = { append?: boolean mode?: string | number mkdir?: boolean + contentMode?: 'buffer' | 'utf8' retryEAGAIN?: (err: Error, writeBufferLen: number, remainingBufferLen: number) => boolean } diff --git a/types/index.test-d.ts b/types/index.test-d.ts index 30dd1a2..39687bf 100644 --- a/types/index.test-d.ts +++ b/types/index.test-d.ts @@ -17,6 +17,7 @@ expectType( new SonicBoomCjsImport.default({ fd: 1})); expectType( new SonicBoomCjs({ fd: 1})); expectType( new SonicBoomCjsNamed({ fd: 1})); + expectType(sonic.write('hello sonic\n')); sonic.flush(); @@ -33,5 +34,6 @@ const extraSonic = new SonicBoom({fd: 1, minLength: 0, maxWrite: 16384, sync: tr new SonicBoom({fd: 1, minLength: 0, maxWrite: 16384, sync: true, append: true, mode: 0o644, mkdir: true, dest: '/dev/null'}); new SonicBoom({fd: 1, minLength: 0, maxWrite: 16384, sync: true, append: true, mode: 0o644, mkdir: true, dest: 1}); new SonicBoom({fd: 1, minLength: 0, maxWrite: 16384, sync: true, fsync: true, append: true, mode: 0o644, mkdir: true}); +new SonicBoom({fd: 1, minLength: 0, maxWrite: 16384, sync: true, fsync: true, append: true, mode: 0o644, mkdir: true, contentMode: 'buffer' }); extraSonic.write('extra sonic\n'); From 7b1079f220b31222376052369a9b16d984039399 Mon Sep 17 00:00:00 2001 From: AVVS Date: Thu, 29 Jun 2023 11:53:56 -0700 Subject: [PATCH 6/6] chore: exclude win+node14 --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a36d608..a105985 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,6 +40,9 @@ jobs: matrix: node-version: [14, 16, 18] os: [macos-latest, ubuntu-latest, windows-latest] + exclude: + - os: windows-latest + node-version: 14 steps: - name: Check out repo uses: actions/checkout@v3