Skip to content

Commit

Permalink
Merge pull request #122 from golemfactory/blue/blacklist-failing-nodes
Browse files Browse the repository at this point in the history
temporarily blacklist failing providers
  • Loading branch information
shadeofblue authored Aug 22, 2023
2 parents b1a15ac + 80eb101 commit a722890
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 4 deletions.
33 changes: 32 additions & 1 deletion dapp_runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import logging
from datetime import datetime
from decimal import Decimal
from typing import Dict, Final, List, Optional

import uvicorn
Expand All @@ -11,10 +12,12 @@
from yapapi.config import ApiConfig
from yapapi.contrib.service.http_proxy import LocalHttpProxy
from yapapi.contrib.service.socket_proxy import SocketProxy
from yapapi.events import CommandExecuted
from yapapi.events import CommandExecuted, Event, ServiceStateChanged
from yapapi.network import Network
from yapapi.payload import Payload
from yapapi.props import com
from yapapi.services import Cluster, Service, ServiceSerialization, ServiceState
from yapapi.strategy import LeastExpensiveLinearPayuMS

from dapp_runner._util import FreePortProvider, cancel_and_await_tasks, utcnow, utcnow_iso_str
from dapp_runner.descriptor import Config, DappDescriptor
Expand All @@ -31,6 +34,7 @@
from .error import RunnerError
from .payload import get_payload
from .service import DappService, get_service
from .strategy import BlacklistOnFailure

LOCAL_HTTP_PROXY_DATA_KEY: Final[str] = "local_proxy_address"
LOCAL_HTTP_PROXY_URI: Final[str] = "http://localhost"
Expand Down Expand Up @@ -77,14 +81,23 @@ def __init__(self, config: Config, dapp: DappDescriptor):
self.config = config
self.dapp = dapp

self._base_strategy = LeastExpensiveLinearPayuMS(
max_fixed_price=Decimal("1.0"),
max_price_for={com.Counter.CPU: Decimal("0.2"), com.Counter.TIME: Decimal("0.1")},
)
self._blacklist: BlacklistOnFailure = BlacklistOnFailure(self._base_strategy)

self.golem = Golem(
budget=config.payment.budget,
subnet_tag=config.yagna.subnet_tag,
payment_driver=config.payment.driver,
payment_network=config.payment.network,
api_config=ApiConfig(app_key=config.yagna.app_key), # type: ignore
strategy=self._blacklist,
)

self.golem.add_event_consumer(self._detect_failures, [ServiceStateChanged])

self.clusters = {}
self._payloads = {}
self._http_proxies = {}
Expand Down Expand Up @@ -350,6 +363,24 @@ def dapp_terminated(self) -> bool:
]
)

def _detect_failures(self, event: Event) -> None:
# just a sanity check
if not isinstance(event, ServiceStateChanged):
return

service = event.service
if (
self._desired_app_state == ServiceState.running
and event.new == ServiceState.terminated
and service._ctx
):
self._blacklist.blacklist_node(service._ctx.provider_id)
logger.info(
"Blacklisting %s (%s) after a failure",
service._ctx.provider_name,
service._ctx.provider_id,
)

def _update_node_gaom(self, service: DappService, service_descriptor: ServiceDescriptor):
# update the state in the GAOM
service_descriptor.state = service.state.identifier
Expand Down
37 changes: 37 additions & 0 deletions dapp_runner/runner/strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Dapp runner's market strategy implementation."""
import logging
from typing import Set

from yapapi import rest
from yapapi.strategy.wrapping_strategy import WrappingMarketStrategy

BLACKLISTED_SCORE = -1.0


class BlacklistOnFailure(WrappingMarketStrategy):
"""A market strategy wrapper that blacklists providers when they fail an activity."""

def __init__(self, base_strategy):
"""Initialize instance.
:param base_strategy: the base strategy around which this strategy is wrapped
"""
super().__init__(base_strategy)
self._logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
self._blacklist: Set[str] = set()

def blacklist_node(self, node_id: str):
"""Add the given node id to the blacklist."""
self._blacklist.add(node_id)

async def score_offer(self, offer: rest.market.OfferProposal) -> float:
"""Reject the node if blacklisted, otherwise score the offer using the base strategy."""

if offer.issuer in self._blacklist:
self._logger.debug(
"Rejecting offer %s from a blacklisted node '%s'", offer.id, offer.issuer
)
return BLACKLISTED_SCORE

score = await self.base_strategy.score_offer(offer)
return score
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ python = "^3.8"
appdirs = "^1.4"
click = "^7.0" # requires bump to goth's dependencies https://github.com/golemfactory/goth/issues/605
dpath = "~2.0.8"
pyyaml = "^5.0" # requires bump to goth's dependencies https://github.com/golemfactory/goth/issues/605
pyyaml = "^6.0"
shortuuid = "^1.0"
ansicolors = "^1.1.8"
networkx = "^2.8"
yapapi = { git = "https://github.com/golemfactory/yapapi.git", branch = "master" }
yapapi = "0.12.0-alpha.1"
pydantic = "^1.10.5"
fastapi = "^0.93.0"
uvicorn = {extras = ["standard"], version = "^0.21.0"}
Expand Down Expand Up @@ -103,6 +103,7 @@ authorized_licenses = [
"MPL-2.0",
"Mozilla Public License 2.0 (MPL 2.0)",
"MIT",
"MIT License",
"LGPL",
"LGPL-3.0-or-later",
"GNU Lesser General Public License v3 or later (LGPLv3+)",
Expand Down
1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _mock_runner(**kwargs):
mocker.patch("yapapi.golem.Golem._get_new_engine", mock.Mock())
mocker.patch("yapapi.golem.Golem.start", mock.AsyncMock())
mocker.patch("yapapi.golem.Golem.stop", mock.AsyncMock())
mocker.patch("yapapi.event_dispatcher.AsyncEventDispatcher.add_event_consumer", mock.Mock())

return RunnerFactory(**kwargs)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/descriptor/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

@pytest.fixture
@mock.patch(
"yapapi.payload.manifest.vm.resolve_package_repo_url",
"yapapi.payload.manifest.vm.resolve_package_url",
mock.AsyncMock(return_value="hash:sha3:some_image_hash:some_image_url"),
)
async def minimal_manifest() -> Manifest:
Expand Down

0 comments on commit a722890

Please sign in to comment.