Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC Websocket block subscribe #15

Merged
merged 8 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions COINGECKO.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import CONFIG
from CONFIG import REDIS_DB
from HELPERS import ttl_block_only
from HELPERS_TYPES import Mode


class Coingecko:
Expand Down Expand Up @@ -39,7 +41,9 @@ def get_price(self):
ids = CONFIG.COINGECKO_IDS
vs_currencies = CONFIG.COINGECKO_FIAT

key = f"coingecko;{ids};{vs_currencies}"
cache_seconds = int(CONFIG.COINGECKO_CACHE.get("seconds", 7))
key = f"coingecko;{ttl_block_only(cache_seconds)};{ids};{vs_currencies}"

value = REDIS_DB.get(key)
if value is not None:
return json.loads(value)
Expand All @@ -57,9 +61,11 @@ def get_price(self):
"coins": updated_coins,
"last_update": int(time()),
}
REDIS_DB.set(
key, json.dumps(data), ex=int(CONFIG.COINGECKO_CACHE.get("seconds", 7))
)

if cache_seconds == Mode.FOR_BLOCK_TIME.value: # -2
cache_seconds = int(CONFIG.DEFAULT_CACHE_SECONDS)

REDIS_DB.set(key, json.dumps(data), ex=int(cache_seconds))
return data


Expand Down
17 changes: 12 additions & 5 deletions CONFIG.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import redis
from dotenv import load_dotenv

from HELPERS_TYPES import Mode

