Skip to content

Commit

Permalink
fix: indexer websocket "event loop closed"
Browse files Browse the repository at this point in the history
The indexer was broken due to (django/channels_redis#332).

This resulted in the indexer constantly erroring out, and not having a way to recover because it was constantly stuck searching for the object in the database.

While there are still a few nuances with this, the API is accessible, is not blocked by the indexer, and "can" be independently scaled if that need arises; although is not implemented today.

There may still be a few bugs here, but we **cannot** move to version 4 of `channels-redis` otherwise everything will implode and leave you with no idea what is going on wrong (the package handles things on the backend that you don't think about and results in you debugging absolutely your entire project only to realize the library is unstable and usage should be delayed until stable v5 is released.)
  • Loading branch information
nftchance committed Mar 11, 2023
1 parent 9e2195d commit 0ae0584
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 26 deletions.
2 changes: 0 additions & 2 deletions api/indexer/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def _etl(self, contracts, abi, topics, batch):
)

if len(events) == 0:
time.sleep(settings.LISTENER_POLL_INTERVAL)
return

self.loader.load(events)
Expand Down Expand Up @@ -68,7 +67,6 @@ def etl(self, extracting_obj, abi, topics, temp_from_block=None, temp_to_block=N
print([future._state for future in futures])

if future.cancelled():
print("Cancelled")
continue
try:
n = future.result()
Expand Down
4 changes: 1 addition & 3 deletions api/indexer/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,12 @@ def handle_uri(self, event):
return ("Badge uri updated", event['args'])

def load(self, events):
print("in loader")
event_responses = []

for event in events:
if 'event' in event:
if event['event'] in self.loader_mapping:
for handler in self.loader_mapping[event['event']]:
event_responses.append(handler(event))
event_responses.append(self.loader_mapping[event['event']](event))
else:
event_responses.append(("Event not handled", event['event'], event['args']))
else:
Expand Down
4 changes: 3 additions & 1 deletion api/indexer/references.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ def logs(self, contract_addresses, from_block, to_block):
key_hash = f"{contract_addresses_key}{from_block}{to_block}"

if not hasattr(self, key_hash):
setattr(self, key_hash, self._logs(contract_addresses, from_block, to_block))
setattr(self, key_hash, self._logs(contract_addresses, from_block, to_block))

return getattr(self, key_hash)
37 changes: 19 additions & 18 deletions api/indexer/scripts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import django

from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

Expand All @@ -13,9 +12,6 @@
backfill = Backfill()
scheduler = BackgroundScheduler()

def listener(event):
print(f'Job {event.job_id} raised {event.exception.__class__.__name__}')

@util.close_old_connections
def backfill_factories():
backfill.backfill_factories()
Expand All @@ -40,6 +36,9 @@ class JobManager:
def ready(self, *args, **options):
scheduler.add_jobstore(DjangoJobStore(), "default")

for job in scheduler.get_jobs():
scheduler.remove_job(job.id, "default")

jobs = [[
delete_old_job_executions,
CronTrigger(hour="*", minute="*/59"),
Expand All @@ -65,28 +64,30 @@ def ready(self, *args, **options):
for job in jobs:
# If job does not exist, add it
try:
if not scheduler.get_job(job[2], "default"):
scheduler.add_job(
job[0],
job[1],
id=job[2],
max_instances=1,
replace_existing=True,
jobstore="default",
coalesce=True,
misfire_grace_time=5,
)

print("Added: `{}`".format(job[2]))
scheduler.add_job(
job[0],
job[1],
id=job[2],
max_instances=1,
replace_existing=True,
jobstore="default",
coalesce=True,
misfire_grace_time=5,
)

print("Added: `{}`".format(job[2]))
except Exception as e:
print("Error adding job: `{}`".format(job[2]))
print(e)

try:
scheduler.add_listener(listener, EVENT_JOB_ERROR)
print("Starting scheduler...")
scheduler.start()
except (Exception, KeyboardInterrupt, SystemExit) as e:
print("Stopping scheduler...")

for job in jobs:
scheduler.remove_job(job[2], "default")

scheduler.shutdown()
print("Scheduler shut down successfully!")
3 changes: 1 addition & 2 deletions api/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
channels-redis==4.0.0
daphne==4.0.0
channels-redis==3.4.0
django-apscheduler==0.6.2
django-cors-headers==3.13.0
django-filter==22.1
Expand Down

0 comments on commit 0ae0584

Please sign in to comment.