Skip to content

Commit

Permalink
fix(FETCH): throw on failing FETCH
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Mar 5, 2024
1 parent 513b480 commit 12f9a45
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 74 deletions.
2 changes: 1 addition & 1 deletion lib/commands/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ module.exports = async (connection, range, query, options) => {
}

connection.log.warn({ err, cid: connection.id });
return false;
throw err;
}
}
};
147 changes: 74 additions & 73 deletions lib/imap-flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -2254,93 +2254,94 @@ class ImapFlow extends EventEmitter {
*/
async *fetch(range, query, options) {
options = options || {};
try {
if (!this.mailbox) {
// no mailbox selected, nothing to do
return;
}

range = await this.resolveRange(range, options);
if (!range) {
return false;
}
if (!this.mailbox) {
// no mailbox selected, nothing to do
return;
}

let finished = false;
let push = false;
let rowQueue = [];
let getNext = () =>
new Promise((resolve, reject) => {
let check = () => {
if (rowQueue.length) {
let entry = rowQueue.shift();
if (entry.err) {
return reject(entry.err);
} else {
return resolve(entry.value);
}
}
if (finished) {
return resolve(null);
}
range = await this.resolveRange(range, options);
if (!range) {
return false;
}

// wait until data is pushed to queue and try again
push = () => {
push = false;
check();
};
};
check();
});
let finished = false;
let push = false;
let rowQueue = [];

this.run('FETCH', range, query, {
uid: !!options.uid,
binary: options.binary,
changedSince: options.changedSince,
onUntaggedFetch: (untagged, next) => {
rowQueue.push({
value: {
response: untagged,
next
let getNext = () =>
new Promise((resolve, reject) => {
let check = () => {
if (rowQueue.length) {
let entry = rowQueue.shift();
if (entry.err) {
return reject(entry.err);
} else {
return resolve(entry.value);
}
});
if (typeof push === 'function') {
push();
}
}
})
.then(() => {
finished = true;
if (typeof push === 'function') {
push();

if (finished) {
return resolve(null);
}
})
.catch(err => {
rowQueue.push({ err });
});

let res;
while ((res = await getNext())) {
if (this.isClosed || this.socket.destroyed) {
let error = new Error('Connection closed');
error.code = 'EConnectionClosed';
throw error;
}
// wait until data is pushed to queue and try again
push = () => {
push = false;
check();
};
};
check();
});

if (res !== null) {
yield res.response;
res.next();
this.run('FETCH', range, query, {
uid: !!options.uid,
binary: options.binary,
changedSince: options.changedSince,
onUntaggedFetch: (untagged, next) => {
rowQueue.push({
value: {
response: untagged,
next
}
});
if (typeof push === 'function') {
push();
}
}
})
.then(() => {
finished = true;
if (typeof push === 'function') {
push();
}
})
.catch(err => {
rowQueue.push({ err });
if (typeof push === 'function') {
push();
}
});

if (!finished) {
// FETCH never finished!
let error = new Error('FETCH did not finish');
error.code = 'ENotFinished';
let res;
while ((res = await getNext())) {
if (this.isClosed || this.socket.destroyed) {
let error = new Error('Connection closed');
error.code = 'EConnectionClosed';
throw error;
}
} catch (err) {
setImmediate(() => this.close());
throw err;

if (res !== null) {
yield res.response;
res.next();
}
}

if (!finished) {
// FETCH never finished!
let error = new Error('FETCH did not finish');
error.code = 'ENotFinished';
throw error;
}
}

Expand Down

0 comments on commit 12f9a45

Please sign in to comment.