From fea3cdeaccd8b91479173271a3c93867da73e3ac Mon Sep 17 00:00:00 2001 From: TobiWo Date: Sun, 26 Feb 2023 12:47:31 +0100 Subject: [PATCH] feat(all): WIP --- duties/constants/json.py | 4 + duties/constants/logging.py | 8 +- duties/constants/program.py | 5 + duties/fetcher/fetch.py | 88 +++++++---------- duties/fetcher/parser/validators.py | 71 +++++--------- duties/main.py | 9 +- duties/protocol/ethereum.py | 10 +- duties/protocol/request.py | 144 +++++++++++++++++++++++----- 8 files changed, 205 insertions(+), 134 deletions(-) diff --git a/duties/constants/json.py b/duties/constants/json.py index 5223cd2e..b2987921 100644 --- a/duties/constants/json.py +++ b/duties/constants/json.py @@ -3,3 +3,7 @@ RESPONSE_JSON_DATA_FIELD_NAME = "data" RESPONSE_JSON_DATA_GENESIS_TIME_FIELD_NAME = "genesis_time" +RESPONSE_JSON_STATUS_FIELD_NAME = "status" +RESPONSE_JSON_INDEX_FIELD_NAME = "index" +RESPONSE_JSON_VALIDATOR_FIELD_NAME = "validator" +RESPONSE_JSON_PUBKEY_FIELD_NAME = "pubkey" diff --git a/duties/constants/logging.py b/duties/constants/logging.py index 9997e62b..11ef8504 100644 --- a/duties/constants/logging.py +++ b/duties/constants/logging.py @@ -10,10 +10,10 @@ SYSTEM_EXIT_MESSAGE = "Detected user intervention (SIGINT). Shutting down." NEXT_INTERVAL_MESSAGE = "Logging next interval..." NO_UPCOMING_DUTIES_MESSAGE = "No upcoming duties detected!" -TOO_MANY_PROVIDED_VALIDATORS_MESSAGE = ( - "Provided number of validator indices is higher than 300. " - "This surpasses the current maximum for fetching attestation and sync committee duties. " - "Checking for those duties will be skipped!" +TOO_MANY_PROVIDED_VALIDATORS_FOR_FETCHING_ATTESTATION_DUTIES_MESSAGE = ( + "Provided number of validator indices for fetching attestion duties is high (> 100). " + "This pollutes the console output and prevents checking important duties. " + "Checking attestion duties will be skipped!" ) HIGHER_PROCESSING_TIME_INFO_MESSAGE = ( "You provided %s validators. Fetching all necessary data may take some time." diff --git a/duties/constants/program.py b/duties/constants/program.py index 6c1e8440..201e78d7 100644 --- a/duties/constants/program.py +++ b/duties/constants/program.py @@ -12,3 +12,8 @@ GRACEFUL_KILLER = GracefulKiller() THRESHOLD_TO_INFORM_USER_FOR_WAITING_PERIOD = 5000 NOT_ALLOWED_CHARACTERS_FOR_VALIDATOR_PARSING = [".", ","] +NUMBER_OF_VALIDATORS_PER_REST_CALL = 300 +MAX_NUMBER_OF_VALIDATORS_FOR_FETCHING_ATTESTATION_DUTIES = 100 +ALIAS_SEPARATOR = ";" +PUBKEY_PREFIX = "0x" +PUBKEY_LENGTH = 48 diff --git a/duties/fetcher/fetch.py b/duties/fetcher/fetch.py index 4e033138..b19675b1 100644 --- a/duties/fetcher/fetch.py +++ b/duties/fetcher/fetch.py @@ -1,32 +1,31 @@ """Module which holds all logic for fetching validator duties """ -from logging import getLogger from math import ceil from typing import List from cli.arguments import ARGUMENTS -from constants import endpoints, json +from constants import endpoints, program from fetcher.data_types import DutyType, ValidatorDuty from fetcher.parser.validators import get_active_validator_indices from protocol import ethereum -from protocol.request import send_beacon_api_request -from requests import Response - -__LOGGER = getLogger(__name__) - +from protocol.request import CalldataType, send_beacon_api_request __VALIDATORS = get_active_validator_indices() -def is_provided_validator_count_too_high() -> bool: - """Checks whether the number of provided validators is too high for - upcoming api calls +def is_provided_validator_count_too_high_for_fetching_attestation_duties() -> bool: + """Checks whether the number of provided validators is too high + for fetching attestation duties and therefore will not be displayed. Returns: - bool: is number of provided validators > 300 + bool: is number of provided validators > + MAX_NUMBER_OF_VALIDATORS_FOR_FETCHING_ATTESTATION_DUTIES """ - if len(__VALIDATORS) > 300: + if ( + len(__VALIDATORS) + > program.MAX_NUMBER_OF_VALIDATORS_FOR_FETCHING_ATTESTATION_DUTIES + ): return True return False @@ -40,15 +39,16 @@ def get_next_attestation_duties() -> dict[str, ValidatorDuty]: for all provided validators """ current_epoch = ethereum.get_current_epoch() - request_data = f"[{','.join(__VALIDATORS)}]" is_any_duty_outdated: List[bool] = [True] validator_duties: dict[str, ValidatorDuty] = {} - if ARGUMENTS.omit_attestation_duties: + if ( + ARGUMENTS.omit_attestation_duties + or len(__VALIDATORS) + > program.MAX_NUMBER_OF_VALIDATORS_FOR_FETCHING_ATTESTATION_DUTIES + ): return validator_duties while is_any_duty_outdated: - response_data = __get_raw_response_data( - current_epoch, DutyType.ATTESTATION, request_data - ) + response_data = __fetch_duty_responses(current_epoch, DutyType.ATTESTATION) validator_duties = { data.validator_index: __get_next_attestation_duty(data, validator_duties) for data in response_data @@ -73,12 +73,9 @@ def get_next_sync_committee_duties() -> dict[str, ValidatorDuty]: ceil(current_epoch / ethereum.EPOCHS_PER_SYNC_COMMITTEE) * ethereum.EPOCHS_PER_SYNC_COMMITTEE ) - request_data = f"[{','.join(__VALIDATORS)}]" validator_duties: dict[str, ValidatorDuty] = {} for epoch in [current_epoch, next_sync_committee_starting_epoch]: - response_data = __get_raw_response_data( - epoch, DutyType.SYNC_COMMITTEE, request_data - ) + response_data = __fetch_duty_responses(epoch, DutyType.SYNC_COMMITTEE) for data in response_data: if data.validator_index not in validator_duties: validator_duties[data.validator_index] = ValidatorDuty( @@ -103,7 +100,7 @@ def get_next_proposing_duties() -> dict[str, ValidatorDuty]: current_epoch = ethereum.get_current_epoch() validator_duties: dict[str, ValidatorDuty] = {} for index in [1, 1]: - response_data = __get_raw_response_data(current_epoch, DutyType.PROPOSING) + response_data = __fetch_duty_responses(current_epoch, DutyType.PROPOSING) for data in response_data: if ( str(data.validator_index) in __VALIDATORS @@ -121,24 +118,6 @@ def get_next_proposing_duties() -> dict[str, ValidatorDuty]: return __filter_proposing_duties(validator_duties) -def __get_raw_response_data( - target_epoch: int, duty_type: DutyType, request_data: str = "" -) -> List[ValidatorDuty]: - """Fetches raw responses for provided duties - - Args: - target_epoch (int): Epoch to check for duties - duty_type (DutyType): Type of the duty - request_data (str, optional): Request data if any. Defaults to "". - - Returns: - List[ValidatorDuty]: List of all fetched validator duties for a specific epoch - """ - response = __fetch_duty_response(target_epoch, duty_type, request_data) - response_data = response.json()[json.RESPONSE_JSON_DATA_FIELD_NAME] - return [ValidatorDuty.from_dict(data) for data in response_data] - - def __get_next_attestation_duty( data: ValidatorDuty, present_duties: dict[str, ValidatorDuty] ) -> ValidatorDuty: @@ -186,33 +165,36 @@ def __filter_proposing_duties( return filtered_proposing_duties -def __fetch_duty_response( - target_epoch: int, duty_type: DutyType, request_data: str = "" -) -> Response: +def __fetch_duty_responses( + target_epoch: int, duty_type: DutyType +) -> List[ValidatorDuty]: """Fetches validator duties in dependence of the duty type from the beacon client Args: target_epoch (int): Epoch to fetch duties for duty_type (DutyType): Type of the duty - request_data (str, optional): Request data if any. Defaults to "". Returns: - Response: Raw response from the sent api request + List[ValidatorDuty]: List of fetched validator duties """ match duty_type: case DutyType.ATTESTATION: - response = send_beacon_api_request( - f"{endpoints.ATTESTATION_DUTY_ENDPOINT}{target_epoch}", request_data + responses = send_beacon_api_request( + f"{endpoints.ATTESTATION_DUTY_ENDPOINT}{target_epoch}", + CalldataType.REQUEST_DATA, + __VALIDATORS, ) case DutyType.SYNC_COMMITTEE: - response = send_beacon_api_request( + responses = send_beacon_api_request( f"{endpoints.SYNC_COMMITTEE_DUTY_ENDPOINT}{target_epoch}", - request_data, + CalldataType.REQUEST_DATA, + __VALIDATORS, ) case DutyType.PROPOSING: - response = send_beacon_api_request( - f"{endpoints.BLOCK_PROPOSING_DUTY_ENDPOINT}{target_epoch}" + responses = send_beacon_api_request( + f"{endpoints.BLOCK_PROPOSING_DUTY_ENDPOINT}{target_epoch}", + CalldataType.NONE, ) case _: - response = Response() - return response + responses = [] + return [ValidatorDuty.from_dict(data) for data in responses] diff --git a/duties/fetcher/parser/validators.py b/duties/fetcher/parser/validators.py index 4cc7b1e5..8fbff878 100644 --- a/duties/fetcher/parser/validators.py +++ b/duties/fetcher/parser/validators.py @@ -9,7 +9,7 @@ from eth_typing import BLSPubkey from fetcher.data_types import ValidatorData, ValidatorIdentifier from protocol import ethereum -from protocol.request import send_beacon_api_request +from protocol.request import CalldataType, send_beacon_api_request __LOGGER = getLogger(__name__) @@ -43,7 +43,11 @@ def __create_active_validator_identifiers( __get_validator_index_or_pubkey(None, validator) for validator in __RAW_PARSED_VALIDATOR_IDENTIFIERS.values() ] - validator_infos = __fetch_validator_infos_from_beacon_chain(provided_validators) + validator_infos = send_beacon_api_request( + endpoint=endpoints.VALIDATOR_STATUS_ENDPOINT, + calldata_type=CalldataType.PARAMETERS, + provided_validators=provided_validators, + ) return __create_complete_active_validator_identifiers( validator_infos, provided_validators ) @@ -70,36 +74,6 @@ def __get_validator_index_or_pubkey( return raw_validator_identifier.validator.pubkey -def __fetch_validator_infos_from_beacon_chain( - provided_validators: List[str], -) -> List[Any]: - """Temporary function to fetch all validators with it's status - from the beacon chain. Temporary because chunking will be added - in general to the request functionality - - Args: - provided_validators (List[str]): Provided validators by the user - - Returns: - List[Any]: Fetched validator infos from the beacon chain - """ - chunked_validators = [ - provided_validators[index : index + 300] - for index in range(0, len(provided_validators), 300) - ] - fetched_validator_infos: List[Any] = [] - for chunk in chunked_validators: - parameter_value = f"{','.join(chunk)}" - raw_response = send_beacon_api_request( - endpoint=endpoints.VALIDATOR_STATUS_ENDPOINT, - parameters={"id": parameter_value}, - ) - fetched_validator_infos.extend( - raw_response.json()[json.RESPONSE_JSON_DATA_FIELD_NAME] - ) - return fetched_validator_infos - - def __create_complete_active_validator_identifiers( fetched_validator_infos: List[Any], provided_validators: List[str] ) -> Dict[str, ValidatorIdentifier]: @@ -119,10 +93,13 @@ def __create_complete_active_validator_identifiers( raw_identifier = __get_raw_validator_identifier(validator_info) if ( raw_identifier - and validator_info["status"] in ethereum.ACTIVE_VALIDATOR_STATUS + and validator_info[json.RESPONSE_JSON_STATUS_FIELD_NAME] + in ethereum.ACTIVE_VALIDATOR_STATUS ): - raw_identifier.index = validator_info["index"] - raw_identifier.validator.pubkey = validator_info["validator"]["pubkey"] + raw_identifier.index = validator_info[json.RESPONSE_JSON_INDEX_FIELD_NAME] + raw_identifier.validator.pubkey = validator_info[ + json.RESPONSE_JSON_STATUS_FIELD_NAME + ][json.RESPONSE_JSON_PUBKEY_FIELD_NAME] complete_validator_identifiers[raw_identifier.index] = raw_identifier __log_inactive_and_duplicated_validators( provided_validators, @@ -143,9 +120,13 @@ def __get_raw_validator_identifier( Returns: ValidatorIdentifier | None: Raw validator identifier """ - identifier_index = __RAW_PARSED_VALIDATOR_IDENTIFIERS.get(validator_info["index"]) + identifier_index = __RAW_PARSED_VALIDATOR_IDENTIFIERS.get( + validator_info[json.RESPONSE_JSON_INDEX_FIELD_NAME] + ) identifier_pubkey = __RAW_PARSED_VALIDATOR_IDENTIFIERS.get( - validator_info["validator"]["pubkey"] + validator_info[json.RESPONSE_JSON_STATUS_FIELD_NAME][ + json.RESPONSE_JSON_PUBKEY_FIELD_NAME + ] ) if identifier_index and identifier_pubkey: if identifier_index.alias: @@ -190,7 +171,7 @@ def __get_duplicates_with_different_identifiers( provided_valdiators: List[str], complete_validator_identifiers: Dict[str, ValidatorIdentifier], ) -> List[str]: - """Filters for duplicated validators which were provided with different identifiers + """Filters for duplicated validators which where provided with different identifiers Args: provided_valdiators (List[str]): Provided validators by the user @@ -261,17 +242,17 @@ def __create_raw_validator_identifier(validator: str) -> ValidatorIdentifier: validator, ) raise SystemExit() - if ";" in validator: + if program.ALIAS_SEPARATOR in validator: validator = validator.replace(" ", "") - alias_split = validator.split(";") + alias_split = validator.split(program.ALIAS_SEPARATOR) index_or_pubkey = alias_split[0] alias = alias_split[1] - if index_or_pubkey.startswith("0x"): - if __is_valid_pubkey(index_or_pubkey[2:]): + if index_or_pubkey.startswith(program.PUBKEY_PREFIX): + if __is_valid_pubkey(index_or_pubkey[len(program.PUBKEY_PREFIX) :]): return ValidatorIdentifier("", ValidatorData(index_or_pubkey), alias) return ValidatorIdentifier(index_or_pubkey, ValidatorData(""), alias) - if validator.startswith("0x"): - if __is_valid_pubkey(validator[2:]): + if validator.startswith(program.PUBKEY_PREFIX): + if __is_valid_pubkey(validator[len(program.PUBKEY_PREFIX) :]): return ValidatorIdentifier("", ValidatorData(validator), None) return ValidatorIdentifier(validator, ValidatorData(""), None) @@ -307,7 +288,7 @@ def __is_valid_pubkey(pubkey: str) -> bool: """ try: parsed_pubkey = BLSPubkey(bytes.fromhex(pubkey)) - if len(parsed_pubkey) != 48: + if len(parsed_pubkey) != program.PUBKEY_LENGTH: __LOGGER.error(logging.WRONG_OR_INCOMPLETE_PUBKEY_MESSAGE, pubkey) raise SystemExit() except ValueError as error: diff --git a/duties/main.py b/duties/main.py index 640d930b..ede2a163 100644 --- a/duties/main.py +++ b/duties/main.py @@ -30,12 +30,13 @@ def __fetch_validator_duties( if not __is_current_data_outdated(duties): return duties next_attestation_duties: dict[str, ValidatorDuty] = {} - next_sync_committee_duties: dict[str, ValidatorDuty] = {} - if fetch.is_provided_validator_count_too_high(): - logger.warning(logging.TOO_MANY_PROVIDED_VALIDATORS_MESSAGE) + if fetch.is_provided_validator_count_too_high_for_fetching_attestation_duties(): + logger.warning( + logging.TOO_MANY_PROVIDED_VALIDATORS_FOR_FETCHING_ATTESTATION_DUTIES_MESSAGE + ) else: next_attestation_duties = fetch.get_next_attestation_duties() - next_sync_committee_duties = fetch.get_next_sync_committee_duties() + next_sync_committee_duties = fetch.get_next_sync_committee_duties() next_proposing_duties = fetch.get_next_proposing_duties() duties = [ duty diff --git a/duties/protocol/ethereum.py b/duties/protocol/ethereum.py index 26ccd612..0ac0026e 100644 --- a/duties/protocol/ethereum.py +++ b/duties/protocol/ethereum.py @@ -5,7 +5,7 @@ from time import time from constants import endpoints, json -from protocol.request import send_beacon_api_request +from protocol.request import CalldataType, send_beacon_api_request def __fetch_genesis_time() -> int: @@ -14,12 +14,10 @@ def __fetch_genesis_time() -> int: Returns: int: Genesis time as unix timestamp in seconds """ - response = send_beacon_api_request(endpoints.BEACON_GENESIS_ENDPOINT) - return int( - response.json()[json.RESPONSE_JSON_DATA_FIELD_NAME][ - json.RESPONSE_JSON_DATA_GENESIS_TIME_FIELD_NAME - ] + response = send_beacon_api_request( + endpoints.BEACON_GENESIS_ENDPOINT, CalldataType.NONE, flatten=False ) + return int(response[0][json.RESPONSE_JSON_DATA_GENESIS_TIME_FIELD_NAME]) GENESIS_TIME = __fetch_genesis_time() diff --git a/duties/protocol/request.py b/duties/protocol/request.py index 2109ab35..c15feb52 100644 --- a/duties/protocol/request.py +++ b/duties/protocol/request.py @@ -1,9 +1,11 @@ """Module for fetching data from a beacon client """ +from enum import Enum +from itertools import chain from logging import getLogger from time import sleep -from typing import Dict +from typing import Any, List from cli.arguments import ARGUMENTS from constants import json, logging, program @@ -13,16 +15,61 @@ __LOGGER = getLogger(__name__) +class CalldataType(Enum): + """Defines the type of the calldata for the rest call""" + + NONE = 0 + REQUEST_DATA = 1 + PARAMETERS = 2 + + def send_beacon_api_request( endpoint: str, - request_data: str | None = None, - parameters: Dict[str, str] | None = None, + calldata_type: CalldataType, + provided_validators: List[str] | None = None, + flatten: bool = True, +) -> List[Any]: + """Sends api requests to the beacon client and returns the subsequent data objects + from the responses + + Args: + endpoint (str): The endpoint which will be called + calldata_type (CalldataType): The type of calldata submitted with the request + provided_validators (List[str]): The validator indices or pubkey to get information for + flatten (bool): If True the returned list will be flattened + + Returns: + List[Any]: List with data objects from responses + """ + + responses: List[Response] = [] + if provided_validators: + chunked_validators = [ + provided_validators[ + index : index + program.NUMBER_OF_VALIDATORS_PER_REST_CALL + ] + for index in range( + 0, len(provided_validators), program.NUMBER_OF_VALIDATORS_PER_REST_CALL + ) + ] + for chunk in chunked_validators: + responses.append(__send_request(endpoint, calldata_type, chunk)) + else: + responses.append(__send_request(endpoint, calldata_type, [])) + return __convert_to_raw_data_responses(responses, flatten) + + +def __send_request( + endpoint: str, + calldata_type: CalldataType, + provided_validators: List[str], ) -> Response: - """Sends an api request to the beacon client + """Sends a single request to the beacon client Args: endpoint (str): The endpoint which will be called - request_data (str, optional): Request data if any. Defaults to "". + calldata_type (CalldataType): The type of calldata submitted with the request + provided_validators (List[str]): The validator indices or pubkey to get information for Raises: SystemExit: Program exit if the response is not present at all @@ -34,25 +81,27 @@ def send_beacon_api_request( """ is_request_successful = False response = None + calldata = __get_processed_calldata(provided_validators, calldata_type) while not is_request_successful and not program.GRACEFUL_KILLER.kill_now: try: - if not request_data and not parameters: - response = get( - url=f"{ARGUMENTS.beacon_node}{endpoint}", - timeout=program.REQUEST_TIMEOUT, - ) - elif request_data and not parameters: - response = post( - url=f"{ARGUMENTS.beacon_node}{endpoint}", - data=request_data, - timeout=program.REQUEST_TIMEOUT, - ) - else: - response = get( - url=f"{ARGUMENTS.beacon_node}{endpoint}", - params=parameters, - timeout=program.REQUEST_TIMEOUT, - ) + match calldata_type: + case CalldataType.REQUEST_DATA: + response = post( + url=f"{ARGUMENTS.beacon_node}{endpoint}", + data=calldata, + timeout=program.REQUEST_TIMEOUT, + ) + case CalldataType.PARAMETERS: + response = get( + url=f"{ARGUMENTS.beacon_node}{endpoint}", + params={"id": calldata}, + timeout=program.REQUEST_TIMEOUT, + ) + case _: + response = get( + url=f"{ARGUMENTS.beacon_node}{endpoint}", + timeout=program.REQUEST_TIMEOUT, + ) response.close() is_request_successful = __is_request_successful(response) except RequestsConnectionError: @@ -67,6 +116,57 @@ def send_beacon_api_request( raise SystemExit() +def __convert_to_raw_data_responses( + raw_responses: List[Response], flatten: bool +) -> List[Any]: + """Creates a list with raw data response objects + + Args: + raw_responses (List[Response]): List of fetched responses + flatten (bool): Should a possible list of lists be flattend. This assumes + some knowledge about the handled data strucutes. + + Returns: + List[Any]: List of raw data objects from raw response objects + """ + if flatten: + return list( + chain( + *[ + raw_response.json()[json.RESPONSE_JSON_DATA_FIELD_NAME] + for raw_response in raw_responses + ] + ) + ) + return [ + raw_response.json()[json.RESPONSE_JSON_DATA_FIELD_NAME] + for raw_response in raw_responses + ] + + +def __get_processed_calldata( + validator_chunk: List[str], calldata_type: CalldataType +) -> str: + """Processes calldata in dependence of calldata type + + Args: + validator_chunk (List[str]): List of validators + calldata_type (CalldataType): Calldata type + + Returns: + str: Calldata as specific formatted string + """ + calldata: str = "" + match calldata_type: + case CalldataType.REQUEST_DATA: + calldata = f"[{','.join(validator_chunk)}]" + case CalldataType.PARAMETERS: + calldata = f"{','.join(validator_chunk)}" + case _: + calldata = "" + return calldata + + def __is_request_successful(response: Response) -> bool: """Helper to check if a request was successful