Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.write(Buffer) support #174

Merged
merged 7 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ const sonic = new SonicBoom({ fd })
const sonic4k = new SonicBoom({ fd, minLength: 4096 })
const sonicSync = new SonicBoom({ fd, sync: true })
const sonicSync4k = new SonicBoom({ fd, minLength: 4096, sync: true })
const sonicBuffer = new SonicBoom({ fd, contentMode: 'buffer' })
const sonic4kBuffer = new SonicBoom({ fd, contentMode: 'buffer', minLength: 4096 })
const sonicSyncBuffer = new SonicBoom({ fd, contentMode: 'buffer', sync: true })
const sonicSync4kBuffer = new SonicBoom({ fd, contentMode: 'buffer', minLength: 4096, sync: true })
const dummyConsole = new Console(fs.createWriteStream('/dev/null'))

const MAX = 10000

let str = ''

for (let i = 0; i < 10; i++) {
str += 'hello'
}
const buf = Buffer.alloc(50, 'hello', 'utf8')
const str = buf.toString()

setTimeout(doBench, 100)

Expand Down Expand Up @@ -59,6 +60,36 @@ const run = bench([
dummyConsole.log(str)
}
setImmediate(cb)
},
function benchSonicBuf (cb) {
sonicBuffer.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonicBuffer.write(buf)
}
},
function benchSonicSyncBuf (cb) {
sonicSyncBuffer.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonicSyncBuffer.write(buf)
}
},
function benchSonic4kBuf (cb) {
sonic4kBuffer.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonic4kBuffer.write(buf)
}
},
function benchSonicSync4kBuf (cb) {
sonicSync4kBuffer.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonicSync4kBuffer.write(buf)
}
},
function benchCoreBuf (cb) {
core.once('drain', cb)
for (let i = 0; i < MAX; i++) {
core.write(buf)
}
}
], 1000)

Expand Down
184 changes: 167 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const path = require('path')
const sleep = require('atomic-sleep')

const BUSY_WRITE_TIMEOUT = 100
const kEmptyBuffer = Buffer.allocUnsafe(0)

