Skip to content

Commit

Permalink
fix: erratic error with store
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Apr 1, 2022
1 parent 1b37c25 commit a26febc
Show file tree
Hide file tree
Showing 13 changed files with 2,764 additions and 2,719 deletions.
14 changes: 4 additions & 10 deletions packages/analytics/src/bufferize.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,14 @@ import { createStore } from '@ezs/store';
*/
export default async function bufferize(data, feed) {
const { ezs } = this;
if (this.isFirst()) {
if (!this.store) {
const location = this.getParam('location');
this.store = createStore(ezs, 'bufferize', location);
this.store.reset();
await this.store.reset();
}
if (this.isLast()) {
return this.store.stream()
.pipe(this.ezs('extract', { path: 'value' }))
.on('data', (item) => feed.write(item))
.on('error', (e) => {
this.store.close();
feed.stop(e);
})
.on('end', () => feed.close());
const stream = await this.store.cast();
return feed.flow(stream.pipe(this.ezs('extract', { path: 'value' })));
}
const path = this.getParam('path', 'bufferID');
const key = this.getIndex().toString().padStart(20, '0');
Expand Down
17 changes: 6 additions & 11 deletions packages/analytics/src/buffers.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { createStoreWithID } from '@ezs/store';
* @param {String} [from] the store id
* @returns {Object}
*/
export default function buffers(data, feed) {
export default async function buffers(data, feed) {
if (this.isLast()) {
feed.close();
return;
Expand All @@ -46,16 +46,11 @@ export default function buffers(data, feed) {
const statement = length === -1 ? 'transit' : 'truncate';
const bufferID = String(data);
const store = createStoreWithID(ezs, bufferID, location);
store.stream()
const stream = await store.cast();
const output = stream
.pipe(ezs('extract', { path: 'value' }))
.pipe(ezs(statement, { length }))
.on('data', (item) => feed.write(item))
.on('error', (e) => {
this.store.close();
feed.stop(e);
})
.on('end', () => {
feed.end();
store.close();
});
.on('error', () => store.close())
.on('end', () => store.close());
feed.flow(output);
}
5 changes: 3 additions & 2 deletions packages/analytics/src/distribute.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import { createStore } from '@ezs/store';
* @param {Number} [default=0] default value for missing object
* @returns {Object}
*/
export default function distribute(data, feed) {
export default async function distribute(data, feed) {
const id = get(data, this.getParam('id', 'id')) || this.getIndex();
const value = get(data, this.getParam('value', 'value'));

Expand All @@ -83,7 +83,8 @@ export default function distribute(data, feed) {
j += 1;
}
let x = 0;
this.store.empty()
const stream = await this.store.empty();
stream
.on('data', (item) => {
const key = parseInt(item.id, 10);
const idx = ruler.indexOf(key);
Expand Down
5 changes: 3 additions & 2 deletions packages/analytics/src/reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import { createStore } from '@ezs/store';
* @param {String} [value=value] path to use for value
* @returns {Object}
*/
export default function reducing(data, feed) {
export default async function reducing(data, feed) {
if (!this.store) {
const location = this.getParam('location');
this.store = createStore(this.ezs, 'reducing', location);
}
if (this.isLast()) {
this.store.empty()
const stream = await this.store.empty();
stream
.on('data', (item) => feed.write(item))
.on('end', () => {
this.store.close();
Expand Down
5 changes: 3 additions & 2 deletions packages/analytics/src/statistics.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ const calculating = (values) => {
* @param {String} [target=_statistics] path of statistics in output object
* @returns {Object}
*/
export default function statistics(data, feed) {
export default async function statistics(data, feed) {
const path = this.getParam('path', 'value');
const target = this.getParam('target', '_statistics');
const fields = Array.isArray(path) ? path : [path];
Expand Down Expand Up @@ -155,7 +155,8 @@ export default function statistics(data, feed) {
};
return obj;
}, {});
this.store.empty()
const stream = await this.store.empty();
stream
.on('data', ({ value }) => {
const localValues = value.hashValues.reduce((obj, item) => {
const sample = this.stack[item.key].hash[item.hashValue];
Expand Down
5 changes: 4 additions & 1 deletion packages/basics/test/url-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ httpbin
httpbin
.get('/status/400?a=c')
.reply(400);
httpbin
.get('/status/400')
.reply(400);

describe('URLStream', () => {
let server;
Expand Down Expand Up @@ -186,7 +189,7 @@ describe('URLStream', () => {
{ a: 'c' },
];
from(input)
.pipe(ezs('URLStream', {
.pipe(ezs('URLStream', {
url: 'http://unknow',
retries: 1,
}))
Expand Down
4 changes: 2 additions & 2 deletions packages/loterre/src/skos-hierarchy.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ async function SKOSHierarchy(data, feed) {

if (!this.store) {
this.store = createStore(this.ezs, 'skos_hierarchy_store');
this.store.reset();
await this.store.reset();
}
if (this.isLast()) {
this.store.close();
await this.store.close();
feed.close();
} else {
const paths = Array()
Expand Down
17 changes: 10 additions & 7 deletions packages/loterre/src/skos-pathenum.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,23 @@ async function SKOSPathEnum(data, feed) {
try {
if (!this.store) {
this.store = createStore(this.ezs, 'skos_pathenum_store');
this.store.reset();
await this.store.reset();
}
if (this.isLast()) {
const stream = this.store.cast()
const stream = await this.store.cast();
const output = stream
.pipe(this.ezs(getBroaderAndNarrower, {
path: this.getParam('path', 'skos$broader'),
uri: this.getParam('uri', 'rdf$about'),
label: this.getParam('label', 'skos$prefLabel'),
recursion: this.getParam('recursion', false),
}, this.store));
return feed.flow(stream).finally(() => {
this.store.close();
feed.close();
});
}, this.store))
.on('end', () => {
feed.close();
this.store.close();
});
;
return feed.flow(output);
}
await this.store.add(data.rdf$about, data);
feed.end();
Expand Down
2 changes: 1 addition & 1 deletion packages/loterre/test/skos-pathenum.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ezs.use(ezsLocal);

describe('SKOSPathEnum', () => {
beforeEach(() => {
jest.setTimeout(10000);
// jest.setTimeout(10000);
});

test('from file : data-sample.skos', (done) => {
Expand Down
Loading

0 comments on commit a26febc

Please sign in to comment.