From cdb6e004853dd243860bcdeea70253213e09f27d Mon Sep 17 00:00:00 2001
From: Dariusz Rybi <dariusz@golem.network>
Date: Tue, 20 Jul 2021 14:19:30 +0200
Subject: [PATCH 1/4] [GOTH] Renegotiate proposal

---
 .../test_renegotiate_proposal/requestor.py    | 115 ++++++++++++++++++
 .../test_renegotiate_proposal.py              |  46 +++++++
 2 files changed, 161 insertions(+)
 create mode 100755 tests/goth_tests/test_renegotiate_proposal/requestor.py
 create mode 100644 tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py

diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py
new file mode 100755
index 000000000..629e5bef5
--- /dev/null
+++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python3
+import asyncio
+from asyncio import TimeoutError
+import datetime
+import sys
+from typing_extensions import Final
+import ya_market
+from yapapi import props as yp
+from yapapi.log import enable_default_logger
+from yapapi.props.builder import DemandBuilder
+from yapapi.rest import Configuration, Market
+from yapapi.rest.market import OfferProposal
+
+from examples import utils
+
+DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.accept-timeout?"
+
+
+async def _respond(proposal: OfferProposal, dbuild) -> str:
+    dbuild.properties["golem.com.payment.chosen-platform"] = "NGNT"
+    timeout = proposal.props.get(DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP)
+    dbuild.properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout
+    return await proposal.respond(dbuild.properties, dbuild.constraints)
+
+
+async def renegotiate_offers(conf: Configuration, subnet_tag: str):
+    """Rejects every proposal & then renegotiates it"""
+    async with conf.market() as client:
+        market_api = Market(client)
+        dbuild = DemandBuilder()
+        dbuild.add(yp.NodeInfo(name="some renegotiating node", subnet_tag=subnet_tag))
+        dbuild.add(
+            yp.Activity(
+                expiration=datetime.datetime.now(datetime.timezone.utc)
+                + datetime.timedelta(minutes=30)
+            )
+        )
+
+        async with market_api.subscribe(dbuild.properties, dbuild.constraints) as subscription:
+            issuers = set()
+            proposals = 0
+            rejected_proposals = set()  # Already rejected, don't reject again
+            async for event in subscription.events():
+                node_name = event.props.get("golem.node.id.name")
+                proposal_id = event._proposal.proposal.proposal_id
+                print(f"\n[{node_name}] {'*'*15} {proposal_id}")
+                prev_proposal_id = event._proposal.proposal.prev_proposal_id
+                print(f"[{node_name}] prev_proposal_id: {prev_proposal_id}")
+                if not event.is_draft:
+                    if proposals > 4:
+                        print(f"[node_name] Skipping additional proposal")
+                        break
+                    try:
+                        await _respond(event, dbuild)
+                    except ya_market.exceptions.ApiException as e:
+                        print(f"[{node_name}] respond error: {e}")
+                        continue
+                    proposals += 1
+                    issuers.add(event.issuer)
+                    print(f"[{node_name}] Responded {proposals} {len(issuers)}")
+                    continue
+
+                # print(f"props {json.dumps(event.props, indent=4)}")
+                print(
+                    f"[{node_name}] Offer: {proposal_id} from {event.issuer} is_draft: {event.is_draft}"
+                )
+                if prev_proposal_id not in rejected_proposals:
+                    await event.reject()
+                    print(f"[{node_name}] Rejected {len(rejected_proposals)}. id: {proposal_id}")
+                    await asyncio.sleep(1)
+                    print(f"[{node_name}] Renegotiating. id: {proposal_id}")
+                    try:
+                        new_offer_id = await _respond(event, dbuild)
+                        print(f"[{node_name}] new_offer_id: {new_offer_id}")
+                        rejected_proposals.add(new_offer_id)
+                    except ya_market.exceptions.ApiException as e:
+                        print(f"[{node_name}] respond error: {e}")
+                        continue
+                    continue
+                print(".create_agreement()")
+                agreement = await event.create_agreement()
+                print(".confirm()")
+                confirm_result = await agreement.confirm()
+                print(f"[{node_name}] agreement.confirm(): {confirm_result}")
+                if confirm_result:
+                    terminate_reason = {
+                        "message": "Work cancelled",
+                        "golem.requestor.code": "Cancelled",
+                    }
+                    terminate_result = await agreement.terminate(terminate_reason)
+                    print(f"agreement.terminate(): {terminate_result}")
+        print("All done")
+
+
+def main():
+    subnet = "goth"
+    sys.stderr.write(f"Using subnet: {utils.TEXT_COLOR_YELLOW}{subnet}{utils.TEXT_COLOR_DEFAULT}\n")
+
+    enable_default_logger()
+    try:
+        asyncio.get_event_loop().run_until_complete(
+            asyncio.wait_for(
+                renegotiate_offers(
+                    Configuration(),
+                    subnet_tag=subnet,
+                ),
+                timeout=140,
+            )
+        )
+    except TimeoutError:
+        print("Main timeout triggered :(")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py b/tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py
new file mode 100644
index 000000000..fe84ffa70
--- /dev/null
+++ b/tests/goth_tests/test_renegotiate_proposal/test_renegotiate_proposal.py
@@ -0,0 +1,46 @@
+import logging
+import os
+from pathlib import Path
+import pytest
+from typing import List
+
+from goth.configuration import load_yaml, Override
+from goth.runner.log import configure_logging
+from goth.runner import Runner
+from goth.runner.probe import RequestorProbe
+
+
+logger = logging.getLogger("goth.test.renegotiate_proposal")
+
+
+@pytest.mark.asyncio
+async def test_renegotiation(
+    log_dir: Path,
+    goth_config_path: Path,
+    config_overrides: List[Override],
+) -> None:
+
+    # This is the default configuration with 2 wasm/VM providers
+    goth_config = load_yaml(goth_config_path, config_overrides)
+    test_script_path = str(Path(__file__).parent / "requestor.py")
+
+    configure_logging(log_dir)
+
+    runner = Runner(
+        base_log_dir=log_dir,
+        compose_config=goth_config.compose_config,
+    )
+
+    async with runner(goth_config.containers):
+
+        requestor = runner.get_probes(probe_type=RequestorProbe)[0]
+
+        async with requestor.run_command_on_host(test_script_path, env=os.environ) as (
+            _cmd_task,
+            cmd_monitor,
+        ):
+
+            await cmd_monitor.wait_for_pattern(r"\[.+\] Renegotiating", timeout=20)
+            await cmd_monitor.wait_for_pattern(r"agreement.terminate\(\): True", timeout=20)
+            # assert not "Main timeout triggered :("
+            await cmd_monitor.wait_for_pattern(r"All done", timeout=20)

