diff --git a/COINGECKO.py b/COINGECKO.py index 24f85bf..4b4f433 100644 --- a/COINGECKO.py +++ b/COINGECKO.py @@ -5,6 +5,8 @@ import CONFIG from CONFIG import REDIS_DB +from HELPERS import ttl_block_only +from HELPERS_TYPES import Mode class Coingecko: @@ -39,7 +41,9 @@ def get_price(self): ids = CONFIG.COINGECKO_IDS vs_currencies = CONFIG.COINGECKO_FIAT - key = f"coingecko;{ids};{vs_currencies}" + cache_seconds = int(CONFIG.COINGECKO_CACHE.get("seconds", 7)) + key = f"coingecko;{ttl_block_only(cache_seconds)};{ids};{vs_currencies}" + value = REDIS_DB.get(key) if value is not None: return json.loads(value) @@ -57,9 +61,11 @@ def get_price(self): "coins": updated_coins, "last_update": int(time()), } - REDIS_DB.set( - key, json.dumps(data), ex=int(CONFIG.COINGECKO_CACHE.get("seconds", 7)) - ) + + if cache_seconds == Mode.FOR_BLOCK_TIME.value: # -2 + cache_seconds = int(CONFIG.DEFAULT_CACHE_SECONDS) + + REDIS_DB.set(key, json.dumps(data), ex=int(cache_seconds)) return data diff --git a/CONFIG.py b/CONFIG.py index 04e838b..7dfd508 100644 --- a/CONFIG.py +++ b/CONFIG.py @@ -6,6 +6,8 @@ import redis from dotenv import load_dotenv +from HELPERS_TYPES import Mode + HEADERS = { "accept": "application/json", "Content-Type": "application/json", @@ -66,8 +68,7 @@ def get_config_file(filename: str): RPC_URL = getenv("RPC_URL", "https://juno-rpc.reece.sh:443") BACKUP_RPC_URL = getenv("BACKUP_RPC_URL", "https://rpc.juno.strange.love:443") -# DISABLED CURRENTLY, Future TODO -# RPC_WEBSOCKET = f'ws://{getenv("WEBSOCKET_ADDR", "15.204.143.232:26657")}/websocket' +RPC_WEBSOCKET = getenv("WEBSOCKET_ADDR", "ws://15.204.143.232:26657/websocket") # ============ # === REST === @@ -88,8 +89,9 @@ def get_config_file(filename: str): NODE_TM_VERSION = getenv("NODE_TM_VERSION", "") # === Cache Times === -cache_times: dict = {} DEFAULT_CACHE_SECONDS: int = 6 + +cache_times: dict = {} RPC_ENDPOINTS: dict = {} REST_ENDPOINTS: dict = {} COINGECKO_CACHE: dict = {} @@ -112,10 +114,15 @@ def update_cache_times(): def get_cache_time_seconds(path: str, is_rpc: bool) -> int: + """ + Returns an endpoints time to cache in seconds + """ endpoints = RPC_ENDPOINTS if is_rpc else REST_ENDPOINTS + cache_seconds = DEFAULT_CACHE_SECONDS for k, seconds in endpoints.items(): if re.match(k, path): - return seconds + cache_seconds = seconds + break - return DEFAULT_CACHE_SECONDS + return cache_seconds diff --git a/CONNECT_WEBSOCKET.py b/CONNECT_WEBSOCKET.py new file mode 100644 index 0000000..0963666 --- /dev/null +++ b/CONNECT_WEBSOCKET.py @@ -0,0 +1,99 @@ +import json +import logging + +import rel +import websocket + +from CONFIG import REDIS_DB, RPC_WEBSOCKET + +SUBSCRIBE_MSG = '{"jsonrpc": "2.0", "method": "subscribe", "params": ["tm.event=\'NewBlock\'"], "id": 1}' + +logger = logging.getLogger(__name__) + + +# on a new block message, we will clear redis of any values which the config set to -2 +# Use this for an indexer in the future?? :D +def on_message(ws, message): + + msg = json.loads(message) + + if msg.get("result") == {}: + logger.info("Subscribed to New Block with TendermintRPC...") + return + + # block_height = msg["result"]["data"]["value"]["block"]["header"]["height"] + block_height = ( + msg.get("result", {}) + .get("data", {}) + .get("value", {}) + .get("block", {}) + .get("header", {}) + .get("height", -1) + ) + + if block_height == -1: + logger.error("Error: block height not found") + return + + logger.debug(f"""New Block: {block_height}""") + + # resets all blockOnly keys (balances for example) + del_keys = REDIS_DB.keys("*;IsBlockOnly;*") + if len(del_keys) > 0: + logger.debug(f"Deleting {len(del_keys)} keys...") + REDIS_DB.delete(*del_keys) + + +def on_error(ws, error): + logger.error(error) + + +def on_close(ws, close_status_code, close_msg): + logger.info("Closed connection") + + +def on_open(ws): + logger.info("Opened connection") + ws.send(SUBSCRIBE_MSG) + logger.info("Sent subscribe request") + + +class TendermintRPCWebSocket: + def __init__( + self, + enableSignal: bool = False, + enableTrace: bool = False, + logLevel: int = logging.DEBUG, + ): + self.enableSignal = enableSignal + + websocket.enableTrace(enableTrace) # toggle to show or hide output + self.ws = websocket.WebSocketApp( + f"{RPC_WEBSOCKET}", + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close, + ) + + logger.setLevel(logLevel) + logger.addHandler(logging.StreamHandler()) + + def start(self): + if self.enableSignal: + self.ws.run_forever(dispatcher=rel, reconnect=5) + self.signal(2, rel.abort) + self.dispatch() + else: + self.run_forever() + + def signal(self, sig, func): + rel.signal(sig, func) + + def dispatch(self): + rel.dispatch() + + +if __name__ == "__main__": + tmrpc = TendermintRPCWebSocket(enableSignal=True) # so we can ctrl+c + tmrpc.start() diff --git a/HELPERS.py b/HELPERS.py index fabdff2..dbc53c7 100644 --- a/HELPERS.py +++ b/HELPERS.py @@ -5,6 +5,17 @@ import CONFIG from CONFIG import REDIS_DB +from HELPERS_TYPES import Mode + + +def ttl_block_only(cache_seconds: int = 0): + # this way on a new block, we delete all *;IsBlockOnly;* keys + return ( + "IsBlockOnly" + if cache_seconds == Mode.FOR_BLOCK_TIME.value + else f"{cache_seconds}s" + ) + total_calls = { # RPC: diff --git a/HELPERS_TYPES.py b/HELPERS_TYPES.py new file mode 100644 index 0000000..37b50e8 --- /dev/null +++ b/HELPERS_TYPES.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class Mode(Enum): + NO_CACHE = 0 + DISABLED = -1 + FOR_BLOCK_TIME = -2 diff --git a/RequestsHandler.py b/RequestsHandler.py index 25477f4..c7c8d6d 100644 --- a/RequestsHandler.py +++ b/RequestsHandler.py @@ -5,10 +5,26 @@ import CONFIG from CONFIG import REDIS_DB from HELPERS import hide_rest_data, hide_rpc_data, increment_call_value +from HELPERS_TYPES import Mode timeout = httpx.Timeout(5.0, connect=5.0, read=4.0) +def set_cache_for_time_if_valid( + status_code: int, call_key: str, cache_seconds: int, redis_key: str, res: dict +): + + if status_code == 200: + increment_call_value(call_key) + + if cache_seconds == Mode.FOR_BLOCK_TIME.value: # -2 + cache_seconds = 6 # avg block time. So if the websocket stops for some reason, still 6sec TTL + + if cache_seconds > 0: + # is a cache + REDIS_DB.setex(redis_key, cache_seconds, json.dumps(res)) + + class RestApiHandler: def handle_single_rest_get_requests( self, path, key, cache_seconds: int, param_args @@ -19,9 +35,10 @@ def handle_single_rest_get_requests( req = httpx.get(f"{CONFIG.BACKUP_REST_URL}/{path}", params=param_args) res = hide_rest_data(req.json(), path) - if req.status_code == 200: - REDIS_DB.setex(key, cache_seconds, json.dumps(res)) - increment_call_value("total_outbound;get_all_rest") + + set_cache_for_time_if_valid( + req.status_code, "total_outbound;get_all_rest", cache_seconds, key, res + ) return res @@ -57,7 +74,6 @@ def handle_batch_http_request(self, REQ_DATA: list) -> dict: ) if req.status_code == 200: - # REDIS_DB.setex(key, cache_seconds, json.dumps(req.json())) increment_call_value("total_outbound;batch_http", len(REQ_DATA)) return req.json() @@ -71,9 +87,10 @@ def handle_single_rpc_post_request(self, data, key, method, cache_seconds) -> di # only saves to cache if the request was successful res = hide_rpc_data(req.json(), method) - if req.status_code == 200: - REDIS_DB.setex(key, cache_seconds, json.dumps(res)) - increment_call_value("total_outbound;post_endpoint") + + set_cache_for_time_if_valid( + req.status_code, "total_outbound;post_endpoint", cache_seconds, key, res + ) return res @@ -90,8 +107,9 @@ def handle_single_rpc_get_requests( ) res = hide_rpc_data(req.json(), path) - if req.status_code == 200: - REDIS_DB.setex(key, cache_seconds, json.dumps(res)) - increment_call_value("total_outbound;get_rpc_endpoint") + + set_cache_for_time_if_valid( + req.status_code, "total_outbound;get_rpc_endpoint", cache_seconds, key, res + ) return res diff --git a/configs/.env b/configs/.env index c316609..ed096af 100644 --- a/configs/.env +++ b/configs/.env @@ -19,6 +19,8 @@ STATS_PASSWORD="" # blank = no password for https://network.rest.website.com/sta RPC_URL="http://127.0.0.1:26657" BACKUP_RPC_URL="https://rpc.juno.strange.love" +RPC_WEBSOCKET="ws://15.204.143.232:26657/websocket" + # REST API REST_URL="http://127.0.0.1:1317" BACKUP_REST_URL="https://api.juno.strange.love" @@ -49,7 +51,4 @@ RPC_CUSTOM_TEXT=' -2: Cache for the duration of the block (Subscribes to RPC_WEBSOCKET in .env file) +- > -1: Disable this query entirely (prevent DoS attacks on the node) +- > 0: No cache +- > 1+: Cache for the specified number of seconds -This is ONLY the path, which means it does not start with a `/`. +This file uses regex pattern matching as keys, with values as the number of seconds to cache once it has been called. +For python strings, you must prefix any `*` you find with a `.` (period). So to match "random" in "my 8 random11 string", you would do `.*random.*` to match all before and after. diff --git a/docs/SUPPORT.md b/docs/SUPPORT.md new file mode 100644 index 0000000..59718d2 --- /dev/null +++ b/docs/SUPPORT.md @@ -0,0 +1,7 @@ +# Contact & Support + +> If something is confusing, I am happy to create further documentation / feature requests within reason. + +- Github Issues / PRs accepted. + +- Twitter: [https://twitter.com/Reecepbcups_](https://twitter.com/Reecepbcups_) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index d0f0a92..5c56e63 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,7 +1,9 @@ # python3 -m pip install -r requirements/requirements.txt # sudo python3 -m pip install -r requirements/requirements.txt python-dotenv -websockets + +websocket-client # import websocket +rel redis httpx diff --git a/rest.py b/rest.py index 29b2c89..c6ce57e 100644 --- a/rest.py +++ b/rest.py @@ -1,17 +1,22 @@ # Reece Williams | https://reece.sh | Jan 2023 import json +import logging +import threading from flask import Flask, jsonify, request from flask_cors import CORS, cross_origin import CONFIG as CONFIG from CONFIG import REDIS_DB +from CONNECT_WEBSOCKET import TendermintRPCWebSocket from HELPERS import ( + Mode, download_openapi_locally, get_stats_html, get_swagger_code_from_source, increment_call_value, + ttl_block_only, ) from RequestsHandler import RestApiHandler @@ -30,6 +35,11 @@ def before_first_request(): download_openapi_locally() REST_HANDLER = RestApiHandler() + tmrpc = TendermintRPCWebSocket(enableSignal=False, logLevel=logging.DEBUG) + t = threading.Thread(target=tmrpc.ws.run_forever) + t.daemon = True + t.start() + @app.route("/", methods=["GET"]) @cross_origin() @@ -43,7 +53,6 @@ def root(): return REST_SWAGGER_HTML -# return all queries @app.route("/", methods=["GET"]) @cross_origin() def get_rest(path): @@ -60,14 +69,14 @@ def get_rest(path): args = request.args cache_seconds = CONFIG.get_cache_time_seconds(path, is_rpc=False) - if cache_seconds < 0: + if cache_seconds == Mode.DISABLED.value: return jsonify( { "error": f"cosmos endpoint cache: The path '{path}' is disabled on this node..." } ) - key = f"{CONFIG.REST_PREFIX};{path};{args}" + key = f"{CONFIG.REST_PREFIX};{ttl_block_only(cache_seconds)};{path};{args}" v = REDIS_DB.get(key) if v: diff --git a/rpc.py b/rpc.py index 1e4d2f7..519f960 100644 --- a/rpc.py +++ b/rpc.py @@ -2,9 +2,10 @@ # import asyncio import json +import logging import re +import threading -# import websockets from flask import Flask, jsonify, request from flask_cors import CORS, cross_origin from flask_sock import Sock @@ -12,12 +13,16 @@ import CONFIG as CONFIG from COINGECKO import Coingecko from CONFIG import REDIS_DB -from HELPERS import hide_rpc_data, increment_call_value, replace_rpc_text +from CONNECT_WEBSOCKET import TendermintRPCWebSocket +from HELPERS import ( + Mode, + hide_rpc_data, + increment_call_value, + replace_rpc_text, + ttl_block_only, +) from RequestsHandler import RPCHandler -# from flask_socketio import emit - - # === FLASK === rpc_app = Flask(__name__) sock = Sock(rpc_app) @@ -37,6 +42,12 @@ def before_first_request(): RPC_HANDLER = RPCHandler() GECKO = Coingecko() + # future: https://stackoverflow.com/questions/24101724/gunicorn-with-multiple-workers-is-there-an-easy-way-to-execute-certain-code-onl + tmrpc = TendermintRPCWebSocket(enableSignal=False, logLevel=logging.DEBUG) + t = threading.Thread(target=tmrpc.ws.run_forever) + t.daemon = True + t.start() + # === ROUTES === @rpc_app.route("/", methods=["GET"]) @@ -95,16 +106,16 @@ def get_rpc_endpoint(path: str): args = request.args - key = f"{CONFIG.RPC_PREFIX};{path};{args}" - cache_seconds = CONFIG.get_cache_time_seconds(path, is_rpc=True) - if cache_seconds < 0: + if cache_seconds == Mode.DISABLED.value: return jsonify( { "error": f"cosmos endpoint cache: The path '{path}' is disabled on this node..." } ) + key = f"{CONFIG.RPC_PREFIX};{ttl_block_only(cache_seconds)};{path};{args}" + v = REDIS_DB.get(key) if v: increment_call_value("total_cache;get_rpc_endpoint") @@ -128,16 +139,17 @@ def post_rpc_endpoint(): # If its a single RPC request, the following is used. method = REQ_DATA.get("method", None) params = REQ_DATA.get("params", None) - key = f"{CONFIG.RPC_PREFIX};{method};{params}" cache_seconds = CONFIG.get_cache_time_seconds(method, is_rpc=True) - if cache_seconds < 0: + if cache_seconds == Mode.DISABLED.value: return jsonify( { "error": f"cosmos endpoint cache: The RPC method '{method}' is disabled on this node..." } ) + key = f"{CONFIG.RPC_PREFIX};{ttl_block_only(cache_seconds)};{method};{params}" + v = REDIS_DB.get(key) if v: increment_call_value("total_cache;post_endpoint") @@ -179,4 +191,6 @@ def post_rpc_endpoint(): if __name__ == "__main__": before_first_request() + + # setting to True runs 2 processes rpc_app.run(debug=True, host="0.0.0.0", port=CONFIG.RPC_PORT)