Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rotate use cache. #1458

Merged
merged 7 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dongtai_common/models/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class IastHeartbeat(models.Model):
memory = models.CharField(max_length=1000, blank=True, null=True)
cpu = models.CharField(max_length=1000, blank=True, null=True)
disk = models.CharField(max_length=1000, blank=True, null=True)
req_count = models.IntegerField(blank=True, null=True)
req_count = models.IntegerField(default=0, blank=True, null=True)
dt = models.IntegerField(blank=True, null=True)
report_queue = models.PositiveIntegerField(default=0,
null=False,
Expand Down
44 changes: 19 additions & 25 deletions dongtai_engine/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,8 @@ def is_alive(agent_id: int, timestamp: int) -> bool:
"""
Whether the probe is alive or not, the judgment condition: there is a heartbeat log within 2 minutes
"""
return IastHeartbeat.objects.values('id').filter(agent__id=agent_id,
dt__gt=(timestamp -
60 * 2)).exists()

heartbeat_key = f"heartbeat-{agent_id}"
return True if cache.get(heartbeat_key) is not None else False

@shared_task(queue='dongtai-periodic-task')
def update_agent_status():
Expand All @@ -380,27 +378,23 @@ def update_agent_status():
before_agent_status_update()
logger.info(f'检测引擎状态更新开始')
timestamp = int(time.time())
try:
running_agents = IastAgent.objects.values("id").filter(online=1)
is_stopped_agents = list()
for agent in running_agents:
agent_id = agent['id']
if is_alive(agent_id=agent_id, timestamp=timestamp):
continue
else:
is_stopped_agents.append(agent_id)
if is_stopped_agents:
IastAgent.objects.filter(id__in=is_stopped_agents).update(is_running=0, is_core_running=0, online=0)

vul_id_qs = IastReplayQueue.objects.filter(
update_time__lte=timestamp - 60 * 5,
verify_time__isnull=True,
replay_type=1).values('relation_id').distinct()
IastVulnerabilityModel.objects.filter(pk__in=vul_id_qs).update(
status_id=7)
logger.info(f'检测引擎状态更新成功')
except Exception as e:
logger.error(f'检测引擎状态更新出错,错误详情:{e}', exc_info=e)
running_agents_ids = list(
IastAgent.objects.values("id").filter(online=1).values_list(
'pk', flat=True).all())
heartbeat_keys = set(map(lambda x: f"heartbeat-{x}", running_agents_ids))
exists_keys = set(cache.get_many(heartbeat_keys).keys())
keys_missing = heartbeat_keys - exists_keys
stop_agent_ids = list(
map(lambda x: int(x.replace("heartbeat-", "")), keys_missing))
IastAgent.objects.filter(id__in=stop_agent_ids).update(
is_running=0, is_core_running=0, online=0)
vul_id_qs = IastReplayQueue.objects.filter(
update_time__lte=timestamp - 60 * 5,
verify_time__isnull=True,
replay_type=1).values('relation_id').distinct()
IastVulnerabilityModel.objects.filter(pk__in=vul_id_qs).update(status_id=7)
logger.info("update offline agent: %s", stop_agent_ids)
logger.info(f'检测引擎状态更新成功')
after_agent_status_update()

@shared_task(queue='dongtai-periodic-task')
Expand Down
100 changes: 53 additions & 47 deletions dongtai_protocol/report/handler/heartbeat_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@
from dongtai_common.models.project import IastProject, VulValidation
from dongtai_common.utils.systemsettings import get_vul_validate
from dongtai_common.models.agent import IastAgent
from django.core.cache import cache

logger = logging.getLogger('dongtai.openapi')


def update_agent_cache(agent_id, data):
cache.set(f"heartbeat-{agent_id}", data, timeout=521)


def check_agent_incache(agent_id):
return True if cache.get(f"heartbeat-{agent_id}") else False


@ReportHandler.register(const.REPORT_HEART_BEAT)
class HeartBeatHandler(IReportHandler):

def __init__(self):
super().__init__()
self.req_count = None
Expand All @@ -41,59 +51,50 @@ def parse(self):
self.cpu = self.detail.get('cpu')
self.memory = self.detail.get('memory')
self.disk = self.detail.get('disk')
self.req_count = self.detail.get('reqCount')
self.req_count = self.detail.get('reqCount', None)
self.report_queue = self.detail.get('reportQueue', 0)
self.method_queue = self.detail.get('methodQueue', 0)
self.replay_queue = self.detail.get('replayQueue', 0)
self.return_queue = self.detail.get('returnQueue', None)

def has_permission(self):
self.agent = IastAgent.objects.filter(id=self.agent_id, user=self.user_id).first()
self.agent = IastAgent.objects.filter(id=self.agent_id,
user=self.user_id).first()
return self.agent

def save_heartbeat(self):
self.agent.is_running = 1
self.agent.online = 1
self.agent.save(update_fields=['is_running', 'online'])
queryset = IastHeartbeat.objects.filter(agent=self.agent)
heartbeat = queryset.order_by('-id').first()
if heartbeat:
queryset.exclude(pk=heartbeat.id).delete()
heartbeat.dt = int(time.time())
if self.return_queue == 1:
heartbeat.req_count = self.req_count
heartbeat.report_queue = self.report_queue
heartbeat.method_queue = self.method_queue
heartbeat.replay_queue = self.replay_queue
heartbeat.save(update_fields=[
'req_count', 'dt', 'report_queue', 'method_queue', 'replay_queue'
])
elif self.return_queue == 0:
heartbeat.memory = self.memory
heartbeat.cpu = self.cpu
heartbeat.disk = self.disk
heartbeat.save(update_fields=['disk', 'memory', 'cpu', 'dt'])
else:
heartbeat.memory = self.memory
heartbeat.cpu = self.cpu
heartbeat.req_count = self.req_count
heartbeat.report_queue = self.report_queue
heartbeat.method_queue = self.method_queue
heartbeat.replay_queue = self.replay_queue
heartbeat.disk = self.disk
heartbeat.save(update_fields=[
'disk', 'memory', 'cpu', 'req_count', 'dt', 'report_queue',
'method_queue', 'replay_queue'
])
default_dict = {"dt": int(time.time())}
if not check_agent_incache(self.agent_id):
self.agent.is_running = 1
self.agent.online = 1
IastHeartbeat.objects.update_or_create(agent_id=self.agent_id,
defaults={
"dt": int(time.time()),
})
if self.return_queue == 1:
default_dict['req_count'] = self.req_count
default_dict['report_queue'] = self.report_queue
default_dict['method_queue'] = self.method_queue
default_dict['replay_queue'] = self.replay_queue
IastHeartbeat.objects.update_or_create(agent_id=self.agent_id,
defaults=default_dict)
elif self.return_queue == 0:
if self.req_count is not None:
default_dict['req_count'] = self.req_count
default_dict['memory'] = self.memory
default_dict['cpu'] = self.cpu
default_dict['disk'] = self.disk
else:
IastHeartbeat.objects.create(memory=self.memory,
cpu=self.cpu,
req_count=self.req_count,
report_queue=self.replay_queue,
method_queue=self.method_queue,
replay_queue=self.replay_queue,
dt=int(time.time()),
agent=self.agent)
default_dict['memory'] = self.memory
default_dict['cpu'] = self.cpu
default_dict['req_count'] = self.req_count
default_dict['report_queue'] = self.report_queue
default_dict['method_queue'] = self.method_queue
default_dict['replay_queue'] = self.replay_queue
default_dict['disk'] = self.disk
IastHeartbeat.objects.update_or_create(agent_id=self.agent_id,
defaults=default_dict)
update_agent_cache(self.agent_id, default_dict)

def get_result(self, msg=None):
logger.info('return_queue: {}'.format(self.return_queue))
Expand Down Expand Up @@ -148,10 +149,15 @@ def get_result(self, msg=None):
state=const.WAITING).update(
update_time=timestamp,
state=const.SOLVING)
IastReplayQueue.objects.filter(id__in=failure_ids).update(update_time=timestamp, state=const.SOLVED)

IastVulnerabilityModel.objects.filter(id__in=success_vul_ids).update(latest_time=timestamp, status_id=2)
IastVulnerabilityModel.objects.filter(id__in=failure_vul_ids).update(latest_time=timestamp, status_id=1)
IastReplayQueue.objects.filter(id__in=failure_ids).update(
update_time=timestamp, state=const.SOLVED)

IastVulnerabilityModel.objects.filter(
id__in=success_vul_ids).update(latest_time=timestamp,
status_id=2)
IastVulnerabilityModel.objects.filter(
id__in=failure_vul_ids).update(latest_time=timestamp,
status_id=1)
logger.info(_('Reproduction request issued successfully'))
logger.debug([i['id'] for i in replay_requests])
return replay_requests
Expand Down
3 changes: 2 additions & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
# datetime: 2021/7/13 下午10:21
# project: dongtai-engine
from django.test.runner import DiscoverRunner
from django.core.cache import cache
import os
import unittest

import django


Expand All @@ -15,6 +15,7 @@ def __init__(self, methodName='runTest'):
super().__init__(methodName)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dongtai_conf.settings")
os.environ.setdefault("debug", "true")
cache.clear()
django.setup()


Expand Down