From d3745876f0e5ab4c5665cd7eb0e378b650482066 Mon Sep 17 00:00:00 2001
From: Dariusz Rybi <dariusz@golem.network>
Date: Mon, 2 Aug 2021 16:36:37 +0200
Subject: [PATCH 2/4] [review] Refactor

---
 tests/goth_tests/test_renegotiate_proposal/requestor.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py
index 629e5bef5..fec865b85 100755
--- a/tests/goth_tests/test_renegotiate_proposal/requestor.py
+++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py
@@ -14,6 +14,7 @@
 from examples import utils
 
 DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.accept-timeout?"
+PROPOSALS_LIMIT: Final[int] = 4
 
 
 async def _respond(proposal: OfferProposal, dbuild) -> str:
@@ -47,7 +48,7 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str):
                 prev_proposal_id = event._proposal.proposal.prev_proposal_id
                 print(f"[{node_name}] prev_proposal_id: {prev_proposal_id}")
                 if not event.is_draft:
-                    if proposals > 4:
+                    if proposals > PROPOSALS_LIMIT:
                         print(f"[node_name] Skipping additional proposal")
                         break
                     try:
@@ -94,7 +95,6 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str):
 
 def main():
     subnet = "goth"
-    sys.stderr.write(f"Using subnet: {utils.TEXT_COLOR_YELLOW}{subnet}{utils.TEXT_COLOR_DEFAULT}\n")
 
     enable_default_logger()
     try:

From 4b073bbb10f3e7dee0c5a9dd5582e1b8f03b36cf Mon Sep 17 00:00:00 2001
From: Dariusz Rybi <jiivan@users.noreply.github.com>
Date: Mon, 2 Aug 2021 16:37:34 +0200
Subject: [PATCH 3/4] Apply suggestions from code review

Co-authored-by: Kuba Mazurek <jakub.mazurek@golem.network>
---
 tests/goth_tests/test_renegotiate_proposal/requestor.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py
index fec865b85..1aea9e61f 100755
--- a/tests/goth_tests/test_renegotiate_proposal/requestor.py
+++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py
@@ -58,10 +58,9 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str):
                         continue
                     proposals += 1
                     issuers.add(event.issuer)
-                    print(f"[{node_name}] Responded {proposals} {len(issuers)}")
+                    print(f"[{node_name}] Responded. proposals={proposals}, issuers={len(issuers)}")
                     continue
 
-                # print(f"props {json.dumps(event.props, indent=4)}")
                 print(
                     f"[{node_name}] Offer: {proposal_id} from {event.issuer} is_draft: {event.is_draft}"
                 )

From 66a43c9b6642ce535fbc184952012f0fd3f0b1f1 Mon Sep 17 00:00:00 2001
From: Dariusz Rybi <dariusz@golem.network>
Date: Mon, 2 Aug 2021 17:14:10 +0200
Subject: [PATCH 4/4] [review] Remove unnecessary try..except

---
 .../test_renegotiate_proposal/requestor.py       | 16 ++++------------
 1 file changed, 4 insertions(+), 12 deletions(-)

diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py
index 1aea9e61f..e187754f3 100755
--- a/tests/goth_tests/test_renegotiate_proposal/requestor.py
+++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py
@@ -51,11 +51,7 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str):
                     if proposals > PROPOSALS_LIMIT:
                         print(f"[node_name] Skipping additional proposal")
                         break
-                    try:
-                        await _respond(event, dbuild)
-                    except ya_market.exceptions.ApiException as e:
-                        print(f"[{node_name}] respond error: {e}")
-                        continue
+                    await _respond(event, dbuild)
                     proposals += 1
                     issuers.add(event.issuer)
                     print(f"[{node_name}] Responded. proposals={proposals}, issuers={len(issuers)}")
@@ -69,13 +65,9 @@ async def renegotiate_offers(conf: Configuration, subnet_tag: str):
                     print(f"[{node_name}] Rejected {len(rejected_proposals)}. id: {proposal_id}")
                     await asyncio.sleep(1)
                     print(f"[{node_name}] Renegotiating. id: {proposal_id}")
-                    try:
-                        new_offer_id = await _respond(event, dbuild)
-                        print(f"[{node_name}] new_offer_id: {new_offer_id}")
-                        rejected_proposals.add(new_offer_id)
-                    except ya_market.exceptions.ApiException as e:
-                        print(f"[{node_name}] respond error: {e}")
-                        continue
+                    new_offer_id = await _respond(event, dbuild)
+                    print(f"[{node_name}] new_offer_id: {new_offer_id}")
+                    rejected_proposals.add(new_offer_id)
                     continue
                 print(".create_agreement()")
                 agreement = await event.create_agreement()