diff --git a/packages/main/src/cache/persist.ts b/packages/main/src/cache/persist.ts index 8f5f5a7e..f874b295 100644 --- a/packages/main/src/cache/persist.ts +++ b/packages/main/src/cache/persist.ts @@ -273,12 +273,41 @@ export function addFlag({ join boxes on box_id = boxes.id where messages.account_id = @accountId - and messages.uid in (${uids.join(", ")}) and boxes.name = @boxName + and messages.uid in (${uids.join(", ")}) ` ).run({ accountId, boxName: box.name, flag }) } +// TODO: What is the proper way to provide a list of values in a query? +export function delFlags({ + accountId, + box, + uids, + flags +}: { + accountId: ID + box: { name: string } + uids: number[] + flags: string[] +}) { + db.prepare( + ` + delete from message_flags + where + message_id in ( + select messages.id from messages + join boxes on box_id = boxes.id + where + messages.account_id = @accountId + and boxes.name = @boxName + and uid in (${uids.join(", ")}) + ) + and flag in (${flags.map(f => `'${f}'`).join(", ")}) + ` + ).run({ accountId, boxName: box.name }) +} + function insertInto(table: string, values: Record): ID { const keys = Object.keys(values) const { lastInsertRowid } = db diff --git a/packages/main/src/queue/better-queue-better-sqlite3.ts b/packages/main/src/queue/better-queue-better-sqlite3.ts index 53aa8276..ca475817 100644 --- a/packages/main/src/queue/better-queue-better-sqlite3.ts +++ b/packages/main/src/queue/better-queue-better-sqlite3.ts @@ -58,7 +58,7 @@ export default class SqliteStore implements BetterQueue.Store { ) .get({ taskId, lock: "" }) if (row == null) { - return null + return undefined // Failure result must be `undefined`, not `null`! } return JSON.parse(row.task) }) diff --git a/packages/main/src/queue/index.test.ts b/packages/main/src/queue/index.test.ts index c50a5fb0..226af753 100644 --- a/packages/main/src/queue/index.test.ts +++ b/packages/main/src/queue/index.test.ts @@ -64,7 +64,80 @@ it("marks a conversation as read", async () => { ) }) -it.skip("replaces pending read status change when a new change is queued", async () => {}) +it("marks a conversation as unread", async () => { + const connectionManager = mockConnection() + await sync(accountId, connectionManager) + + const promise = queue + .enqueue( + queue.actions.unmarkAsRead({ + accountId: String(accountId), + box: inbox, + uids: [7687] + }) + ) + .toPromise() + const flags = db + .prepare( + ` + select flag from message_flags + join messages on message_id = messages.id + where + uid = @uid + ` + ) + .all({ uid: 7687 }) + expect(flags).toEqual([]) + await promise + expect(Connection.prototype.delFlags).toHaveBeenCalledWith( + [7687], + ["\\Seen"], + expect.any(Function) + ) +}) + +it.skip("replaces pending read status change when a new change is queued", async () => { + // pause queue + const connectionManager = AccountManager.connectionManagers[String(accountId)] + delete AccountManager.connectionManagers[String(accountId)] + + const promise1 = queue + .enqueue( + queue.actions.markAsRead({ + accountId: String(accountId), + box: inbox, + uids: [7687] + }) + ) + .toPromise() + const promise2 = queue + .enqueue( + queue.actions.unmarkAsRead({ + accountId: String(accountId), + box: inbox, + uids: [7687] + }) + ) + .toPromise() + + // resume queue + AccountManager.connectionManagers[String(accountId)] = connectionManager + + queue.queue.resume() + const flags = db + .prepare( + ` + select flag from message_flags + join messages on message_id = messages.id + where + uid = @uid + ` + ) + .all({ uid: 7687 }) + expect(flags).toEqual([]) + await Promise.all([promise1, promise2]) + expect(Connection.prototype.addFlags).not.toHaveBeenCalled() +}) afterEach(() => { db.prepare("delete from accounts").run() diff --git a/packages/main/src/queue/index.ts b/packages/main/src/queue/index.ts index d0eb485d..8f16b48d 100644 --- a/packages/main/src/queue/index.ts +++ b/packages/main/src/queue/index.ts @@ -34,16 +34,37 @@ export const { actions, actionTypes, perform } = combineHandlers({ ]) ) ) + }, + + unmarkAsRead( + _context: unknown, + { + accountId, + box, + uids + }: { accountId: ID; box: { name: string }; uids: number[] } + ): R { + return withConnectionManager(accountId, connectionManager => + connectionManager.request( + request.actions.delFlags({ name: box.name, readonly: false }, uids, [ + "\\Seen" + ]) + ) + ) } }) type Task = ActionTypes -export function enqueue(action: T): R> { +export function enqueue(action: Task): R> { if (action.type === actionTypes.markAsRead) { cache.addFlag({ ...payload(action), flag: "\\Seen" }) + } else if (action.type === actionTypes.unmarkAsRead) { + cache.delFlags({ ...payload(action), flags: ["\\Seen"] }) } - const result = promises.lift1>(cb => queue.push(action, cb)) + const result = promises.lift1>(cb => + queue.push(action, cb) + ) return kefir.fromPromise(result) } @@ -66,7 +87,7 @@ function processTask( cb: ((error: Error) => void) & ((error: null, result: ActionResult) => void) ) { - perform(undefined, task).observe({ + perform(1, task).observe({ value(v) { cb(null, v) }, @@ -86,11 +107,15 @@ const store = path: getDbPath() }) -const queue = new BetterQueue(processTask, { - maxRetries: 3, - maxTimeout: 5000, - retryDelay: 5000, - store +export const queue = new BetterQueue(processTask, { + maxRetries: Infinity, // keep retrying until we come online + maxTimeout: process.env.NODE_ENV === "test" ? 150 : 10000, + retryDelay: process.env.NODE_ENV === "test" ? 1 : 10000, + store, + merge(_oldTask, newTask, cb) { + console.log("merge", _oldTask, newTask) + cb(null, newTask) + } }) export function getQueuedTasks(): Array> { diff --git a/packages/main/src/request/index.ts b/packages/main/src/request/index.ts index b50007d8..77d6116f 100644 --- a/packages/main/src/request/index.ts +++ b/packages/main/src/request/index.ts @@ -64,6 +64,20 @@ export const { actions, perform } = combineHandlers({ }) }, + delFlags( + connection: Connection, + box: BoxSpecifier, + source: imap.MessageSource, + flags: string[] + ): R { + return withBox(connection, box, () => { + const result = promises.lift0(cb => + connection.delFlags(source, flags, cb) + ) + return kefir.fromPromise(result) + }) + }, + end(connection: Connection): R { connection.end() return kefir.constant(undefined) diff --git a/packages/main/src/request/testHelpers.ts b/packages/main/src/request/testHelpers.ts index ebcc315f..bd68e908 100644 --- a/packages/main/src/request/testHelpers.ts +++ b/packages/main/src/request/testHelpers.ts @@ -29,6 +29,11 @@ export function mockConnection({ cb(null as any) } ) + mock(Connection.prototype.delFlags).mockImplementation( + (_source, _flags, cb) => { + cb(null as any) + } + ) mock(Connection.prototype.getBoxes).mockImplementation((cb: any) => { cb(null, boxes) }) diff --git a/packages/main/src/resolvers/conversation.test.ts b/packages/main/src/resolvers/conversation.test.ts index 182085f0..d2a7baaa 100644 --- a/packages/main/src/resolvers/conversation.test.ts +++ b/packages/main/src/resolvers/conversation.test.ts @@ -194,6 +194,35 @@ it("marks a conversation as read", async () => { }) }) +it("marks a conversation as unread", async () => { + const result = await graphql( + schema, + ` + mutation setIsRead($conversationId: ID!, $isRead: Boolean!) { + conversations { + setIsRead(id: $conversationId, isRead: $isRead) { + id + isRead + } + } + } + `, + null, + null, + { conversationId, isRead: false } + ) + expect(result).toEqual({ + data: { + conversations: { + setIsRead: { + id: conversationId, + isRead: false + } + } + } + }) +}) + afterEach(() => { db.prepare("delete from accounts").run() }) diff --git a/packages/main/src/resolvers/conversation.ts b/packages/main/src/resolvers/conversation.ts index 072864ab..60c0d8c0 100644 --- a/packages/main/src/resolvers/conversation.ts +++ b/packages/main/src/resolvers/conversation.ts @@ -55,23 +55,21 @@ export const Conversation: ConversationResolvers = { export const ConversationMutations: ConversationMutationsResolvers = { async setIsRead(_parent, { id, isRead }) { - if (!isRead) { - throw new Error( - "Marking a conversation as unread is not yet implemented." - ) - } const thread = getConversation(id) if (!thread) { throw new Error(`Cannot find conversation with ID, ${id}`) } - setIsRead(thread.messages) + setIsRead(thread.messages, isRead) return thread } } -function setIsRead(messages: cache.Message[]) { +function setIsRead(messages: cache.Message[], isRead: boolean) { const grouped = Seq(messages) - .filter(message => !cache.getFlags(message.id).includes("\\Seen")) + .filter(message => { + const seen = cache.getFlags(message.id).includes("\\Seen") + return isRead ? !seen : seen + }) .groupBy(message => List([message.account_id, message.box_id] as const)) for (const [grouping, msgs] of grouped) { const accountId = grouping.get(0) @@ -81,7 +79,7 @@ function setIsRead(messages: cache.Message[]) { continue } const uids = msgs.map(message => message.uid).filter(nonNull) - if (!uids.isEmpty()) { + if (!uids.isEmpty() && isRead) { queue.enqueue( queue.actions.markAsRead({ accountId: String(accountId), @@ -89,6 +87,14 @@ function setIsRead(messages: cache.Message[]) { uids: uids.valueSeq().toArray() }) ) + } else if (!uids.isEmpty() && !isRead) { + queue.enqueue( + queue.actions.unmarkAsRead({ + accountId: String(accountId), + box, + uids: uids.valueSeq().toArray() + }) + ) } } }