diff --git a/apps/web/app/(app)/automation/BulkRunRules.tsx b/apps/web/app/(app)/automation/BulkRunRules.tsx
index 4ea32cf3a..d373999e5 100644
--- a/apps/web/app/(app)/automation/BulkRunRules.tsx
+++ b/apps/web/app/(app)/automation/BulkRunRules.tsx
@@ -10,7 +10,7 @@ import { SectionDescription } from "@/components/Typography";
import type { ThreadsResponse } from "@/app/api/google/threads/controller";
import type { ThreadsQuery } from "@/app/api/google/threads/validation";
import { LoadingContent } from "@/components/LoadingContent";
-import { runAiRules } from "@/providers/QueueProvider";
+import { runAiRules } from "@/utils/queue/email-actions";
import { aiQueueAtom } from "@/store/queue";
import { sleep } from "@/utils/sleep";
import { PremiumAlertWithData, usePremium } from "@/components/PremiumAlert";
diff --git a/apps/web/app/(app)/bulk-unsubscribe/ArchiveProgress.tsx b/apps/web/app/(app)/bulk-unsubscribe/ArchiveProgress.tsx
new file mode 100644
index 000000000..c33ab6efa
--- /dev/null
+++ b/apps/web/app/(app)/bulk-unsubscribe/ArchiveProgress.tsx
@@ -0,0 +1,58 @@
+import { useEffect } from "react";
+import { AnimatePresence, motion } from "framer-motion";
+import { useAtomValue } from "jotai";
+import { ProgressBar } from "@tremor/react";
+import { queueAtoms, resetTotalThreads } from "@/store/archive-queue";
+import { cn } from "@/utils";
+
+export const ArchiveProgress = () => {
+ const { totalThreads, activeThreadIds } = useAtomValue(queueAtoms.archive);
+
+ const threadsRemaining =
+ Object.values(activeThreadIds).filter(Boolean).length;
+ const totalArchived = totalThreads - threadsRemaining;
+ const progress = (totalArchived / totalThreads) * 100;
+ const isCompleted = progress === 100;
+
+ useEffect(() => {
+ if (isCompleted) {
+ setTimeout(() => {
+ resetTotalThreads("archive");
+ }, 5_000);
+ }
+ }, [isCompleted]);
+
+ if (!totalThreads) return null;
+
+ return (
+
diff --git a/apps/web/app/(app)/bulk-unsubscribe/hooks.ts b/apps/web/app/(app)/bulk-unsubscribe/hooks.ts
index 2b288d3d4..23d13501e 100644
--- a/apps/web/app/(app)/bulk-unsubscribe/hooks.ts
+++ b/apps/web/app/(app)/bulk-unsubscribe/hooks.ts
@@ -12,7 +12,7 @@ import { captureException } from "@/utils/error";
import {
archiveAllSenderEmails,
deleteEmails,
-} from "@/providers/QueueProvider";
+} from "@/utils/queue/email-actions";
import type { Row } from "@/app/(app)/bulk-unsubscribe/types";
import type { GetThreadsResponse } from "@/app/api/google/threads/basic/route";
import { isDefined } from "@/utils/types";
@@ -524,9 +524,9 @@ export function useNewsletterFilter() {
Record<"unhandled" | "unsubscribed" | "autoArchived" | "approved", boolean>
>({
unhandled: true,
- unsubscribed: false,
- autoArchived: false,
- approved: false,
+ unsubscribed: true,
+ autoArchived: true,
+ approved: true,
});
return {
diff --git a/apps/web/app/(app)/simple/SimpleList.tsx b/apps/web/app/(app)/simple/SimpleList.tsx
index e4e01ba81..c79b45c8b 100644
--- a/apps/web/app/(app)/simple/SimpleList.tsx
+++ b/apps/web/app/(app)/simple/SimpleList.tsx
@@ -17,7 +17,7 @@ import { Button as HoverButton } from "@/components/Button";
import { extractNameFromEmail } from "@/utils/email";
import { Tooltip } from "@/components/Tooltip";
import type { ParsedMessage } from "@/utils/types";
-import { archiveEmails } from "@/providers/QueueProvider";
+import { archiveEmails } from "@/utils/queue/email-actions";
import { Summary } from "@/app/(app)/simple/Summary";
import { getGmailUrl } from "@/utils/url";
import {
diff --git a/apps/web/components/CommandK.tsx b/apps/web/components/CommandK.tsx
index 49003216f..cdf67c5a2 100644
--- a/apps/web/components/CommandK.tsx
+++ b/apps/web/components/CommandK.tsx
@@ -16,7 +16,7 @@ import {
import { navigation } from "@/components/SideNav";
import { useComposeModal } from "@/providers/ComposeModalProvider";
import { refetchEmailListAtom, selectedEmailAtom } from "@/store/email";
-import { archiveEmails } from "@/providers/QueueProvider";
+import { archiveEmails } from "@/utils/queue/email-actions";
export function CommandK() {
const [open, setOpen] = React.useState(false);
diff --git a/apps/web/components/email-list/EmailList.tsx b/apps/web/components/email-list/EmailList.tsx
index 1922eceb9..80a096561 100644
--- a/apps/web/components/email-list/EmailList.tsx
+++ b/apps/web/components/email-list/EmailList.tsx
@@ -30,7 +30,7 @@ import {
deleteEmails,
markReadThreads,
runAiRules,
-} from "@/providers/QueueProvider";
+} from "@/utils/queue/email-actions";
import { selectedEmailAtom } from "@/store/email";
import { categorizeAction } from "@/utils/actions/categorize";
diff --git a/apps/web/providers/AppProviders.tsx b/apps/web/providers/AppProviders.tsx
index 9284a9eaa..8938a9b0c 100644
--- a/apps/web/providers/AppProviders.tsx
+++ b/apps/web/providers/AppProviders.tsx
@@ -1,15 +1,12 @@
import type React from "react";
import { Provider } from "jotai";
import { ComposeModalProvider } from "@/providers/ComposeModalProvider";
-import { QueueProvider } from "@/providers/QueueProvider";
import { jotaiStore } from "@/store";
export function AppProviders(props: { children: React.ReactNode }) {
return (
-
- {props.children}
-
+ {props.children}
);
}
diff --git a/apps/web/providers/QueueProvider.tsx b/apps/web/providers/QueueProvider.tsx
deleted file mode 100644
index f604e814e..000000000
--- a/apps/web/providers/QueueProvider.tsx
+++ /dev/null
@@ -1,198 +0,0 @@
-"use client";
-
-import { useEffect } from "react";
-import PQueue from "p-queue";
-import { runRulesAction } from "@/utils/actions/ai-rule";
-import {
- archiveThreadAction,
- markReadThreadAction,
- trashThreadAction,
-} from "@/utils/actions/mail";
-import type { EmailForAction } from "@/utils/ai/actions";
-import { pushToAiQueueAtom, removeFromAiQueueAtom } from "@/store/queue";
-import type { Thread } from "@/components/email-list/types";
-import type { GetThreadsResponse } from "@/app/api/google/threads/basic/route";
-import { isDefined } from "@/utils/types";
-
-const queue = new PQueue({ concurrency: 3 });
-
-type QueueNameLocalStorage =
- | "archiveQueue"
- | "deleteQueue"
- | "markReadQueue"
- | "aiRuleQueue";
-
-export const archiveEmails = async (
- threadIds: string[],
- refetch: () => void,
-) => {
- updateQueueStorage("archiveQueue", threadIds, "pending");
-
- queue.addAll(
- threadIds.map((threadId) => async () => {
- await archiveThreadAction(threadId);
- updateQueueStorage("archiveQueue", [threadId], "complete");
- refetch();
- }),
- );
-};
-
-export const archiveAllSenderEmails = async (
- from: string,
- onComplete: () => void,
-) => {
- // 1. search gmail for messages from sender
- const res = await fetch(
- `/api/google/threads/basic?from=${from}&labelId=INBOX`,
- );
- const data: GetThreadsResponse = await res.json();
-
- // 2. archive messages
- if (data?.length) {
- archiveEmails(data.map((t) => t.id).filter(isDefined), onComplete);
- } else {
- onComplete();
- }
-
- return data;
-};
-
-export const markReadThreads = async (
- threadIds: string[],
- refetch: () => void,
-) => {
- queue.addAll(
- threadIds.map((threadId) => async () => {
- await markReadThreadAction(threadId, true);
- refetch();
- }),
- );
-};
-
-export const deleteEmails = async (
- threadIds: string[],
- refetch: () => void,
-) => {
- queue.addAll(
- threadIds.map((threadId) => async () => {
- await trashThreadAction(threadId);
- refetch();
- }),
- );
-};
-
-export const runAiRules = async (
- threads: Thread[],
- force: boolean,
- // refetch: () => void,
-) => {
- // updateRunAiQueueStorage(threads, "pending");
-
- pushToAiQueueAtom(threads.map((t) => t.id));
-
- queue.addAll(
- threads.map((thread) => async () => {
- const message = threadToRunRulesEmail(thread);
- if (!message) return;
- console.log("runRulesAction", message.threadId);
- const result = await runRulesAction(message, force);
- console.log("result", result);
- removeFromAiQueueAtom(thread.id);
- // updateRunAiQueueStorage([thread], "complete");
- // refetch();
- }),
- );
-};
-
-function threadToRunRulesEmail(thread: Thread): EmailForAction | undefined {
- const message = thread.messages?.[thread.messages.length - 1];
- if (!message) return;
- const email: EmailForAction = {
- from: message.headers.from,
- // to: message.headers.to,
- // date: message.headers.date,
- replyTo: message.headers["reply-to"],
- // cc: message.headers.cc,
- subject: message.headers.subject,
- // textPlain: message.textPlain || null,
- // textHtml: message.textHtml || null,
- // snippet: thread.snippet,
- threadId: message.threadId || "",
- messageId: message.id || "",
- headerMessageId: message.headers["message-id"] || "",
- references: message.headers.references,
- };
-
- return email;
-}
-
-export function QueueProvider({ children }: { children: React.ReactNode }) {
- useEffect(() => {
- const pendingArchive = getPendingEmails("archiveQueue");
- if (pendingArchive) archiveEmails(pendingArchive, () => {});
-
- const pendingMarkRead = getPendingEmails("markReadQueue");
- if (pendingMarkRead) markReadThreads(pendingMarkRead, () => {});
-
- const pendingDelete = getPendingEmails("deleteQueue");
- if (pendingDelete) deleteEmails(pendingDelete, () => {});
-
- // TODO revisit this to make it's working as intended
- // const pendingAi = getPendingEmails("aiRuleQueue");
- // if (pendingAi) runAiRules(pendingAi);
- }, []);
-
- return <>{children}>;
-}
-
-function updateQueueStorage(
- name: QueueNameLocalStorage,
- threadIds: string[],
- state: "pending" | "complete",
-) {
- const currentStateString = localStorage.getItem(name);
-
- if (currentStateString) {
- const currentState: string[] = JSON.parse(currentStateString);
- const updatedState: string[] =
- state === "pending"
- ? Array.from(new Set([...currentState, ...threadIds]))
- : currentState.filter((id: string) => !threadIds.includes(id));
- localStorage.setItem(name, JSON.stringify(updatedState));
- } else {
- return localStorage.setItem(name, JSON.stringify(threadIds));
- }
-}
-
-// Copy and paste of the above. Might be able to refactor to use a generic
-// function updateRunAiQueueStorage(
-// threads: EmailForAction[],
-// state: "pending" | "complete",
-// ) {
-// const name: QueueNameLocalStorage = "aiRuleQueue";
-// const currentStateString = localStorage.getItem(name);
-
-// if (currentStateString) {
-// const currentState: EmailForAction[] =
-// JSON.parse(currentStateString);
-// const updatedState: EmailForAction[] =
-// state === "pending"
-// ? uniqBy([...currentState, ...threads], (t) => t.threadId)
-// : currentState.filter(
-// ({ threadId }) => !threads.find((t) => t.threadId === threadId),
-// );
-// localStorage.setItem(name, JSON.stringify(updatedState));
-// } else {
-// return localStorage.setItem(name, JSON.stringify(threads));
-// }
-// }
-
-function getPendingEmails(name: QueueNameLocalStorage) {
- const currentStateString = localStorage.getItem(name);
- if (!currentStateString) return;
-
- const currentState = JSON.parse(currentStateString);
- if (!currentState.length) return;
-
- return currentState;
-}
diff --git a/apps/web/store/archive-queue.ts b/apps/web/store/archive-queue.ts
new file mode 100644
index 000000000..cffc96d8e
--- /dev/null
+++ b/apps/web/store/archive-queue.ts
@@ -0,0 +1,79 @@
+"use client";
+
+import { atomWithStorage } from "jotai/utils";
+import { jotaiStore } from "@/store";
+import { emailActionQueue } from "@/utils/queue/email-action-queue";
+import {
+ archiveThreadAction,
+ trashThreadAction,
+ markReadThreadAction,
+} from "@/utils/actions/mail";
+
+type QueueType = "archive" | "delete" | "markRead";
+
+type QueueState = {
+ activeThreadIds: Record
;
+ totalThreads: number;
+};
+
+const initialQueueState: QueueState = {
+ activeThreadIds: {},
+ totalThreads: 0,
+};
+
+// Create atoms with localStorage persistence for each queue type
+export const queueAtoms = {
+ archive: atomWithStorage("archiveQueue", initialQueueState),
+ delete: atomWithStorage("deleteQueue", initialQueueState),
+ markRead: atomWithStorage("markReadQueue", initialQueueState),
+};
+
+type ActionFunction = (threadId: string, ...args: any[]) => Promise;
+
+const actionMap: Record = {
+ archive: archiveThreadAction,
+ delete: trashThreadAction,
+ markRead: (threadId: string) => markReadThreadAction(threadId, true),
+};
+
+export const addThreadsToQueue = (
+ queueType: QueueType,
+ threadIds: string[],
+ refetch?: () => void,
+) => {
+ const queueAtom = queueAtoms[queueType];
+ const action = actionMap[queueType];
+
+ jotaiStore.set(queueAtom, (prev) => ({
+ activeThreadIds: {
+ ...prev.activeThreadIds,
+ ...Object.fromEntries(threadIds.map((id) => [id, true])),
+ },
+ totalThreads: prev.totalThreads + threadIds.length,
+ }));
+
+ emailActionQueue.addAll(
+ threadIds.map((threadId) => async () => {
+ await action(threadId);
+
+ // remove completed thread from activeThreadIds
+ jotaiStore.set(queueAtom, (prev) => {
+ const { [threadId]: _, ...remainingThreads } = prev.activeThreadIds;
+ return {
+ ...prev,
+ activeThreadIds: remainingThreads,
+ };
+ });
+
+ refetch?.();
+ }),
+ );
+};
+
+export const resetTotalThreads = (queueType: QueueType) => {
+ const queueAtom = queueAtoms[queueType];
+ jotaiStore.set(queueAtom, (prev) => ({
+ ...prev,
+ totalThreads: 0,
+ }));
+};
diff --git a/apps/web/utils/queue/email-action-queue.ts b/apps/web/utils/queue/email-action-queue.ts
new file mode 100644
index 000000000..5b9471107
--- /dev/null
+++ b/apps/web/utils/queue/email-action-queue.ts
@@ -0,0 +1,6 @@
+"use client";
+
+import PQueue from "p-queue";
+
+// Avoid overwhelming Gmail API
+export const emailActionQueue = new PQueue({ concurrency: 3 });
diff --git a/apps/web/utils/queue/email-actions.ts b/apps/web/utils/queue/email-actions.ts
new file mode 100644
index 000000000..ef290c39c
--- /dev/null
+++ b/apps/web/utils/queue/email-actions.ts
@@ -0,0 +1,88 @@
+"use client";
+
+import { runRulesAction } from "@/utils/actions/ai-rule";
+import type { EmailForAction } from "@/utils/ai/actions";
+import { pushToAiQueueAtom, removeFromAiQueueAtom } from "@/store/queue";
+import { addThreadsToQueue } from "@/store/archive-queue";
+import type { Thread } from "@/components/email-list/types";
+import type { GetThreadsResponse } from "@/app/api/google/threads/basic/route";
+import { isDefined } from "@/utils/types";
+import { emailActionQueue } from "@/utils/queue/email-action-queue";
+
+export const archiveEmails = async (
+ threadIds: string[],
+ refetch?: () => void,
+) => {
+ addThreadsToQueue("archive", threadIds, refetch);
+};
+
+export const markReadThreads = async (
+ threadIds: string[],
+ refetch: () => void,
+) => {
+ addThreadsToQueue("markRead", threadIds, refetch);
+};
+
+export const deleteEmails = async (
+ threadIds: string[],
+ refetch: () => void,
+) => {
+ addThreadsToQueue("delete", threadIds, refetch);
+};
+
+export const archiveAllSenderEmails = async (
+ from: string,
+ onComplete: () => void,
+) => {
+ try {
+ // 1. search gmail for messages from sender
+ const url = `/api/google/threads/basic?from=${from}&labelId=INBOX`;
+ const res = await fetch(url);
+
+ if (!res.ok) throw new Error(`HTTP error! status: ${res.status}`);
+
+ const data: GetThreadsResponse = await res.json();
+
+ // 2. archive messages
+ if (data?.length) {
+ archiveEmails(data.map((t) => t.id).filter(isDefined), onComplete);
+ } else {
+ onComplete();
+ }
+
+ return data;
+ } catch (error) {
+ console.error("Failed to fetch or archive emails:", error);
+ onComplete(); // Call onComplete even if there's an error
+ return []; // Return an empty array in case of error
+ }
+};
+
+export const runAiRules = async (threads: Thread[], force: boolean) => {
+ pushToAiQueueAtom(threads.map((t) => t.id));
+
+ emailActionQueue.addAll(
+ threads.map((thread) => async () => {
+ const message = threadToRunRulesEmail(thread);
+ if (!message) return;
+ await runRulesAction(message, force);
+ removeFromAiQueueAtom(thread.id);
+ }),
+ );
+};
+
+function threadToRunRulesEmail(thread: Thread): EmailForAction | undefined {
+ const message = thread.messages?.[thread.messages.length - 1];
+ if (!message) return;
+ const email: EmailForAction = {
+ from: message.headers.from,
+ replyTo: message.headers["reply-to"],
+ subject: message.headers.subject,
+ threadId: message.threadId || "",
+ messageId: message.id || "",
+ headerMessageId: message.headers["message-id"] || "",
+ references: message.headers.references,
+ };
+
+ return email;
+}