Skip to content

Commit

Permalink
Use only relay to index online status, and update nodestatushistory t…
Browse files Browse the repository at this point in the history
…o include any type of node rather than linked to provider
  • Loading branch information
cryptobench committed Sep 19, 2024
1 parent 41652f4 commit 8e5a760
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 143 deletions.
5 changes: 5 additions & 0 deletions docker-stack/golem-reputation-backend/p2p-ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ async def ping_provider(provider_id):
results.append(result)
else:
print("ERROR pinging", stderr.decode())
# If you detect the critical error related to `yagna.sock`, exit the script
if "No such file or directory" in stderr.decode():
print("Critical error: yagna.sock is unavailable, exiting...")
os._exit(1) # This will exit the script and stop the container
return False

if len(results) == 2:
Expand All @@ -129,6 +133,7 @@ async def ping_provider(provider_id):
print("Timeout reached while checking node status", e)
return False


# Process each provider ID and send ping results in chunks


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from django.db import migrations, models

def update_nodestatushistory(apps, schema_editor):
NodeStatusHistory = apps.get_model('api', 'NodeStatusHistory')
Provider = apps.get_model('api', 'Provider')

for history in NodeStatusHistory.objects.all():
if history.provider:
history.node_id = history.provider.node_id
history.save()

class Migration(migrations.Migration):

dependencies = [
('api', '0051_update_gputask_structure'),
]