HEADERS = {
"accept": "application/json",
"Content-Type": "application/json",
Expand Down Expand Up @@ -66,8 +68,7 @@ def get_config_file(filename: str):
RPC_URL = getenv("RPC_URL", "https://juno-rpc.reece.sh:443")
BACKUP_RPC_URL = getenv("BACKUP_RPC_URL", "https://rpc.juno.strange.love:443")

# DISABLED CURRENTLY, Future TODO
# RPC_WEBSOCKET = f'ws://{getenv("WEBSOCKET_ADDR", "15.204.143.232:26657")}/websocket'
RPC_WEBSOCKET = getenv("WEBSOCKET_ADDR", "ws://15.204.143.232:26657/websocket")

# ============
# === REST ===
Expand All @@ -88,8 +89,9 @@ def get_config_file(filename: str):
NODE_TM_VERSION = getenv("NODE_TM_VERSION", "")

# === Cache Times ===
cache_times: dict = {}
DEFAULT_CACHE_SECONDS: int = 6

cache_times: dict = {}
RPC_ENDPOINTS: dict = {}
REST_ENDPOINTS: dict = {}
COINGECKO_CACHE: dict = {}
Expand All @@ -112,10 +114,15 @@ def update_cache_times():


def get_cache_time_seconds(path: str, is_rpc: bool) -> int:
"""
Returns an endpoints time to cache in seconds
"""
endpoints = RPC_ENDPOINTS if is_rpc else REST_ENDPOINTS

cache_seconds = DEFAULT_CACHE_SECONDS
for k, seconds in endpoints.items():
if re.match(k, path):
return seconds
cache_seconds = seconds
break

return DEFAULT_CACHE_SECONDS
return cache_seconds
99 changes: 99 additions & 0 deletions CONNECT_WEBSOCKET.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
import logging

import rel
import websocket

from CONFIG import REDIS_DB, RPC_WEBSOCKET

SUBSCRIBE_MSG = '{"jsonrpc": "2.0", "method": "subscribe", "params": ["tm.event=\'NewBlock\'"], "id": 1}'

logger = logging.getLogger(__name__)


# on a new block message, we will clear redis of any values which the config set to -2
# Use this for an indexer in the future?? :D
def on_message(ws, message):

msg = json.loads(message)

if msg.get("result") == {}:
logger.info("Subscribed to New Block with TendermintRPC...")
return

# block_height = msg["result"]["data"]["value"]["block"]["header"]["height"]
block_height = (
msg.get("result", {})
.get("data", {})
.get("value", {})
.get("block", {})
.get("header", {})
.get("height", -1)
)

if block_height == -1:
logger.error("Error: block height not found")
return

logger.debug(f"""New Block: {block_height}""")

# resets all blockOnly keys (balances for example)
del_keys = REDIS_DB.keys("*;IsBlockOnly;*")
if len(del_keys) > 0:
logger.debug(f"Deleting {len(del_keys)} keys...")
REDIS_DB.delete(*del_keys)


def on_error(ws, error):
logger.error(error)


def on_close(ws, close_status_code, close_msg):
logger.info("Closed connection")


def on_open(ws):
logger.info("Opened connection")
ws.send(SUBSCRIBE_MSG)
logger.info("Sent subscribe request")


class TendermintRPCWebSocket:
def __init__(
self,
enableSignal: bool = False,
enableTrace: bool = False,
logLevel: int = logging.DEBUG,
):
self.enableSignal = enableSignal

websocket.enableTrace(enableTrace) # toggle to show or hide output
self.ws = websocket.WebSocketApp(
f"{RPC_WEBSOCKET}",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)

logger.setLevel(logLevel)
logger.addHandler(logging.StreamHandler())

def start(self):
if self.enableSignal:
self.ws.run_forever(dispatcher=rel, reconnect=5)
self.signal(2, rel.abort)
self.dispatch()
else:
self.run_forever()

def signal(self, sig, func):
rel.signal(sig, func)

def dispatch(self):
rel.dispatch()


if __name__ == "__main__":
tmrpc = TendermintRPCWebSocket(enableSignal=True) # so we can ctrl+c
tmrpc.start()
11 changes: 11 additions & 0 deletions HELPERS.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@

import CONFIG
from CONFIG import REDIS_DB
from HELPERS_TYPES import Mode


def ttl_block_only(cache_seconds: int = 0):
# this way on a new block, we delete all *;IsBlockOnly;* keys
return (
"IsBlockOnly"
if cache_seconds == Mode.FOR_BLOCK_TIME.value
else f"{cache_seconds}s"
)


total_calls = {
# RPC:
Expand Down
7 changes: 7 additions & 0 deletions HELPERS_TYPES.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum


class Mode(Enum):
NO_CACHE = 0
DISABLED = -1
FOR_BLOCK_TIME = -2
38 changes: 28 additions & 10 deletions RequestsHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,26 @@
import CONFIG
from CONFIG import REDIS_DB
from HELPERS import hide_rest_data, hide_rpc_data, increment_call_value
from HELPERS_TYPES import Mode

timeout = httpx.Timeout(5.0, connect=5.0, read=4.0)


def set_cache_for_time_if_valid(
status_code: int, call_key: str, cache_seconds: int, redis_key: str, res: dict
):

if status_code == 200:
increment_call_value(call_key)

if cache_seconds == Mode.FOR_BLOCK_TIME.value: # -2
cache_seconds = 6 # avg block time. So if the websocket stops for some reason, still 6sec TTL

if cache_seconds > 0:
# is a cache
REDIS_DB.setex(redis_key, cache_seconds, json.dumps(res))


class RestApiHandler:
def handle_single_rest_get_requests(
self, path, key, cache_seconds: int, param_args
Expand All @@ -19,9 +35,10 @@ def handle_single_rest_get_requests(
req = httpx.get(f"{CONFIG.BACKUP_REST_URL}/{path}", params=param_args)

res = hide_rest_data(req.json(), path)
if req.status_code == 200:
REDIS_DB.setex(key, cache_seconds, json.dumps(res))
increment_call_value("total_outbound;get_all_rest")

set_cache_for_time_if_valid(
req.status_code, "total_outbound;get_all_rest", cache_seconds, key, res
)

return res

Expand Down Expand Up @@ -57,7 +74,6 @@ def handle_batch_http_request(self, REQ_DATA: list) -> dict:
)

if req.status_code == 200:
# REDIS_DB.setex(key, cache_seconds, json.dumps(req.json()))
increment_call_value("total_outbound;batch_http", len(REQ_DATA))

return req.json()
Expand All @@ -71,9 +87,10 @@ def handle_single_rpc_post_request(self, data, key, method, cache_seconds) -> di

# only saves to cache if the request was successful
res = hide_rpc_data(req.json(), method)
if req.status_code == 200:
REDIS_DB.setex(key, cache_seconds, json.dumps(res))
increment_call_value("total_outbound;post_endpoint")

set_cache_for_time_if_valid(
req.status_code, "total_outbound;post_endpoint", cache_seconds, key, res
)

return res

Expand All @@ -90,8 +107,9 @@ def handle_single_rpc_get_requests(
)

res = hide_rpc_data(req.json(), path)
if req.status_code == 200:
REDIS_DB.setex(key, cache_seconds, json.dumps(res))
increment_call_value("total_outbound;get_rpc_endpoint")

set_cache_for_time_if_valid(
req.status_code, "total_outbound;get_rpc_endpoint", cache_seconds, key, res
)

return res
7 changes: 3 additions & 4 deletions configs/.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ STATS_PASSWORD="" # blank = no password for https://network.rest.website.com/sta
RPC_URL="http://127.0.0.1:26657"
BACKUP_RPC_URL="https://rpc.juno.strange.love"

RPC_WEBSOCKET="ws://15.204.143.232:26657/websocket"

# REST API
REST_URL="http://127.0.0.1:1317"
BACKUP_REST_URL="https://api.juno.strange.love"
Expand Down Expand Up @@ -49,7 +51,4 @@ RPC_CUSTOM_TEXT='<a href="https://twitter.com/Reecepbcups_/status/16173965711881
# === TESTING APPLICATION PORTS ===
# Only local with `python3 rpc.py`. Systemd services use ports defined in .service files.
REST_PORT=5000
RPC_PORT=5001

# === WEBSOCKET (DISABLED )===
# RPC_WEBSOCKET="15.204.143.232:26657"
RPC_PORT=5001
12 changes: 8 additions & 4 deletions configs/cache_times.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
{
"DEFAULT": 6,
"DEFAULT": -2,
"coingecko": {
"seconds": 6
},
"rpc": {
"genesis": 259200,
"health.*": -2,
"abci_info.*": -2,
"status.*": -2,

"unconfirmed_txs": 3,

"genesis.*": 259200,
"block?height=.*": 3600,
"block_results.*": 3600,
"unconfirmed_txs": 3
"block_results.*": 3600
},
"rest": {
"cosmos\/auth\/v1beta1\/accounts": -1,
Expand Down
14 changes: 9 additions & 5 deletions docs/CONFIG_VALUES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ ENABLE_COUNTER=false
INCREASE_COUNTER_EVERY=10000
```

---

## Variable Length Cache

In the `cache_times.json` file, you can specify specific endpoints and how long said queries should persist in the cache.
This is useful for large queries such as /validators which may return 100+ validators. This data does not change often, making it useful for caching for longer periods.

If you wish to disable the cache, you can set the value to 0 for the said endpoint. If you wish to disable the endpoint query entirely, set it to a value less than 0 (such as -1).
By default, the cosmos/auth/v1beta1/accounts endpoint is disabled, as it temporarily halts the node.
There are 4 options:

This file uses regex pattern matching as keys, with values as the number of seconds to cache once it has been called.
For python strings, you must prefix any `*` you find with a `.`. So to match "random" in "my 8 random11 string", you would do `.*random.*` to match all before and after.
- > -2: Cache for the duration of the block (Subscribes to RPC_WEBSOCKET in .env file)
- > -1: Disable this query entirely (prevent DoS attacks on the node)
- > 0: No cache
- > 1+: Cache for the specified number of seconds

This is ONLY the path, which means it does not start with a `/`.
This file uses regex pattern matching as keys, with values as the number of seconds to cache once it has been called.
For python strings, you must prefix any `*` you find with a `.` (period). So to match "random" in "my 8 random11 string", you would do `.*random.*` to match all before and after.
7 changes: 7 additions & 0 deletions docs/SUPPORT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Contact & Support

> If something is confusing, I am happy to create further documentation / feature requests within reason.

- Github Issues / PRs accepted.

- Twitter: [https://twitter.com/Reecepbcups_](https://twitter.com/Reecepbcups_)
4 changes: 3 additions & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# python3 -m pip install -r requirements/requirements.txt
# sudo python3 -m pip install -r requirements/requirements.txt
python-dotenv
websockets

websocket-client # import websocket
rel

redis
httpx
Expand Down
Loading