Skip to content

Commit

Permalink
Merge pull request #256 from Inist-CNRS/use-memory-store2
Browse files Browse the repository at this point in the history
refactor: 💡 [expand] use memory
  • Loading branch information
touv authored Sep 14, 2022
2 parents 56396ea + 7c24757 commit 4542bc6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 47 deletions.
27 changes: 13 additions & 14 deletions packages/analytics/src/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { resolve as resolvePath } from 'path';
import { tmpdir } from 'os';
import get from 'lodash.get';
import set from 'lodash.set';
import { createStore } from '@ezs/store';
import cacache from 'cacache';
import each from 'async-each-series';
import makeDir from 'make-dir';
Expand Down Expand Up @@ -39,8 +38,9 @@ async function mergeWith(data, feed) {
const { id, value } = data;
const path = this.getParam('path');
try {
const obj = await store.cut(id);
if (obj === null) {
const obj = store[id];
delete store[id];
if (obj === undefined || obj === null) {
throw new Error('id was corrupted');
}
const source = get(obj, path);
Expand All @@ -51,7 +51,8 @@ async function mergeWith(data, feed) {
set(obj, path, value);
return feed.send(obj);
} catch (e) {
return feed.stop(e);
// avoid to break the pipe
return feed.send(e);
}
}

Expand All @@ -68,12 +69,13 @@ async function drainWith(data, feed) {
async (cur, next) => {
let obj;
try {
obj = await store.get(cur);
obj = store[cur];
delete store[cur];
} catch (e) {
feed.write(e);
}
if (obj === null) {
feed.stop(new Error(`Unable to find ${cur} in the store ${store.id()}`));
if (obj === undefined || obj === null) {
feed.stop(new Error(`Unable to find ${cur} in the store`));
} else {
feed.write(obj);
}
Expand Down Expand Up @@ -138,9 +140,7 @@ export default async function expand(data, feed) {
const cacheName = this.getParam('cacheName');

if (!this.store) {
const location = this.getParam('location');
this.store = createStore(ezs, 'expand', location);
await this.store.reset();
this.store = {}
this.flows = [];
}
if (!this.createStatements) {
Expand Down Expand Up @@ -203,8 +203,7 @@ export default async function expand(data, feed) {
this.flows.push(feed.flow(strm));
}
await Promise.all(this.flows);
await this.store.close();

delete this.store;
return feed.close();
}
const value = get(data, path);
Expand All @@ -221,7 +220,7 @@ export default async function expand(data, feed) {
const id = this.getIndex().toString().padStart(20, '0');
const size = Number(this.getParam('size', 1));

await this.store.put(id, data);
this.store[id] = data;

this.stack[this.bufferID][id] = true;
this.buffer.push(core(id, value));
Expand All @@ -232,7 +231,7 @@ export default async function expand(data, feed) {
return feed.end();
} catch (e) {
if (this.store) {
this.store.close();
delete this.store;
}
return feed.stop(e);
}
Expand Down
34 changes: 1 addition & 33 deletions packages/analytics/test/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,38 +246,6 @@ test('with no script', (done) => {
done(new Error('Error is the right behavior'));
});
});
test('with wrong location ', (done) => {
ezs.use(statements);
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const script = `
[use]
plugin = analytics
[assign]
path = value
value = get('value').toUpper()
`;
from(input)
.pipe(ezs('expand', { path: 'b', script, location: '/no/where' }))
.pipe(ezs.catch())
.on('error', (e) => {
expect(e.message).toEqual(expect.stringContaining('EACCES: permission denied'));
done();
})
.on('data', () => {
done(new Error('Error is the right behavior'));
})
.on('end', () => {
done(new Error('Error is the right behavior'));
});
});

test('with no path', (done) => {
ezs.use(statements);
Expand Down Expand Up @@ -423,7 +391,7 @@ test('with a script that loses the identifier', (done) => {
.pipe(ezs('expand', { path: 'b', script }))
.pipe(ezs.catch())
.on('error', (e) => {
expect(e.message).toEqual(expect.stringContaining('key cannot be `null`'));
expect(e.message).toEqual(expect.stringContaining('id was corrupted'));
done();
})
.on('end', () => {
Expand Down

0 comments on commit 4542bc6

Please sign in to comment.