Skip to content

Commit

Permalink
Storage.read_events_by_seq bug fixes for tombstone events
Browse files Browse the repository at this point in the history
  • Loading branch information
snarfed committed May 23, 2024
1 parent 16f9f38 commit e0fe770
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
16 changes: 10 additions & 6 deletions arroba/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,25 +265,29 @@ def make_commit():
for block in self.read_blocks_by_seq(start=start):
assert block.seq
if block.seq != seq: # switching to a new commit's blocks
if block.decoded.get('$type', '').startswith(
'com.atproto.sync.subscribeRepos#'):
yield block.decoded # non-commit message
elif commit_block:
if commit_block:
yield make_commit()
else:
assert blocks is None # only the first commit
# we shouldn't have any dangling blocks that we don't serve
assert not blocks
seq = block.seq
blocks = {} # maps CID to Block
commit_block = None

if block.decoded.get('$type', '').startswith(
'com.atproto.sync.subscribeRepos#'): # non-commit message
yield block.decoded
continue

blocks[block.cid] = block
commit_fields = ['version', 'did', 'rev', 'prev', 'data', 'sig']
if block.decoded.keys() == set(commit_fields):
commit_block = block

# final commit
if blocks:
assert blocks and commit_block
assert blocks
assert commit_block
yield make_commit()

def has(self, cid):
Expand Down
22 changes: 22 additions & 0 deletions arroba/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ def test_read_events_by_seq_include_record_block_even_if_preexisting(self):
record = Block(decoded={'foo': 'bar'})
self.assertEqual(record, commits[0].blocks[record.cid])

def test_read_events_tombstone_then_commit(self):
storage = MemoryStorage()
alice = Repo.create(storage, 'did:alice', signing_key=self.key)

storage.tombstone_repo(alice)

bob = Repo.create(storage, 'did:alice', signing_key=self.key)

events = list(storage.read_events_by_seq())
self.assertEqual(alice.head.cid, events[0].commit.cid)
self.assertEqual(1, events[0].commit.seq)

self.assertEqual({
'$type': 'com.atproto.sync.subscribeRepos#tombstone',
'seq': 2,
'did': 'did:alice',
'time': NOW.isoformat(),
}, events[1])

self.assertEqual(bob.head.cid, events[2].commit.cid)
self.assertEqual(3, events[2].commit.seq)

def test_tombstone_repo(self):
seen = []
storage = MemoryStorage()
Expand Down
35 changes: 27 additions & 8 deletions arroba/tests/test_xrpc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,18 @@ def subscribe(self, received, delivered=None, limit=None, cursor=None):
return

def assertCommitMessage(self, commit_msg, record=None, write=None,
cur=None, prev=None, seq=None):
cur = cur or self.repo.head.cid
repo=None, cur=None, prev=None, seq=None):
if not repo:
repo = self.repo
if not cur:
cur = repo.head.cid

blocks = commit_msg.pop('blocks')
msg_roots, msg_blocks = read_car(blocks)
self.assertEqual([cur], msg_roots)

self.assertEqual({
'repo': 'did:web:user.com',
'repo': repo.did,
'commit': cur,
'ops': [{
'action': op.action.name.lower(),
Expand Down Expand Up @@ -502,7 +505,7 @@ def assertCommitMessage(self, commit_msg, record=None, write=None,

commit_record = {
'version': 3,
'did': 'did:web:user.com',
'did': repo.did,
'data': dag_cbor_cid(mst_entry),
'rev': int_to_tid(seq, clock_id=0),
'prev': prev,
Expand Down Expand Up @@ -650,17 +653,33 @@ def test_include_preexisting_record_block(self, *_):
subscriber.join()

def test_tombstone(self, *_):
# second repo
bob_repo = Repo.create(server.storage, 'did:web:bob',
handle='bo.bb', signing_key=self.key)

# tombstone first repo
server.storage.tombstone_repo(self.repo)

seq = server.storage.last_seq(SUBSCRIBE_REPOS_NSID)
header, payload = next(iter(xrpc_sync.subscribe_repos(cursor=seq)))
# write to second repo
prev = bob_repo.head.cid
write = Write(Action.CREATE, 'co.ll', next_tid(), {'foo': 'bar'})
bob_repo.apply_writes([write])

# subscribe should serve both
subscribe = iter(xrpc_sync.subscribe_repos(cursor=3))
header, payload = next(subscribe)
self.assertEqual({'op': 1, 't': '#tombstone'}, header)
self.assertEqual({
'seq': seq,
'did': self.repo.did,
'seq': 3,
'did': 'did:web:user.com',
'time': testutil.NOW.isoformat(),
}, payload)

header, payload = next(subscribe)
self.assertEqual({'op': 1, 't': '#commit'}, header)
self.assertCommitMessage(payload, {'foo': 'bar'}, write=write,
repo=bob_repo, prev=prev, seq=4)


class DatastoreXrpcSyncTest(XrpcSyncTest, testutil.DatastoreTest):
STORAGE_CLS = DatastoreStorage
Expand Down

0 comments on commit e0fe770

Please sign in to comment.