Skip to content

Commit

Permalink
feat: add backend capability for marking conversation as unread
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse Hallett committed May 16, 2019
1 parent 12401e9 commit bd0bbd6
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 20 deletions.
31 changes: 30 additions & 1 deletion packages/main/src/cache/persist.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,41 @@ export function addFlag({
join boxes on box_id = boxes.id
where
messages.account_id = @accountId
and messages.uid in (${uids.join(", ")})
and boxes.name = @boxName
and messages.uid in (${uids.join(", ")})
`
).run({ accountId, boxName: box.name, flag })
}

// TODO: What is the proper way to provide a list of values in a query?
export function delFlags({
accountId,
box,
uids,
flags
}: {
accountId: ID
box: { name: string }
uids: number[]
flags: string[]
}) {
db.prepare(
`
delete from message_flags
where
message_id in (
select messages.id from messages
join boxes on box_id = boxes.id
where
messages.account_id = @accountId
and boxes.name = @boxName
and uid in (${uids.join(", ")})
)
and flag in (${flags.map(f => `'${f}'`).join(", ")})
`
).run({ accountId, boxName: box.name })
}

function insertInto(table: string, values: Record<string, unknown>): ID {
const keys = Object.keys(values)
const { lastInsertRowid } = db
Expand Down
2 changes: 1 addition & 1 deletion packages/main/src/queue/better-queue-better-sqlite3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export default class SqliteStore<T> implements BetterQueue.Store<T> {
)
.get({ taskId, lock: "" })
if (row == null) {
return null
return undefined // Failure result must be `undefined`, not `null`!
}
return JSON.parse(row.task)
})
Expand Down
75 changes: 74 additions & 1 deletion packages/main/src/queue/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,80 @@ it("marks a conversation as read", async () => {
)
})

it.skip("replaces pending read status change when a new change is queued", async () => {})
it("marks a conversation as unread", async () => {
const connectionManager = mockConnection()
await sync(accountId, connectionManager)

const promise = queue
.enqueue(
queue.actions.unmarkAsRead({
accountId: String(accountId),
box: inbox,
uids: [7687]
})
)
.toPromise()
const flags = db
.prepare(
`
select flag from message_flags
join messages on message_id = messages.id
where
uid = @uid
`
)
.all({ uid: 7687 })
expect(flags).toEqual([])
await promise
expect(Connection.prototype.delFlags).toHaveBeenCalledWith(
[7687],
["\\Seen"],
expect.any(Function)
)
})

it.skip("replaces pending read status change when a new change is queued", async () => {
// pause queue
const connectionManager = AccountManager.connectionManagers[String(accountId)]
delete AccountManager.connectionManagers[String(accountId)]

const promise1 = queue
.enqueue(
queue.actions.markAsRead({
accountId: String(accountId),
box: inbox,
uids: [7687]
})
)
.toPromise()
const promise2 = queue
.enqueue(
queue.actions.unmarkAsRead({
accountId: String(accountId),
box: inbox,
uids: [7687]
})
)
.toPromise()

// resume queue
AccountManager.connectionManagers[String(accountId)] = connectionManager

queue.queue.resume()
const flags = db
.prepare(
`
select flag from message_flags
join messages on message_id = messages.id
where
uid = @uid
`
)
.all({ uid: 7687 })
expect(flags).toEqual([])
await Promise.all([promise1, promise2])
expect(Connection.prototype.addFlags).not.toHaveBeenCalled()
})

afterEach(() => {
db.prepare("delete from accounts").run()
Expand Down
41 changes: 33 additions & 8 deletions packages/main/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,37 @@ export const { actions, actionTypes, perform } = combineHandlers({
])
)
)
},

unmarkAsRead(
_context: unknown,
{
accountId,
box,
uids
}: { accountId: ID; box: { name: string }; uids: number[] }
): R<void> {
return withConnectionManager(accountId, connectionManager =>
connectionManager.request(
request.actions.delFlags({ name: box.name, readonly: false }, uids, [
"\\Seen"
])
)
)
}
})

type Task = ActionTypes<typeof actions>

export function enqueue<T extends Task>(action: T): R<ActionResult<T>> {
export function enqueue(action: Task): R<ActionResult<typeof action>> {
if (action.type === actionTypes.markAsRead) {
cache.addFlag({ ...payload(action), flag: "\\Seen" })
} else if (action.type === actionTypes.unmarkAsRead) {
cache.delFlags({ ...payload(action), flags: ["\\Seen"] })
}
const result = promises.lift1<ActionResult<T>>(cb => queue.push(action, cb))
const result = promises.lift1<ActionResult<typeof action>>(cb =>
queue.push(action, cb)
)
return kefir.fromPromise(result)
}

Expand All @@ -66,7 +87,7 @@ function processTask(
cb: ((error: Error) => void) &
((error: null, result: ActionResult<typeof task>) => void)
) {
perform(undefined, task).observe({
perform(1, task).observe({
value(v) {
cb(null, v)
},
Expand All @@ -86,11 +107,15 @@ const store =
path: getDbPath()
})

const queue = new BetterQueue<Task>(processTask, {
maxRetries: 3,
maxTimeout: 5000,
retryDelay: 5000,
store
export const queue = new BetterQueue<Task>(processTask, {
maxRetries: Infinity, // keep retrying until we come online
maxTimeout: process.env.NODE_ENV === "test" ? 150 : 10000,
retryDelay: process.env.NODE_ENV === "test" ? 1 : 10000,
store,
merge(_oldTask, newTask, cb) {
console.log("merge", _oldTask, newTask)
cb(null, newTask)
}
})

export function getQueuedTasks(): Array<Action<unknown>> {
Expand Down
14 changes: 14 additions & 0 deletions packages/main/src/request/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ export const { actions, perform } = combineHandlers({
})
},

delFlags(
connection: Connection,
box: BoxSpecifier,
source: imap.MessageSource,
flags: string[]
): R<void> {
return withBox(connection, box, () => {
const result = promises.lift0(cb =>
connection.delFlags(source, flags, cb)
)
return kefir.fromPromise(result)
})
},

end(connection: Connection): R<undefined> {
connection.end()
return kefir.constant(undefined)
Expand Down
5 changes: 5 additions & 0 deletions packages/main/src/request/testHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ export function mockConnection({
cb(null as any)
}
)
mock(Connection.prototype.delFlags).mockImplementation(
(_source, _flags, cb) => {
cb(null as any)
}
)
mock(Connection.prototype.getBoxes).mockImplementation((cb: any) => {
cb(null, boxes)
})
Expand Down
29 changes: 29 additions & 0 deletions packages/main/src/resolvers/conversation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ it("marks a conversation as read", async () => {
})
})

it("marks a conversation as unread", async () => {
const result = await graphql(
schema,
`
mutation setIsRead($conversationId: ID!, $isRead: Boolean!) {
conversations {
setIsRead(id: $conversationId, isRead: $isRead) {
id
isRead
}
}
}
`,
null,
null,
{ conversationId, isRead: false }
)
expect(result).toEqual({
data: {
conversations: {
setIsRead: {
id: conversationId,
isRead: false
}
}
}
})
})

afterEach(() => {
db.prepare("delete from accounts").run()
})
24 changes: 15 additions & 9 deletions packages/main/src/resolvers/conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,21 @@ export const Conversation: ConversationResolvers = {

export const ConversationMutations: ConversationMutationsResolvers = {
async setIsRead(_parent, { id, isRead }) {
if (!isRead) {
throw new Error(
"Marking a conversation as unread is not yet implemented."
)
}
const thread = getConversation(id)
if (!thread) {
throw new Error(`Cannot find conversation with ID, ${id}`)
}
setIsRead(thread.messages)
setIsRead(thread.messages, isRead)
return thread
}
}

function setIsRead(messages: cache.Message[]) {
function setIsRead(messages: cache.Message[], isRead: boolean) {
const grouped = Seq(messages)
.filter(message => !cache.getFlags(message.id).includes("\\Seen"))
.filter(message => {
const seen = cache.getFlags(message.id).includes("\\Seen")
return isRead ? !seen : seen
})
.groupBy(message => List([message.account_id, message.box_id] as const))
for (const [grouping, msgs] of grouped) {
const accountId = grouping.get(0)
Expand All @@ -81,14 +79,22 @@ function setIsRead(messages: cache.Message[]) {
continue
}
const uids = msgs.map(message => message.uid).filter(nonNull)
if (!uids.isEmpty()) {
if (!uids.isEmpty() && isRead) {
queue.enqueue(
queue.actions.markAsRead({
accountId: String(accountId),
box,
uids: uids.valueSeq().toArray()
})
)
} else if (!uids.isEmpty() && !isRead) {
queue.enqueue(
queue.actions.unmarkAsRead({
accountId: String(accountId),
box,
uids: uids.valueSeq().toArray()
})
)
}
}
}
Expand Down

0 comments on commit bd0bbd6

Please sign in to comment.