Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove internal routing #6541

Merged
merged 4 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 1 addition & 24 deletions raiden/blockchain/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
NettingChannelEndState,
NettingChannelState,
SuccessfulTransactionState,
TokenNetworkGraphState,
TokenNetworkState,
TransactionChannelDeposit,
TransactionExecutionStatus,
Expand All @@ -44,7 +43,6 @@
ContractReceiveChannelSettled,
ContractReceiveChannelWithdraw,
ContractReceiveNewTokenNetwork,
ContractReceiveRouteClosed,
ContractReceiveRouteNew,
ContractReceiveSecretReveal,
ContractReceiveUpdateTransfer,
Expand Down Expand Up @@ -89,9 +87,7 @@ def contractreceivenewtokennetwork_from_event(
return ContractReceiveNewTokenNetwork(
token_network_registry_address=token_network_registry_address,
token_network=TokenNetworkState(
address=token_network_address,
token_address=token_address,
network_graph=TokenNetworkGraphState(token_network_address),
address=token_network_address, token_address=token_address,
),
transaction_hash=event.transaction_hash,
block_number=event.block_number,
Expand Down Expand Up @@ -229,23 +225,6 @@ def contractreceivechannelclosed_from_event(
)


def contractreceiverouteclosed_from_event(event: DecodedEvent) -> ContractReceiveRouteClosed:
data = event.event_data
args = data["args"]
channel_identifier = args["channel_identifier"]

return ContractReceiveRouteClosed(
canonical_identifier=CanonicalIdentifier(
chain_identifier=event.chain_id,
token_network_address=TokenNetworkAddress(event.originating_contract),
channel_identifier=channel_identifier,
),
transaction_hash=event.transaction_hash,
block_number=event.block_number,
block_hash=event.block_hash,
)


def contractreceiveupdatetransfer_from_event(
channel_state: NettingChannelState, event: DecodedEvent
) -> ContractReceiveUpdateTransfer:
Expand Down Expand Up @@ -371,8 +350,6 @@ def blockchainevent_to_statechange(

if canonical_identifier is not None:
return contractreceivechannelclosed_from_event(canonical_identifier, event)
else:
return contractreceiverouteclosed_from_event(event)

elif event_name == ChannelEvent.SETTLED:
channel_settle_state = get_contractreceivechannelsettled_data_from_event(
Expand Down
1 change: 0 additions & 1 deletion raiden/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ class RoutingMode(Enum):
"""Routing mode configuration that can be chosen on the command line"""

PFS = "pfs"
LOCAL = "local"
PRIVATE = "private"


Expand Down
146 changes: 10 additions & 136 deletions raiden/routing.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
from heapq import heappop, heappush
from uuid import UUID

import networkx
import structlog
from eth_utils import to_canonical_address

from raiden.exceptions import ServiceRequestFailed
from raiden.messages.metadata import RouteMetadata
from raiden.network.pathfinding import PFSConfig, query_paths
from raiden.settings import INTERNAL_ROUTING_DEFAULT_FEE_PERC
from raiden.transfer import channel, views
from raiden.transfer.state import ChainState, ChannelState, NetworkState, RouteState
from raiden.transfer.state import ChainState, ChannelState, RouteState
from raiden.utils.formatting import to_checksum_address
from raiden.utils.typing import (
Address,
BlockNumber,
ChannelID,
FeeAmount,
InitiatorAddress,
List,
NamedTuple,
OneToNAddress,
Optional,
PaymentAmount,
Expand Down Expand Up @@ -48,36 +43,12 @@ def get_best_routes(
token_network = views.get_token_network_by_address(chain_state, token_network_address)
assert token_network, "The token network must be validated and exist."

try:
# networkx returns a generator, consume the result since it will be
# iterated over multiple times.
all_neighbors = list(
networkx.all_neighbors(token_network.network_graph.network, from_address)
)
except networkx.NetworkXError:
# If `our_address` is not in the graph, no channels opened with the
# address.
log.debug(
"Node does not have a channel in the requested token network.",
source=to_checksum_address(from_address),
target=to_checksum_address(to_address),
amount=amount,
)
return "Node does not have a channel in the requested token network.", list(), None

error_closed = 0
error_no_route = 0
error_no_capacity = 0
error_not_online = 0
error_direct = None
shortest_routes: List[Neighbour] = list()

# Always use a direct channel if available:
# - There are no race conditions and the capacity is guaranteed to be
# available.
# - There will be no mediation fees
# - The transfer will be faster
if to_address in all_neighbors:
if Address(to_address) in token_network.partneraddresses_to_channelidentifiers.keys():
for channel_id in token_network.partneraddresses_to_channelidentifiers[
Address(to_address)
]:
Expand All @@ -95,83 +66,11 @@ def get_best_routes(
)
return None, [direct_route], None

error_direct = is_usable

latest_channel_opened_at = BlockNumber(0)
for partner_address in all_neighbors:
for channel_id in token_network.partneraddresses_to_channelidentifiers[partner_address]:
channel_state = token_network.channelidentifiers_to_channels[channel_id]

if channel.get_status(channel_state) != ChannelState.STATE_OPENED:
error_closed += 1
continue

latest_channel_opened_at = max(
latest_channel_opened_at, channel_state.open_transaction.finished_block_number
)

try:
route = networkx.shortest_path( # pylint: disable=E1121
token_network.network_graph.network, partner_address, to_address
)
except (networkx.NetworkXNoPath, networkx.NodeNotFound):
error_no_route += 1
else:
distributable = channel.get_distributable(
channel_state.our_state, channel_state.partner_state
)

network_status = views.get_node_network_status(
chain_state, channel_state.partner_state.address
)

if distributable < amount:
error_no_capacity += 1
elif network_status != NetworkState.REACHABLE:
error_not_online += 1
else:
nonrefundable = amount > channel.get_distributable(
channel_state.partner_state, channel_state.our_state
)

# The complete route includes the initiator, add it to the beginning
complete_route = [Address(from_address)] + route
neighbour = Neighbour(
length=len(route),
nonrefundable=nonrefundable,
partner_address=partner_address,
channelid=channel_state.identifier,
route=complete_route,
)
heappush(shortest_routes, neighbour)

if not shortest_routes:
qty_channels = sum(
len(token_network.partneraddresses_to_channelidentifiers[partner_address])
for partner_address in all_neighbors
)
error_msg = (
f"None of the existing channels could be used to complete the "
f"transfer. From the {qty_channels} existing channels. "
f"{error_closed} are closed. {error_not_online} are not online. "
f"{error_no_route} don't have a route to the target in the given "
f"token network. {error_no_capacity} don't have enough capacity for "
f"the requested transfer."
)
if error_direct is not None:
error_msg += f"direct channel {error_direct}."

log.warning(
"None of the existing channels could be used to complete the transfer",
from_address=to_checksum_address(from_address),
to_address=to_checksum_address(to_address),
error_closed=error_closed,
error_no_route=error_no_route,
error_no_capacity=error_no_capacity,
error_direct=error_direct,
error_not_online=error_not_online,
latest_channel_opened_at = 0
for channel_state in token_network.channelidentifiers_to_channels.values():
latest_channel_opened_at = max(
latest_channel_opened_at, channel_state.open_transaction.finished_block_number
)
return error_msg, list(), None

if pfs_config is not None and one_to_n_address is not None:
pfs_error_msg, pfs_routes, pfs_feedback_token = get_best_routes_pfs(
Expand All @@ -184,7 +83,7 @@ def get_best_routes(
previous_address=previous_address,
pfs_config=pfs_config,
privkey=privkey,
pfs_wait_for_block=latest_channel_opened_at,
pfs_wait_for_block=BlockNumber(latest_channel_opened_at),
)

if not pfs_error_msg:
Expand All @@ -200,38 +99,13 @@ def get_best_routes(

log.warning(
"Request to Pathfinding Service was not successful. "
"No routes to the target are found.",
"No routes to the target were found.",
pfs_message=pfs_error_msg,
)
return pfs_error_msg, list(), None

else:
available_routes = list()

while shortest_routes:
neighbour = heappop(shortest_routes)

# https://github.com/raiden-network/raiden/issues/4751
# Internal routing doesn't know how much fees the initiator will be charged,
# so it should set a percentage on top of the original amount
# for the whole route.
estimated_fee = FeeAmount(round(INTERNAL_ROUTING_DEFAULT_FEE_PERC * amount))
if neighbour.length == 1: # Target is our direct neighbour, pay no fees.
estimated_fee = FeeAmount(0)

available_routes.append(
RouteState(route=neighbour.route, estimated_fee=estimated_fee,)
)

return (None, available_routes, None)


class Neighbour(NamedTuple):
length: int # first item used for ordering
nonrefundable: bool
partner_address: Address
channelid: ChannelID
route: List[Address]
log.warning("Pathfinding Service could not be used.")
return "Pathfinding Service could not be used.", list(), None


def get_best_routes_pfs(
Expand Down
22 changes: 0 additions & 22 deletions raiden/storage/serialization/schemas.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import json
from random import Random
from typing import Dict, Iterable

import marshmallow
import networkx
from eth_utils import to_bytes, to_canonical_address, to_hex
from marshmallow import EXCLUDE, Schema, post_dump
from marshmallow_dataclass import class_schema
Expand Down Expand Up @@ -213,25 +211,6 @@ def __call__(self, **metadata: Any) -> "CallablePolyField":
return self


class NetworkXGraphField(marshmallow.fields.Field):
""" Converts networkx.Graph objects to a string """

def _serialize(self, value: networkx.Graph, attr: Any, obj: Any, **kwargs: Any) -> str:
return json.dumps(
[(to_hex_address(edge[0]), to_hex_address(edge[1])) for edge in value.edges]
)

def _deserialize(self, value: str, attr: Any, data: Any, **kwargs: Any) -> networkx.Graph:
try:
raw_data = json.loads(value)
canonical_addresses = [
(to_canonical_address(edge[0]), to_canonical_address(edge[1])) for edge in raw_data
]
return networkx.Graph(canonical_addresses)
except (TypeError, ValueError):
raise self.make_error("validator_failed", input=value)


class BaseSchema(marshmallow.Schema):
# We want to ignore unknown fields
class Meta:
Expand Down Expand Up @@ -319,7 +298,6 @@ class Meta:
# QueueIdentifier (Special case)
QueueIdentifier: QueueIdentifierField,
# Other
networkx.Graph: NetworkXGraphField,
Random: PRNGField,
}

Expand Down
5 changes: 1 addition & 4 deletions raiden/tests/fuzz/test_state_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
HashTimeLockState,
NettingChannelState,
NetworkState,
TokenNetworkGraphState,
TokenNetworkRegistryState,
TokenNetworkState,
make_empty_pending_locks_state,
Expand Down Expand Up @@ -350,9 +349,7 @@ def initialize_all(self, block_number, random, random_seed):
self.token_network_address = factories.UNIT_TOKEN_NETWORK_ADDRESS
self.token_id = factories.UNIT_TOKEN_ADDRESS
self.token_network_state = TokenNetworkState(
address=self.token_network_address,
token_address=self.token_id,
network_graph=TokenNetworkGraphState(self.token_network_address),
address=self.token_network_address, token_address=self.token_id,
)

self.token_network_registry_address = factories.make_token_network_registry_address()
Expand Down
2 changes: 2 additions & 0 deletions raiden/tests/integration/long_running/test_token_networks.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def test_participant_selection(raiden_network: List[RaidenService], token_addres
)


@pytest.mark.skip(reason="Connection manager knowingly broken by removal of internal routing")
@raise_on_failure
@pytest.mark.parametrize("number_of_nodes", [4])
@pytest.mark.parametrize("channels_per_node", [0])
Expand Down Expand Up @@ -354,6 +355,7 @@ def test_connect_does_not_open_channels_with_offline_nodes(
)


@pytest.mark.skip(reason="Connection manager knowingly broken by removal of internal routing")
@raise_on_failure
@pytest.mark.parametrize("number_of_nodes", [3])
@pytest.mark.parametrize("channels_per_node", [0])
Expand Down
11 changes: 0 additions & 11 deletions raiden/tests/integration/network/test_pathfinding.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,6 @@ def test_configure_pfs(service_registry_address, private_keys, web3, contract_ma

response = mocked_json_response(response_data=json_data)

# With local routing configure_pfs should raise assertion
with pytest.raises(AssertionError):
_ = configure_pfs_or_exit(
pfs_url="",
routing_mode=RoutingMode.LOCAL,
service_registry=service_registry,
node_network_id=chain_id,
token_network_registry_address=token_network_registry_address_test_default,
pathfinding_max_fee=DEFAULT_PATHFINDING_MAX_FEE,
)

# With private routing configure_pfs should raise assertion
with pytest.raises(AssertionError):
_ = configure_pfs_or_exit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,6 @@ def test_monitoring_broadcast_messages_in_production_if_bigger_than_threshold(


@pytest.mark.parametrize("matrix_server_count", [1])
@pytest.mark.parametrize("route_mode", [RoutingMode.LOCAL, RoutingMode.PFS])
@pytest.mark.parametrize(
"broadcast_rooms", [[DISCOVERY_DEFAULT_ROOM, PATH_FINDING_BROADCASTING_ROOM]]
)
Expand All @@ -731,7 +730,6 @@ def test_pfs_broadcast_messages(
retries_before_backoff,
monkeypatch,
broadcast_rooms,
route_mode,
):
"""
Test that RaidenService broadcasts PFSCapacityUpdate messages to
Expand All @@ -752,7 +750,7 @@ def test_pfs_broadcast_messages(
transport._send_raw = MagicMock()
raiden_service = MockRaidenService(None)
raiden_service.config.services.monitoring_enabled = True
raiden_service.routing_mode = route_mode
raiden_service.routing_mode = RoutingMode.PFS

transport.start(raiden_service, [], None)

Expand Down
Loading