diff --git a/dongtai_conf/celery.py b/dongtai_conf/celery.py index a70be8587..9232a67a0 100644 --- a/dongtai_conf/celery.py +++ b/dongtai_conf/celery.py @@ -83,7 +83,6 @@ "dongtai_engine.tasks.clear_error_log": {'exchange': 'dongtai-periodic-task', 'routing_key': 'dongtai-periodic-task'}, "dongtai_engine.tasks.vul_recheck": {'exchange': 'dongtai-periodic-task', 'routing_key': 'dongtai-periodic-task'}, "dongtai_engine.preheat.function_preheat": {'exchange': 'dongtai-periodic-task', 'routing_key': 'dongtai-periodic-task'}, - "dongtai_engine.plugins.data_clean": {'exchange': 'dongtai-periodic-task', 'routing_key': 'dongtai-periodic-task'}, "dongtai_engine.plugins.project_status": {'exchange': 'dongtai-periodic-task', 'routing_key': 'dongtai-periodic-task'}, } configs["CELERY_ENABLE_UTC"] = False diff --git a/dongtai_engine/plugins/data_clean.py b/dongtai_engine/plugins/data_clean.py deleted file mode 100644 index d370bda6d..000000000 --- a/dongtai_engine/plugins/data_clean.py +++ /dev/null @@ -1,97 +0,0 @@ -from dongtai_common.models.agent import IastAgent -from celery import shared_task -from dongtai_common.models.agent_method_pool import MethodPool -from time import time -from celery.apps.worker import logger -from dongtai_conf.settings import ELASTICSEARCH_STATE -from asgiref.sync import sync_to_async -import asyncio -from typing import List, Tuple -#import asyncio_gevent - -DELETE_BATCH_SIZE = 10000 - - -def chunked_queryset(queryset, chunk_size): - """ Slice a queryset into chunks. """ - - start_pk = 0 - queryset = queryset.order_by('pk') - - while True: - # No entry left - if not queryset.filter(pk__gt=start_pk).exists(): - break - - try: - # Fetch chunk_size entries if possible - end_pk = queryset.filter(pk__gt=start_pk).values_list( - 'pk', flat=True)[chunk_size - 1] - - # Fetch rest entries if less than chunk_size left - except IndexError: - end_pk = queryset.values_list('pk', flat=True).last() - - yield queryset.filter(pk__gt=start_pk).filter(pk__lte=end_pk) - - start_pk = end_pk - - -@shared_task(queue='dongtai-periodic-task', - time_limit=60 * 60 * 2, - soft_time_limit=60 * 60 * 4) -def data_cleanup(days: int): - delete_time_stamp = int(time()) - 60 * 60 * 24 * days - if ELASTICSEARCH_STATE: - # use delete to trigger the signal to delete related elasticsearch doc - qs = MethodPool.objects.filter(update_time__lte=delete_time_stamp) - for i in chunked_queryset(qs, DELETE_BATCH_SIZE): - i.delete() - else: - # use _raw_delete to reduce the delete time and memory usage. - # it could aviod to load every instance into memory. - latest_id = MethodPool.objects.filter( - update_time__lte=delete_time_stamp).order_by('-id').values_list( - 'id', flat=True).first() - first_id = MethodPool.objects.filter( - update_time__lte=delete_time_stamp).order_by('id').values_list( - 'id', flat=True).first() - if not any((latest_id, first_id)) or not isinstance(latest_id, int) or not isinstance(first_id, int): - logger.info("no data for clean up") - return - if all((latest_id, first_id)): - batch_clean(latest_id, first_id, 10000) - # qs = MethodPool.objects.filter(pk__lte=latest_id) - # qs._raw_delete(qs.db) - - logger.info(f'Delete offline agent 30 days ago') - agent_latest_id = IastAgent.objects.filter( - latest_time__lte=delete_time_stamp).order_by('-id').values_list( - 'id', flat=True).first() - if any([agent_latest_id]): - IastAgent.objects.filter(pk__lte=agent_latest_id, - online=0).delete() - - -@sync_to_async(thread_sensitive=False) -def data_clean_batch(upper_id: int, lower_id: int): - qs = MethodPool.objects.filter(pk__lt=upper_id, pk__gte=lower_id) - logger.info(f"data cleaning {upper_id}-{lower_id} ") - qs._raw_delete(qs.db) - - -#@asyncio_gevent.async_to_sync -async def loop_main(range_list: List[Tuple[int, int]]): - coros = [ - data_clean_batch(upper_id, lower_id) - for upper_id, lower_id in range_list - ] - await asyncio.gather(*coros) - - -def batch_clean(upper_id: int, lower_id: int, batch_size: int): - chunk_range = list( - zip(range(lower_id + batch_size, upper_id + batch_size, batch_size), - range(lower_id, upper_id, batch_size))) - loop = asyncio.get_event_loop() - loop.run_until_complete(loop_main(chunk_range)) diff --git a/dongtai_web/projecttemplate/base.py b/dongtai_web/projecttemplate/base.py deleted file mode 100644 index 926c0f61e..000000000 --- a/dongtai_web/projecttemplate/base.py +++ /dev/null @@ -1,163 +0,0 @@ -from dongtai_common.models.project import ( - IastProject, - IastProjectTemplate, - VulValidation, -) -from rest_framework import serializers -from dongtai_common.endpoint import UserEndPoint, R, TalentAdminEndPoint -from rest_framework import viewsets -from django.utils.translation import gettext_lazy as _ -from dongtai_web.utils import extend_schema_with_envcheck, get_response_serializer -from dongtai_common.permissions import UserPermission, ScopedPermission, SystemAdminPermission, TalentAdminPermission - - -class PaginationSerializer(serializers.Serializer): - page_size = serializers.IntegerField(default=20, - help_text=_('Number per page')) - page = serializers.IntegerField(default=1, help_text=_('Page index')) - - -class ProjectTemplateCreateArgsSerializer(serializers.Serializer): - id = serializers.IntegerField( - min_value=1, - help_text=_("The id corresponding to the scanning strategy."), - required=False) - template_name = serializers.CharField(help_text=_('The name of project')) - scan_id = serializers.IntegerField( - min_value=1, - max_value=2**32, - help_text=_("The id corresponding to the scanning strategy.")) - vul_validation = serializers.IntegerField( - min_value=0, - max_value=2, - help_text="vul validation switch, 0-FOLLOW_GLOBAL, 1-ENABLE,2-DISABLE") - data_gather = serializers.JSONField(help_text="data gather settings", - required=False) - data_gather_is_followglobal = serializers.IntegerField(required=False, - min_value=0, - max_value=2, - default=0) - blacklist_is_followglobal = serializers.IntegerField(required=False, - min_value=0, - max_value=2, - default=0) - blacklist = serializers.SerializerMethodField(required=False) - is_system = serializers.IntegerField(required=False, default=0) - - def get_blacklist(self, obj): - return [] - - class Meta: - model = IastProjectTemplate - fields = [ - 'id', - 'template_name', - 'scan_id', - 'vul_validation', - 'data_gather', - 'data_gather_is_followglobal', - 'blacklist_is_followglobal', - 'blacklist', - ] - - -def template_create(data, user): - data['user_id'] = user.id - for field in ["blacklist"]: - if field in data: - del data[field] - project_template = IastProjectTemplate.objects.create(**data) - pk = project_template.id - return pk - - -def template_update(pk, data, user): - data['user_id'] = user.id - for field in ["blacklist"]: - if field in data: - del data[field] - IastProjectTemplate.objects.filter(pk=pk).update(**data) - - -class IastProjectTemplateView(TalentAdminEndPoint, viewsets.ViewSet): - name = "api-v1-agent-project-template" - description = _("project_template") - - permission_classes_by_action = {'list': (UserPermission, )} - - def get_permissions(self): - try: - return [ - permission() for permission in - self.permission_classes_by_action[self.action] - ] - except KeyError: - return [permission() for permission in self.permission_classes] - - @extend_schema_with_envcheck(request=ProjectTemplateCreateArgsSerializer, - summary=_('Create project template'), - description=_("Create project template"), - tags=[_('projectemplate')]) - def create(self, request): - ser = ProjectTemplateCreateArgsSerializer(data=request.data) - ser.is_valid() - if ser.errors: - return R.failure(data=ser.errors) - data = {} - for field in ProjectTemplateCreateArgsSerializer.Meta.fields: - if field in request.data and field != 'id': - data[field] = request.data[field] - template_create(data, request.user) - return R.success() - - @extend_schema_with_envcheck(request=ProjectTemplateCreateArgsSerializer, - summary=_('Update project template'), - description=_("Update project template"), - tags=[_('projectemplate')]) - def update(self, request, pk): - ser = ProjectTemplateCreateArgsSerializer(data=request.data) - ser.is_valid() - if ser.errors: - return R.failure(data=ser.errors) - data = {} - for field in ProjectTemplateCreateArgsSerializer.Meta.fields: - if field in request.data and field != 'id': - data[field] = request.data[field] - template_update(pk, data, request.user) - return R.success() - - @extend_schema_with_envcheck([PaginationSerializer], - summary=_('List project template'), - description=_("List project template"), - tags=[_('projectemplate')]) - def list(self, request): - ser = PaginationSerializer(data=request.GET) - ser.is_valid() - if ser.errors: - return R.failure(data=ser.errors) - page_size = ser.validated_data['page_size'] - page = ser.validated_data['page'] - summary, templates = self.get_paginator( - IastProjectTemplate.objects.values().order_by( - '-latest_time').all(), page, page_size) - return R.success( - data=ProjectTemplateCreateArgsSerializer(templates, - many=True).data, - page=summary, - ) - - @extend_schema_with_envcheck(summary=_('delete project template'), - description=_("delete project template"), - tags=[_('projectemplate')]) - def delete(self, request, pk): - IastProjectTemplate.objects.filter(pk=pk).delete() - return R.success() - - @extend_schema_with_envcheck(summary=_('get project template'), - description=_("get project template"), - tags=[_('projectemplate')]) - def retrieve(self, request, pk): - obj = IastProjectTemplate.objects.filter(pk=pk).values().first() - if not obj: - return R.failure() - return R.success(data=ProjectTemplateCreateArgsSerializer(obj).data) diff --git a/dongtai_web/systemmonitor/data_clean.py b/dongtai_web/systemmonitor/data_clean.py deleted file mode 100644 index ff7c978b3..000000000 --- a/dongtai_web/systemmonitor/data_clean.py +++ /dev/null @@ -1,106 +0,0 @@ -from dongtai_common.endpoint import R -from dongtai_common.endpoint import UserEndPoint -from dongtai_conf.settings import config -from dongtai_common.models.profile import IastProfile -from django.utils.translation import gettext_lazy as _ -from dongtai_web.utils import extend_schema_with_envcheck, get_response_serializer -from django.utils.translation import gettext_lazy as _ -from rest_framework import serializers -from rest_framework.serializers import ValidationError -from django.forms.models import model_to_dict -import json -from datetime import datetime -from django_celery_beat.models import ( - CrontabSchedule, - PeriodicTask, -) -from dongtai_engine.plugins.data_clean import data_cleanup - - -class DataCleanSettingsSer(serializers.Serializer): - clean_time = serializers.TimeField(format="%H:%M:%S") - days_before = serializers.IntegerField() - enable = serializers.BooleanField() - - -class DataCleanDoItNowArgsSer(serializers.Serializer): - days_before = serializers.IntegerField() - - -class DataCleanEndpoint(UserEndPoint): - - @extend_schema_with_envcheck(summary=_('Get Profile'), - description=_("Get Profile with key"), - tags=[_('Profile')]) - def get(self, request): - key = 'data_clean' - profile = IastProfile.objects.filter(key=key).values_list( - 'value', flat=True).first() - if profile is None: - return R.failure( - msg=_("Failed to get {} configuration").format(key)) - data = json.loads(profile) - return R.success(data=data) - - @extend_schema_with_envcheck(summary=_('Profile modify'), - request=DataCleanSettingsSer, - description=_("Modifiy Profile with key"), - tags=[_('Profile')]) - def post(self, request): - ser = DataCleanSettingsSer(data=request.data) - try: - if ser.is_valid(True): - pass - except ValidationError as e: - return R.failure(data=e.detail) - key = 'data_clean' - datetime_obj = datetime.strptime(ser.data['clean_time'], '%H:%M:%S') - hour = datetime_obj.hour - minute = datetime_obj.hour - enabled = 1 if ser.data['enable'] else 0 - kwargs = {'days': int(ser.data['days_before'])} - schedule, _ = CrontabSchedule.objects.get_or_create( - minute=minute, - hour=hour, - day_of_week='*', - day_of_month='*', - month_of_year='*', - ) - PeriodicTask.objects.update_or_create( - name='data clean functions', # simply describes this periodic task. - defaults={ - 'crontab': schedule, # we created this above. - 'enabled': enabled, - 'task': - 'dongtai_engine.plugins.data_clean.data_cleanup', # name of task. - 'args': json.dumps([]), - 'kwargs': json.dumps(kwargs), - }) - value = json.dumps(ser.data) - try: - obj, created = IastProfile.objects.update_or_create( - { - 'key': key, - 'value': value - }, key=key) - except Exception as e: - return R.failure(msg=_("Update {} failed").format(key)) - data = json.loads(value) - return R.success(data=data) - - -class DataCleanDoItNowEndpoint(UserEndPoint): - - @extend_schema_with_envcheck(summary=_('Get Profile'), - request=DataCleanDoItNowArgsSer, - description=_("Get Profile with key"), - tags=[_('Profile')]) - def post(self, request): - ser = DataCleanDoItNowArgsSer(data=request.data) - try: - if ser.is_valid(True): - pass - except ValidationError as e: - return R.failure(data=e.detail) - data_cleanup.delay(days=ser.data['days_before']) - return R.success() diff --git a/dongtai_web/systemmonitor/urls.py b/dongtai_web/systemmonitor/urls.py index 756e1c4dd..dedd3ba1b 100644 --- a/dongtai_web/systemmonitor/urls.py +++ b/dongtai_web/systemmonitor/urls.py @@ -1,17 +1,11 @@ from django.urls import include, path from rest_framework import routers -from dongtai_web.systemmonitor.data_clean import ( - DataCleanEndpoint, - DataCleanDoItNowEndpoint, -) from dongtai_web.systemmonitor.project_warning import ProjectWarningEndpoint router = routers.DefaultRouter() base_urlpatterns = [ - path("data_clean", DataCleanEndpoint.as_view()), - path("data_clean/task", DataCleanDoItNowEndpoint.as_view()), path("project_warning", ProjectWarningEndpoint.as_view()), ] diff --git a/dongtai_web/urls.py b/dongtai_web/urls.py index 03eb055dd..4a0351a7c 100644 --- a/dongtai_web/urls.py +++ b/dongtai_web/urls.py @@ -127,11 +127,6 @@ SensitiveInfoRuleBatchView, SensitiveInfoRuleAllView, ) -from dongtai_web.views.scan_strategys import ( - ScanStrategyViewSet, - ScanStrategyRelationProject, - ScanStrategyAllView, -) from dongtai_web.views.details_id import (AgentListWithid, ProjectListWithid, ScaListWithid, VulsListWithid) from dongtai_web.views.vul_recheck_v2 import VulReCheckv2 @@ -152,7 +147,6 @@ from dongtai_web.vul_log.vul_log_view import VulLogViewSet from dongtai_web.vul_recheck_payload.vul_recheck_payload import VulReCheckPayloadViewSet from dongtai_web.header_vul.base import HeaderVulViewSet -from dongtai_web.projecttemplate.base import IastProjectTemplateView from dongtai_web.dast.webhook import DastWebhook from dongtai_web.dast.page import DastVulsEndPoint from dongtai_web.dast.manage import DastManageEndPoint @@ -284,24 +278,9 @@ SensitiveInfoPatternTypeView.as_view()), path('sensitive_info_rule/_validation', SensitiveInfoPatternValidationView.as_view()), - path('scan_strategy', - ScanStrategyViewSet.as_view({ - 'get': 'list', - 'post': 'create' - })), - path( - 'scan_strategy/', - ScanStrategyViewSet.as_view({ - 'get': 'retrieve', - 'put': 'update', - 'delete': 'destory' - })), - path('scan_strategy//relationprojects', - ScanStrategyRelationProject.as_view()), path('sensitive_info_rule/batch_update', SensitiveInfoRuleBatchView.as_view()), path('sensitive_info_rule/all', SensitiveInfoRuleAllView.as_view()), - path('scan_strategy/all', ScanStrategyAllView.as_view()), path('agent/list/ids', AgentListWithid.as_view()), path('vul/list/ids', VulsListWithid.as_view()), path('sca/list/ids', ScaListWithid.as_view()), @@ -378,18 +357,6 @@ path('header_vul/', HeaderVulViewSet.as_view({ 'delete': "delete", })), - path( - 'projecttemplate/', - IastProjectTemplateView.as_view({ - 'get': "retrieve", - 'put': 'update', - 'delete': 'delete', - })), - path('projecttemplate', - IastProjectTemplateView.as_view({ - 'get': "list", - 'post': 'create', - })), path('dast_webhook', DastWebhook.as_view()), path('dastvul/', DastVulsEndPoint.as_view({ 'get': "single", diff --git a/dongtai_web/views/scan_strategys.py b/dongtai_web/views/scan_strategys.py deleted file mode 100644 index 18d0899e7..000000000 --- a/dongtai_web/views/scan_strategys.py +++ /dev/null @@ -1,269 +0,0 @@ -import logging - -from dongtai_common.endpoint import UserEndPoint, R -from dongtai_common.utils import const - -from django.utils.translation import gettext_lazy as _ -from rest_framework import serializers -from dongtai_web.utils import extend_schema_with_envcheck, get_response_serializer -from django.utils.text import format_lazy -from rest_framework.serializers import ValidationError -from rest_framework import viewsets - -from django.db import models -from dongtai_common.models.strategy_user import IastStrategyUser -from dongtai_common.models.user import User -import time -from django.db.models import Q -from dongtai_common.permissions import TalentAdminPermission -from dongtai_common.models.project import IastProject -from dongtai_web.serializers.project import ProjectSerializer -from dongtai_web.views.utils.commonview import ( - BatchStatusUpdateSerializerView, - AllStatusUpdateSerializerView, -) -from dongtai_common.common.utils import disable_cache -from dongtai_engine.common.queryset import load_sink_strategy - - -logger = logging.getLogger('dongtai-webapi') - - -class ScanStrategySerializer(serializers.ModelSerializer): - content = serializers.SerializerMethodField() - - class Meta: - model = IastStrategyUser - fields = ['id', 'name', 'content', 'user', 'status', 'created_at'] - - def get_content(self, obj): - try: - return [int(i) for i in obj.content.split(',')] - except Exception as e: - print(e) - return [] - - -class _ScanStrategyArgsSerializer(serializers.Serializer): - page_size = serializers.IntegerField(default=20, - help_text=_('Number per page')) - page = serializers.IntegerField(default=1, help_text=_('Page index')) - name = serializers.CharField( - required=False, - help_text=_( - "The name of the item to be searched, supports fuzzy search.")) - - -class _ProjectSerializer(ProjectSerializer): - class Meta: - model = IastProject - fields = ['id', 'name'] - - -class ScanCreateSerializer(serializers.Serializer): - name = serializers.CharField(required=True) - status = serializers.ChoiceField((-1, 0, 1), required=True) - content = serializers.ListField(child=serializers.IntegerField(), - required=True) - - -class _ScanStrategyRelationProjectArgsSerializer(serializers.Serializer): - size = serializers.IntegerField(default=5, - max_value=50, - min_value=1, - required=False, - help_text=_('Number per page')) - - -class ScanStrategyRelationProject(UserEndPoint): - @extend_schema_with_envcheck( - request=ScanCreateSerializer, - tags=[_('ScanStrategy')], - summary=_('ScanStrategy Relation Projects'), - description=_("Get scan strategy relation projects" - ), - ) - def get(self, request, pk): - ser = _ScanStrategyRelationProjectArgsSerializer(data=request.GET) - try: - if ser.is_valid(True): - size = ser.validated_data['size'] - except ValidationError as e: - return R.failure(data=e.detail) - scan_strategy = IastStrategyUser.objects.filter(pk=pk).first() - projects = IastProject.objects.filter( - scan=scan_strategy).order_by('-latest_time')[::size] - return R.success(data=_ProjectSerializer(projects, many=True).data) - - -class ScanStrategyViewSet(UserEndPoint, viewsets.ViewSet): - - permission_classes_by_action = {} - - def get_permissions(self): - try: - return [ - permission() for permission in - self.permission_classes_by_action[self.action] - ] - except KeyError: - return [permission() for permission in self.permission_classes] - - @extend_schema_with_envcheck( - [_ScanStrategyArgsSerializer], - tags=[_('ScanStrategy')], - summary=_('ScanStrategy List'), - description=_("Get the item corresponding to the user, support fuzzy search based on name." - ), - ) - def list(self, request): - ser = _ScanStrategyArgsSerializer(data=request.GET) - try: - if ser.is_valid(True): - name = ser.validated_data.get('name', None) - page = ser.validated_data['page'] - page_size = ser.validated_data['page_size'] - except ValidationError as e: - return R.failure(data=e.detail) - q = ~Q(status=-1) - if name: - q = Q(name__icontains=name) & q - queryset = IastStrategyUser.objects.filter(q).order_by('-created_at') - if name: - queryset = queryset.filter(name__icontains=name) - page_summary, page_data = self.get_paginator(queryset, page, page_size) - return R.success(data=ScanStrategySerializer(page_data, - many=True).data, - page=page_summary) - - @extend_schema_with_envcheck( - request=ScanCreateSerializer, - tags=[_('ScanStrategy')], - summary=_('ScanStrategy Create'), - description=_("Create ScanStrategy" - ), - ) - def create(self, request): - ser = ScanCreateSerializer(data=request.data) - try: - if ser.is_valid(True): - name = ser.validated_data['name'] - content = ser.validated_data['content'] - status = ser.validated_data['status'] - except ValidationError as e: - return R.failure(data=e.detail) - try: - ser.validated_data['content'] = ','.join([str(i) for i in content]) - obj = IastStrategyUser.objects.create(**ser.validated_data, - user=request.user) - return R.success(msg=_('create success'), - data=ScanStrategySerializer(obj).data) - except Exception as e: - logger.error(e) - return R.failure() - - @extend_schema_with_envcheck( - request=ScanCreateSerializer, - tags=[_('ScanStrategy')], - summary=_('ScanStrategy Update'), - description=_("Get the item corresponding to the user, support fuzzy search based on name." - ), - ) - def update(self, request, pk): - ser = ScanCreateSerializer(data=request.data, partial=True) - try: - if ser.is_valid(True): - pass - except ValidationError as e: - return R.failure(data=e.detail) - if ser.validated_data.get('content', None): - ser.validated_data['content'] = ','.join( - [str(i) for i in ser.validated_data['content']]) - obj = IastStrategyUser.objects.filter(pk=pk).update( - **ser.validated_data) - disable_cache(load_sink_strategy, (), kwargs={"scan_id": pk}) - return R.success(msg=_('update success')) - - @extend_schema_with_envcheck( - tags=[_('ScanStrategy')], - summary=_('ScanStrategy delete'), - description=_("Get the item corresponding to the user, support fuzzy search based on name." - ), - ) - def destory(self, request, pk): - scan = IastStrategyUser.objects.filter(pk=pk).first() - if not scan: - return R.failure(msg='No scan strategy found') - if checkusing(scan): - return R.failure(msg='someproject is using this scan strategy') - try: - IastStrategyUser.objects.filter(pk=pk, ).update(status=-1) - return R.success(msg=_('delete success')) - except Exception as e: - logger.error(e) - return R.failure() - - @extend_schema_with_envcheck( - tags=[_('ScanStrategy')], - summary=_('ScanStrategy get'), - description=_("Get the item with pk"), - ) - def retrieve(self, request, pk): - obj = IastStrategyUser.objects.filter(pk=pk).first() - return R.success(data=ScanStrategySerializer(obj).data) - - -class ScanStrategyBatchView(BatchStatusUpdateSerializerView): - status_field = 'status' - model = IastStrategyUser - - @extend_schema_with_envcheck( - request=BatchStatusUpdateSerializerView.serializer, - tags=[_('ScanStrategy')], - summary=_('ScanStrategy batch status'), - description=_("batch update status."), - ) - def post(self, request): - data = self.get_params(request.data) - user = request.user - data['ids'] = filter_using(data['ids'], [user]) - self.update_model(request, data) - return R.success(msg=_('update success')) - - -class ScanStrategyAllView(AllStatusUpdateSerializerView): - status_field = 'status' - model = IastStrategyUser - - @extend_schema_with_envcheck( - request=BatchStatusUpdateSerializerView.serializer, - tags=[_('ScanStrategy')], - summary=_('ScanStrategy all status'), - description=_("all update status."), - ) - def post(self, request): - data = self.get_params(request.data) - self.update_model(request, data) - return R.success(msg=_('update success')) - - def update_model(self, request, validated_data): - ids = self.model.objects.values_list('id', flat=True).all() - filter_ids = filter_using(ids, request.user) - self.model.objects.filter(pk__in=filter_ids, - user__in=[request.user]).update(**{ - self.status_field: - validated_data['status'] - }) - - -def filter_using(ids, users): - after_filter_ids = [] - for obj in IastStrategyUser.objects.filter(pk__in=ids, user__in=[users]).all(): - if checkusing(obj): - continue - after_filter_ids.append(obj.id) - return after_filter_ids - - -def checkusing(scanstrategy): - return IastProject.objects.filter(scan=scanstrategy).exists() diff --git a/test/iast/views/test_scan_strategy.py b/test/iast/views/test_scan_strategy.py deleted file mode 100644 index 82383977a..000000000 --- a/test/iast/views/test_scan_strategy.py +++ /dev/null @@ -1,24 +0,0 @@ -###################################################################### -# @author : bidaya0 (bidaya0@$HOSTNAME) -# @file : scan_strategy -# @created : 星期四 12月 02, 2021 19:57:44 CST -# -# @description : -###################################################################### - - -from rest_framework.test import APITestCase -from dongtai_common.models.user import User - - -class ScanStrategyTestCase(APITestCase): - def setUp(self): - pass - - def test_create(self): - self.user = User.objects.filter(pk=1).first() - assert self.user is not None - self.client.force_authenticate(user=self.user) - response = self.client.get('/api/v1/scan_strategy') - print(response.content) - assert response.status_code == 200