Skip to content

Commit

Permalink
fix(store): wrap store.createWriteStream to make callback optional
Browse files Browse the repository at this point in the history
  • Loading branch information
olalonde committed Sep 9, 2016
1 parent 7836f6b commit 98f9bd0
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 30 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ caching larger values like resized/cropped images or transcoded videos.
- [Error Handling](#error-handling)
- [Errors](#errors)
- [How it works](#how-it-works)
- [TODO](#todo)
- [Partial Writes / Durability](#partial-writes--durability)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -224,7 +224,7 @@ the stream API to stream the image from S3 (e.g.
Cloud-cache evicts expired values on read which means that expired
values will remain stored as long as they are not read.

## Partial Writes
## Partial Writes / Durability

Cloud-cache does not guarantee that **set** operations will be atomic
and instead delegates that responsibility to the underlying store
Expand All @@ -234,3 +234,4 @@ a write). For example, `fs-blob-store` will
[happily](https://github.com/mafintosh/fs-blob-store/pull/6) write half
of a stream to the file system. `s3-blob-store`, on the other hand, will
only write a stream which has been fully consumed.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"randomstring": "^1.1.5",
"request": "^2.74.0",
"rimraf": "^2.5.4",
"s3-blob-store": "^1.2.1",
"s3-blob-store": "^1.2.3",
"semantic-release": "^4.3.5"
},
"release": {
Expand Down
24 changes: 14 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PassThrough } from 'stream'
import { getType } from './types'
import { readMetadata, writeMetadata, readValue, writeValue } from './metadata'
import * as errors from './errors'
import createWriteStream from './store/create-write-stream'

const { KeyNotExistsError, CloudCacheError } = errors

Expand Down Expand Up @@ -84,16 +85,19 @@ export default (store, {
resolve()
}

const writeStream = store.createWriteStream(fullKey(key), cb)

writeMetadata(metadata, writeStream)
.then(() => {
if (stream) {
return resolve(writeStream)
}
return writeValue(writeStream, value)
})
.catch(reject)
// wrapper around store.createWriteStream that makes callback optional
const writeStream = createWriteStream(store, fullKey(key), cb)

try {
writeMetadata(metadata, writeStream)
if (stream) {
resolve(writeStream)
} else {
writeValue(writeStream, value)
}
} catch (err) {
reject(err)
}
})

const getOrSet = (key, fn, opts = {}) => {
Expand Down
23 changes: 6 additions & 17 deletions src/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,10 @@ export const readMetadata = (readStream) => new Promise((resolve, reject) => {
})

// Write metadata to stream
export const writeMetadata = (metadata, writeStream) => new Promise((resolve, reject) => {
let metadataStr
try {
metadataStr = JSON.stringify(metadata)
} catch (err) {
return reject(err)
}
export const writeMetadata = (metadata, writeStream) => {
const metadataStr = JSON.stringify(metadata)
writeStream.write(`${metadataStr}\n`, 'utf8')
resolve()
})
}

export const readValue = (readStream, type) => new Promise((resolve, reject) => {
readStream.pipe(concat((buf) => {
Expand All @@ -65,12 +59,7 @@ export const readValue = (readStream, type) => new Promise((resolve, reject) =>
}))
})

export const writeValue = (writeStream, value) => new Promise((resolve, reject) => {
try {
writeStream.end(getType(value).encode(value))
resolve()
} catch (err) {
reject(err)
}
})
export const writeValue = (writeStream, value) => (
writeStream.end(getType(value).encode(value))
)

51 changes: 51 additions & 0 deletions src/store/blocking-writable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Writable } from 'stream'

const SIGNAL_LAST_WRITE = new Buffer([0])

/**
* Wraps a write stream and waits until stream.unblock([err]) has been called
* before ending and emitting the 'finish' event.
*/
export default class extends Writable {
constructor(dest) {
super()
this.dest = dest
this.blockingState = {
blocked: true,
err: null,
cb: false,
}
}

_write(chunk, encoding, cb) {
if (chunk === SIGNAL_LAST_WRITE) return this._lastWrite(cb)
this.dest.write(chunk, encoding, cb)
}

_lastWrite(cb) {
const { blockingState } = this
this.dest.end((err) => {
if (err) return cb(err)
if (blockingState.blocked) {
blockingState.cb = cb
} else {
cb(blockingState.err)
}
})
}

unblock(err) {
const { blockingState } = this
if (blockingState.cb) return blockingState.cb(err)
blockingState.blocked = false
blockingState.err = err
}

end(data, enc, cb) {
if (typeof data === 'function') return this.end(null, null, data)
if (typeof enc === 'function') return this.end(data, null, enc)
if (data) this.write(data)
this.write(SIGNAL_LAST_WRITE)
super.end(cb)
}
}
25 changes: 25 additions & 0 deletions src/store/create-write-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { PassThrough } from 'stream'
import BlockingWritable from './blocking-writable'

/**
* asbtract-blob-store's API requires waiting for a callback on writes.
*
* Cloud-cache however does not have this requirement and instead uses a pure
* sync streaming/event API. Since we cannot rely on abstract-blob-store's
* 'finish' events, we must wait for the callback before our own write stream
* ends and emits 'finish'.
*/
export default (store, key, cb) => {
// go to normal mode with callback
if (typeof cb === 'function') return store.createWriteStream(key, cb)
// no call back mode...

const through = new PassThrough()
const ws = new BlockingWritable(through)

through.pipe(store.createWriteStream(key, (err) => {
ws.unblock(err)
}))

return ws
}

0 comments on commit 98f9bd0

Please sign in to comment.