-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
153 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
#!/usr/bin/env python3 | ||
import asyncio | ||
from asyncio import TimeoutError | ||
import datetime | ||
import json | ||
import sys | ||
from typing_extensions import Final | ||
|
||
import ya_market | ||
from yapapi import props as yp | ||
from yapapi.log import enable_default_logger | ||
from yapapi.props.builder import DemandBuilder | ||
from yapapi.rest import Configuration, Market, Activity, Payment # noqa | ||
|
||
from examples import utils | ||
|
||
DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.accept-timeout?" | ||
|
||
async def _respond(proposal, dbuild): | ||
dbuild.properties["golem.com.payment.chosen-platform"] = "NGNT" | ||
timeout = proposal.props.get(DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP) | ||
dbuild.properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout | ||
return await proposal.respond(dbuild.properties, dbuild.constraints) | ||
|
||
async def renegotiate_offers(conf: Configuration, subnet_tag: str): | ||
"""Rejects every proposal & then renegotiates it""" | ||
async with conf.market() as client: | ||
market_api = Market(client) | ||
dbuild = DemandBuilder() | ||
dbuild.add(yp.NodeInfo(name="some renegotiating node", subnet_tag=subnet_tag)) | ||
dbuild.add(yp.Activity(expiration=datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=30))) | ||
|
||
async with market_api.subscribe(dbuild.properties, dbuild.constraints) as subscription: | ||
issuers = set() | ||
proposals = 0 | ||
rejected_proposals = set() # Already rejected, don't reject again | ||
async for event in subscription.events(): | ||
node_name = event.props.get("golem.node.id.name") | ||
if not event.is_draft: | ||
try: | ||
await _respond(event, dbuild) | ||
except ya_market.exceptions.ApiException as e: | ||
print(f"[{node_name}] respond error: {e}") | ||
continue | ||
proposals += 1 | ||
issuers.add(event.issuer) | ||
print(f"[{node_name}] Responded {proposals} {len(issuers)}") | ||
continue | ||
|
||
print(f"[{node_name}] Offer: {event.id} from {event.issuer} is_draft: {event.is_draft}") | ||
#print(f"props {json.dumps(event.props, indent=4)}") | ||
prev_proposal_id = event._proposal.proposal.prev_proposal_id | ||
if prev_proposal_id not in rejected_proposals: | ||
rejected_proposals.add(prev_proposal_id) | ||
await event.reject() | ||
print(f"[{node_name}] Rejected {len(rejected_proposals)}") | ||
await asyncio.sleep(2) | ||
print(f"[{node_name}] Renegotiating") | ||
try: | ||
await _respond(event, dbuild) | ||
except ya_market.exceptions.ApiException as e: | ||
print(f"[{node_name}] respond error: {e}") | ||
continue | ||
continue | ||
print(".create_agreement()") | ||
agreement = await event.create_agreement() | ||
print(".confirm()") | ||
confirm_result = await agreement.confirm() | ||
print(f"[{node_name}] agreement.confirm(): {confirm_result}") | ||
if confirm_result: | ||
terminate_reason = {"message": "Work cancelled", "golem.requestor.code": "Cancelled"} | ||
terminate_result = await agreement.terminate(terminate_reason) | ||
print(f"agreement.terminate(): {terminate_result}") | ||
print("All done") | ||
|
||
|
||
def main(): | ||
subnet = "goth" | ||
sys.stderr.write(f"Using subnet: {utils.TEXT_COLOR_YELLOW}{subnet}{utils.TEXT_COLOR_DEFAULT}\n") | ||
|
||
enable_default_logger() | ||
try: | ||
asyncio.get_event_loop().run_until_complete( | ||
asyncio.wait_for( | ||
renegotiate_offers( | ||
Configuration(), | ||
subnet_tag=subnet, | ||
), | ||
timeout=140, | ||
) | ||
) | ||
except TimeoutError: | ||
print("Main timeout triggered :(") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
56 changes: 56 additions & 0 deletions
56
tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import functools | ||
import logging | ||
import os | ||
from pathlib import Path | ||
import pytest | ||
import re | ||
from typing import List | ||
|
||
from goth.configuration import load_yaml, Override | ||
from goth.runner.log import configure_logging | ||
from goth.runner import Runner | ||
from goth.runner.probe import RequestorProbe | ||
|
||
|
||
logger = logging.getLogger("goth.test.renegotiate_proposal") | ||
|
||
|
||
async def assert_re(re_: str, stream): | ||
async for line in stream: | ||
print(f"match({re_}, {line})") | ||
if re.match(re_, line): | ||
return | ||
raise AssertionError("No re match") | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_renegotiation( | ||
log_dir: Path, | ||
goth_config_path: Path, | ||
config_overrides: List[Override], | ||
) -> None: | ||
|
||
# This is the default configuration with 2 wasm/VM providers | ||
goth_config = load_yaml(goth_config_path, config_overrides) | ||
test_script_path = str(Path(__file__).parent / "requestor.py") | ||
|
||
configure_logging(log_dir) | ||
|
||
runner = Runner( | ||
base_log_dir=log_dir, | ||
compose_config=goth_config.compose_config, | ||
) | ||
|
||
async with runner(goth_config.containers): | ||
|
||
requestor = runner.get_probes(probe_type=RequestorProbe)[0] | ||
|
||
async with requestor.run_command_on_host(test_script_path, env=os.environ) as ( | ||
_cmd_task, | ||
cmd_monitor, | ||
): | ||
|
||
cmd_monitor.add_assertion(functools.partial(assert_re, r"\[.+\] Renegotiating"), name="renego") | ||
cmd_monitor.add_assertion(functools.partial(assert_re, r"agreement.terminate\(\): True"), name="terminate") | ||
# assert not "Main timeout triggered :(" | ||
cmd_monitor.add_assertion(functools.partial(assert_re, r"All done"), name="alldone") |