operations = [
migrations.AddField(
model_name='nodestatushistory',
name='node_id',
field=models.CharField(max_length=42, null=True),
),
migrations.RunPython(update_nodestatushistory),
migrations.RemoveField(
model_name='nodestatushistory',
name='provider',
),
migrations.AlterField(
model_name='nodestatushistory',
name='node_id',
field=models.CharField(max_length=42),
),
migrations.RemoveIndex(
model_name='nodestatushistory',
name='api_nodesta_provide_94f647_idx',
),
migrations.AddIndex(
model_name='nodestatushistory',
index=models.Index(fields=['node_id', 'timestamp'], name='node_status_history_idx'),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1.7 on 2024-09-19 10:26

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('api', '0052_update_nodestatushistory'),
]

operations = [
migrations.RenameIndex(
model_name='nodestatushistory',
new_name='api_nodesta_node_id_acbc40_idx',
old_name='node_status_history_idx',
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,14 @@ class Meta:


class NodeStatusHistory(models.Model):
provider = models.ForeignKey(Provider, on_delete=models.CASCADE)
node_id = models.CharField(max_length=42)
is_online = models.BooleanField()
timestamp = models.DateTimeField(auto_now_add=True)

def __str__(self):
return f"{self.provider.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}"
return f"{self.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}"

class Meta:
indexes = [
models.Index(fields=["provider", "timestamp"]),
models.Index(fields=["node_id", "timestamp"]),
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
async def async_fetch_node_ids():
# Define the synchronous part as an inner function
def get_node_ids():
# Fetch the latest status for each provider and filter those that are online
# Fetch the latest status for each node_id and filter those that are online
latest_statuses = NodeStatusHistory.objects.filter(
provider_id__in=NodeStatusHistory.objects.order_by(
'provider', '-timestamp').distinct('provider').values_list('provider_id', flat=True)
).order_by('provider', '-timestamp').distinct('provider')
node_id__in=NodeStatusHistory.objects.order_by(
'node_id', '-timestamp').distinct('node_id').values_list('node_id', flat=True)
).order_by('node_id', '-timestamp').distinct('node_id')

# Return provider IDs where the latest status is online
return [status.provider.node_id for status in latest_statuses if status.is_online]
# Return node_ids where the latest status is online
return [status.node_id for status in latest_statuses if status.is_online]

# Use sync_to_async to convert it and immediately invoke
node_ids = await sync_to_async(get_node_ids, thread_sensitive=True)()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import redis
import requests
import asyncio
import csv
import json
Expand All @@ -24,18 +25,12 @@

@app.task(queue='default', options={'queue': 'default', 'routing_key': 'default'})
def update_providers_info(node_props):
node_ids = [prop['node_id'] for prop in node_props]
existing_providers = Provider.objects.filter(node_id__in=node_ids)
existing_providers_dict = {
provider.node_id: provider for provider in existing_providers}

create_batch = []
update_batch = []

provider_data = []
for props in node_props:
prop_data = {key: value for key, value in props.items() if key.startswith(
"golem.com.payment.platform.") and key.endswith(".address")}
provider_data = {
provider_data.append({
"node_id": props['node_id'],
"payment_addresses": prop_data,
"network": 'testnet' if any(key in TESTNET_KEYS for key in props.keys()) else 'mainnet',
"cores": props.get("golem.inf.cpu.cores"),
Expand All @@ -46,20 +41,32 @@ def update_providers_info(node_props):
"threads": props.get("golem.inf.cpu.threads"),
"storage": props.get("golem.inf.storage.gib"),
"name": props.get("golem.node.id.name"),
}

issuer_id = props['node_id']
if issuer_id in existing_providers_dict:
provider_instance = existing_providers_dict[issuer_id]
for key, value in provider_data.items():
setattr(provider_instance, key, value)
update_batch.append(provider_instance)
})

node_ids = [data['node_id'] for data in provider_data]
existing_providers = {
provider.node_id: provider
for provider in Provider.objects.filter(node_id__in=node_ids)
}

providers_to_create = []
providers_to_update = []

for data in provider_data:
if data['node_id'] in existing_providers:
provider = existing_providers[data['node_id']]
for key, value in data.items():
setattr(provider, key, value)
providers_to_update.append(provider)
else:
create_batch.append(Provider(node_id=issuer_id, **provider_data))
providers_to_create.append(Provider(**data))

Provider.objects.bulk_create(create_batch, ignore_conflicts=True)
Provider.objects.bulk_create(providers_to_create, ignore_conflicts=True)
Provider.objects.bulk_update(
update_batch, [field for field in provider_data.keys() if field != 'node_id'])
providers_to_update,
fields=[field for field in provider_data[0].keys() if field !=
'node_id']
)


TESTNET_KEYS = [
Expand All @@ -76,81 +83,33 @@ def update_providers_info(node_props):
from .utils import build_parser, print_env_info, format_usage # noqa: E402


def update_nodes_status(provider_id, is_online_now):
provider, created = Provider.objects.get_or_create(node_id=provider_id)

# Get the latest status from Redis
latest_status = r.get(f"provider:{provider_id}:status")

if latest_status is None:
# Status not found in Redis, fetch the latest status from the database
last_status = NodeStatusHistory.objects.filter(
provider=provider).last()
if not last_status or last_status.is_online != is_online_now:
# Create a new status entry if there's a change in status
NodeStatusHistory.objects.create(
provider=provider, is_online=is_online_now)
provider.online = is_online_now
provider.save()
else:
# Compare the latest status from Redis with the current status
if latest_status.decode() != str(is_online_now):
# Status has changed, update the database and Node.online field
NodeStatusHistory.objects.create(
provider=provider, is_online=is_online_now)
provider.online = is_online_now
provider.save()

# Store the current status in Redis for future lookups
r.set(f"provider:{provider_id}:status", str(is_online_now))


@app.task(queue='uptime', options={'queue': 'uptime', 'routing_key': 'uptime'})
def update_nodes_data(node_props):
r = redis.Redis(host='redis', port=6379, db=0)

for props in node_props:
issuer_id = props['node_id']
is_online_now = check_node_status(issuer_id)
print(f"Updating NodeStatus for {issuer_id} with is_online_now={is_online_now}")
try:
update_nodes_status(issuer_id, is_online_now)
except Exception as e:
print(f"Error updating NodeStatus for {issuer_id}: {e}")

provider_ids_in_props = {props['node_id'] for props in node_props}
previously_online_providers_ids = Provider.objects.filter(
nodestatushistory__is_online=True
).distinct().values_list('node_id', flat=True)

provider_ids_not_in_scan = set(
previously_online_providers_ids) - provider_ids_in_props

for issuer_id in provider_ids_not_in_scan:
is_online_now = check_node_status(issuer_id)
print(f"Verifying NodeStatus for {issuer_id} with is_online_now={is_online_now}")
try:
update_nodes_status(issuer_id, is_online_now)
except Exception as e:
print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}")


def check_node_status(issuer_id):
node_id_no_prefix = issuer_id[2:] if issuer_id.startswith(
'0x') else issuer_id
url = f"http://yacn2.dev.golem.network:9000/nodes/{node_id_no_prefix}"
try:
process = subprocess.run(
["yagna", "net", "find", issuer_id],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5 # 5-second timeout for the subprocess
)

# Process finished, return True if it was successful and "seen:" is in the output
return process.returncode == 0 and "seen:" in process.stdout.decode()
except subprocess.TimeoutExpired as e:
print("Timeout reached while checking node status", e)
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
node_key = issuer_id.lower()
node_info = data.get(node_key)

if node_info:
if isinstance(node_info, list):
if node_info == [] or node_info == [None]:
return False
else:
return any('seen' in item for item in node_info if item)
else:
return False
else:
return False
except requests.exceptions.RequestException as e:
print(
f"HTTP request exception when checking node status for {issuer_id}: {e}")
return False
except Exception as e:
print(f"Unexpected error checking node status: {e}")
print(f"Unexpected error checking node status for {issuer_id}: {e}")
return False


Expand Down Expand Up @@ -200,5 +159,4 @@ async def monitor_nodes_status(subnet_tag: str = "public"):
print("Scan timeout reached")

# Delay update_nodes_data call using Celery
update_nodes_data.delay(node_props)
update_providers_info.delay(node_props)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def penalty_weight(deviation):
def calculate_uptime(node_id, node=None):
if node is None:
node = Provider.objects.get(node_id=node_id)
statuses = NodeStatusHistory.objects.filter(provider=node).order_by("timestamp")
statuses = NodeStatusHistory.objects.filter(node_id=node_id).order_by("timestamp")

online_duration = timedelta(0)
last_online_time = None
Expand Down
Loading

0 comments on commit 8e5a760

Please sign in to comment.