Skip to content

Commit

Permalink
feature: 任务历史支持 ip 过滤(closed TencentBlueKing#1631)
Browse files Browse the repository at this point in the history
  • Loading branch information
neko12583 committed Jul 26, 2023
1 parent 1118d1e commit 4a0134f
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 89 deletions.
115 changes: 65 additions & 50 deletions apps/node_man/handlers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
"""
import copy
import logging
import operator
from functools import reduce
from typing import Any, Dict, List, Optional, Set, Union

from django.conf import settings
from django.core.paginator import Paginator
from django.db.models import Q
from django.db import transaction
from django.utils import timezone
from django.utils.translation import get_language
Expand Down Expand Up @@ -142,7 +145,6 @@ def list(self, params: dict, username: str):
:param params: 请求参数的字典
:param username: 用户名
"""

kwargs = {
**tools.JobTools.parse_job_list_filter_kwargs(query_params=params),
"status__in": params.get("status"),
Expand All @@ -156,13 +158,6 @@ def list(self, params: dict, username: str):
all_biz_info = CmdbHandler().biz_id_name_without_permission()
biz_info = CmdbHandler().biz_id_name({"action": constants.IamActionType.task_history_view})
biz_permission = list(biz_info.keys())

# 用户搜索的业务
search_biz = []
if params.get("bk_biz_id"):
# 如果有带筛选条件,则只返回筛选且有权业务的主机
search_biz = [bk_biz_id for bk_biz_id in params["bk_biz_id"] if bk_biz_id in biz_info]

if params.get("job_id"):
job_ids = set()
for job_id_var in params["job_id"]:
Expand All @@ -173,10 +168,58 @@ def list(self, params: dict, username: str):
job_ids.add(job_id)
kwargs["id__in"] = job_ids

# 业务权限
search_biz_ids = params.get("bk_biz_id")
all_biz_ids = set(all_biz_info.keys())

if search_biz_ids:
# 字典的 in 比列表性能更高
search_biz_ids = [bk_biz_id for bk_biz_id in search_biz_ids if bk_biz_id in biz_info]
else:
search_biz_ids = biz_permission

if set(search_biz_ids) & all_biz_ids == all_biz_ids:
biz_scope_query_q = Q()
else:
biz_scope_query_q = reduce(
operator.or_,
[Q(bk_biz_scope__contains=bk_biz_id) for bk_biz_id in search_biz_ids],
Q()
)
# 自身创建的 job 可见
biz_scope_query_q |= Q(created_by=username)

# ip 搜索
inner_ip_query_q = Q()
if params.get("inner_ip_list"):
instance_id_list = tools.JobTools.get_instance_ids_by_ips(params["inner_ip_list"])

# 处理时间范围
instance_record_query_kwargs = {
"instance_id__in": instance_id_list,
"create_time__gte": params.get("start_time"),
"create_time__lte": params.get("end_time"),
}

# subscription_id 查询更快,但使用 subscription_id 不够准确,subscription 的范围有变化的可能
task_id_list = models.SubscriptionInstanceRecord.objects.filter(
**filter_values(instance_record_query_kwargs)
).values_list("task_id", flat=True)

# 带了 ip 查询条件,如果没有该 ip 的任务,应当返回无数据
if not task_id_list:
return {"total": 0, "list": []}

inner_ip_query_q = reduce(operator.or_, [Q(task_id_list__contains=task_id) for task_id in task_id_list])

# 过滤None值并筛选Job
# 此处不过滤空列表(filter_empty=False),job_id, job_type 存在二次解析,若全部值非法得到的是空列表,期望应是查不到数据
job_result = models.Job.objects.filter(**filter_values(kwargs))
job_result = models.Job.objects.filter(biz_scope_query_q, inner_ip_query_q, **filter_values(kwargs))

# 过滤没有业务的Job
job_result = job_result.filter(~Q(bk_biz_scope__isnull=True) & ~Q(bk_biz_scope={}))

# 排序
if params.get("sort"):
sort_head = params["sort"]["head"]
job_result = job_result.extra(select={sort_head: f"JSON_EXTRACT(statistics, '$.{sort_head}')"})
Expand All @@ -185,44 +228,10 @@ def list(self, params: dict, username: str):
else:
job_result = job_result.order_by(sort_head)

# TODO 全量拉取,待优化
job_result = job_result.values()

job_list = []
for job in job_result:
if not job["end_time"]:
job["cost_time"] = f'{(timezone.now() - job["start_time"]).seconds}'
else:
job["cost_time"] = f'{(job["end_time"] - job["start_time"]).seconds}'
job["bk_biz_scope_display"] = [all_biz_info.get(biz) for biz in job["bk_biz_scope"]]
job["job_type_display"] = constants.JOB_TYPE_DICT.get(job["job_type"])

# 如果任务没有业务则不显示
if not job["bk_biz_scope"]:
continue

# 判断权限
if set(job["bk_biz_scope"]) - set(biz_permission) == set():
if set(job["bk_biz_scope"]) - set(search_biz) != set(job["bk_biz_scope"]):
# 此种情况说明job业务范围包括查询业务的其中一个
job_list.append(job)
continue
elif not search_biz:
# 查询所有业务情况
job_list.append(job)
continue
elif search_biz == biz_permission:
job_list.append(job)
continue

# 创建者是自己
if job["created_by"] == username and not params.get("bk_biz_id"):
job_list.append(job)
continue

# 分页
paginator = Paginator(job_list, params["pagesize"])
job_page: List[Dict[str, Any]] = paginator.page(params["page"]).object_list
# 可以接受 queryset 作为参数
paginator = Paginator(job_result.values(), params["pagesize"])
# 分页之后再转换为列表
job_page: List[Dict[str, Any]] = list(paginator.page(params["page"]).object_list)

# 填充策略名称
sub_infos = models.Subscription.objects.filter(
Expand All @@ -231,15 +240,21 @@ def list(self, params: dict, username: str):

# 建立订阅ID和订阅详细信息的映射
sub_id__sub_info_map = {sub_info["id"]: sub_info for sub_info in sub_infos}

# 预处理数据:字段填充,计算等
for job in job_page:
job.update(tools.JobTools.unzip_job_type(job["job_type"]))

# 填充订阅相关信息
job["policy_name"] = sub_id__sub_info_map.get(job["subscription_id"], {}).get("name")

return {"total": len(job_list), "list": job_page}
if not job["end_time"]:
job["cost_time"] = f'{(timezone.now() - job["start_time"]).seconds}'
else:
job["cost_time"] = f'{(job["end_time"] - job["start_time"]).seconds}'
job["bk_biz_scope_display"] = [all_biz_info.get(biz) for biz in job["bk_biz_scope"]]
job["job_type_display"] = constants.JOB_TYPE_DICT.get(job["job_type"])

# 使用分页器的 count,避免重复计算
return {"total": paginator.count, "list": job_page}

def install(
self,
Expand Down
1 change: 1 addition & 0 deletions apps/node_man/handlers/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def fetch_job_list_condition(self, job_category):
return self.filter_empty_children(
[
{"name": _("任务ID"), "id": "job_id"},
{"name": _("IP"), "id": "inner_ip_list"},
{"name": _("执行者"), "id": "created_by", "children": created_bys_children},
{"name": _("执行状态"), "id": "status", "children": statuses_children},
{"name": _("任务类型"), "id": "step_type", "children": step_types_children},
Expand Down
1 change: 1 addition & 0 deletions apps/node_man/serializers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ListSerializer(serializers.Serializer):
status = serializers.ListField(label=_("状态"), required=False)
created_by = serializers.ListField(label=_("执行者"), required=False)
bk_biz_id = serializers.ListField(label=_("业务ID"), required=False)
inner_ip_list = serializers.ListField(label=_("搜索IP"), required=False)
page = serializers.IntegerField(label=_("当前页数"), required=False, default=1)
pagesize = serializers.IntegerField(label=_("分页大小"), required=False, default=10)
sort = SortSerializer(label=_("排序"), required=False)
Expand Down
94 changes: 93 additions & 1 deletion apps/node_man/tests/test_handlers/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import time
from typing import List
from unittest.mock import patch

from django.test import TestCase
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from apps.mock_data import common_unit
from apps.node_man import constants, tools
Expand All @@ -24,7 +26,7 @@
MixedOperationError,
)
from apps.node_man.handlers.job import JobHandler
from apps.node_man.models import Host, Job
from apps.node_man.models import Host, Job, SubscriptionInstanceRecord
from apps.node_man.tests.utils import (
SEARCH_BUSINESS,
MockClient,
Expand Down Expand Up @@ -113,6 +115,96 @@ def test_job_list_no_biz_scope(self):
result = JobHandler().list({"page": 1, "pagesize": 10, "category": "agent"}, "admin")
self.assertEqual(result["total"], 0)

@patch("apps.node_man.handlers.cmdb.client_v2", MockClient)
def test_job_list_without_permission(self):
"""测试 无权限/自身创建 任务"""
number = 10
create_job(
number,
bk_biz_scope=[999],
created_by="blueking"
)
create_job(
1,
id=999,
bk_biz_scope=[999],
created_by="admin"
)
result = JobHandler().list({"page": 1, "pagesize": 10}, "admin")
self.assertEqual(result["total"], 1)
self.assertEqual(result["list"][0]["id"], 999)

def test_job_list_with_ip(self):
"""测试 单/多ip 搜索"""
create_host(1, bk_cloud_id=0, ip="127.0.0.1")
create_host(1, bk_host_id=2, bk_cloud_id=0, ip="127.0.0.2")

create_job(1, task_id_list=[1])
create_job(1, id=2, task_id_list=[1, 2])

sub_inst_record_objs = [
SubscriptionInstanceRecord(
task_id=1,
subscription_id=1,
instance_id="host|instance|host|127.0.0.1-0-0",
is_latest=True,
),
SubscriptionInstanceRecord(
task_id=2,
subscription_id=2,
instance_id="host|instance|host|127.0.0.2-0-0",
is_latest=True,
)
]
SubscriptionInstanceRecord.objects.bulk_create(sub_inst_record_objs)

single_ip_result = JobHandler().list(
{
"page": 1,
"pagesize": 10,
"hide_auto_trigger_job": False,
"inner_ip_list": ["127.0.0.1"]
},
"admin",
)
self.assertEqual(len(single_ip_result["list"]), 2)

multiple_ip_result = JobHandler().list(
{
"page": 1,
"pagesize": 10,
"hide_auto_trigger_job": False,
"inner_ip_list": ["127.0.0.1", "127.0.0.2"]
},
"admin",
)
self.assertEqual(len(multiple_ip_result["list"]), 2)

def test_job_list_spend_time(self):
"""测试查询时间"""
create_host(1, bk_cloud_id=0, ip="127.0.0.1")
create_job(10000, task_id_list=[1])
SubscriptionInstanceRecord.objects.create(
task_id=1,
subscription_id=1,
instance_id="host|instance|host|127.0.0.1-0-0",
is_latest=True,
)

start_time = time.time()
JobHandler().list(
{
"page": 1,
"pagesize": 200,
"hide_auto_trigger_job": False,
"inner_ip_list": ["127.0.0.1"]
},
"admin",
)
spend_time = time.time() - start_time

self.assertLessEqual(spend_time, 1, msg=_("响应时间超过1秒"))

@patch("apps.node_man.handlers.cmdb.client_v2", MockClient)
def test_host_install(self):
# 测试AGENT/P-AGENT/PROXY的job安装任务
Expand Down
15 changes: 9 additions & 6 deletions apps/node_man/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,14 @@ def create_ap(number):
AccessPoint.objects.bulk_create(ap_to_create)


def create_job(number, id=None, end_time=None, bk_biz_scope=None):
def create_job(number, id=None, end_time=None, bk_biz_scope=None, task_id_list=None, created_by=None):
job_types = list(chain(*list(constants.JOB_TYPE_MAP.values())))

if bk_biz_scope == {} or bk_biz_scope:
pass
else:
bk_biz_scope = [[biz["bk_biz_id"] for biz in SEARCH_BUSINESS][random.randint(0, 10)]]

jobs = []
for i in range(1, number + 1):
job = Job(
Expand All @@ -679,12 +684,10 @@ def create_job(number, id=None, end_time=None, bk_biz_scope=None):
random.randint(0, len(constants.JobStatusType.get_choices()) - 1)
],
statistics={"success_count": 0, "failed_count": 0, "running_count": 0, "total_count": 0},
bk_biz_scope=bk_biz_scope
if bk_biz_scope == {}
else [[biz["bk_biz_id"] for biz in SEARCH_BUSINESS][random.randint(0, 10)]],
bk_biz_scope=bk_biz_scope,
subscription_id=random.randint(1, 100),
task_id_list=[random.randint(1, 100)],
created_by="admin",
task_id_list=task_id_list or [random.randint(1, 100)],
created_by=created_by or "admin",
end_time=end_time,
)
jobs.append(job)
Expand Down
Loading

0 comments on commit 4a0134f

Please sign in to comment.