Skip to content

Commit

Permalink
feat: add revert missing members to pending on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
peterferguson committed Apr 10, 2024
1 parent 14863f0 commit 9d3924a
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 87 deletions.
8 changes: 7 additions & 1 deletion src/actions/retry-pending-members.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as R from "remeda";
import { and, eq } from "drizzle-orm";
import * as schema from "../db/schema";
import { db } from "../db";
Expand All @@ -13,7 +14,12 @@ export async function retryPendingMembers(groupId?: string) {
: eq(fields.status, "pending"),
});

await Promise.allSettled(pendingMembers.map(retryAddMember));
const batchedPromises = R.chunk(pendingMembers.map(retryAddMember), 10);

for (const batch of batchedPromises) {
// TODO: maybe we should also have a delay?
await Promise.allSettled(batch);
}
}

/**
Expand Down
228 changes: 147 additions & 81 deletions src/actions/sync-stored-members-with-xmtp.ts
Original file line number Diff line number Diff line change
@@ -1,117 +1,183 @@
import { and, eq } from "drizzle-orm";
import * as R from "remeda";
import { and, eq, inArray } from "drizzle-orm";
import * as schema from "../db/schema";
import { db } from "../db";
import { bot } from "../lib/xmtp/client";
import type { ChainAwareAddress } from "../db/schema";
import { getWalletClient } from "../lib/eth/clients";

const { walletClient } = getWalletClient();

/**
* Syncs the database member state with the on network XMTP group chat member state
*
* This function will:
* - Ensure that the xmtp group chats exist in the database if not found
* - Ensure that all chat members are in the database
* - Ensure database members with approved status who are not in the group chat are reverted to pending status
* - Ensure database member status is approved if the user is in the group on XMTP
*
* @param groupId Optionally provide a groupId to only sync members for that group
*/
export async function syncStoredMembersWithXmtp(groupId?: string) {
const groups = await bot.listGroups();
const groupChats = await bot.listGroups().catch((e) => {
console.error("Failed to list groups", e);
});

if (!groups) {
console.log("groupChats", groupChats);
if (!groupChats) {
return;
}

// - ensure that the database member status is approved if the user is in the group on XMTP
const members = await db.query.groupMembers.findMany({
const membersFromDatabase = await db.query.groupMembers.findMany({
...(groupId && {
where: (fields, { eq }) => eq(fields.groupId, groupId),
}),
});

for (const group of groups) {
// - only sync the members for the provided group if defined
if (groupId && group.group_id !== groupId) {
continue;
}

const storedMembers = members.filter((m) => m.groupId === group.group_id);

if (storedMembers.length === 0) {
// - ensure the group exists in the database
const storedGroup = await db.query.groups.findFirst({
where: (fields, { eq }) => eq(fields.id, group.group_id),
});

// - store the group is not found
if (!storedGroup) {
const { walletClient } = getWalletClient();
if (
group.metdata.creator_account_address.toLowerCase() !==
walletClient.account.address.toLowerCase() ||
group.metdata.policy !== "GroupCreatorIsAdmin"
) {
// - this is not a group that we don't manage so we do nothing
// ? maybe we remove this in the future
return;
}

await db.insert(schema.groups).values({
id: group.group_id,
});
// - first separate the groups into missing, stored and unsupported
// - missing groups are groups that are not in the database
// - stored groups are groups that are in the database
// - unsupported groups are groups that are not supported by this function
// - then map the values down onto each of the members so we end up with
// - an array of members with the group id & metadata attached
const { missing: missingGroups, stored: storedGroups } = R.pipe(
groupChats,
R.groupBy((group) => {
console.log(
"groupChats -> ",
group,
"unsupported",
group.metadata.creator_account_address.toLowerCase() !==
walletClient.account.address.toLowerCase() ||
group.metadata.policy !== "GroupCreatorIsAdmin",
);
if (
group.metadata.creator_account_address.toLowerCase() !==
walletClient.account.address.toLowerCase() ||
group.metadata.policy !== "GroupCreatorIsAdmin"
) {
// - this is not a group that we don't manage so we do nothing
// ? maybe we remove this in the future
return "unsupported";
}

// - the user has been added to the chat but is not in the database ... probably only possible if there is a bug
// - but best to add them to the database to be safe

// ? for now we assume that they are an EOA
return !membersFromDatabase.some((m) => m.groupId === group.group_id)
? "missing"
: "stored";
}),
);

await db.insert(schema.groupMembers).values(
group.members.map((address) => ({
status: "approved" as const,
chainAwareAddress: `eth:${address}` satisfies ChainAwareAddress,
groupId: group.group_id,
})),
);
console.log("missingGroups", missingGroups);
console.log("storedGroups", storedGroups);

continue;
}
// - find the set of missing groups and store them in the database
for (const missingGroupId of R.unique(
(missingGroups ?? []).map((group) => group.group_id),
)) {
await db.insert(schema.groups).values({ id: missingGroupId });
}

// - check if the user is in the group chat & has the correct status
for (const memberAddress of group.members) {
const storedMember = storedMembers.find((m) =>
m.chainAwareAddress.endsWith(memberAddress),
);
const missingMembers = R.pipe(
missingGroups ?? [],
R.flatMap((group) =>
group.members.map((address) => ({
groupId: group.group_id,
address,
})),
),
);

if (missingMembers.length !== 0) {
// - store the missing groups and their members in the database with no further checks needed on these groups
// ? for now we assume that they are an EOA
await db.insert(schema.groupMembers).values(
missingMembers.map((member) => ({
status: "approved" as const,
chainAwareAddress: `eth:${member.address}` satisfies ChainAwareAddress,
groupId: member.groupId,
})),
);
}

// - if the user is not in the database then add them
if (!storedMember) {
await db.insert(schema.groupMembers).values(
group.members.map((address) => ({
status: "approved" as const,
chainAwareAddress: `eth:${address}` satisfies ChainAwareAddress,
groupId: group.group_id,
})),
const storedMembersInGroupChat = R.pipe(
storedGroups ?? [],
R.flatMap((group) =>
group.members.map((address) => {
const member = membersFromDatabase.find(
(m) =>
m.groupId === group.group_id &&
m.chainAwareAddress.toLowerCase().endsWith(address.toLowerCase()),
);
const { status, chainAwareAddress } = member ?? {};
return {
groupId: group.group_id,
address,
// biome-ignore lint/style/noNonNullAssertion: we filtered on these members to get here
chainAwareAddress: chainAwareAddress!,
// biome-ignore lint/style/noNonNullAssertion: we filtered on these members to get here
status: status!,
};
}),
),
);

for (const storedMember of storedMembersInGroupChat) {
// - only sync the already stored members for the provided group if defined
if (groupId && storedMember.groupId !== groupId) {
continue;
}

// - if the user is in the group chat but is not approved then approve them
switch (storedMember.status) {
case "approved": {
// - do nothing they are already approved
continue;
}

// - if the user is in the group chat but is not approved then approve them
switch (storedMember.status) {
case "approved": {
// - do nothing they are already approved
continue;
}
default:
await db
.update(schema.groupMembers)
.set({ status: "approved" as const })
.where(
and(
eq(schema.groupMembers.groupId, group.group_id),
eq(
schema.groupMembers.chainAwareAddress,
storedMember.chainAwareAddress,
),
default:
await db
.update(schema.groupMembers)
.set({ status: "approved" as const })
.where(
and(
eq(schema.groupMembers.groupId, storedMember.groupId),
eq(
schema.groupMembers.chainAwareAddress,
storedMember.chainAwareAddress,
),
);
break;
}
),
);
break;
}

// - for members that are in the database but not in the group chat we should revert them to pending status

const storedMembersThatAreNotInGroupChat = R.pipe(
membersFromDatabase,
R.filter((m) => {
// - if the user is in the chat then we don't want to revert them to pending
const groupChat = groupChats.find(
({ group_id: id, members }) =>
id === m.groupId &&
members.some((address) =>
m.chainAwareAddress.toLowerCase().endsWith(address.toLowerCase()),
),
);
if (groupChat) return false;
return true;
}),
);

await db
.update(schema.groupMembers)
.set({ status: "pending" as const })
.where(
inArray(
schema.groupMembers.id,
// biome-ignore lint/style/noNonNullAssertion: these are assigned on insert and so shouldn't be null
storedMembersThatAreNotInGroupChat.map((m) => m.id!),
),
);
}
}
3 changes: 0 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ export default new Elysia()
.get("/", async () => {
if (process.env.NODE_ENV === "development") {
console.log("groups", await bot.listGroups());
await bot
.send("5eb5b1fa27adc585a75cdedd6a1d4d5d", "Hello")
.catch(console.error);
}

return "Onit XMTP bot 🤖";
Expand Down
2 changes: 1 addition & 1 deletion src/lib/xmtp/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export async function createClient(dbPath: string) {
groups: {
group_id: string;
members: Address[];
metdata: {
metadata: {
creator_account_address: Address;
policy: "GroupCreatorIsAdmin" | "EveryoneIsAdmin";
};
Expand Down
2 changes: 1 addition & 1 deletion test/local-api-test.http
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ GET http://localhost:8080/group/4cf971dcb124e9e57fad653cdc6242f1/wallets HTTP/1.


###
GET http://localhost:8080/0x524bF2086D4b5BBdA06f4c16Ec36f06AAd4E1Cad HTTP/1.1
GET http://localhost:8080/wallet/0x524bF2086D4b5BBdA06f4c16Ec36f06AAd4E1Cad HTTP/1.1

###
GET http://localhost:8080/group/ed4ac987e6556dc8802bb7d0fce39d1d/link-wallet/basesep:0xaC03aD602D6786e7E87566192b48e30666e327Ad HTTP/1.1
Expand Down

0 comments on commit 9d3924a

Please sign in to comment.