-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbluesky.ts
149 lines (142 loc) · 5.34 KB
/
bluesky.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import { AppBskyFeedGetAuthorFeed, BskyAgent } from "@atproto/api";
import { PostView } from "@atproto/api/dist/client/types/app/bsky/feed/defs";
import { ServiceItem } from "../common/ServiceItem.js";
import { NotionEnv } from "../notion/Notion.js";
import { createLogger } from "../common/logger.js";
const logger = createLogger("Bluesky");
export type BlueSkyEnv = {
bluesky_identifier: string;
bluesky_app_password: string;
} & NotionEnv;
export const BlueskyType = "Bluesky";
export const isBlueSkyEnv = (env: unknown): env is BlueSkyEnv => {
return (env as BlueSkyEnv).bluesky_identifier !== undefined && (env as BlueSkyEnv).bluesky_app_password !== undefined;
}
const convertHttpUrlFromAtProto = (url: string): string => {
const match = url.match(/at:\/\/(did:plc:.*?)\/app.bsky.feed.post\/(.*)/);
if (match === null) {
throw new Error(`post.uri is invalid: ${url}`);
}
const did = match[1];
const contentId = match[2];
return `https://bsky.app/profile/${did}/post/${contentId}`
}
const getRootPost = (post: PostView): { url: string; } | undefined => {
// @ts-expect-error no reply type
if (!post.record?.reply?.root) {
return undefined;
}
// @ts-expect-error no reply type
const url = convertHttpUrlFromAtProto(post.record.reply.root.uri);
return {
url,
}
}
// Issue: https://github.com/bluesky-social/atproto/issues/910
export const convertPostToServiceIr = (post: PostView, identifier: string): ServiceItem => {
const record = post.record as { text?: string };
if (typeof record.text !== "string") {
throw new Error("post.record.text is not string");
}
// if post is reply, get root post
const rootPost = getRootPost(post);
return {
type: BlueskyType,
// at://did:plc:niluiwex7fsnjak2wxs4j47y/app.bsky.feed.post/3jz3xglxhzu27@@azu.bsky.social
title: record.text,
url: convertHttpUrlFromAtProto(post.uri),
unixTimeMs: new Date(post.indexedAt).getTime(),
// if the reply is self post
...(rootPost ? {
parent: {
url: rootPost.url
},
} : {})
};
};
type Feed = AppBskyFeedGetAuthorFeed.Response["data"]["feed"];
export const collectTweetsUntil = async (timeline: ServiceItem[], lastTweet: ServiceItem): Promise<ServiceItem[]> => {
const results: ServiceItem[] = [];
try {
for (const tweet of timeline) {
if (lastTweet.url === tweet.url) {
return results;
}
if (lastTweet.unixTimeMs < tweet.unixTimeMs) {
results.push(tweet);
} else {
return results;
}
}
} catch (error) {
logger.error(new Error("collect error", {
cause: error,
}));
throw new Error("collect error at bluesky");
}
return results;
};
export async function fetchBluesky(env: BlueSkyEnv, lastServiceItem: ServiceItem | null): Promise<ServiceItem[]> {
const agent = new BskyAgent({
service: "https://bsky.social"
});
if (!env.bluesky_identifier || !env.bluesky_app_password) {
throw new Error("bluesky_identifier or bluesky_app_password is not set");
}
await agent.login({
identifier: env.bluesky_identifier,
password: env.bluesky_app_password
});
type FetchAuthorFeedParams = {
actor: string;
feed: Feed;
cursor?: string;
};
const fetchAuthorFeed = async ({ actor, feed, cursor }: FetchAuthorFeedParams): Promise<Feed> => {
try {
const timeline = await agent.getAuthorFeed({
actor,
limit: 50,
cursor
});
if (timeline.success) {
// if found older tweet than lasttweet , stop fetching
const latestPost = timeline.data.feed.at(-1);
if (lastServiceItem && latestPost) {
const lastItemDate = new Date(latestPost?.post?.indexedAt ?? "");
if (lastItemDate.getTime() < lastServiceItem.unixTimeMs) {
return [...feed, ...timeline.data.feed];
}
}
return [...feed, ...timeline.data.feed];
} else {
throw new Error("timeline fetch error:" + JSON.stringify(timeline.data));
}
} catch (error) {
logger.debug("fetch error", {
// @ts-ignore
status: error.status,
// @ts-ignore
code: error.code,
});
throw error;
}
};
logger.info("fetching from bluesky since %s", lastServiceItem?.unixTimeMs !== undefined
? new Date(lastServiceItem.unixTimeMs).toISOString()
: "first");
const feed = await fetchAuthorFeed({
actor: env.bluesky_identifier,
feed: []
});
const convertedPosts = feed.map((post) => {
return convertPostToServiceIr(post.post, env.bluesky_identifier);
})
const sortedPosts = convertedPosts.sort((a, b) => {
return a.unixTimeMs > b.unixTimeMs ? -1 : 1;
});
logger.info("fetched item count", sortedPosts.length);
const postItems = lastServiceItem ? await collectTweetsUntil(sortedPosts, lastServiceItem) : sortedPosts;
logger.info("post-able items count", postItems.length);
return postItems;
}