Skip to content

Commit

Permalink
feat: overhaul stream store API to better fit actual use in kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
FUDCo committed May 27, 2021
1 parent ba95e34 commit c5cc00a
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 268 deletions.
82 changes: 33 additions & 49 deletions packages/swing-store-lmdb/src/lmdbSwingStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const encoder = new util.TextEncoder();
* @typedef { import('@agoric/swing-store-simple').KVStore } KVStore
* @typedef { import('@agoric/swing-store-simple').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store-simple').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store-simple').StreamWriter } StreamWriter
* @typedef { import('@agoric/swing-store-simple').SwingStore } SwingStore
*/

Expand Down Expand Up @@ -227,11 +226,10 @@ function makeSwingStore(dirPath, forceReset = false) {
*
* @returns {Iterable<string>} an iterator for the items in the named stream
*/
function openReadStream(streamName, startPosition, endPosition) {
function readStream(streamName, startPosition, endPosition) {
insistStreamName(streamName);
const status = streamStatus.get(streamName);
assert(
!status,
!streamStatus.get(streamName),
X`can't read stream ${q(streamName)} because it's already in use`,
);
insistStreamPosition(startPosition);
Expand Down Expand Up @@ -277,9 +275,8 @@ function makeSwingStore(dirPath, forceReset = false) {
function* reader() {
try {
while (true) {
const statusInner = streamStatus.get(streamName);
assert(
statusInner === readStatus,
streamStatus.get(streamName) === readStatus,
X`can't read stream ${q(streamName)}, it's been closed`,
);
const line = /** @type {string|false} */ (innerReader.next());
Expand All @@ -299,9 +296,8 @@ function makeSwingStore(dirPath, forceReset = false) {
} catch (e) {
console.log(e);
} finally {
const statusEnd = streamStatus.get(streamName);
assert(
statusEnd === readStatus,
streamStatus.get(streamName) === readStatus,
X`can't read stream ${q(streamName)}, it's been closed`,
);
closeStream(streamName);
Expand All @@ -313,61 +309,49 @@ function makeSwingStore(dirPath, forceReset = false) {
}

/**
* Obtain a writer for a stream.
* Write to a stream.
*
* @param {string} streamName The stream to be written
* @param {string} item The item to write
* @param {Object} position The position to write the item
*
* @returns {StreamWriter} a writer for the named stream
* @returns {Object} the new position after writing
*/
function openWriteStream(streamName) {
function writeStreamItem(streamName, item, position) {
insistStreamName(streamName);
const status = streamStatus.get(streamName);
assert(
!status,
X`can't write stream ${q(streamName)} because it's already in use`,
);
streamStatus.set(streamName, 'write');
insistStreamPosition(position);

// XXX fdTemp is a workaround for a flaw in TypeScript's type inference
// It should be fd, which it should be changed to when and if they fix tsc.
let fdTemp = streamFds.get(streamName);
if (!fdTemp) {
let fd = streamFds.get(streamName);
if (!fd) {
const filePath = `${dirPath}/${streamName}`;
const mode = fs.existsSync(filePath) ? 'r+' : 'w';
fdTemp = fs.openSync(filePath, mode);
streamFds.set(streamName, fdTemp);
fd = fs.openSync(filePath, mode);
streamFds.set(streamName, fd);
streamStatus.set(streamName, 'write');
} else {
const status = streamStatus.get(streamName);
if (!status) {
streamStatus.set(streamName, 'write');
} else {
assert(
status === 'write',
X`can't write stream ${q(streamName)} because it's already in use`,
);
}
}
const fd = fdTemp;
activeStreamFds.add(fd);

/**
* Write to a stream.
*
* @param {string} item The item to write
* @param {Object} position The position to write the item
*
* @returns {Object} the new position after writing
*/
function write(item, position) {
assert.typeof(item, 'string');
assert(
streamFds.get(streamName) === fd,
X`can't write to closed stream ${q(streamName)}`,
);
insistStreamPosition(position);
const buf = encoder.encode(`${item}\n`);
fs.writeSync(fd, buf, 0, buf.length, position.offset);
return harden({
offset: position.offset + buf.length,
itemCount: position.itemCount + 1,
});
}
return write;
const buf = encoder.encode(`${item}\n`);
fs.writeSync(fd, buf, 0, buf.length, position.offset);
return harden({
offset: position.offset + buf.length,
itemCount: position.itemCount + 1,
});
}

const streamStore = harden({
openReadStream,
openWriteStream,
readStream,
writeStreamItem,
closeStream,
STREAM_START,
});
Expand Down
118 changes: 34 additions & 84 deletions packages/swing-store-lmdb/test/test-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,72 +71,37 @@ test('streamStore read/write', t => {
t.is(isSwingStore(dbDir), false);
const { streamStore, commit, close } = initSwingStore(dbDir);

let s1pos = streamStore.STREAM_START;
const writer1 = streamStore.openWriteStream('st1');
s1pos = writer1('first', s1pos);
s1pos = writer1('second', s1pos);
const start = streamStore.STREAM_START;
let s1pos = start;
s1pos = streamStore.writeStreamItem('st1', 'first', s1pos);
s1pos = streamStore.writeStreamItem('st1', 'second', s1pos);
const s1posAlt = { ...s1pos };
const writer2 = streamStore.openWriteStream('st2');
s1pos = writer1('third', s1pos);
s1pos = streamStore.writeStreamItem('st1', 'third', s1pos);
let s2pos = streamStore.STREAM_START;
s2pos = writer2('oneth', s2pos);
s1pos = writer1('fourth', s1pos);
s2pos = writer2('twoth', s2pos);
s2pos = streamStore.writeStreamItem('st2', 'oneth', s2pos);
s1pos = streamStore.writeStreamItem('st1', 'fourth', s1pos);
s2pos = streamStore.writeStreamItem('st2', 'twoth', s2pos);
const s2posAlt = { ...s2pos };
s2pos = writer2('threeth', s2pos);
s2pos = writer2('fourst', s2pos);
s2pos = streamStore.writeStreamItem('st2', 'threeth', s2pos);
s2pos = streamStore.writeStreamItem('st2', 'fourst', s2pos);
streamStore.closeStream('st1');
streamStore.closeStream('st2');
const reader1 = streamStore.openReadStream(
'st1',
streamStore.STREAM_START,
s1pos,
);
const reads1 = [];
for (const item of reader1) {
reads1.push(item);
}
t.deepEqual(reads1, ['first', 'second', 'third', 'fourth']);
const writer2alt = streamStore.openWriteStream('st2');
s2pos = writer2alt('re3', s2posAlt);
const reader1 = streamStore.readStream('st1', start, s1pos);
t.deepEqual(Array.from(reader1), ['first', 'second', 'third', 'fourth']);
s2pos = streamStore.writeStreamItem('st2', 're3', s2posAlt);
streamStore.closeStream('st2');
const reader2 = streamStore.openReadStream(
'st2',
streamStore.STREAM_START,
s2pos,
);
const reads2 = [];
for (const item of reader2) {
reads2.push(item);
}
t.deepEqual(reads2, ['oneth', 'twoth', 're3']);

const reader1alt = streamStore.openReadStream('st1', s1posAlt, s1pos);
const reads1alt = [];
for (const item of reader1alt) {
reads1alt.push(item);
}
t.deepEqual(reads1alt, ['third', 'fourth']);

const writerEmpty = streamStore.openWriteStream('empty');
const emptyPos = writerEmpty('filler', streamStore.STREAM_START);
const reader2 = streamStore.readStream('st2', start, s2pos);
t.deepEqual(Array.from(reader2), ['oneth', 'twoth', 're3']);

const reader1alt = streamStore.readStream('st1', s1posAlt, s1pos);
t.deepEqual(Array.from(reader1alt), ['third', 'fourth']);

const emptyPos = streamStore.writeStreamItem('empty', 'filler', start);
streamStore.closeStream('empty');
const readerEmpty = streamStore.openReadStream('empty', emptyPos, emptyPos);
const readsEmpty = [];
for (const item of readerEmpty) {
readsEmpty.push(item);
}
t.deepEqual(readsEmpty, []);
const readerEmpty2 = streamStore.openReadStream(
'empty',
streamStore.STREAM_START,
streamStore.STREAM_START,
);
const readsEmpty2 = [];
for (const item of readerEmpty2) {
readsEmpty2.push(item);
}
t.deepEqual(readsEmpty2, []);
const readerEmpty = streamStore.readStream('empty', emptyPos, emptyPos);
t.deepEqual(Array.from(readerEmpty), []);
const readerEmpty2 = streamStore.readStream('empty', start, start);
t.deepEqual(Array.from(readerEmpty2), []);

commit();
close();
Expand All @@ -148,35 +113,20 @@ test('streamStore mode interlock', t => {
fs.rmdirSync(dbDir, { recursive: true });
t.is(isSwingStore(dbDir), false);
const { streamStore, commit, close } = initSwingStore(dbDir);
const start = streamStore.STREAM_START;

const writer = streamStore.openWriteStream('st1');
const s1pos = writer('first', streamStore.STREAM_START);
t.throws(
() => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos),
{
message: `can't read stream "st1" because it's already in use`,
},
);
t.throws(() => streamStore.openWriteStream('st1', s1pos), {
message: `can't write stream "st1" because it's already in use`,
const s1pos = streamStore.writeStreamItem('st1', 'first', start);

t.throws(() => streamStore.readStream('st1', start, s1pos), {
message: `can't read stream "st1" because it's already in use`,
});
streamStore.closeStream('st1');
t.throws(() => writer('second', streamStore.STREAM_START), {
message: `can't write to closed stream "st1"`,
});

const reader = streamStore.openReadStream(
'st1',
streamStore.STREAM_START,
s1pos,
);
t.throws(
() => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos),
{
message: `can't read stream "st1" because it's already in use`,
},
);
t.throws(() => streamStore.openWriteStream('st1'), {
const reader = streamStore.readStream('st1', start, s1pos);
t.throws(() => streamStore.readStream('st1', start, s1pos), {
message: `can't read stream "st1" because it's already in use`,
});
t.throws(() => streamStore.writeStreamItem('st1', start, s1pos), {
message: `can't write stream "st1" because it's already in use`,
});
streamStore.closeStream('st1');
Expand Down
Loading

0 comments on commit c5cc00a

Please sign in to comment.