Skip to content

Commit

Permalink
Repo.create[_from_commit]: emit new #identity and #account events to …
Browse files Browse the repository at this point in the history
…subscribeRepos

for #1119
  • Loading branch information
snarfed committed Jul 17, 2024
1 parent 61f5a8e commit f9463b6
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 57 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ _Non-breaking changes:_

* `mst`:
* Add new optional `start` kwarg to `load_all`.
* `repo`:
* [Emit new `#identity` and `#account` events](https://github.com/snarfed/bridgy-fed/issues/1119) to `subscribeRepos` when creating new repos.
* `storage`:
* Add new `write_event` method.
* Add new optional `repo` kwarg to `read_blocks_by_seq` and `read_events_by_seq` to limit returned results to a single repo.
Expand Down
5 changes: 5 additions & 0 deletions arroba/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ def create_from_commit(cls, storage, commit_data, *,
Returns:
Repo:
"""
did = commit_data.commit.repo
storage.write_event(repo_did=did, type='identity', handle=kwargs.get('handle'))
storage.write_event(repo_did=did, type='account', active=True)

storage.apply_commit(commit_data)

# avoid reading from storage, since if we're in a transaction, those
Expand All @@ -183,6 +187,7 @@ def create_from_commit(cls, storage, commit_data, *,
storage.create_repo(repo, signing_key=signing_key, rotation_key=rotation_key)
if repo.callback:
repo.callback(commit_data)

return repo

@classmethod
Expand Down
6 changes: 5 additions & 1 deletion arroba/tests/test_datastore_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ def test_apply_commit(self):

# new repo with initial commit
repo = Repo.create(self.storage, 'did:web:user.com', signing_key=self.key)
self.assert_same_seq(b.key.id() for b in AtpBlock.query())
self.assert_same_seq(b.key.id() for b in AtpBlock.query()
if b.decoded.get('$type') not in (
'com.atproto.sync.subscribeRepos#account',
'com.atproto.sync.subscribeRepos#identity'
))

# new commit
writes = [Write(Action.CREATE, 'coll', next_tid(), obj) for obj in objs]
Expand Down
25 changes: 21 additions & 4 deletions arroba/tests/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RepoTest(TestCase):
def setUp(self):
super().setUp()
self.storage = self.STORAGE_CLS()
self.repo = Repo.create(self.storage, 'did:web:user.com',
self.repo = Repo.create(self.storage, 'did:web:user.com', handle='user.com',
signing_key=self.key)

def assertCommitIs(self, commit_data, write, seq):
Expand Down Expand Up @@ -61,6 +61,23 @@ def test_metadata(self):
self.assertEqual(3, self.repo.version)
self.assertEqual('did:web:user.com', self.repo.did)

def test_create(self):
# setUp called Repo.create
events = list(self.storage.read_blocks_by_seq())
self.assertEqual([{
'$type': 'com.atproto.sync.subscribeRepos#identity',
'seq': 2,
'did': 'did:web:user.com',
'time': NOW.isoformat(),
'handle': 'user.com',
}, {
'$type': 'com.atproto.sync.subscribeRepos#account',
'seq': 3,
'did': 'did:web:user.com',
'time': NOW.isoformat(),
'active': True,
}], [e.decoded for e in events[2:]])

def test_does_basic_operations(self):
profile = {
'$type': 'app.bsky.actor.profile',
Expand Down Expand Up @@ -166,13 +183,13 @@ def test_apply_writes_callback(self):
self.repo.apply_writes([create])

self.assertEqual(1, len(seen))
self.assertCommitIs(seen[0], create, 2)
self.assertCommitIs(seen[0], create, 4)

# update object
update = Write(Action.UPDATE, 'co.ll', tid, {'foo': 'baz'})
self.repo.apply_writes([update])
self.assertEqual(2, len(seen))
self.assertCommitIs(seen[1], update, 3)
self.assertCommitIs(seen[1], update, 5)

# unset callback, update again
self.repo.callback = None
Expand All @@ -189,7 +206,7 @@ def test_apply_commit_callback(self):
self.repo.apply_commit(Repo.format_commit(repo=self.repo, writes=[create]))

self.assertEqual(1, len(seen))
self.assertCommitIs(seen[0], create, 2)
self.assertCommitIs(seen[0], create, 4)


class DatastoreRepoTest(RepoTest, DatastoreTest):
Expand Down
65 changes: 37 additions & 28 deletions arroba/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,56 @@ def test_block_hash(self):
self.assertEqual(id(Block(decoded=DECODED)), id(Block(encoded=ENCODED)))

def test_read_events_by_seq(self):
commit_cids = []

storage = MemoryStorage()
repo = Repo.create(storage, 'did:web:user.com', signing_key=self.key)
commit_cids.append(repo.head.cid)
init = repo.head.cid

tid = next_tid()
create = Write(Action.CREATE, 'co.ll', tid, {'foo': 'bar'})
commit_cid = repo.apply_writes([create])
commit_cids.append(repo.head.cid)
repo.apply_writes([create])
create = repo.head.cid

delete = Write(Action.DELETE, 'co.ll', tid)
commit_cid = repo.apply_writes([delete])
commit_cids.append(repo.head.cid)
repo.apply_writes([delete])
delete = repo.head.cid

self.assertEqual(commit_cids, [cd.commit.cid for cd in
storage.read_events_by_seq()])
self.assertEqual(commit_cids[1:], [cd.commit.cid for cd in
storage.read_events_by_seq(start=2)])
events = list(storage.read_events_by_seq())
self.assertEqual(5, len(events))
self.assertEqual(init, events[0].commit.cid)
self.assertEqual('com.atproto.sync.subscribeRepos#identity',
events[1]['$type'])
self.assertEqual('com.atproto.sync.subscribeRepos#account',
events[2]['$type'])
self.assertEqual(create, events[3].commit.cid)
self.assertEqual(delete, events[4].commit.cid)

events = storage.read_events_by_seq(start=4)
self.assertEqual([create, delete], [cd.commit.cid for cd in events])

def test_read_events_by_seq_repo(self):
commit_cids = []

storage = MemoryStorage()
alice = Repo.create(storage, 'did:alice', signing_key=self.key)
commit_cids.append(alice.head.cid)
alice_init = alice.head.cid

bob = Repo.create(storage, 'did:bob', signing_key=self.key)

create = Write(Action.CREATE, 'co.ll', next_tid(), {'foo': 'bar'})
alice.apply_writes([create])
commit_cids.append(alice.head.cid)

create = Write(Action.CREATE, 'co.ll', next_tid(), {'baz': 'biff'})
bob.apply_writes([create])

self.assertEqual(commit_cids, [cd.commit.cid for cd in
storage.read_events_by_seq(repo='did:alice')])
self.assertEqual(commit_cids[1:], [cd.commit.cid for cd in
storage.read_events_by_seq(
repo='did:alice', start=3)])
events = list(storage.read_events_by_seq(repo='did:alice'))
self.assertEqual(4, len(events))
self.assertEqual(alice_init, events[0].commit.cid)
self.assertEqual('com.atproto.sync.subscribeRepos#identity',
events[1]['$type'])
self.assertEqual('com.atproto.sync.subscribeRepos#account',
events[2]['$type'])
self.assertEqual(alice.head.cid, events[3].commit.cid)

events = storage.read_events_by_seq(repo='did:alice', start=4)
self.assertEqual([alice.head.cid], [cd.commit.cid for cd in events])

def test_read_events_by_seq_include_record_block_even_if_preexisting(self):
# https://github.com/snarfed/bridgy-fed/issues/1016#issuecomment-2109276344
Expand All @@ -91,7 +100,7 @@ def test_read_events_by_seq_include_record_block_even_if_preexisting(self):
second = Write(Action.CREATE, 'co.ll', next_tid(), {'foo': 'bar'})
commit_cid = repo.apply_writes([second])

commits = list(storage.read_events_by_seq(start=3))
commits = list(storage.read_events_by_seq(start=5))
self.assertEqual(1, len(commits))
self.assertEqual(repo.head.cid, commits[0].commit.cid)
self.assertEqual(prev, commits[0].prev)
Expand All @@ -113,13 +122,13 @@ def test_read_events_tombstone_then_commit(self):

self.assertEqual({
'$type': 'com.atproto.sync.subscribeRepos#tombstone',
'seq': 2,
'seq': 4,
'did': 'did:alice',
'time': NOW.isoformat(),
}, events[1])
}, events[3])

self.assertEqual(bob.head.cid, events[2].commit.cid)
self.assertEqual(3, events[2].commit.seq)
self.assertEqual(bob.head.cid, events[4].commit.cid)
self.assertEqual(5, events[4].commit.seq)

def test_load_repo(self):
storage = MemoryStorage()
Expand Down Expand Up @@ -177,17 +186,17 @@ def test_tombstone_repo(self):
seen = []
storage = MemoryStorage()
repo = Repo.create(storage, 'did:user', signing_key=self.key)
self.assertEqual(1, storage.last_seq(SUBSCRIBE_REPOS_NSID))
self.assertEqual(3, storage.last_seq(SUBSCRIBE_REPOS_NSID))

repo.callback = lambda event: seen.append(event)
storage.tombstone_repo(repo)

self.assertEqual(TOMBSTONED, repo.status)

self.assertEqual(2, storage.last_seq(SUBSCRIBE_REPOS_NSID))
self.assertEqual(4, storage.last_seq(SUBSCRIBE_REPOS_NSID))
expected = {
'$type': 'com.atproto.sync.subscribeRepos#tombstone',
'seq': 2,
'seq': 4,
'did': 'did:user',
'time': NOW.isoformat(),
}
Expand Down
53 changes: 29 additions & 24 deletions arroba/tests/test_xrpc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_get_repo(self):
self.assertEqual({
'version': 3,
'did': 'did:web:user.com',
'rev': '2222222222422',
'rev': '2222222222622',
}, commit)

self.assertEqual(self.data, load(blocks))
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_get_latest_commit(self):
resp = xrpc_sync.get_latest_commit({}, did='did:web:user.com')
self.assertEqual({
'cid': self.repo.head.cid.encode('base32'),
'rev': '2222222222422',
'rev': '2222222222622',
}, resp)

def test_get_record(self):
Expand Down Expand Up @@ -549,7 +549,9 @@ def subscribe(self, received, delivered=None, limit=None, cursor=None):
for i, (header, payload) in enumerate(
xrpc_sync.subscribe_repos(cursor=cursor)):
self.assertIn(header, [
{'op': 1, 't': '#account'},
{'op': 1, 't': '#commit'},
{'op': 1, 't': '#identity'},
{'op': 1, 't': '#tombstone'},
{'op': -1},
])
Expand Down Expand Up @@ -644,7 +646,7 @@ def test_subscribe_repos(self, *_):

self.assertEqual(1, len(received_a))
self.assertCommitMessage(received_a[0], {'foo': 'bar'}, write=create,
prev=prev, seq=2)
prev=prev, seq=4)

# update, subscriber_a and subscriber_b
received_b = []
Expand All @@ -661,10 +663,10 @@ def test_subscribe_repos(self, *_):

self.assertEqual(2, len(received_a))
self.assertCommitMessage(received_a[1], {'foo': 'baz'}, write=update,
prev=prev, seq=3)
prev=prev, seq=5)
self.assertEqual(1, len(received_b))
self.assertCommitMessage(received_b[0], {'foo': 'baz'}, write=update,
prev=prev, seq=3)
prev=prev, seq=5)

subscriber_a.join()

Expand All @@ -676,7 +678,7 @@ def test_subscribe_repos(self, *_):

self.assertEqual(2, len(received_a))
self.assertEqual(2, len(received_b))
self.assertCommitMessage(received_b[1], write=delete, prev=prev, seq=4)
self.assertCommitMessage(received_b[1], write=delete, prev=prev, seq=6)

subscriber_b.join()

Expand All @@ -688,21 +690,21 @@ def test_subscribe_repos_cursor_zero(self, *_):
write = Write(Action.CREATE if val == 'bar' else Action.UPDATE,
'co.ll', tid, {'foo': val})
writes.append(write)
commit_cid = self.repo.apply_writes([write])
self.repo.apply_writes([write])
commit_cids.append(self.repo.head.cid)

received = []
self.subscribe(received, limit=4, cursor=0)
self.subscribe(received, limit=6, cursor=0)

self.assertEqual(5, server.storage.allocate_seq(SUBSCRIBE_REPOS_NSID))
self.assertEqual(7, server.storage.allocate_seq(SUBSCRIBE_REPOS_NSID))

self.assertCommitMessage(
received[0], record=None, cur=commit_cids[0], prev=None, seq=1)

for i, val in enumerate(['bar', 'baz', 'biff'], start=1):
self.assertCommitMessage(
received[i], {'foo': val}, cur=commit_cids[i], write=writes[i],
prev=commit_cids[i - 1], seq=i + 1)
received[i + 2], {'foo': val}, cur=commit_cids[i], write=writes[i],
prev=commit_cids[i - 1], seq=i + 3)

def test_subscribe_repos_cursor_past_current_seq(self, *_):
received = []
Expand All @@ -711,7 +713,7 @@ def test_subscribe_repos_cursor_past_current_seq(self, *_):
({'op': -1},
{
'error': 'FutureCursor',
'message': 'Cursor 999 is past our current sequence number 1',
'message': 'Cursor 999 is past our current sequence number 3',
}),
], received)

Expand All @@ -732,6 +734,9 @@ def test_subscribe_repos_cursor_before_rollback_window(self, *_):
self.assertEqual({'op': 1, 't': '#info'}, header)
self.assertEqual({'name': 'OutdatedCursor'}, payload)

header, _ = next(sub)
self.assertEqual({'op': 1, 't': '#account'}, header)

self.assertCommitMessage(next(sub), {'foo': 'bar'}, write=write,
seq=6, cur=self.repo.head.cid, prev=prev)

Expand All @@ -757,7 +762,7 @@ def test_include_preexisting_record_block(self, *_):

self.assertEqual(1, len(received))
self.assertCommitMessage(received[0], {'foo': 'bar'}, write=second,
prev=prev, seq=3)
prev=prev, seq=5)

subscriber.join()

Expand All @@ -780,27 +785,27 @@ def test_tombstone(self, *_):
received = []
delivered = Semaphore(value=0)
subscriber = Thread(target=self.subscribe, args=[received, delivered],
kwargs={'limit': 6, 'cursor': 0})
kwargs={'limit': 10, 'cursor': 0})
subscriber.start()

# first two events are initial commits for each repo
delivered.acquire()
delivered.acquire()
# first six events are initial commits and events for each repo
for i in range(6):
delivered.acquire()

# tombstone
delivered.acquire()
header, payload = received[2]
header, payload = received[6]
self.assertEqual({'op': 1, 't': '#tombstone'}, header)
self.assertEqual({
'seq': 3,
'seq': 7,
'did': 'did:web:user.com',
'time': testutil.NOW.isoformat(),
}, payload)

# bob's write, now from streaming
delivered.acquire()
self.assertCommitMessage(received[3], {'foo': 'bar'}, write=write,
repo=bob_repo, prev=prev, seq=4)
self.assertCommitMessage(received[7], {'foo': 'bar'}, write=write,
repo=bob_repo, prev=prev, seq=8)

# another write to bob
prev = bob_repo.head.cid
Expand All @@ -812,11 +817,11 @@ def test_tombstone(self, *_):
server.storage.tombstone_repo(bob_repo)
delivered.acquire()

self.assertEqual(6, len(received))
header, payload = received[5]
self.assertEqual(10, len(received))
header, payload = received[9]
self.assertEqual({'op': 1, 't': '#tombstone'}, header)
self.assertEqual({
'seq': 6,
'seq': 10,
'did': 'did:bob',
'time': testutil.NOW.isoformat(),
}, payload)
Expand Down

0 comments on commit f9463b6

Please sign in to comment.