Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse hashtags for Bluesky #515

Merged
merged 7 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions bc/channel/utils/connectors/bluesky_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,35 @@ def get_current_time_iso(self) -> str:
"""Get current time in Server Timezone (UTC) and ISO format."""
return datetime.now(timezone.utc).isoformat()

def _parse_tags(self, text: str) -> list[RegexMatch]:
"""
Parses hashtags from text.

This helper function takes a string as input and attempts to extract
hashtags from it. If any hashtags are found, they are appended to a
list of hashtags. If no hashtags are found, an empty list is returned.

Args:
text (str): The text to parse.

Returns:
list[RegexMatch]: List of matches.
"""
spans = []
# reference: https://github.com/bluesky-social/atproto/blob/fbc7e75c402e0c268e7e411353968985eeb4bb06/packages/api/src/rich-text/util.ts#L10
# given that our needs of a hashtag is very simple, we can do away with
# only parsing alphanumeric characters
tag_regex = r"(?:^|\s)#(?P<tag>[0-9]*[a-zA-Z][a-zA-Z0-9]*)"
for m in re.finditer(tag_regex, text):
spans.append(
RegexMatch(
start=m.start("tag") - 1,
end=m.end("tag"),
text=m.group("tag"),
)
)
return spans

def _parse_urls(self, text: str) -> list[RegexMatch]:
"""
Parses a URL from text.
Expand Down Expand Up @@ -229,6 +258,21 @@ def _parse_text_facets(self, text) -> list[TextAnnotation]:
],
}
facets.append(annotation)

for u in self._parse_tags(text):
annotation = {
"index": {
"byteStart": u.start,
"byteEnd": u.end,
},
"features": [
{
"$type": "app.bsky.richtext.facet#tag",
"tag": u.text,
}
],
}
mlissner marked this conversation as resolved.
Show resolved Hide resolved
facets.append(annotation)
return facets

def fetch_embed_url_card(self, url: str) -> SocialCard | None:
Expand Down Expand Up @@ -313,9 +357,17 @@ def post_status(self, text: str, media: list[Thumbnail]) -> dict[str, str]:
"images": media,
}
elif message_object["facets"]:
card = self.fetch_embed_url_card(
message_object["facets"][-1]["features"][0]["uri"]
)
link: str | None = None
card: SocialCard | None = None

for facet in message_object["facets"]:
feature = facet["features"][0]
if feature["$type"] == "app.bsky.richtext.facet#link":
link = feature["uri"]

if link:
card = self.fetch_embed_url_card(link)

if card:
message_object["embed"] = {
"$type": "app.bsky.embed.external",
Expand Down
6 changes: 5 additions & 1 deletion bc/channel/utils/connectors/bluesky_api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ class SocialCard(TypedDict):
"LinkFacet", {"$type": Literal["app.bsky.richtext.facet#link"], "uri": str}
)

TagFacet = TypedDict(
"TagFacet", {"$type": Literal["app.bsky.richtext.facet#tag"], "tag": str}
)


class TextAnnotation(TypedDict):
index: ByteSlice
features: list[LinkFacet]
features: list[LinkFacet | TagFacet]


Record = TypedDict(
Expand Down