Skip to content

Commit

Permalink
xrpc_sync.subscribeRepos: add support for non-commit events
Browse files Browse the repository at this point in the history
...starting with account tombstones. for snarfed/bridgy-fed#783
  • Loading branch information
snarfed committed May 22, 2024
1 parent cbf0737 commit aad0d9d
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 22 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Configure arroba with these environment variables:
Optional, only used in [com.atproto.repo](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_repo), [.server](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_server), and [.sync](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_sync) XRPC handlers:

* `REPO_TOKEN`, static token to use as both `accessJwt` and `refreshJwt`, defaults to contents of `repo_token` file. Not required to be an actual JWT. If not set, XRPC methods that require auth will return HTTP 501 Not Implemented.
* `ROLLBACK_WINDOW`, number of commits to serve in the [`subscribeRepos` rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit.
* `ROLLBACK_WINDOW`, number of events to serve in the [`subscribeRepos` rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit.

<!-- Only used in app.py:
* `REPO_DID`, repo user's DID, defaults to contents of `repo_did` file
Expand All @@ -107,7 +107,8 @@ _Breaking changes:_
* `AtpRemoteBlob`: if the blob URL doesn't return the `Content-Type` header, infer type from the URL, or fall back to `application/octet-stream` ([bridgy-fed#1073](https://github.com/snarfed/bridgy-fed/issues/1073)).
* `did`:
* Cache `resolve_plc`, `resolve_web`, and `resolve_handle` for 6h, up to 5000 total results per call.
* `xrpc_sync`: rename `send_new_commits` to `send_events` due to adding account tombstone support.
* `storage`: rename `Storage.read_commits_by_seq` to `read_events_by_seq` for new account tombstone support.
* `xrpc_sync`: rename `send_new_commits` to `send_events`, ditto.

_Non-breaking changes:_

Expand All @@ -119,6 +120,7 @@ _Non-breaking changes:_
* `service_jwt`: add optional `aud` kwarg.
* `xrpc_sync`:
* `subscribeRepos`:
* Add support for non-commit events, starting with account tombstones.
* Add `ROLLBACK_WINDOW` environment variable to limit size of [rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit.
* For commits with create or update operations, always include the record block, even if it already existed in the repo beforehand ([snarfed/bridgy-fed#1016](https://github.com/snarfed/bridgy-fed/issues/1016)).
* Bug fix, populate the time each commit was created in `time` instead of the current time ([snarfed/bridgy-fed#1015](https://github.com/snarfed/bridgy-fed/issues/1015)).
Expand Down
2 changes: 1 addition & 1 deletion app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ runtime_config:
operating_system: ubuntu18
runtime_version: 3.9

# need only one instance so that new commits can be delivered to subscribeRepos
# need only one instance so that new events can be delivered to subscribeRepos
# subscribers in memory
manual_scaling:
instances: 1
Expand Down
17 changes: 11 additions & 6 deletions arroba/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Action(Enum):
DELETE = auto()

# TODO: Should this be a subclass of Block?
# TODO: generalize to handle other events
CommitData = namedtuple('CommitData', [
'commit', # Block
'blocks', # dict of CID to Block
Expand Down Expand Up @@ -119,7 +120,7 @@ def __hash__(self):


class Storage:
"""Abstract base class for storing nodes: records, MST entries, and commits.
"""Abstract base class for storing nodes: records, MST entries, commits, etc.
Concrete subclasses should implement this on top of physical storage,
eg database, filesystem, in memory.
Expand Down Expand Up @@ -234,16 +235,17 @@ def read_blocks_by_seq(self, start=0):
"""
raise NotImplementedError()

def read_commits_by_seq(self, start=0):
"""Batch read commits from storage by ``subscribeRepos`` sequence number.
def read_events_by_seq(self, start=0):
"""Batch read commits and other events by ``subscribeRepos`` sequence number.
Args:
seq (int): optional ``subscribeRepos`` sequence number to start from,
inclusive. Defaults to 0.
Returns:
generator: generator of :class:`CommitData`, starting from ``seq``,
inclusive, in ascending ``seq`` order
generator: generator of :class:`CommitData` for commits and dict
messages for other events, starting from ``seq``, inclusive, in
ascending ``seq`` order
"""
assert start >= 0

Expand All @@ -263,7 +265,10 @@ def make_commit():
for block in self.read_blocks_by_seq(start=start):
assert block.seq
if block.seq != seq: # switching to a new commit's blocks
if commit_block:
if block.decoded.get('$type', '').startswith(
'com.atproto.sync.subscribeRepos#'):
yield block.decoded # non-commit message
elif commit_block:
yield make_commit()
else:
assert blocks is None # only the first commit
Expand Down
10 changes: 5 additions & 5 deletions arroba/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_block_eq(self):
def test_block_hash(self):
self.assertEqual(id(Block(decoded=DECODED)), id(Block(encoded=ENCODED)))

def test_read_commits_by_seq(self):
def test_read_events_by_seq(self):
commit_cids = []

storage = MemoryStorage()
Expand All @@ -49,11 +49,11 @@ def test_read_commits_by_seq(self):
commit_cids.append(repo.head.cid)

self.assertEqual(commit_cids, [cd.commit.cid for cd in
storage.read_commits_by_seq()])
storage.read_events_by_seq()])
self.assertEqual(commit_cids[1:], [cd.commit.cid for cd in
storage.read_commits_by_seq(start=2)])
storage.read_events_by_seq(start=2)])

def test_read_commits_by_seq_include_record_block_even_if_preexisting(self):
def test_read_events_by_seq_include_record_block_even_if_preexisting(self):
# https://github.com/snarfed/bridgy-fed/issues/1016#issuecomment-2109276344
commit_cids = []

Expand All @@ -69,7 +69,7 @@ def test_read_commits_by_seq_include_record_block_even_if_preexisting(self):
second = Write(Action.CREATE, 'co.ll', next_tid(), {'foo': 'bar'})
commit_cid = repo.apply_writes([second])

commits = list(storage.read_commits_by_seq(start=3))
commits = list(storage.read_events_by_seq(start=3))
self.assertEqual(1, len(commits))
self.assertEqual(repo.head.cid, commits[0].commit.cid)
self.assertEqual(prev, commits[0].prev)
Expand Down
13 changes: 13 additions & 0 deletions arroba/tests/test_xrpc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def subscribe(self, received, delivered=None, limit=None, cursor=None):
xrpc_sync.subscribe_repos(cursor=cursor)):
self.assertIn(header, [
{'op': 1, 't': '#commit'},
{'op': 1, 't': '#tombstone'},
{'op': -1},
])
received.append(payload)
Expand Down Expand Up @@ -648,6 +649,18 @@ def test_include_preexisting_record_block(self, *_):

subscriber.join()

def test_tombstone(self, *_):
server.storage.tombstone_repo(self.repo)

seq = server.storage.last_seq(SUBSCRIBE_REPOS_NSID)
header, payload = next(iter(xrpc_sync.subscribe_repos(cursor=seq)))
self.assertEqual({'op': 1, 't': '#tombstone'}, header)
self.assertEqual({
'seq': seq,
'did': self.repo.did,
'time': testutil.NOW.isoformat(),
}, payload)


class DatastoreXrpcSyncTest(XrpcSyncTest, testutil.DatastoreTest):
STORAGE_CLS = DatastoreStorage
Expand Down
25 changes: 17 additions & 8 deletions arroba/xrpc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,27 @@ def header_payload(commit_data):
yield ({'op': 1, 't': '#info'}, {'name': 'OutdatedCursor'})
cursor = rollback_start

logger.info(f'fetching existing commits from seq {cursor}')
for commit_data in server.storage.read_commits_by_seq(start=cursor):
yield header_payload(commit_data)
last_seq = commit_data.commit.seq

# serve new commits as they happen
logger.info(f'serving new commits')
logger.info(f'fetching existing events from seq {cursor}')
for event in server.storage.read_events_by_seq(start=cursor):
if isinstance(event, CommitData):
yield header_payload(event)
last_seq = event.commit.seq
elif isinstance(event, dict):
type = event.pop('$type')
type_fragment = type.removeprefix('com.atproto.sync.subscribeRepos')
assert type_fragment != type, type
yield {'op': 1, 't': type_fragment}, event
last_seq = event['seq']
else:
raise RuntimeError(f'unexpected event type {event.__class__} {event}')

# serve new events as they happen
logger.info(f'serving new events')
while True:
with new_commits:
new_commits.wait(NEW_COMMITS_TIMEOUT.total_seconds())

for commit_data in server.storage.read_commits_by_seq(start=last_seq + 1):
for commit_data in server.storage.read_events_by_seq(start=last_seq + 1):
yield header_payload(commit_data)
last_seq = commit_data.commit.seq

Expand Down

0 comments on commit aad0d9d

Please sign in to comment.