Skip to content

Commit

Permalink
Merge pull request #308 from golemfactory/az/find-offers-cancellation
Browse files Browse the repository at this point in the history
Rewrite `find_offers()` so that `CancelledError` is not caught
  • Loading branch information
azawlocki authored Apr 2, 2021
2 parents 7a9bab3 + eff6468 commit ceba412
Showing 1 changed file with 29 additions and 39 deletions.
68 changes: 29 additions & 39 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,42 +368,30 @@ async def accept_payment_for_agreement(agreement_id: str, *, partial: bool = Fal
)

async def find_offers() -> None:
async def reject_proposal(proposal, reason):
try:
await proposal.reject(reason=reason)
emit(events.ProposalRejected(prop_id=proposal.id, reason=reason))
except Exception:
emit(
events.ProposalFailed(
prop_id=proposal.id, exc_info=sys.exc_info() # type: ignore
)
)
"""Subscribe to offers and process them continuously."""

async def respond_to_proposal(proposal, builder):
try:
await proposal.respond(builder.properties, builder.constraints)
emit(events.ProposalResponded(prop_id=proposal.id))
except Exception:
emit(
events.ProposalFailed(
prop_id=proposal.id, exc_info=sys.exc_info() # type: ignore
)
)
async def reject_proposal(proposal, reason):
await proposal.reject(reason=reason)
emit(events.ProposalRejected(prop_id=proposal.id, reason=reason))

nonlocal offers_collected, proposals_confirmed
try:
subscription = await builder.subscribe(market_api)
except Exception as ex:
emit(events.SubscriptionFailed(reason=str(ex)))
raise

async with subscription:

emit(events.SubscriptionCreated(sub_id=subscription.id))
try:
proposals = subscription.events()
except Exception as ex:
emit(events.CollectFailed(sub_id=subscription.id, reason=str(ex)))
raise

async for proposal in proposals:

emit(events.ProposalReceived(prop_id=proposal.id, provider_id=proposal.issuer))
offers_collected += 1
try:
Expand All @@ -415,13 +403,11 @@ async def respond_to_proposal(proposal, builder):
type(strategy).__name__,
score,
)
except InvalidPropertiesError as err:
await reject_proposal(proposal, "Malformed offer")
continue
if score < SCORE_NEUTRAL:
await reject_proposal(proposal, "Score too low")
elif not proposal.is_draft:
try:

if score < SCORE_NEUTRAL:
await reject_proposal(proposal, "Score too low")

elif not proposal.is_draft:
common_platforms = self._get_common_payment_platforms(proposal)
if common_platforms:
builder.properties["golem.com.payment.chosen-platform"] = next(
Expand All @@ -440,19 +426,23 @@ async def respond_to_proposal(proposal, builder):
continue
else:
builder.properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout
await respond_to_proposal(proposal, builder)
except CancelledError:
raise
except Exception:
emit(
events.ProposalFailed(
prop_id=proposal.id, exc_info=sys.exc_info() # type: ignore
)

await proposal.respond(builder.properties, builder.constraints)
emit(events.ProposalResponded(prop_id=proposal.id))

else:
emit(events.ProposalConfirmed(prop_id=proposal.id))
await agreements_pool.add_proposal(score, proposal)
proposals_confirmed += 1

except CancelledError:
raise
except Exception:
emit(
events.ProposalFailed(
prop_id=proposal.id, exc_info=sys.exc_info() # type: ignore
)
else:
emit(events.ProposalConfirmed(prop_id=proposal.id))
await agreements_pool.add_proposal(score, proposal)
proposals_confirmed += 1
)

# aio_session = await self._stack.enter_async_context(aiohttp.ClientSession())
# storage_manager = await DavStorageProvider.for_directory(
Expand Down

0 comments on commit ceba412

Please sign in to comment.