-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[python] Add a transaction management layer #7987
Conversation
if multiple SDK instances submit txns from the same account, we will see repeated 30s timeouts and seq number resyncs. The sequence number syncs do not always return the true latest sequence number due to full node sync delay, which would make things worse. Wonder if we should start building the API with a cached state for accounts and then all clients don't need to worry about how to hack around the mempool capability as this PR did. cc @bowenyang007. Since the current mempool fails to provide a consistent view of sequence numbers and often swallows errors, adding a global cache state will simplify programming model a lot. |
No one should do that. The ASN code says this is only co-task safe. Similarly the doc accommodating it says the same thing. Will write negative language saying this is not intended for sharing across clients.
I think there's opportunity here to generalize into Rust, but ultimately if we can build a few hundred line one and put it into our popular languages it is pretty useful for light weight efforts. On the other hand, if we can get wasm out the door, this might be another alternative to write once and deploy everywhere. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of my comments are just around lack of comments. Otherwise the logic is good
if self.last_uncommitted_number is None or self.current_number is None: | ||
await self.initialize() | ||
if ( | ||
self.current_number - self.last_uncommitted_number | ||
>= self.maximum_in_flight | ||
): | ||
await self.__update() | ||
start_time = time.time() | ||
while ( | ||
self.current_number - self.last_uncommitted_number | ||
>= self.maximum_in_flight | ||
): | ||
if not block: | ||
return None | ||
await asyncio.sleep(self.sleep_time) | ||
if time.time() - start_time > self.maximum_wait_time: | ||
logging.warn( | ||
f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address().hex()}" | ||
) | ||
await self.initialize() | ||
else: | ||
await self.__update() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I prefer comments to tell what we're checking for each of these.
It looks like:
- Check if the queue is initialized
- Check if there is over the number in flight
- Sleep if it's blocking on transactions, otherwise continue to send transactions (need to define
block
in docs) - Poll and block to see if the mempool has cleared any transactions
- If it's clear (or block is false), submit the transaction, and let the next one grab the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a comment.
my concern is that this is duplicating the comments in the doc block and in the document.
accurate summary though
""" | ||
if self.last_uncommitted_number == self.current_number: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a short cut, but maybe want to provide an input flag to sync regardless (if for some reason you've got something else also handling transactions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, just removing altogether
start_time = time.time() | ||
while self.last_uncommitted_number != self.current_number: | ||
print(f"{self.last_uncommitted_number} {self.current_number}") | ||
if time.time() - start_time > self.maximum_wait_time: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You make this condition a lot, maybe move it into a function?
while self.last_uncommitted_number != self.current_number: | ||
print(f"{self.last_uncommitted_number} {self.current_number}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is roughly the same logic as the other loop, probably want to combine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not similar enough to where I want to refactor... I think I poked at this prior to and the logic was slightly different on how you get in and the order in which you test.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
self.last_uncommitted_number = None | ||
self.current_number = None | ||
|
||
async def next_sequence_number(self, block: bool = True) -> Optional[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call this acquire_next_sequence_number to convey the notion that this might block for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like long function names 😬
sequence_number = ( | ||
await self._account_sequence_number.next_sequence_number() | ||
) | ||
transaction = await self._transaction_generator( | ||
self._account, sequence_number | ||
) | ||
txn_hash_awaitable = self._rest_client.submit_bcs_transaction( | ||
transaction | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a thought (not sure you want to overcomplicate this) - one potential fault-tolerant improvement, is to have AccountSequenceNumber have two counters, issued and submitted, and next_sequence_number increments issued, and after submit_bcs_transaction is called , if transaction was potentially submitted (we received success response, or received no response) we notify AccountSequenceNumber to increment submitted counter
If we received failed response for submission, we notify AccountSequenceNumber to decrement issued number.
and AccountSequenceNumber is changed to never have issued be more than 1 above submitted.
that would make it such that single failure in submission doesn't look up the whole stream until expiry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the issue here is that if this isn't followed carefully, you can end up in a race condition...
I get 3 sequence numbers, submit 3, and only 2 fail, which 2 failed? what am I decrementing. So you end up adding complexity to figure out do I synchronize or set -1
and try again.
while True: | ||
# Always start waiting for one | ||
( | ||
txn_awaitable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't used SDK before, is this waiting for txn hash (polling the rest endpoint for txn hash) or what is it doing?
looking at ecosystem/python/sdk/aptos_sdk/async_client.py , I see that signature is
def submit_bcs_transaction(self, signed_transaction: SignedTransaction) -> str:
so return type is str, not something you can wait on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
str is the txn hash, we would benefit by typing that.
maximum_in_flight: int = 100 | ||
maximum_wait_time = 30 | ||
sleep_time = 0.01 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come type hints in some cases but not others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brain problems of the author.
Returns the next sequence number available on this account. This leverages a lock to | ||
guarantee first-in, first-out ordering of requests. | ||
""" | ||
await self.lock.acquire() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd recommend using this as a context manager to make it impossible to forget to release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... I guess since we're not really catching anything....
while self.last_uncommitted_number != self.current_number: | ||
print(f"{self.last_uncommitted_number} {self.current_number}") | ||
if time.time() - start_time > self.maximum_wait_time: | ||
logging.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be good to let the user toggle this on or off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually.... logging is still very early in this sdk
def stop(self): | ||
"""Stop the tasks for managing transactions""" | ||
if not self._started: | ||
raise Exception("Start not yet called") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: RuntimeError or something instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the python sdk needs more love
async def _process_transactions_task(self): | ||
try: | ||
while True: | ||
# Always start waiting for one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why we do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lemme add a larger comment, but basically we want to wait for the first one and then we can get into the while loop... where we could have hit a herd of now pending txns....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does http2 needed to use this layer?
account: AccountAddress | ||
lock = asyncio.Lock | ||
|
||
maximum_in_flight: int = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100
is because of the mempool limitation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to add a config here in the future....
for (output, sequence_number) in zip(outputs, sequence_numbers): | ||
if isinstance(output, BaseException): | ||
await self._processed_transactions.put( | ||
(sequence_number, None, output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not "put" the exception instead on None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't return the awkward type of (exception|string) instead we return (int, optional[string], optional[exception])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I read it wrong, output
is the exception
) -> (int, typing.Optional[str], typing.Optional[Exception]): | ||
return await self._processed_transactions.get() | ||
|
||
def stop(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop
and start
meant to be used anywhere other than in tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we use it in the actual transaction-batching demo
one need not call stop unless they want a clean shut down though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beautiful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing!!
One small thing to fix before landing so I disabled auto merge.
|
||
_maximum_in_flight: int = 100 | ||
_maximum_wait_time: int = 30 | ||
_sleep_time: int = 0.01 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_sleep_time: int = 0.01 | |
_sleep_time: float = 0.01 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omg
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
✅ Forge suite
|
✅ Forge suite
|
✅ Forge suite
|
This provides a framework for managing as many transactions from a single account at once * The AccountSequenceNumber allocates up to 100 outstanding sequence numbers to maximize the number of concurrent transactions in the happy path. * The transaction manager provides async workers that push a transaction from submission through to validating completion Together they provide the basic harness for scaling transaction submission on the Aptos blockchain from a single account.
this handles all the failures associated with network congestion, meaning this is ready to ship for now... need more testing on other failure cases.... such as intermittent network connectivity, lost connections, bad upstreams.
The bulk of this work is in adding a transaction management layer with some other fixes along the way.
This provides a framework for managing as many transactions from a
single account at once
Together they provide the basic harness for scaling transaction
submission on the Aptos blockchain from a single account.
This should be reasonably copyable into other languages and provides a starting point to helping others build their own transaction management planes.