Skip to content

Commit

Permalink
refactor: retry pending members -> concurrent
Browse files Browse the repository at this point in the history
fix: only store groups that the bot can manage
chore: add types to the schema
fix: readd batch member add since they will be checked regularly (having an approved member that isn't for a small period of time is fine)
  • Loading branch information
peterferguson committed Apr 10, 2024
1 parent 098595a commit 14863f0
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 78 deletions.
81 changes: 51 additions & 30 deletions src/actions/add-members.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,32 @@ export async function addMembers(
// ! we have to do this because atm there is no good way to check if the user is on the network &
// ! xmtp doesn't return a list of failed members on creation
try {
// tODO: there is an issue where XMTP will add members who aren't on the network
// TODO: see https://github.com/xmtp/libxmtp/issues/613
const addedMembers = await bot.addMembers(groupId, members as string[]);
approvedMembers.push(...members);
console.log(
`Group ID is ${groupId} -> Added members ${JSON.stringify(addedMembers)}`,
`Created Group with id ${groupId} -> Added members ${members} -> full response ${JSON.stringify(
addedMembers,
null,
2,
)}`,
);
} catch (e) {
// ! if a adding members fails we need to try each of them individually to see which of the members failed

// - add members in parallel (this is slower for small numbers of members but SIGNIFICANTLY faster for large numbers of members)
const addPromises = await Promise.allSettled(
members.map((member) =>
bot.addMembers(groupId, [member]).catch((e) => {
members.map(async (member) => {
try {
await bot.addMembers(groupId, [member]);

console.log("added member", member);
approvedMembers.push(member);
} catch (e) {
console.log("Failed to add member", member, e);
if (
// @ts-ignore
e.info.stderr
.toString()
.includes(
Expand All @@ -55,8 +68,8 @@ export async function addMembers(
throw new MemberAddFailure(member, "existing");
}
throw new MemberAddFailure(member, "pending");
}),
),
}
}),
);

for (const result of addPromises) {
Expand All @@ -71,30 +84,38 @@ export async function addMembers(
}
}

const successfullyAddedMembers = members.filter(
(member) =>
![pendingMembers, approvedMembers].flat().includes(member as Address),
);

if (successfullyAddedMembers.length !== 0)
await db.insert(schema.groupMembers).values(
successfullyAddedMembers.map((memberAddress) => ({
status: "approved" as const,
groupId,
// TODO: once XMTP supports contract wallets update this
chainAwareAddress: `eth:${memberAddress}` satisfies ChainAwareAddress,
})),
);

if (pendingMembers.length !== 0)
await db.insert(schema.groupMembers).values(
pendingMembers.map((memberAddress) => ({
status: "pending" as const,
groupId,
// TODO: once XMTP supports contract wallets update this
chainAwareAddress: `eth:${memberAddress}` satisfies ChainAwareAddress,
})),
);
if (approvedMembers.length !== 0 || pendingMembers.length !== 0)
await db
.insert(schema.groupMembers)
.values([
...approvedMembers.map((memberAddress) => ({
status: "approved" as const,
groupId,
// TODO: once XMTP supports contract wallets update this
chainAwareAddress: `eth:${memberAddress}` satisfies ChainAwareAddress,
})),
...pendingMembers.map((memberAddress) => ({
status: "pending" as const,
groupId,
// TODO: once XMTP supports contract wallets update this
chainAwareAddress: `eth:${memberAddress}` satisfies ChainAwareAddress,
})),
])
.catch((e) => {
console.error(
"Failed to insert members into database",
e,
JSON.stringify(
{
members,
pendingMembers,
approvedMembers,
},
null,
2,
),
);
});

return { pendingMembers, members: successfullyAddedMembers };
return { pendingMembers, members: approvedMembers };
}
40 changes: 0 additions & 40 deletions src/actions/retry-add-pending-members.ts

This file was deleted.

48 changes: 48 additions & 0 deletions src/actions/retry-pending-members.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { and, eq } from "drizzle-orm";
import * as schema from "../db/schema";
import { db } from "../db";
import { bot } from "../lib/xmtp/client";
import type { GroupMember, GroupMemberStatus } from "../db/schema";

export async function retryPendingMembers(groupId?: string) {
// - get the pending members
const pendingMembers = await db.query.groupMembers.findMany({
where: (fields, { eq }) =>
groupId
? and(eq(fields.groupId, groupId), eq(fields.status, "pending"))
: eq(fields.status, "pending"),
});

await Promise.allSettled(pendingMembers.map(retryAddMember));
}

/**
* Retry adding a status `pending` member to the group & update the status if successful
* */
async function retryAddMember({
id,
groupId,
chainAwareAddress,
}: GroupMember): Promise<GroupMemberStatus | undefined> {
const address = chainAwareAddress.split(":").at(-1);
if (!id || !groupId || !address) return undefined;

try {
console.log(`adding ${address} to group ${groupId}`);
await bot.addMembers(groupId, [address]);
await db
.update(schema.groupMembers)
.set({ status: "approved" as const })
.where(
and(
eq(schema.groupMembers.id, id),
eq(schema.groupMembers.chainAwareAddress, chainAwareAddress),
),
);
return "approved";
} catch (e) {
console.error(`failed to add ${address} to group ${groupId}`);
// - no need to update the status as we will retry this on the next run
return "pending";
}
}
17 changes: 17 additions & 0 deletions src/actions/sync-stored-members-with-xmtp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as schema from "../db/schema";
import { db } from "../db";
import { bot } from "../lib/xmtp/client";
import type { ChainAwareAddress } from "../db/schema";
import { getWalletClient } from "../lib/eth/clients";

/**
* Syncs the database member state with the on network XMTP group chat member state
Expand All @@ -24,6 +25,11 @@ export async function syncStoredMembersWithXmtp(groupId?: string) {
});

for (const group of groups) {
// - only sync the members for the provided group if defined
if (groupId && group.group_id !== groupId) {
continue;
}

const storedMembers = members.filter((m) => m.groupId === group.group_id);

if (storedMembers.length === 0) {
Expand All @@ -34,6 +40,17 @@ export async function syncStoredMembersWithXmtp(groupId?: string) {

// - store the group is not found
if (!storedGroup) {
const { walletClient } = getWalletClient();
if (
group.metdata.creator_account_address.toLowerCase() !==
walletClient.account.address.toLowerCase() ||
group.metdata.policy !== "GroupCreatorIsAdmin"
) {
// - this is not a group that we don't manage so we do nothing
// ? maybe we remove this in the future
return;
}

await db.insert(schema.groups).values({
id: group.group_id,
});
Expand Down
22 changes: 20 additions & 2 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { sqliteTable, text, unique } from "drizzle-orm/sqlite-core";
import { relations } from "drizzle-orm";
import {
relations,
type InferInsertModel,
type InferSelectModel,
} from "drizzle-orm";
import { generateUuid7 } from "../lib/uuid";
import type { Uuidv7 } from "../lib/validators";
import type { ChainShortName } from "../lib/eth/eip3770-shortnames";
Expand All @@ -16,7 +20,7 @@ const idField = {
};

/**
* - Views
* - Tables
*/

export const groups = sqliteTable("groups", {
Expand Down Expand Up @@ -67,3 +71,17 @@ export const groupMembersRelations = relations(groupMembers, ({ one }) => ({
references: [groups.id],
}),
}));

/**
* - Types
*/

export type Group = InferSelectModel<typeof groups>;
export type InsertGroup = InferInsertModel<typeof groups>;

export type GroupMember = InferSelectModel<typeof groupMembers>;
export type GroupMemberStatus = InferSelectModel<typeof groupMembers>["status"];
export type InsertGroupMember = InferInsertModel<typeof groupMembers>;

export type GroupWallet = InferSelectModel<typeof groupWallets>;
export type InsertGroupWallet = InferInsertModel<typeof groupWallets>;
10 changes: 5 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
} from "./actions/create-xmtp-group";
import { getGroup } from "./actions/get-group";
import { syncStoredMembersWithXmtp } from "./actions/sync-stored-members-with-xmtp";
import { retryAddPendingMembers } from "./actions/retry-add-pending-members";
import { retryPendingMembers } from "./actions/retry-pending-members";
import {
AddressLiteral,
ChainAwareAddressLiteral,
Expand Down Expand Up @@ -76,7 +76,7 @@ export default new Elysia()
pattern: Patterns.EVERY_5_MINUTES,
async run() {
console.log("retry add pending members");
await retryAddPendingMembers().catch((e) => console.error(e));
await retryPendingMembers().catch((e) => console.error(e));
},
}),
)
Expand Down Expand Up @@ -212,13 +212,13 @@ export default new Elysia()
},
)
.get(
"/retry-add-pending-members",
"/retry-pending-members",
async ({ query: { groupId } }) => {
const pendingMembers = await retryAddPendingMembers(groupId);
const pendingMembers = await retryPendingMembers(groupId);
return JSON.stringify(pendingMembers, null, 4);
},
{
query: t.Object({ groupId: t.String() }),
query: t.Object({ groupId: t.Optional(t.String()) }),
},
)
.post(
Expand Down
2 changes: 1 addition & 1 deletion test/local-api-test.http
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ content-type: application/json
GET http://localhost:8080/bot/sync-members HTTP/1.1

###
GET http://localhost:8080/bot/sync-pending-members HTTP/1.1
GET http://localhost:8080/bot/retry-pending-members HTTP/1.1

###
GET http://localhost:8080/bot/sync-pending-members?groupId=c6894e8dae94456ef19fb2fcf1bbbdec HTTP/1.1
Expand Down

0 comments on commit 14863f0

Please sign in to comment.