// 16 KB. Don't write more than docker buffer size.
// https://github.com/moby/moby/blob/513ec73831269947d38a644c278ce3cac36783b2/daemon/logger/copier.go#L13
Expand Down Expand Up @@ -56,7 +57,11 @@ function openFile (file, sonic) {

// start
if (!sonic._writing && sonic._len > sonic.minLength && !sonic.destroyed) {
actualWrite(sonic)
if (sonic.contentMode === 'buffer') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if contentMode as an enum instead of a string would make more sense. Either way, this should be included in types/index.d.ts and should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea!

actualWriteBuffer(sonic)
} else {
actualWrite(sonic)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call _actualWrite instead of having an if here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we assign it as sonic._actualWrite I think it's possible to do so. At the same time these are not hot paths, so not sure if it's worth it

Copy link
Collaborator

@mmarchini mmarchini Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was thinking of using _actualWrite for maintainability rather than performance (V8 would take care of ensuring these ifs are performant so I'm not worried about that in from a perf standpoint)

}
}

Expand Down Expand Up @@ -87,15 +92,15 @@ function SonicBoom (opts) {
return new SonicBoom(opts)
}

let { fd, dest, minLength, maxLength, maxWrite, sync, append = true, mode, mkdir, retryEAGAIN, fsync } = opts || {}
let { fd, dest, minLength, maxLength, maxWrite, sync, append = true, mkdir, retryEAGAIN, fsync, contentMode, mode } = opts || {}

fd = fd || dest

this._bufs = []
this._len = 0
this.fd = -1
this._bufs = []
this._lens = []
this._writing = false
this._writingBuf = ''
this._ending = false
this._reopening = false
this._asyncDrainScheduled = false
Expand All @@ -106,12 +111,36 @@ function SonicBoom (opts) {
this.maxLength = maxLength || 0
this.maxWrite = maxWrite || MAX_WRITE
this.sync = sync || false
this.writable = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to mention, this is so that node.js stream util compose function works properly with this

this._fsync = fsync || false
this.append = append || false
this.mode = mode
this.retryEAGAIN = retryEAGAIN || (() => true)
this.mkdir = mkdir || false

let fsWriteSync
let fsWrite
let _actualWrite
if (contentMode === 'buffer') {
this.contentMode = 'buffer'
this._writingBuf = kEmptyBuffer
this.write = writeBuffer
this.flush = flushBuffer
this.flushSync = flushBufferSync
fsWriteSync = () => fs.writeSync(this.fd, this._writingBuf)
fsWrite = () => fs.write(this.fd, this._writingBuf, this.release)
_actualWrite = actualWriteBuffer
} else {
this.contentMode = 'utf8'
this._writingBuf = ''
this.write = write
this.flush = flush
this.flushSync = flushSync
fsWriteSync = () => fs.writeSync(this.fd, this._writingBuf, 'utf8')
fsWrite = () => fs.write(this.fd, this._writingBuf, 'utf8', this.release)
_actualWrite = actualWrite
}

if (typeof fd === 'number') {
this.fd = fd
process.nextTick(() => this.emit('ready'))
Expand Down Expand Up @@ -140,9 +169,7 @@ function SonicBoom (opts) {
}
} else {
// Let's give the destination some time to process the chunk.
setTimeout(() => {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
}, BUSY_WRITE_TIMEOUT)
setTimeout(fsWrite, BUSY_WRITE_TIMEOUT)
}
} else {
this._writing = false
Expand Down Expand Up @@ -171,16 +198,16 @@ function SonicBoom (opts) {

if (this._writingBuf.length) {
if (!this.sync) {
fs.write(this.fd, this._writingBuf, 'utf8', this.release)
fsWrite()
return
}

try {
do {
const n = fs.writeSync(this.fd, this._writingBuf, 'utf8')
const n = fsWriteSync()
this._len -= n
this._writingBuf = this._writingBuf.slice(n)
} while (this._writingBuf)
} while (this._writingBuf.length)
} catch (err) {
this.release(err)
return
Expand All @@ -197,10 +224,10 @@ function SonicBoom (opts) {
this._reopening = false
this.reopen()
} else if (len > this.minLength) {
actualWrite(this)
_actualWrite(this)
} else if (this._ending) {
if (len > 0) {
actualWrite(this)
_actualWrite(this)
} else {
this._writing = false
actualClose(this)
Expand Down Expand Up @@ -234,7 +261,19 @@ function emitDrain (sonic) {

inherits(SonicBoom, EventEmitter)

SonicBoom.prototype.write = function (data) {
function mergeBuf (bufs, len) {
if (bufs.length === 0) {
return kEmptyBuffer
}

if (bufs.length === 1) {
return bufs[0]
}

return Buffer.concat(bufs, len)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the source of the slowdown. We should not merge them, but rather keep them as a list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried having a list here, but tests that expect single flush instead of multiples break. Will work on it further now that there are 2 separate content modes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All in all Buffer.concat seem to be cheaper than multiple fs.write as long as it doesn't have to allocate memory outside of the Buffer.poolSize. Will investigate further how that could be avoided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried fs.writev to avoid concat, but that made it slower than writeStream

}

function write (data) {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}
Expand Down Expand Up @@ -265,7 +304,41 @@ SonicBoom.prototype.write = function (data) {
return this._len < this._hwm
}

SonicBoom.prototype.flush = function () {
function writeBuffer (data) {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}

const len = this._len + data.length
const bufs = this._bufs
const lens = this._lens

if (this.maxLength && len > this.maxLength) {
this.emit('drop', data)
return this._len < this._hwm
}

if (
bufs.length === 0 ||
lens[lens.length - 1] + data.length > this.maxWrite
) {
bufs.push([data])
lens.push(data.length)
} else {
bufs[bufs.length - 1].push(data)
lens[lens.length - 1] += data.length
}

this._len = len

if (!this._writing && this._len >= this.minLength) {
actualWriteBuffer(this)
}

return this._len < this._hwm
}

function flush () {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}
Expand All @@ -281,6 +354,23 @@ SonicBoom.prototype.flush = function () {
actualWrite(this)
}

function flushBuffer () {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}

if (this._writing || this.minLength <= 0) {
return
}

if (this._bufs.length === 0) {
this._bufs.push([])
this._lens.push(0)
}

actualWriteBuffer(this)
}

SonicBoom.prototype.reopen = function (file) {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
Expand Down Expand Up @@ -344,13 +434,17 @@ SonicBoom.prototype.end = function () {
}

if (this._len > 0 && this.fd >= 0) {
actualWrite(this)
if (this.contentMode === 'buffer') {
actualWriteBuffer(this)
} else {
actualWrite(this)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can't we call _actualWrite

} else {
actualClose(this)
}
}

SonicBoom.prototype.flushSync = function () {
function flushSync () {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}
Expand All @@ -365,7 +459,7 @@ SonicBoom.prototype.flushSync = function () {
}

let buf = ''
while (this._bufs.length || buf.length) {
while (this._bufs.length || buf) {
if (buf.length <= 0) {
buf = this._bufs[0]
}
Expand All @@ -387,6 +481,44 @@ SonicBoom.prototype.flushSync = function () {
}
}

function flushBufferSync () {
if (this.destroyed) {
throw new Error('SonicBoom destroyed')
}

if (this.fd < 0) {
throw new Error('sonic boom is not ready yet')
}

if (!this._writing && this._writingBuf.length > 0) {
this._bufs.unshift([this._writingBuf])
this._writingBuf = kEmptyBuffer
}

let buf = kEmptyBuffer
while (this._bufs.length || buf.length) {
if (buf.length <= 0) {
buf = mergeBuf(this._bufs[0], this._lens[0])
}
try {
const n = fs.writeSync(this.fd, buf)
buf = buf.subarray(n)
this._len = Math.max(this._len - n, 0)
if (buf.length <= 0) {
this._bufs.shift()
this._lens.shift()
}
} catch (err) {
const shouldRetry = err.code === 'EAGAIN' || err.code === 'EBUSY'
if (shouldRetry && !this.retryEAGAIN(err, buf.length, this._len - buf.length)) {
throw err
}

sleep(BUSY_WRITE_TIMEOUT)
}
}
}

SonicBoom.prototype.destroy = function () {
if (this.destroyed) {
return
Expand All @@ -411,6 +543,23 @@ function actualWrite (sonic) {
}
}

function actualWriteBuffer (sonic) {
const release = sonic.release
sonic._writing = true
sonic._writingBuf = sonic._writingBuf.length ? sonic._writingBuf : mergeBuf(sonic._bufs.shift(), sonic._lens.shift())

if (sonic.sync) {
try {
const written = fs.writeSync(sonic.fd, sonic._writingBuf)
release(null, written)
} catch (err) {
release(err)
}
} else {
fs.write(sonic.fd, sonic._writingBuf, release)
}
}

function actualClose (sonic) {
if (sonic.fd === -1) {
sonic.once('ready', actualClose.bind(null, sonic))
Expand All @@ -419,6 +568,7 @@ function actualClose (sonic) {

sonic.destroyed = true
sonic._bufs = []
sonic._lens = []

if (sonic.fd !== 1 && sonic.fd !== 2) {
fs.close(sonic.fd, done)
Expand Down
Loading