diff --git a/apps/backend/agent/manager.py b/apps/backend/agent/manager.py index 287f55052..4f04b19d8 100644 --- a/apps/backend/agent/manager.py +++ b/apps/backend/agent/manager.py @@ -44,7 +44,15 @@ def register_host(cls): ) return act - def query_tjj_password(self): + @classmethod + def add_or_update_hosts(cls): + """新增或更新主机""" + act = AgentServiceActivity( + component_code=components.AddOrUpdateHostsComponent.code, name=components.AddOrUpdateHostsComponent.name + ) + return act + + def query_password(self): """查询铁将军密码""" act = AgentServiceActivity( component_code=components.QueryPasswordComponent.code, name=components.QueryPasswordComponent.name @@ -138,14 +146,6 @@ def get_agent_status(cls, expect_status: str, name=components.GetAgentStatusComp act.component.inputs.expect_status = Var(type=Var.PLAIN, value=expect_status) return act - @classmethod - def get_agent_id(cls, name=components.GetAgentIDComponent.name): - """查询 Agent ID""" - if settings.GSE_VERSION == "V1": - return None - act = AgentServiceActivity(component_code=components.GetAgentIDComponent.code, name=name) - return act - @classmethod def check_agent_status(cls, name=components.CheckAgentStatusComponent.name): """查询Agent状态是否正常""" diff --git a/apps/backend/components/collections/agent_new/add_or_update_hosts.py b/apps/backend/components/collections/agent_new/add_or_update_hosts.py new file mode 100644 index 000000000..100211500 --- /dev/null +++ b/apps/backend/components/collections/agent_new/add_or_update_hosts.py @@ -0,0 +1,532 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import json +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set + +from django.conf import settings +from django.db import transaction +from django.utils import timezone +from django.utils.translation import ugettext_lazy as _ + +from apps.core.concurrent import controller +from apps.node_man import constants, models +from apps.node_man.models import ProcessStatus +from apps.utils import batch_request, concurrent, exc +from common.api import CCApi + +from .. import core +from ..base import CommonData +from .base import AgentBaseService + + +class SelectorResult: + is_add: bool = None + is_skip: bool = None + sub_inst: models.SubscriptionInstanceRecord = None + + def __init__(self, is_add: bool, is_skip: bool, sub_inst: models.SubscriptionInstanceRecord): + self.is_add = is_add + self.is_skip = is_skip + self.sub_inst = sub_inst + + +class AddOrUpdateHostsService(AgentBaseService): + @staticmethod + def host_infos_deduplication(host_infos: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 根据 bk_host_id 对主机信息列表进行取重 + :param host_infos: 主机信息列表 + :return: + """ + recorded_host_ids: Set[int] = set() + host_infos_after_deduplication: List[Dict[str, Any]] = [] + for host_info in host_infos: + bk_host_id: int = host_info["bk_host_id"] + if bk_host_id in recorded_host_ids: + continue + recorded_host_ids.add(bk_host_id) + host_infos_after_deduplication.append(host_info) + return host_infos_after_deduplication + + @staticmethod + def get_host_infos_gby_ip_key(host_infos: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + """ + 获取 主机信息根据 IP 等关键信息聚合的结果 + :param host_infos: 主机信息列表 + :return: 主机信息根据 IP 等关键信息聚合的结果 + """ + host_infos_gby_ip_key: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for host_info in host_infos: + bk_cloud_id: int = host_info["bk_cloud_id"] + bk_addressing: str = host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value) + optional_ips: List[Optional[str]] = [host_info.get("bk_host_innerip"), host_info.get("bk_host_innerip_v6")] + + for ip in optional_ips: + if not ip: + continue + ip_key: str = f"{bk_addressing}:{bk_cloud_id}:{ip}" + host_infos_gby_ip_key[ip_key].append(host_info) + return host_infos_gby_ip_key + + @staticmethod + def search_business(bk_biz_id: int) -> Dict[str, Any]: + """ + 查询相应业务的业务运维 + :param bk_biz_id: 业务ID + :return: 列表 ,包含 业务ID、名字、业务运维 + """ + if bk_biz_id == settings.BK_CMDB_RESOURCE_POOL_BIZ_ID: + return { + "bk_biz_maintainer": "", + "bk_biz_id": bk_biz_id, + "bk_biz_name": settings.BK_CMDB_RESOURCE_POOL_BIZ_NAME, + } + search_business_params = { + "fields": ["bk_biz_id", "bk_biz_name", "bk_biz_maintainer"], + "condition": {"bk_biz_id": bk_biz_id}, + } + return CCApi.search_business(search_business_params)["info"][0] + + def static_ip_selector( + self, sub_inst: models.SubscriptionInstanceRecord, cmdb_hosts_with_the_same_ip_key: List[Dict[str, Any]] + ) -> SelectorResult: + """ + 静态 IP 处理器 + :param sub_inst: + :param cmdb_hosts_with_the_same_ip_key: + :return: + """ + # 不存在则新增 + if not cmdb_hosts_with_the_same_ip_key: + return SelectorResult(is_add=True, is_skip=False, sub_inst=sub_inst) + + # 静态IP情况下,只会存在一台机器 + cmdb_host: Dict[str, Any] = cmdb_hosts_with_the_same_ip_key[0] + except_bk_biz_id: int = sub_inst.instance_info["host"]["bk_biz_id"] + if except_bk_biz_id != cmdb_host["bk_biz_id"]: + self.move_insts_to_failed( + sub_inst.id, + log_content=_("主机期望注册到业务【ID:{except_bk_biz_id}】,但实际存在于业务【ID: {actual_biz_id}】,请前往该业务进行安装").format( + except_bk_biz_id=except_bk_biz_id, actual_biz_id=cmdb_host["bk_biz_id"] + ), + ) + return SelectorResult(is_add=False, is_skip=True, sub_inst=sub_inst) + else: + # 同业务下视为更新 + sub_inst.instance_info["host"]["bk_host_id"] = cmdb_host["bk_host_id"] + return SelectorResult(is_add=False, is_skip=False, sub_inst=sub_inst) + + def dynamic_ip_selector( + self, sub_inst: models.SubscriptionInstanceRecord, cmdb_hosts_with_the_same_ip_key: List[Dict[str, Any]] + ) -> SelectorResult: + """ + 动态 IP 处理器 + :param sub_inst: + :param cmdb_hosts_with_the_same_ip_key: + :return: + """ + bk_host_id: Optional[int] = sub_inst.instance_info["host"].get("bk_host_id") + # 安装场景下主机 ID 不存在,不考虑收敛规则,直接新增 + if bk_host_id is None: + return SelectorResult(is_add=True, is_skip=False, sub_inst=sub_inst) + + # 查找 bk_host_id 及 所在业务均匹配的主机信息 + cmdb_host_with_the_same_id: Optional[Dict[str, Any]] = None + except_bk_biz_id: int = sub_inst.instance_info["host"]["bk_biz_id"] + for cmdb_host in cmdb_hosts_with_the_same_ip_key: + if bk_host_id == cmdb_host["bk_host_id"]: + if except_bk_biz_id == cmdb_host["bk_biz_id"]: + cmdb_host_with_the_same_id = cmdb_host + break + else: + self.move_insts_to_failed( + sub_inst.id, + log_content=_( + "主机期望注册到业务【ID:{except_bk_biz_id}】,但实际存在于业务【ID: {actual_biz_id}】,请前往该业务进行安装" + ).format(except_bk_biz_id=except_bk_biz_id, actual_biz_id=cmdb_host["bk_biz_id"]), + ) + return SelectorResult(is_add=False, is_skip=True, sub_inst=sub_inst) + + if cmdb_host_with_the_same_id: + # 存在则执行更新逻辑,面向重装场景 + return SelectorResult(is_add=False, is_skip=False, sub_inst=sub_inst) + return SelectorResult(is_add=True, is_skip=False, sub_inst=sub_inst) + + @controller.ConcurrentController( + data_list_name="bk_host_ids", + batch_call_func=concurrent.batch_call, + get_config_dict_func=lambda: {"limit": constants.QUERY_CMDB_LIMIT}, + ) + def find_host_biz_relations(self, bk_host_ids: List[int]) -> List[Dict[str, Any]]: + """ + 查询主机业务关系信息 + :param bk_host_ids: 主机列表 + :return: 主机业务关系列表 + """ + return CCApi.find_host_biz_relations({"bk_host_id": bk_host_ids}) + + def query_hosts_by_addressing(self, host_infos: List[Dict[str, Any]], bk_addressing: str) -> List[Dict[str, Any]]: + """ + 按寻址方式查询主机 + :param host_infos: 主机信息列表 + :param bk_addressing: 寻址方式 + :return: 主机信息列表 + """ + bk_cloud_id_set: Set[int] = set() + bk_host_innerip_set: Set[str] = set() + bk_host_innerip_v6_set: Set[str] = set() + + for host_info in host_infos: + bk_cloud_id: Optional[int] = host_info.get("bk_cloud_id") + # IPv6 / IPv6 可能存在其中一个为空的现象 + bk_host_innerip: Optional[str] = host_info.get("bk_host_innerip") + bk_host_innerip_v6: Optional[str] = host_info.get("bk_host_innerip_v6") + + if bk_cloud_id is not None: + bk_cloud_id_set.add(bk_cloud_id) + if bk_host_innerip: + bk_host_innerip_set.add(bk_host_innerip) + if bk_host_innerip_v6: + bk_host_innerip_v6_set.add(bk_host_innerip_v6) + + query_hosts_params: Dict[str, Any] = { + "fields": constants.CC_HOST_FIELDS, + "host_property_filter": { + "condition": "AND", + "rules": [ + {"field": "bk_addressing", "operator": "equal", "value": bk_addressing}, + {"field": "bk_cloud_id", "operator": "in", "value": list(bk_cloud_id_set)}, + { + "condition": "OR", + "rules": [ + {"field": "bk_host_innerip", "operator": "in", "value": list(bk_host_innerip_set)}, + {"field": "bk_host_innerip_v6", "operator": "in", "value": list(bk_host_innerip_v6_set)}, + ], + }, + ], + }, + } + cmdb_host_infos: List[Dict[str, Any]] = batch_request.batch_request( + func=CCApi.list_hosts_without_biz, params=query_hosts_params + ) + + processed_cmdb_host_infos: List[Dict[str, Any]] = [] + host_infos_gby_ip_key: Dict[str, List[Dict[str, Any]]] = self.get_host_infos_gby_ip_key(host_infos) + cmdb_host_infos_gby_ip_key: Dict[str, List[Dict[str, Any]]] = self.get_host_infos_gby_ip_key(cmdb_host_infos) + # 模糊查询所得的主机信息列表可能出现:同 IP + 不同云区域的冗余主机 + # 仅选择原数据中存在的 IP + 云区域组合 + for ip_key, partial_cmdb_host_infos in cmdb_host_infos_gby_ip_key.items(): + if not host_infos_gby_ip_key.get(ip_key): + continue + processed_cmdb_host_infos.extend(partial_cmdb_host_infos) + # 数据按 IP 协议版本进行再聚合,同时存在 v4/v6 的情况下会生成重复项,需要按 主机ID 去重 + processed_cmdb_host_infos = self.host_infos_deduplication(processed_cmdb_host_infos) + return processed_cmdb_host_infos + + def query_hosts(self, sub_insts: List[models.SubscriptionInstanceRecord]) -> List[Dict[str, Any]]: + """ + 查询主机信息 + :param sub_insts: 订阅实例 + :return: 主机信息列表 + """ + host_infos_gby_addressing: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for sub_inst in sub_insts: + host_info: Dict[str, Any] = sub_inst.instance_info["host"] + bk_addressing: str = host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value) + host_infos_gby_addressing[bk_addressing].append(host_info) + + query_hosts_params_list: List[Dict[str, Any]] = [] + for addressing, host_infos in host_infos_gby_addressing.items(): + query_hosts_params_list.append({"host_infos": host_infos, "bk_addressing": addressing}) + + cmdb_hosts: List[Dict] = concurrent.batch_call( + func=self.query_hosts_by_addressing, params_list=query_hosts_params_list, extend_result=True + ) + + bk_host_ids: List[int] = [cmdb_host["bk_host_id"] for cmdb_host in cmdb_hosts] + host_biz_relations: List[Dict[str, Any]] = self.find_host_biz_relations(bk_host_ids=bk_host_ids) + host_id__relation_map: Dict[int, Dict[str, Any]] = { + host_biz_relation["bk_host_id"]: host_biz_relation for host_biz_relation in host_biz_relations + } + + # 主机信息填充业务关系 + for cmdb_host in cmdb_hosts: + cmdb_host.update(host_id__relation_map.get(cmdb_host["bk_host_id"], {})) + + return cmdb_hosts + + @controller.ConcurrentController( + data_list_name="sub_insts", + batch_call_func=concurrent.batch_call, + get_config_dict_func=core.get_config_dict, + get_config_dict_kwargs={"config_name": core.ServiceCCConfigName.HOST_WRITE.value}, + ) + @exc.ExceptionHandler(exc_handler=core.default_sub_insts_task_exc_handler) + def handle_update_cmdb_hosts_case(self, sub_insts: List[models.SubscriptionInstanceRecord]) -> List[int]: + """ + 批量更新 CMDB 主机 + :param sub_insts: 订阅实例列表 + :return: 返回成功更新的订阅实例 ID 列表 + """ + if not sub_insts: + return [] + + sub_inst_ids: List[int] = [] + update_list: List[Dict[str, Any]] = [] + for sub_inst in sub_insts: + sub_inst_ids.append(sub_inst.id) + sub_inst.update_time = timezone.now() + host_info: Dict[str, Any] = sub_inst.instance_info["host"] + update_params: Dict[str, Any] = { + "bk_host_id": host_info["bk_host_id"], + "properties": { + "bk_cloud_id": host_info["bk_cloud_id"], + "bk_addressing": host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value), + "bk_host_innerip": host_info.get("bk_host_innerip", ""), + "bk_host_innerip_v6": host_info.get("bk_host_innerip_v6", ""), + "bk_host_outerip": host_info.get("bk_host_outerip", ""), + "bk_host_outerip_v6": host_info.get("bk_host_outerip_v6", ""), + "bk_os_type": constants.BK_OS_TYPE[host_info["os_type"]], + }, + } + self.log_info( + sub_inst_ids=sub_inst.id, + log_content=_("更新 CMDB 主机信息:\n {params}").format(params=json.dumps(update_params, indent=2)), + ) + update_list.append(update_params) + CCApi.batch_update_host({"update": update_list}) + models.SubscriptionInstanceRecord.objects.bulk_update( + sub_insts, fields=["instance_info", "update_time"], batch_size=self.batch_size + ) + return sub_inst_ids + + @controller.ConcurrentController( + data_list_name="sub_insts", + batch_call_func=concurrent.batch_call, + get_config_dict_func=core.get_config_dict, + get_config_dict_kwargs={"config_name": core.ServiceCCConfigName.HOST_WRITE.value}, + ) + @exc.ExceptionHandler(exc_handler=core.default_sub_insts_task_exc_handler) + def add_host_to_business_idle(self, biz_info: Dict[str, Any], sub_insts: List[models.SubscriptionInstanceRecord]): + sub_inst_ids: List[int] = [] + bk_host_list: List[Dict[str, Any]] = [] + for sub_inst in sub_insts: + sub_inst_ids.append(sub_inst.id) + host_info: Dict[str, Any] = sub_inst.instance_info["host"] + register_params: Dict[str, Any] = { + # "3" 表示API导入 + "import_from": "3", + "bk_cloud_id": host_info["bk_cloud_id"], + "bk_addressing": host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value), + "bk_host_innerip": host_info.get("bk_host_innerip", ""), + "bk_host_innerip_v6": host_info.get("bk_host_innerip_v6", ""), + "bk_host_outerip": host_info.get("bk_host_outerip", ""), + "bk_host_outerip_v6": host_info.get("bk_host_outerip_v6", ""), + "bk_os_type": constants.BK_OS_TYPE[host_info["os_type"]], + "bk_bak_operator": biz_info.get("bk_biz_maintainer") or host_info.get("username"), + "operator": biz_info.get("bk_biz_maintainer") or host_info.get("username"), + } + self.log_info( + sub_inst_ids=sub_inst.id, + log_content=_("添加主机到业务 {bk_biz_name}[{bk_biz_id}]:\n {params}").format( + bk_biz_name=biz_info["bk_biz_name"], + bk_biz_id=biz_info["bk_biz_id"], + params=json.dumps(register_params, indent=2), + ), + ) + bk_host_list.append(register_params) + bk_host_ids: List[int] = CCApi.add_host_to_business_idle( + {"bk_biz_id": biz_info["bk_biz_id"], "bk_host_list": bk_host_list} + )["bk_host_ids"] + # 新增主机是一个原子性操作,返回的 ID 列表会按索引顺序对应 + for bk_host_id, sub_inst in zip(bk_host_ids, sub_insts): + sub_inst.update_time = timezone.now() + sub_inst.instance_info["host"]["bk_host_id"] = bk_host_id + # 更新订阅实例中的实例信息 + models.SubscriptionInstanceRecord.objects.bulk_update( + sub_insts, fields=["instance_info", "update_time"], batch_size=self.batch_size + ) + return sub_inst_ids + + def handle_add_cmdb_hosts_case(self, sub_insts: List[models.SubscriptionInstanceRecord]) -> List[int]: + """ + 批量添加主机到 CMDB + :param sub_insts: 订阅实例列表 + :return: 返回成功的订阅实例 ID 列表 + """ + if not sub_insts: + return [] + + sub_insts_gby_biz_id: Dict[int, List[models.SubscriptionInstanceRecord]] = defaultdict(list) + for sub_inst in sub_insts: + bk_biz_id: int = sub_inst.instance_info["host"]["bk_biz_id"] + sub_insts_gby_biz_id[bk_biz_id].append(sub_inst) + + params_list: List[Dict[str, Any]] = [] + for bk_biz_id, partial_sub_insts in sub_insts_gby_biz_id.items(): + biz_info: Dict[str, Any] = self.search_business(bk_biz_id) + params_list.append({"biz_info": biz_info, "sub_insts": partial_sub_insts}) + + return concurrent.batch_call(func=self.add_host_to_business_idle, params_list=params_list, extend_result=True) + + @transaction.atomic + def handle_update_db(self, sub_insts: List[models.SubscriptionInstanceRecord]): + """ + :param sub_insts: 订阅实例列表 + :return: + """ + # 为空时无需更新,减少无效IO + if not sub_insts: + return + bk_host_ids: List[int] = [] + for sub_inst in sub_insts: + bk_host_ids.append(sub_inst.instance_info["host"]["bk_host_id"]) + + host_ids_in_exist_hosts: Set[int] = set( + models.Host.objects.filter(bk_host_id__in=bk_host_ids).values_list("bk_host_id", flat=True) + ) + host_ids_in_exist_proc_statuses: Set[int] = set( + models.ProcessStatus.objects.filter( + bk_host_id__in=bk_host_ids, + name=models.ProcessStatus.GSE_AGENT_PROCESS_NAME, + source_type=models.ProcessStatus.SourceType.DEFAULT, + ).values_list("bk_host_id", flat=True) + ) + host_objs_to_be_created: List[models.Host] = [] + host_objs_to_be_updated: List[models.Host] = [] + proc_status_objs_to_be_created: List[models.ProcessStatus] = [] + for sub_inst in sub_insts: + + host_info: Dict[str, Any] = sub_inst.instance_info["host"] + + bk_host_id: int = host_info["bk_host_id"] + inner_ip: str = host_info.get("bk_host_innerip") or "" + inner_ipv6: str = host_info.get("bk_host_innerip_v6") or "" + outer_ip: str = host_info.get("bk_host_outerip") or "" + outer_ipv6: str = host_info.get("bk_host_outerip_v6") or "" + login_ip: str = host_info.get("login_ip") or "" + + if host_info["host_node_type"] == constants.NodeType.PROXY: + # Proxy login_ip 为空的情况取值顺序:外网 IP(v4 -> v6)-> 内网 IP(v4 -> v6) + login_ip = login_ip or outer_ip or outer_ipv6 or inner_ip or inner_ipv6 + else: + # 其他情况下:内网 IP(v4 -> v6) + login_ip = login_ip or inner_ip or inner_ipv6 + + extra_data = { + "peer_exchange_switch_for_agent": host_info.get("peer_exchange_switch_for_agent", True), + "bt_speed_limit": host_info.get("bt_speed_limit", 0), + } + if host_info.get("data_path"): + extra_data.update({"data_path": host_info.get("data_path")}) + host_obj = models.Host( + bk_host_id=bk_host_id, + bk_cloud_id=host_info["bk_cloud_id"], + bk_addressing=host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value), + bk_biz_id=host_info["bk_biz_id"], + inner_ip=inner_ip, + inner_ipv6=inner_ipv6, + outer_ip=outer_ip, + outer_ipv6=outer_ipv6, + login_ip=login_ip, + data_ip=host_info.get("data_ip") or "", + is_manual=host_info.get("is_manual", False), + os_type=host_info["os_type"], + node_type=host_info["host_node_type"], + ap_id=host_info["ap_id"], + install_channel_id=host_info.get("install_channel_id"), + upstream_nodes=host_info.get("upstream_nodes", []), + updated_at=timezone.now(), + extra_data=extra_data, + ) + if bk_host_id in host_ids_in_exist_hosts: + host_objs_to_be_updated.append(host_obj) + else: + # 初次创建主机时,初始化CPU架构,根据操作系统设置默认值,后续通过安装上报日志修正 + host_obj.cpu_arch = (constants.CpuType.x86_64, constants.CpuType.powerpc)[ + host_obj.cpu_arch == constants.OsType.AIX + ] + host_objs_to_be_created.append(host_obj) + + if bk_host_id not in host_ids_in_exist_proc_statuses: + proc_status_obj = models.ProcessStatus( + bk_host_id=bk_host_id, + source_type=ProcessStatus.SourceType.DEFAULT, + status=constants.ProcStateType.NOT_INSTALLED, + name=models.ProcessStatus.GSE_AGENT_PROCESS_NAME, + ) + proc_status_objs_to_be_created.append(proc_status_obj) + + models.Host.objects.bulk_create(host_objs_to_be_created, batch_size=self.batch_size) + models.Host.objects.bulk_update( + host_objs_to_be_updated, + fields=["bk_biz_id", "bk_cloud_id", "bk_addressing", "os_type", "node_type"] + + ["inner_ip", "inner_ipv6", "outer_ip", "outer_ipv6", "login_ip", "data_ip"] + + ["is_manual", "ap_id", "install_channel_id", "upstream_nodes", "updated_at", "extra_data"], + batch_size=self.batch_size, + ) + models.ProcessStatus.objects.bulk_create(proc_status_objs_to_be_created, batch_size=self.batch_size) + + def _execute(self, data, parent_data, common_data: CommonData): + subscription_instances: List[models.SubscriptionInstanceRecord] = common_data.subscription_instances + + sub_insts_to_be_added: List[models.SubscriptionInstanceRecord] = [] + sub_insts_to_be_updated: List[models.SubscriptionInstanceRecord] = [] + id__sub_inst_obj_map: Dict[int, models.SubscriptionInstanceRecord] = {} + # 获取已存在于 CMDB 的主机信息 + exist_cmdb_hosts: List[Dict[str, Any]] = self.query_hosts(subscription_instances) + # 按 IpKey 聚合主机信息 + # IpKey:ip(v4 or v6)+ bk_addressing(寻值方式)+ bk_cloud_id(云区域) + cmdb_host_infos_gby_ip_key: Dict[str, List[Dict[str, Any]]] = self.get_host_infos_gby_ip_key(exist_cmdb_hosts) + for sub_inst in subscription_instances: + id__sub_inst_obj_map[sub_inst.id] = sub_inst + host_info: Dict[str, Any] = sub_inst.instance_info["host"] + bk_cloud_id: int = host_info["bk_cloud_id"] + bk_addressing: str = host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value) + # 构造 IpKey 的 ip 字段支持 v4 或 v6 + optional_ips: List[Optional[str]] = [host_info.get("bk_host_innerip"), host_info.get("bk_host_innerip_v6")] + + # 获取已存在于 CMDB,且 IpKey 相同的主机信息 + cmdb_hosts_with_the_same_ip_key: List[Dict[str, Any]] = [] + for ip in optional_ips: + if not ip: + continue + ip_key: str = f"{bk_addressing}:{bk_cloud_id}:{ip}" + cmdb_hosts_with_the_same_ip_key.extend(cmdb_host_infos_gby_ip_key.get(ip_key, [])) + + # 若 v4 / v6 字段同时存在,聚合再合并会产生重复信息,需要去重 + cmdb_hosts_with_the_same_ip_key = self.host_infos_deduplication(cmdb_hosts_with_the_same_ip_key) + # 按照寻址方式通过不同的选择器,选择更新或新增主机到 CMDB + if bk_addressing == constants.CmdbAddressingType.DYNAMIC.value: + selector_result: SelectorResult = self.dynamic_ip_selector(sub_inst, cmdb_hosts_with_the_same_ip_key) + else: + selector_result: SelectorResult = self.static_ip_selector(sub_inst, cmdb_hosts_with_the_same_ip_key) + + if selector_result.is_skip: + # 选择器已处理,跳过 + continue + elif selector_result.is_add: + sub_insts_to_be_added.append(selector_result.sub_inst) + else: + sub_insts_to_be_updated.append(selector_result.sub_inst) + + # 1 - 新增或更新 CMDB 主机 + successfully_added_sub_inst_ids: List[int] = self.handle_add_cmdb_hosts_case(sub_insts=sub_insts_to_be_added) + successfully_updated_sub_inst_ids: List[int] = self.handle_update_cmdb_hosts_case( + sub_insts=sub_insts_to_be_updated + ) + + # 2 - 对操作成功的实例更新本地数据 + succeed_sub_insts: List[models.SubscriptionInstanceRecord] = [] + for sub_inst_id in successfully_added_sub_inst_ids + successfully_updated_sub_inst_ids: + succeed_sub_insts.append(id__sub_inst_obj_map[sub_inst_id]) + self.handle_update_db(succeed_sub_insts) diff --git a/apps/backend/components/collections/agent_new/components.py b/apps/backend/components/collections/agent_new/components.py index e59dc7910..8ce82fa4f 100644 --- a/apps/backend/components/collections/agent_new/components.py +++ b/apps/backend/components/collections/agent_new/components.py @@ -14,12 +14,12 @@ from pipeline.component_framework.component import Component from . import base +from .add_or_update_hosts import AddOrUpdateHostsService from .bind_host_agent import BindHostAgentService from .check_agent_status import CheckAgentStatusService from .check_policy_gse_to_proxy import CheckPolicyGseToProxyService from .choose_access_point import ChooseAccessPointService from .configure_policy import ConfigurePolicyService -from .get_agent_id import GetAgentIDService from .get_agent_status import GetAgentStatusService from .install import InstallService from .install_plugins import InstallPluginsService @@ -79,12 +79,6 @@ class GetAgentStatusComponent(Component): bound_service = GetAgentStatusService -class GetAgentIDComponent(Component): - name = _("查询AgentID") - code = "get_agent_id" - bound_service = GetAgentIDService - - class ReloadAgentConfigComponent(Component): name = _("重载Agent配置") code = "reload_agent_config" @@ -179,3 +173,9 @@ class UnBindHostAgentComponent(Component): name = _("主机解绑 Agent 信息") code = "unbind_host_agent" bound_service = UnBindHostAgentService + + +class AddOrUpdateHostsComponent(Component): + name = _("新增或更新主机信息") + code = "add_or_update_hosts" + bound_service = AddOrUpdateHostsService diff --git a/apps/backend/components/collections/agent_new/get_agent_id.py b/apps/backend/components/collections/agent_new/get_agent_id.py deleted file mode 100644 index 90e8f332e..000000000 --- a/apps/backend/components/collections/agent_new/get_agent_id.py +++ /dev/null @@ -1,82 +0,0 @@ -# -*- coding: utf-8 -*- -""" -TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. -Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at https://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" -from typing import Any, Dict, List - -from django.utils.translation import ugettext_lazy as _ - -from apps.backend.api.constants import POLLING_INTERVAL, POLLING_TIMEOUT -from apps.component.esbclient import client_v2 -from apps.node_man import models -from apps.utils import batch_request -from pipeline.core.flow import Service, StaticIntervalGenerator - -from .base import AgentBaseService, AgentCommonData - - -class GetAgentIDService(AgentBaseService): - """安装后AgentID同步到CMDB后才认为是可用的""" - - __need_schedule__ = True - interval = StaticIntervalGenerator(POLLING_INTERVAL) - - def outputs_format(self): - return super().outputs_format() + [ - Service.InputItem(name="polling_time", key="polling_time", type="int", required=True), - ] - - @staticmethod - def update_host_agent_id(cmdb_host_infos: List[Dict[str, Any]]): - need_update_hosts = [ - models.Host(bk_host_id=host_info["bk_host_id"], bk_agent_id=host_info.get("bk_agent_id")) - for host_info in cmdb_host_infos - ] - models.Host.objects.bulk_update(need_update_hosts, fields=["bk_agent_id"]) - - def _execute(self, data, parent_data, common_data: AgentCommonData): - self.log_info(sub_inst_ids=common_data.subscription_instance_ids, log_content=_("开始查询从 CMDB 查询主机的 Agent ID")) - data.outputs.polling_time = 0 - - def _schedule(self, data, parent_data, callback_data=None): - common_data: AgentCommonData = self.get_common_data(data) - list_cmdb_hosts_params = { - "fields": ["bk_host_id", "bk_agent_id"], - "host_property_filter": { - "condition": "AND", - "rules": [ - {"field": "bk_host_id", "operator": "in", "value": common_data.bk_host_ids}, - ], - }, - } - cmdb_host_infos: List[Dict[str, Any]] = batch_request.batch_request( - func=client_v2.cc.list_hosts_without_biz, params=list_cmdb_hosts_params - ) - - # CMDB 中 AgentID 为空的主机ID列表 - no_agent_id_host_ids = [ - host_info["bk_host_id"] for host_info in cmdb_host_infos if not host_info.get("bk_agent_id") - ] - if not no_agent_id_host_ids: - self.update_host_agent_id(cmdb_host_infos) - self.finish_schedule() - return - - polling_time = data.get_one_of_outputs("polling_time") - if polling_time + POLLING_INTERVAL > POLLING_TIMEOUT: - sub_inst_ids = [common_data.host_id__sub_inst_id_map[host_id] for host_id in no_agent_id_host_ids] - self.move_insts_to_failed( - sub_inst_ids=sub_inst_ids, - log_content=_("此主机在 CMDB 中未查到对应 Agent ID,请联系 GSE 及 CMDB 团队排查 Agent ID 上报处理是否异常!"), - ) - self.update_host_agent_id(cmdb_host_infos) - self.finish_schedule() - return - - data.outputs.polling_time = polling_time + POLLING_INTERVAL diff --git a/apps/backend/components/collections/agent_new/query_password.py b/apps/backend/components/collections/agent_new/query_password.py index a7f8048b2..cd2709854 100644 --- a/apps/backend/components/collections/agent_new/query_password.py +++ b/apps/backend/components/collections/agent_new/query_password.py @@ -9,8 +9,10 @@ specific language governing permissions and limitations under the License. """ -from typing import Any, Dict, List, Union +import base64 +from typing import Any, Dict, List, Optional, Union +from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from apps.core.concurrent import controller @@ -59,15 +61,111 @@ def query_password( return {"is_ok": is_ok, "success_ips": success_ips, "failed_ips": failed_ips, "err_msg": err_msg} + def check_and_update_identity_data( + self, sub_insts: List[models.SubscriptionInstanceRecord] + ) -> List[models.SubscriptionInstanceRecord]: + + bk_host_ids: List[int] = [sub_inst.instance_info["host"]["bk_host_id"] for sub_inst in sub_insts] + exist_identity_data_objs: List[models.IdentityData] = models.IdentityData.objects.filter( + bk_host_id__in=bk_host_ids + ) + host_id__identity_data_obj_map: Dict[int, models.IdentityData] = { + identity_data_obj.bk_host_id: identity_data_obj for identity_data_obj in exist_identity_data_objs + } + + def _is_auth_info_empty(_sub_inst: models.SubscriptionInstanceRecord, _host_info: Dict[str, Any]) -> bool: + # 「第三方拉取密码」或者「手动安装」场景下无需校验是否存在认证信息 + if not (_host_info.get("auth_type") == constants.AuthType.TJJ_PASSWORD or is_manual): + # 记录不存在认证信息的订阅实例ID,跳过记录 + if not (_host_info.get("password") or _host_info.get("key")): + return True + return False + + empty_auth_info_sub_inst_ids: List[int] = [] + identity_data_objs_to_be_created: List[models.IdentityData] = [] + identity_data_objs_to_be_updated: List[models.IdentityData] = [] + sub_insts_with_auth_info: List[models.SubscriptionInstanceRecord] = [] + for sub_inst in sub_insts: + host_info: Dict[str, Any] = sub_inst.instance_info["host"] + bk_host_id: int = host_info["bk_host_id"] + is_manual: bool = host_info.get("is_manual", False) + sub_insts_with_auth_info.append(sub_inst) + + # 先校验再更新,防止存量历史任务重试(认证信息已失效)的情况下,重置最新的认证信息快照 + identity_data_obj: Optional[models.IdentityData] = host_id__identity_data_obj_map.get(bk_host_id) + + if not identity_data_obj: + if _is_auth_info_empty(sub_inst, host_info): + empty_auth_info_sub_inst_ids.append(sub_inst.id) + continue + + # 新建认证信息对象 + identity_data_obj: models.IdentityData = models.IdentityData( + bk_host_id=bk_host_id, + auth_type=host_info.get("auth_type"), + account=host_info.get("account"), + password=base64.b64decode(host_info.get("password", "")).decode(), + port=host_info.get("port"), + key=base64.b64decode(host_info.get("key", "")).decode(), + retention=host_info.get("retention", 1), + extra_data=host_info.get("extra_data", {}), + updated_at=timezone.now(), + ) + identity_data_objs_to_be_created.append(identity_data_obj) + continue + + # 更新场景下,传入 `auth_type` 需要提供完整认证信息用于更新,否则使用快照 + auth_type: Optional[str] = host_info.get("auth_type") + if not auth_type: + identity_data_objs_to_be_updated.append(identity_data_obj) + continue + + # 处理认证信息不完整的情况 + if _is_auth_info_empty(sub_inst, host_info): + empty_auth_info_sub_inst_ids.append(sub_inst.id) + continue + + # 允许缺省时使用快照 + identity_data_obj.port = host_info.get("port") or identity_data_obj.port + identity_data_obj.account = host_info.get("account") or identity_data_obj.account + + # 更新认证信息 + identity_data_obj.auth_type = auth_type + identity_data_obj.password = base64.b64decode(host_info.get("password", "")).decode() + identity_data_obj.key = base64.b64decode(host_info.get("key", "")).decode() + identity_data_obj.retention = host_info.get("retention", 1) + identity_data_obj.extra_data = host_info.get("extra_data", {}) + identity_data_obj.updated_at = timezone.now() + + identity_data_objs_to_be_updated.append(identity_data_obj) + + models.IdentityData.objects.bulk_create(identity_data_objs_to_be_created, batch_size=self.batch_size) + models.IdentityData.objects.bulk_update( + identity_data_objs_to_be_updated, + fields=["auth_type", "account", "password", "port", "key", "retention", "extra_data", "updated_at"], + batch_size=self.batch_size, + ) + + # 移除不存在认证信息的实例 + self.move_insts_to_failed( + sub_inst_ids=empty_auth_info_sub_inst_ids, log_content=_("登录认证信息已被清空\n" "- 若为重试操作,请新建任务重新发起") + ) + + return sub_insts_with_auth_info + def _execute(self, data, parent_data, common_data: AgentCommonData): creator = data.get_one_of_inputs("creator") host_id_obj_map = common_data.host_id_obj_map + subscription_instances: List[models.SubscriptionInstanceRecord] = self.check_and_update_identity_data( + common_data.subscription_instances + ) + no_need_query_inst_ids = [] # 这里暂不支持多 cloud_ip_map = {} oa_ticket = "" - for sub_inst in common_data.subscription_instances: + for sub_inst in subscription_instances: bk_host_id = sub_inst.instance_info["host"]["bk_host_id"] host = host_id_obj_map[bk_host_id] diff --git a/apps/backend/components/collections/agent_new/register_host.py b/apps/backend/components/collections/agent_new/register_host.py index a07482a20..1f7c5431d 100644 --- a/apps/backend/components/collections/agent_new/register_host.py +++ b/apps/backend/components/collections/agent_new/register_host.py @@ -8,7 +8,6 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import base64 import json from collections import ChainMap, defaultdict from typing import Any, Callable, Dict, Iterable, List, Optional, Set @@ -263,35 +262,6 @@ def add_hosts_to_biz_resource( for success_host_key in success_host_keys } - def handle_empty_auth_info_case( - self, subscription_instances: List[models.SubscriptionInstanceRecord] - ) -> Dict[str, models.SubscriptionInstanceRecord]: - """ - 处理不存在认证信息的情况 - :param subscription_instances: 订阅实例对象列表 - :return: host_key__sub_inst_map 校验通过的「host_key - 订阅实例映射」 - """ - empty_auth_info_sub_inst_ids: List[int] = [] - host_key__sub_inst_map: Dict[str, models.SubscriptionInstanceRecord] = {} - for subscription_instance in subscription_instances: - host_info = subscription_instance.instance_info["host"] - is_manual = host_info.get("is_manual", False) - - # 「第三方拉取密码」或者「手动安装」的场景下无需校验是否存在认证信息 - if not (host_info.get("auth_type") == constants.AuthType.TJJ_PASSWORD or is_manual): - # 记录不存在认证信息的订阅实例ID,跳过记录 - if not (host_info.get("password") or host_info.get("key")): - empty_auth_info_sub_inst_ids.append(subscription_instance.id) - continue - # 通过校验的订阅实例构建host_key - 订阅实例 映射 - host_key__sub_inst_map[f"{host_info['bk_cloud_id']}-{host_info['bk_host_innerip']}"] = subscription_instance - - # 移除不存在认证信息的实例 - self.move_insts_to_failed( - sub_inst_ids=empty_auth_info_sub_inst_ids, log_content=_("该主机的登录认证信息已被清空,无法重试,请重新发起安装任务") - ) - return host_key__sub_inst_map - def handle_add_hosts_to_cmdb_case( self, host_key__sub_inst_map: Dict[str, models.SubscriptionInstanceRecord] ) -> Dict[str, int]: @@ -340,9 +310,6 @@ def handle_update_db( host_ids_in_exist_hosts: Set[int] = set( models.Host.objects.filter(bk_host_id__in=bk_host_ids).values_list("bk_host_id", flat=True) ) - host_ids_in_exist_identity_data: Set[int] = set( - models.IdentityData.objects.filter(bk_host_id__in=bk_host_ids).values_list("bk_host_id", flat=True) - ) host_ids_in_exist_proc_statuses: Set[int] = set( models.ProcessStatus.objects.filter( bk_host_id__in=bk_host_ids, @@ -353,8 +320,6 @@ def handle_update_db( host_objs_to_be_created: List[models.Host] = [] host_objs_to_be_updated: List[models.Host] = [] proc_status_objs_to_be_created: List[models.ProcessStatus] = [] - identity_data_objs_to_be_created: List[models.IdentityData] = [] - identity_data_objs_to_be_updated: List[models.IdentityData] = [] sub_inst_objs_to_be_updated: List[models.SubscriptionInstanceRecord] = [] for host_key, bk_host_id in host_key__bk_host_id_map.items(): @@ -402,22 +367,6 @@ def handle_update_db( ] host_objs_to_be_created.append(host_obj) - identity_data_obj = models.IdentityData( - bk_host_id=bk_host_id, - auth_type=host_info.get("auth_type"), - account=host_info.get("account"), - password=base64.b64decode(host_info.get("password", "")).decode(), - port=host_info.get("port"), - key=base64.b64decode(host_info.get("key", "")).decode(), - retention=host_info.get("retention", 1), - extra_data=host_info.get("extra_data", {}), - updated_at=timezone.now(), - ) - if bk_host_id in host_ids_in_exist_identity_data: - identity_data_objs_to_be_updated.append(identity_data_obj) - else: - identity_data_objs_to_be_created.append(identity_data_obj) - if bk_host_id not in host_ids_in_exist_proc_statuses: proc_status_obj = models.ProcessStatus( bk_host_id=bk_host_id, @@ -439,12 +388,6 @@ def handle_update_db( batch_size=self.batch_size, ) models.ProcessStatus.objects.bulk_create(proc_status_objs_to_be_created, batch_size=self.batch_size) - models.IdentityData.objects.bulk_create(identity_data_objs_to_be_created, batch_size=self.batch_size) - models.IdentityData.objects.bulk_update( - identity_data_objs_to_be_updated, - fields=["auth_type", "account", "password", "port", "key", "retention", "extra_data", "updated_at"], - batch_size=self.batch_size, - ) models.SubscriptionInstanceRecord.objects.bulk_update( sub_inst_objs_to_be_updated, fields=["instance_info"], batch_size=self.batch_size ) @@ -458,8 +401,11 @@ def _execute(self, data, parent_data, common_data: AgentCommonData): subscription_instances: List[models.SubscriptionInstanceRecord] = common_data.subscription_instances - # key / password expired, failed - host_key__sub_inst_map = self.handle_empty_auth_info_case(subscription_instances) + host_key__sub_inst_map: Dict[str, models.SubscriptionInstanceRecord] = {} + for subscription_instance in subscription_instances: + host_info = subscription_instance.instance_info["host"] + # 通过校验的订阅实例构建host_key - 订阅实例 映射 + host_key__sub_inst_map[f"{host_info['bk_cloud_id']}-{host_info['bk_host_innerip']}"] = subscription_instance # host infos group by bk_biz_id and register add_host_to_resource host_key__bk_host_id_map = self.handle_add_hosts_to_cmdb_case(host_key__sub_inst_map) diff --git a/apps/backend/components/collections/core.py b/apps/backend/components/collections/core.py index 8d14bf3ea..1fb7bde11 100644 --- a/apps/backend/components/collections/core.py +++ b/apps/backend/components/collections/core.py @@ -9,9 +9,10 @@ specific language governing permissions and limitations under the License. """ import copy +import functools import traceback from enum import Enum -from typing import Any, Callable, Dict, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union from django.utils.translation import ugettext_lazy as _ @@ -28,6 +29,7 @@ class ServiceCCConfigName(enum.EnhanceEnum): WMIEXE = "SERVICE_WMIEXE" JOB_CMD = "SERVICE_JOB_CMD" QUERY_PASSWORD = "SERVICE_QUERY_PASSWORD" + HOST_WRITE = "HOST_WRITE" @classmethod def _get_member__alias_map(cls) -> Dict[Enum, str]: @@ -36,6 +38,7 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: cls.WMIEXE: _("使用 wmiexe 会话执行命令"), cls.JOB_CMD: _("使用 job 执行命令"), cls.QUERY_PASSWORD: _("查询密码"), + cls.HOST_WRITE: _("主机写入操作"), } @@ -50,6 +53,9 @@ def get_config_dict(config_name: str) -> Dict[str, Any]: elif config_name == ServiceCCConfigName.JOB_CMD.value: # 通过作业平台执行时,批次间串行防止触发接口限频 default_concurrent_control_config.update(is_concurrent_between_batches=False, interval=2) + elif config_name == ServiceCCConfigName.HOST_WRITE: + # 每一批的数量不能过多,不然可能会出现单台主机校验失败导致整个安装任务失败 + default_concurrent_control_config.update(limit=100) current_controller_settings = models.GlobalSettings.get_config( key=models.GlobalSettings.KeyEnum.CONCURRENT_CONTROLLER_SETTINGS.value, default={} @@ -70,11 +76,33 @@ def default_sub_inst_id_extractor(args: Tuple[Any], kwargs: Dict[str, Any]): return kwargs["sub_inst_id"] -def default_sub_inst_task_exc_handler( - wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any], exc: Exception -) -> Any: +def default_sub_inst_ids_extractor(args: Tuple[Any], kwargs: Dict[str, Any]) -> List[int]: + """ + 默认订阅实例ID列表提取器 + :param args: 位置参数 + :param kwargs: 关键字参数 + :return: 订阅实例ID列表 + """ + if "sub_inst_ids" in kwargs: + return kwargs["sub_inst_ids"] + if "sub_insts" in kwargs: + sub_insts: List[models.SubscriptionInstanceRecord] = kwargs["sub_insts"] + else: + sub_insts: List[models.SubscriptionInstanceRecord] = args[0] + return [sub_inst.id for sub_inst in sub_insts] + + +def default_task_exc_handler( + sub_inst_id_extractor: Callable[[Tuple, Dict], Union[int, Set[int], List[int]]], + wrapped: Callable, + instance: base.BaseService, + args: Tuple[Any], + kwargs: Dict[str, Any], + exc: Exception, +) -> Optional[List]: """ 默认的单订阅实例任务异常处理,用于批量调用时规避单任务异常导致整体执行失败的情况 + :param sub_inst_id_extractor: 订阅实例ID提取器 :param wrapped: 被装饰的函数或类方法 :param instance: 基础Pipeline服务 :param exc: 捕获到异常 @@ -82,8 +110,13 @@ def default_sub_inst_task_exc_handler( :param kwargs: 关键字参数 :return: """ - sub_inst_id = default_sub_inst_id_extractor(args, kwargs) - instance.move_insts_to_failed([sub_inst_id], str(exc)) + sub_inst_id = sub_inst_id_extractor(args, kwargs) + instance.move_insts_to_failed(sub_inst_id if isinstance(sub_inst_id, Iterable) else [sub_inst_id], str(exc)) # 打印 DEBUG 日志 instance.log_debug(sub_inst_id, log_content=traceback.format_exc(), fold=True) - return None + return [] if isinstance(sub_inst_id, Iterable) else None + + +default_sub_inst_task_exc_handler = functools.partial(default_task_exc_handler, default_sub_inst_id_extractor) + +default_sub_insts_task_exc_handler = functools.partial(default_task_exc_handler, default_sub_inst_ids_extractor) diff --git a/apps/backend/subscription/steps/agent.py b/apps/backend/subscription/steps/agent.py index c04b9f82f..7175e8b00 100644 --- a/apps/backend/subscription/steps/agent.py +++ b/apps/backend/subscription/steps/agent.py @@ -154,12 +154,11 @@ class InstallAgent(AgentAction): def _generate_activities(self, agent_manager: AgentManager): activities = [ - agent_manager.register_host(), - agent_manager.query_tjj_password() if settings.USE_TJJ else None, + agent_manager.add_or_update_hosts() if settings.BKAPP_ENABLE_DHCP else agent_manager.register_host(), + agent_manager.query_password(), agent_manager.choose_ap(), agent_manager.install(), agent_manager.get_agent_status(expect_status=constants.ProcStateType.RUNNING), - agent_manager.get_agent_id(), agent_manager.install_plugins() if self.is_install_latest_plugins else None, ] @@ -177,11 +176,10 @@ class ReinstallAgent(AgentAction): def _generate_activities(self, agent_manager: AgentManager): activities = [ - agent_manager.query_tjj_password() if settings.USE_TJJ else None, + agent_manager.query_password(), agent_manager.choose_ap(), agent_manager.install(), agent_manager.get_agent_status(expect_status=constants.ProcStateType.RUNNING), - agent_manager.get_agent_id(), agent_manager.install_plugins() if self.is_install_latest_plugins else None, ] @@ -201,7 +199,6 @@ def _generate_activities(self, agent_manager: AgentManager): agent_manager.push_upgrade_package(), agent_manager.run_upgrade_command(), agent_manager.get_agent_status(expect_status=constants.ProcStateType.RUNNING), - agent_manager.get_agent_id(), ] return activities, None @@ -219,7 +216,6 @@ def _generate_activities(self, agent_manager: AgentManager): agent_manager.restart(skip_polling_result=True), agent_manager.wait(5), agent_manager.get_agent_status(expect_status=constants.ProcStateType.RUNNING), - agent_manager.get_agent_id(), ] return activities, None @@ -254,7 +250,7 @@ def _generate_activities(self, agent_manager: AgentManager): register_host = agent_manager.register_host() activities = [ register_host, - agent_manager.query_tjj_password() if settings.USE_TJJ else None, + agent_manager.query_password(), agent_manager.configure_policy(), agent_manager.choose_ap(), agent_manager.install(), @@ -281,7 +277,7 @@ class ReinstallProxy(AgentAction): def _generate_activities(self, agent_manager: AgentManager): activities = [ - agent_manager.query_tjj_password() if settings.USE_TJJ else None, + agent_manager.query_password(), agent_manager.configure_policy(), agent_manager.choose_ap(), agent_manager.install(), @@ -314,7 +310,6 @@ def _generate_activities(self, agent_manager: AgentManager): agent_manager.run_upgrade_command(), agent_manager.wait(30), agent_manager.get_agent_status(expect_status=constants.ProcStateType.RUNNING), - agent_manager.get_agent_id(), ] # 推送文件到proxy @@ -343,7 +338,7 @@ class UninstallAgent(AgentAction): def _generate_activities(self, agent_manager: AgentManager): activities = [ - agent_manager.query_tjj_password() if settings.USE_TJJ else None, + agent_manager.query_password(), agent_manager.uninstall_agent(), agent_manager.get_agent_status(expect_status=constants.ProcStateType.UNKNOWN), agent_manager.update_process_status(status=constants.ProcStateType.NOT_INSTALLED), @@ -386,7 +381,6 @@ def _generate_activities(self, agent_manager: AgentManager): agent_manager.reload_agent(skip_polling_result=True), agent_manager.wait(5), agent_manager.get_agent_status(expect_status=constants.ProcStateType.RUNNING), - agent_manager.get_agent_id(), ] return activities, None diff --git a/apps/backend/tests/components/collections/agent_new/test_add_or_update_hosts.py b/apps/backend/tests/components/collections/agent_new/test_add_or_update_hosts.py new file mode 100644 index 000000000..653153a29 --- /dev/null +++ b/apps/backend/tests/components/collections/agent_new/test_add_or_update_hosts.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import copy +import importlib +import random +from typing import Dict, List, Optional, Set + +import mock + +from apps.backend.components.collections.agent_new import add_or_update_hosts +from apps.backend.components.collections.agent_new.components import ( + AddOrUpdateHostsComponent, +) +from apps.mock_data import api_mkd +from apps.mock_data import utils as mock_data_utils +from apps.node_man import constants, models +from pipeline.component_framework.test import ComponentTestCase, ExecuteAssertion + +from . import utils + + +class AddOrUpdateHostsTestCase(utils.AgentServiceBaseTestCase): + + CC_API_MOCK_PATHS: List[str] = ["apps.backend.components.collections.agent_new.add_or_update_hosts.CCApi"] + + current_host_id: Optional[int] = None + host_ids_with_dynamic_ip: Set[int] = None + to_be_added_host_ids: Set[int] = None + to_be_updated_host_ids: Set[int] = None + search_business_result: Optional[Dict] = None + list_hosts_without_biz_result: Optional[Dict] = None + add_host_to_business_idle_result: Optional[Dict] = None + cmdb_mock_client: Optional[api_mkd.cmdb.utils.CCApiMockClient] = None + + def add_host_to_business_idle_func(self, query_params): + host_num = len(query_params["bk_host_list"]) + if self.current_host_id is None: + current_host_id = max(models.Host.objects.values_list("bk_host_id", flat=True)) + else: + current_host_id = self.current_host_id + self.current_host_id = current_host_id + host_num + return {"bk_host_ids": list(range(current_host_id + 1, self.current_host_id + 1))} + + @staticmethod + def find_host_biz_relations_func(query_params): + bk_host_ids = query_params["bk_host_id"] + host_biz_relations = [] + for bk_host_id in bk_host_ids: + host_biz_relation = copy.deepcopy(api_mkd.cmdb.unit.CMDB_HOST_BIZ_RELATION) + host_biz_relation.update({"bk_host_id": bk_host_id}) + host_biz_relations.append(host_biz_relation) + return host_biz_relations + + @classmethod + def setup_obj_factory(cls): + """设置 obj_factory""" + cls.obj_factory.init_host_num = 255 + + @classmethod + def structure_cmdb_mock_data(cls): + """ + 构造CMDB接口返回数据 + :return: + """ + cls.to_be_updated_host_ids = set( + random.sample(cls.obj_factory.bk_host_ids, k=random.randint(10, cls.obj_factory.init_host_num)) + ) + cls.to_be_added_host_ids = set(cls.obj_factory.bk_host_ids) - cls.to_be_updated_host_ids + + host_id__obj_map: Dict[int, models.Host] = { + host_obj.bk_host_id: host_obj for host_obj in cls.obj_factory.host_objs + } + + host_infos: List[Dict] = [] + cls.host_ids_with_dynamic_ip = set() + for bk_host_id in cls.to_be_updated_host_ids: + bk_addressing = random.choice(constants.CmdbAddressingType.list_member_values()) + host_obj = host_id__obj_map[bk_host_id] + host_info: Dict = copy.deepcopy(api_mkd.cmdb.unit.CMDB_HOST_INFO) + host_info.update( + { + "bk_host_id": bk_host_id, + "bk_addressing": bk_addressing, + "bk_host_innerip": host_obj.inner_ip, + "bk_host_outerip": host_obj.outer_ipv6, + "bk_host_innerip_v6": host_obj.inner_ipv6, + "bk_host_outerip_v6": host_obj.outer_ipv6, + } + ) + host_infos.append(host_info) + # 如果是动态寻址,模拟主机不存在的情况,此时会走新增策略 + if bk_addressing == constants.CmdbAddressingType.DYNAMIC.value: + cls.host_ids_with_dynamic_ip.add(bk_host_id) + + cls.search_business_result = copy.deepcopy(api_mkd.cmdb.unit.SEARCH_BUSINESS_DATA) + cls.list_hosts_without_biz_result = {"count": len(host_infos), "info": host_infos} + + @classmethod + def adjust_test_data_in_db(cls): + """ + 调整DB中的测试数据 + :return: + """ + models.Host.objects.filter(bk_host_id__in=cls.to_be_added_host_ids).delete() + + models.Host.objects.filter(bk_host_id__in=cls.host_ids_with_dynamic_ip).update( + bk_addressing=constants.CmdbAddressingType.DYNAMIC.value + ) + + for sub_inst_obj in cls.obj_factory.sub_inst_record_objs: + bk_host_id = sub_inst_obj.instance_info["host"]["bk_host_id"] + if bk_host_id in cls.to_be_added_host_ids: + sub_inst_obj.instance_info["host"].pop("bk_host_id") + if bk_host_id in cls.host_ids_with_dynamic_ip: + sub_inst_obj.instance_info["host"].pop("bk_host_id") + sub_inst_obj.instance_info["host"]["bk_addressing"] = constants.CmdbAddressingType.DYNAMIC.value + + models.SubscriptionInstanceRecord.objects.bulk_update( + cls.obj_factory.sub_inst_record_objs, fields=["instance_info"] + ) + + @classmethod + def get_default_case_name(cls) -> str: + return AddOrUpdateHostsComponent.name + + def init_mock_clients(self): + self.cmdb_mock_client = api_mkd.cmdb.utils.CCApiMockClient( + search_business_return=mock_data_utils.MockReturn( + return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.search_business_result + ), + list_hosts_without_biz_return=mock_data_utils.MockReturn( + return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, + return_obj=self.list_hosts_without_biz_result, + ), + find_host_biz_relations_return=mock_data_utils.MockReturn( + return_type=mock_data_utils.MockReturnType.SIDE_EFFECT.value, + return_obj=self.find_host_biz_relations_func, + ), + add_host_to_business_idle_return=mock_data_utils.MockReturn( + return_type=mock_data_utils.MockReturnType.SIDE_EFFECT.value, + return_obj=self.add_host_to_business_idle_func, + ), + ) + + def fetch_succeeded_sub_inst_ids(self) -> List[int]: + return self.common_inputs["subscription_instance_ids"] + + def start_patch(self): + for client_v2_mock_path in self.CC_API_MOCK_PATHS: + mock.patch(client_v2_mock_path, self.cmdb_mock_client).start() + + @classmethod + def setUpTestData(cls): + super().setUpTestData() + # 初始化DB数据后再修改 + cls.structure_cmdb_mock_data() + cls.adjust_test_data_in_db() + + def setUp(self) -> None: + self.init_mock_clients() + super().setUp() + self.start_patch() + + def cases(self): + return [ + ComponentTestCase( + name=self.get_default_case_name(), + inputs=self.common_inputs, + parent_data={}, + execute_assertion=ExecuteAssertion( + success=bool(self.fetch_succeeded_sub_inst_ids()), + outputs={"succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids()}, + ), + schedule_assertion=None, + execute_call_assertion=None, + ) + ] + + def component_cls(self): + importlib.reload(add_or_update_hosts) + AddOrUpdateHostsComponent.bound_service = add_or_update_hosts.AddOrUpdateHostsService + self.start_patch() + return AddOrUpdateHostsComponent + + def assert_in_teardown(self): + + sub_insts = models.SubscriptionInstanceRecord.objects.filter(id__in=self.obj_factory.sub_inst_record_ids) + all_host_ids = list( + set([sub_inst.instance_info["host"]["bk_host_id"] for sub_inst in sub_insts] + self.obj_factory.bk_host_ids) + ) + # 动态 IP 都采用新增策略,最后同步到节点管理的主机会增加 + expect_host_num = len(self.obj_factory.bk_host_ids) + len(self.host_ids_with_dynamic_ip) + host_objs = models.Host.objects.filter(bk_host_id__in=all_host_ids) + + self.assertEqual(len(host_objs), expect_host_num) + # 由于原先没有创建相应的进程状态,此处进程状态记录 + self.assertEqual( + models.ProcessStatus.objects.filter( + bk_host_id__in=all_host_ids, + name=models.ProcessStatus.GSE_AGENT_PROCESS_NAME, + source_type=models.ProcessStatus.SourceType.DEFAULT, + ).count(), + self.obj_factory.init_host_num, + ) + + host_id__host_obj_map: Dict[int, models.Host] = {host_obj.bk_host_id: host_obj for host_obj in host_objs} + for sub_inst in sub_insts: + host_info = sub_inst.instance_info["host"] + bk_host_id = host_info["bk_host_id"] + host_obj = host_id__host_obj_map[bk_host_id] + # 动态 IP 新增主机后,原来的 bk_host_id 会被更新 + self.assertFalse(bk_host_id in self.host_ids_with_dynamic_ip) + self.assertEqual(host_info["bk_biz_id"], host_obj.bk_biz_id) + self.assertEqual(bk_host_id, host_obj.bk_host_id) + + def tearDown(self) -> None: + self.assert_in_teardown() + super().tearDown() diff --git a/apps/backend/tests/components/collections/agent_new/test_get_agent_id.py b/apps/backend/tests/components/collections/agent_new/test_get_agent_id.py deleted file mode 100644 index bebaeb321..000000000 --- a/apps/backend/tests/components/collections/agent_new/test_get_agent_id.py +++ /dev/null @@ -1,173 +0,0 @@ -# -*- coding: utf-8 -*- -""" -TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. -Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at https://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" -from collections import ChainMap -from typing import Any, Callable, Dict, List, Optional - -import mock - -from apps.backend.api.constants import POLLING_INTERVAL -from apps.backend.components.collections.agent_new import components -from apps.mock_data import api_mkd -from apps.mock_data import utils as mock_data_utils -from apps.mock_data.api_mkd.cmdb import unit -from pipeline.component_framework.test import ( - ComponentTestCase, - ExecuteAssertion, - ScheduleAssertion, -) - -from . import utils - - -class GetAgentIDTestCaseMixin: - CLIENT_V2_MOCK_PATHS: List[str] = [ - "apps.backend.components.collections.agent_new.get_agent_id.client_v2", - ] - list_hosts_without_biz_func: Optional[Callable] = None - cmdb_mock_client: Optional[api_mkd.cmdb.utils.CMDBMockClient] = None - - def init_mock_clients(self): - self.cmdb_mock_client = api_mkd.cmdb.utils.CMDBMockClient( - list_hosts_without_biz_return=mock_data_utils.MockReturn( - return_type=mock_data_utils.MockReturnType.SIDE_EFFECT.value, - return_obj=self.list_hosts_without_biz_func, - ) - ) - - @classmethod - def setup_obj_factory(cls): - """设置 obj_factory""" - cls.obj_factory.init_host_num = 10 - - def fetch_succeeded_sub_inst_ids(self) -> List[int]: - return self.common_inputs["subscription_instance_ids"] - - def structure_common_inputs(self) -> Dict[str, Any]: - common_inputs = super().structure_common_inputs() - return {**common_inputs} - - def structure_common_outputs(self, polling_time: Optional[int] = None, **extra_kw) -> Dict[str, Any]: - """ - 构造原子返回数据 - :param polling_time: 轮询时间 - :param extra_kw: 额外需要添加的数据 - :return: - """ - base_common_outputs = { - "succeeded_subscription_instance_ids": self.fetch_succeeded_sub_inst_ids(), - } - if polling_time is not None: - base_common_outputs["polling_time"] = polling_time - - return dict(ChainMap(extra_kw, base_common_outputs)) - - def component_cls(self): - return components.GetAgentIDComponent - - def setUp(self) -> None: - self.init_mock_clients() - for client_v2_mock_path in self.CLIENT_V2_MOCK_PATHS: - mock.patch(client_v2_mock_path, self.cmdb_mock_client).start() - super().setUp() - - -class GetAgentIDFailedTestCase(GetAgentIDTestCaseMixin, utils.AgentServiceBaseTestCase): - list_hosts_without_biz_func = unit.mock_list_host_without_biz_result - - @classmethod - def get_default_case_name(cls) -> str: - return "查询 Agent ID 失败" - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), - outputs=self.structure_common_outputs(polling_time=0), - ), - schedule_assertion=[ - ScheduleAssertion( - success=True, - schedule_finished=False, - outputs=self.structure_common_outputs(polling_time=POLLING_INTERVAL), - ) - ], - execute_call_assertion=None, - ) - ] - - -class GetAgentIDSucceededTestCase(GetAgentIDTestCaseMixin, utils.AgentServiceBaseTestCase): - list_hosts_without_biz_func = unit.mock_list_host_without_biz_with_agent_id_result - - @classmethod - def get_default_case_name(cls) -> str: - return "查询 Agent ID 成功" - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), - outputs=self.structure_common_outputs(polling_time=0), - ), - schedule_assertion=[ - ScheduleAssertion( - success=True, - schedule_finished=True, - outputs=self.structure_common_outputs(polling_time=0), - ) - ], - execute_call_assertion=None, - ) - ] - - -class GetAgentIDTimeoutTestCase(GetAgentIDTestCaseMixin, utils.AgentServiceBaseTestCase): - list_hosts_without_biz_func = unit.mock_list_host_without_biz_result - - @classmethod - def get_default_case_name(cls) -> str: - return "查询 Agent ID 超时" - - def setUp(self) -> None: - super().setUp() - mock.patch( - "apps.backend.components.collections.agent_new.get_agent_id.POLLING_TIMEOUT", - POLLING_INTERVAL - 1, - ).start() - - def cases(self): - return [ - ComponentTestCase( - name=self.get_default_case_name(), - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=bool(self.fetch_succeeded_sub_inst_ids()), - outputs=self.structure_common_outputs(polling_time=0), - ), - schedule_assertion=[ - ScheduleAssertion( - success=False, - schedule_finished=True, - outputs=self.structure_common_outputs(polling_time=0, succeeded_subscription_instance_ids=[]), - ) - ], - execute_call_assertion=None, - ) - ] diff --git a/apps/backend/tests/components/collections/agent_new/test_query_password.py b/apps/backend/tests/components/collections/agent_new/test_query_password.py index 95e716e6f..a87f53d93 100644 --- a/apps/backend/tests/components/collections/agent_new/test_query_password.py +++ b/apps/backend/tests/components/collections/agent_new/test_query_password.py @@ -8,8 +8,9 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - +import base64 import importlib +import random import mock @@ -24,48 +25,55 @@ class QueryPasswordTestCase(utils.AgentServiceBaseTestCase): + GET_PASSWORD_MOCK_PATH = "apps.node_man.handlers.password.TjjPasswordHandler.get_password" - def component_cls(self): - importlib.reload(query_password) - QueryPasswordComponent.bound_service = query_password.QueryPasswordService - return QueryPasswordComponent + @staticmethod + def get_password(*args, **kwargs): + cloud_ip_list = args[1] + cloud_ip__pwd_map = {cloud_ip: "passwordSuccessExample" for cloud_ip in cloud_ip_list} + return True, cloud_ip__pwd_map, {}, "success" - def cases(self): - return [ - ComponentTestCase( - name="测试主机无需查询密码", - inputs=self.common_inputs, - parent_data={}, - execute_assertion=ExecuteAssertion( - success=True, - outputs={"succeeded_subscription_instance_ids": self.common_inputs["subscription_instance_ids"]}, - ), - schedule_assertion=None, - execute_call_assertion=None, - patchers=[], - ) - ] + @classmethod + def setup_obj_factory(cls): + """设置 obj_factory""" + cls.obj_factory.init_host_num = 255 + @classmethod + def adjust_test_data_in_db(cls): + without_identity_data_host_ids = random.sample( + cls.obj_factory.bk_host_ids, random.randint(10, cls.obj_factory.init_host_num) + ) + models.IdentityData.objects.filter(bk_host_id__in=without_identity_data_host_ids).delete() -class QueryPasswordSucceededTestCase(QueryPasswordTestCase): - def get_password(*args, **kwargs): - return True, {"0-127.0.0.1": "passwordSuccessExample"}, {}, "success" + for sub_inst in cls.obj_factory.sub_inst_record_objs: + host_info = sub_inst.instance_info["host"] + host_info["auth_type"] = random.choice(constants.AUTH_TUPLE) + host_info["port"] = random.randint(3306, 65535) + host_info["password"] = base64.b64encode(f"test-{host_info['bk_host_id']}".encode()).decode() - def set_identity_to_query_password(self): - models.IdentityData.objects.filter(bk_host_id__in=self.obj_factory.bk_host_ids).update( - auth_type=constants.AuthType.TJJ_PASSWORD + models.SubscriptionInstanceRecord.objects.bulk_update( + cls.obj_factory.sub_inst_record_objs, fields=["instance_info"] ) + @classmethod + def setUpTestData(cls): + super().setUpTestData() + cls.adjust_test_data_in_db() + def setUp(self) -> None: - self.set_identity_to_query_password() mock.patch(target=self.GET_PASSWORD_MOCK_PATH, side_effect=self.get_password).start() super().setUp() + def component_cls(self): + importlib.reload(query_password) + QueryPasswordComponent.bound_service = query_password.QueryPasswordService + return QueryPasswordComponent + def cases(self): return [ ComponentTestCase( - name="测试查询密码成功", + name=QueryPasswordComponent.name, inputs=self.common_inputs, parent_data={}, execute_assertion=ExecuteAssertion( @@ -78,23 +86,48 @@ def cases(self): ) ] - def tearDown(self): - # 状态检查 - self.assertTrue(models.IdentityData.objects.filter(retention=1, password="passwordSuccessExample").exists()) + def assert_in_teardown(self): + identity_data_objs = models.IdentityData.objects.filter(bk_host_id__in=self.obj_factory.bk_host_ids) + self.assertEqual(self.obj_factory.init_host_num, len(identity_data_objs)) + host_id__identity_data_objs_map = { + identity_data_obj.bk_host_id: identity_data_obj for identity_data_obj in identity_data_objs + } + for sub_inst in self.obj_factory.sub_inst_record_objs: + host_info = sub_inst.instance_info["host"] + identity_data_obj = host_id__identity_data_objs_map.get(host_info["bk_host_id"]) + auth_type = host_info["auth_type"] + self.assertEqual(identity_data_obj.auth_type, auth_type) + self.assertEqual(identity_data_obj.port, host_info["port"]) + self.assertEqual(identity_data_obj.auth_type, host_info["auth_type"]) + if auth_type == constants.AuthType.TJJ_PASSWORD: + self.assertEqual(identity_data_obj.password, "passwordSuccessExample") + else: + self.assertEqual(identity_data_obj.password, base64.b64decode(host_info.get("password", "")).decode()) + + def tearDown(self) -> None: + self.assert_in_teardown() + super().tearDown() class QueryPasswordFailedTestCase(QueryPasswordTestCase): - @classmethod - def get_password(cls, *args, **kwargs): + @staticmethod + def get_password(*args, **kwargs): return False, {}, {}, "{'10': 'ticket is expired'}" - def set_identity_to_query_password(self): - models.IdentityData.objects.filter(bk_host_id__in=self.obj_factory.bk_host_ids).update( - auth_type=constants.AuthType.TJJ_PASSWORD + @classmethod + def adjust_test_data_in_db(cls): + super().adjust_test_data_in_db() + + # 全量更新为需要查询密码的认证类型 + for sub_inst in cls.obj_factory.sub_inst_record_objs: + sub_inst.instance_info["host"]["auth_type"] = constants.AuthType.TJJ_PASSWORD + models.SubscriptionInstanceRecord.objects.bulk_update( + cls.obj_factory.sub_inst_record_objs, fields=["instance_info"] ) + # 密码置空 + models.IdentityData.objects.filter(bk_host_id__in=cls.obj_factory.bk_host_ids).update(password=None) def setUp(self) -> None: - self.set_identity_to_query_password() mock.patch(target=self.GET_PASSWORD_MOCK_PATH, side_effect=self.get_password).start() super().setUp() @@ -113,3 +146,11 @@ def cases(self): patchers=[], ) ] + + def assert_in_teardown(self): + self.assertEqual( + models.IdentityData.objects.filter( + bk_host_id__in=self.obj_factory.bk_host_ids, auth_type=constants.AuthType.TJJ_PASSWORD + ).count(), + len(self.obj_factory.bk_host_ids), + ) diff --git a/apps/backend/tests/components/collections/agent_new/test_register_host.py b/apps/backend/tests/components/collections/agent_new/test_register_host.py index df9fe3c41..b8fda4d78 100644 --- a/apps/backend/tests/components/collections/agent_new/test_register_host.py +++ b/apps/backend/tests/components/collections/agent_new/test_register_host.py @@ -8,7 +8,6 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import base64 import random from typing import Dict, List, Optional @@ -78,7 +77,6 @@ def adjust_test_data_in_db(cls): :return: """ models.Host.objects.filter(bk_host_id__in=cls.obj_factory.bk_host_ids).delete() - models.IdentityData.objects.filter(bk_host_id__in=cls.obj_factory.bk_host_ids).delete() for sub_inst_obj in cls.obj_factory.sub_inst_record_objs: sub_inst_obj.instance_info["host"].pop("bk_host_id") @@ -150,7 +148,6 @@ def component_cls(self): def assert_in_teardown(self): bk_host_ids = self.obj_factory.bk_host_ids host_objs = models.Host.objects.filter(bk_host_id__in=bk_host_ids) - identity_data_objs = models.IdentityData.objects.filter(bk_host_id__in=bk_host_ids) self.assertEqual( models.ProcessStatus.objects.filter( @@ -161,28 +158,19 @@ def assert_in_teardown(self): len(bk_host_ids), ) self.assertEqual(len(host_objs), len(bk_host_ids)) - self.assertEqual(len(identity_data_objs), len(bk_host_ids)) - host_id__identity_data_obj_map: Dict[int, models.IdentityData] = { - identity_data_obj.bk_host_id: identity_data_obj for identity_data_obj in identity_data_objs - } host_key__host_obj_map: Dict[str, models.Host] = { f"{host_obj.bk_cloud_id}-{host_obj.inner_ip}": host_obj for host_obj in host_objs } for sub_inst in models.SubscriptionInstanceRecord.objects.filter(id__in=self.obj_factory.sub_inst_record_ids): host_info = sub_inst.instance_info["host"] - identity_data_obj = host_id__identity_data_obj_map[host_info["bk_host_id"]] host_obj = host_key__host_obj_map[f"{host_info['bk_cloud_id']}-{host_info['bk_host_innerip']}"] self.assertEqual(host_info["bk_biz_id"], host_obj.bk_biz_id) self.assertEqual(host_info["bk_host_id"], host_obj.bk_host_id) - self.assertEqual(identity_data_obj.port, host_info["port"]) - self.assertEqual(identity_data_obj.auth_type, host_info["auth_type"]) - self.assertEqual(identity_data_obj.password, base64.b64decode(host_info.get("password", "")).decode()) def assert_in_teardown__empty_db(self): bk_host_ids = self.obj_factory.bk_host_ids self.assertFalse(models.Host.objects.filter(bk_host_id__in=bk_host_ids).exists()) - self.assertFalse(models.IdentityData.objects.filter(bk_host_id__in=bk_host_ids).exists()) self.assertFalse( models.ProcessStatus.objects.filter( bk_host_id__in=bk_host_ids, @@ -212,18 +200,11 @@ def adjust_test_data_in_db(cls): host_obj.inner_ip = "" # 数据库中的host为AGENT,重装为PROXY,OBJ_FACTORY_CLASS设为PROXY host_obj.node_type = constants.NodeType.AGENT - for identity_data_obj in cls.obj_factory.identity_data_objs: - identity_data_obj.auth_type = random.choice(constants.AUTH_TUPLE) - identity_data_obj.port = random.randint(3306, 65535) - identity_data_obj.password = "test" for sub_inst_obj in cls.obj_factory.sub_inst_record_objs: sub_inst_obj.instance_info["host"].pop("bk_host_id") models.Host.objects.bulk_update( cls.obj_factory.host_objs, fields=["bk_biz_id", "bk_cloud_id", "inner_ip", "node_type"] ) - models.IdentityData.objects.bulk_update( - cls.obj_factory.identity_data_objs, fields=["auth_type", "port", "password"] - ) models.SubscriptionInstanceRecord.objects.bulk_update( cls.obj_factory.sub_inst_record_objs, fields=["instance_info"] ) @@ -237,31 +218,6 @@ def assert_in_teardown(self): self.assertEqual(host.node_type, constants.NodeType.PROXY) -class EmptyAuthInfoTestCase(RegisterHostTestCase): - @classmethod - def get_default_case_name(cls) -> str: - return "主机登录认证信息被清空" - - def fetch_succeeded_sub_inst_ids(self) -> List[int]: - return [] - - @classmethod - def adjust_test_data_in_db(cls): - """ - 调整DB中的测试数据 - :return: - """ - super().adjust_test_data_in_db() - for sub_inst_obj in cls.obj_factory.sub_inst_record_objs: - sub_inst_obj.instance_info["auth_type"] = constants.AuthType.PASSWORD - sub_inst_obj.instance_info["host"].pop("password") - sub_inst_obj.instance_info["host"].pop("key") - sub_inst_obj.save() - - def assert_in_teardown(self): - self.assert_in_teardown__empty_db() - - class CannotFindInCMDB(RegisterHostTestCase): @classmethod def get_default_case_name(cls) -> str: diff --git a/apps/backend/tests/components/collections/agent_new/utils.py b/apps/backend/tests/components/collections/agent_new/utils.py index fc5e26f37..33feb0658 100644 --- a/apps/backend/tests/components/collections/agent_new/utils.py +++ b/apps/backend/tests/components/collections/agent_new/utils.py @@ -45,7 +45,9 @@ "host_node_type": constants.NodeType.AGENT, "login_ip": utils.DEFAULT_IP, "bk_host_innerip": utils.DEFAULT_IP, + "bk_host_innerip_v6": common_unit.host.DEFAULT_IP, "bk_host_outerip": utils.DEFAULT_IP, + "bk_host_outerip_v6": common_unit.host.DEFAULT_IPV6, "port": 22, "password": base64.b64encode("password".encode()).decode(), "key": base64.b64encode("password:::key".encode()).decode(), @@ -110,13 +112,25 @@ def bulk_create_model(cls, model: Type[Model], create_data_list: List[Dict]): @classmethod def fill_mock_ip(cls, host_related_data_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - ip_field_names = ["inner_ip", "outer_ip", "login_ip", "data_ip", "bk_host_innerip", "bk_host_outerip"] + ipv6_fields_names = ["inner_ipv6", "outer_ipv6", "bk_host_innerip_v6", "bk_host_outerip_v6"] + ip_field_names = [ + "inner_ip", + "outer_ip", + "login_ip", + "data_ip", + "bk_host_innerip", + "bk_host_outerip", + ] + ipv6_fields_names default_ip_tmpl = "127.0.0.{index}" + default_ipv6_tmpl = "::{index_hex}" for index, host_data in enumerate(host_related_data_list, 1): - ip = default_ip_tmpl.format(index=index) for ip_field_name in ip_field_names: if ip_field_name not in host_data: continue + if ip_field_name in ipv6_fields_names: + ip = default_ipv6_tmpl.format(index_hex=hex(index)[2:]) + else: + ip = default_ip_tmpl.format(index=index) host_data[ip_field_name] = ip return host_related_data_list diff --git a/apps/mock_data/api_mkd/cmdb/unit.py b/apps/mock_data/api_mkd/cmdb/unit.py index 61859e63d..b87264855 100644 --- a/apps/mock_data/api_mkd/cmdb/unit.py +++ b/apps/mock_data/api_mkd/cmdb/unit.py @@ -8,58 +8,55 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import copy +import random from apps.mock_data.common_unit import host from apps.node_man import constants +from ... import utils + CMDB_HOST_INFO = { "bk_host_id": host.DEFAULT_HOST_ID, + "bk_agent_id": "", + "bk_host_name": "", + "bk_supplier_account": "0", "bk_cloud_id": constants.DEFAULT_CLOUD, + "bk_addressing": constants.CmdbAddressingType.STATIC.value, "bk_host_innerip": host.DEFAULT_IP, - "bk_host_outerip": "", -} -CMDB_HOST_INFO_WITH_AGENT_ID = { - "bk_host_id": host.DEFAULT_HOST_ID, - "bk_cloud_id": constants.DEFAULT_CLOUD, - "bk_host_innerip": host.DEFAULT_IP, + "bk_host_innerip_v6": host.DEFAULT_IPV6, "bk_host_outerip": host.DEFAULT_IP, - "bk_host_innerip_v6": host.DEFAULT_IPv6, - "bk_host_outerip_v6": host.DEFAULT_IPv6, - "bk_agent_id": host.BK_AGENT_ID, + "bk_host_outerip_v6": host.DEFAULT_IPV6, + "operator": utils.DEFAULT_USERNAME, + "bk_bak_operator": utils.DEFAULT_USERNAME, + "bk_cpu_module": "", + "bk_isp_name": None, + "bk_os_bit": "", + "bk_os_name": "", + "bk_os_type": constants.BK_OS_TYPE[constants.OsType.LINUX], + "bk_os_version": "", + "bk_province_name": None, + "bk_state": None, + "bk_state_name": None, } -LIST_HOSTS_WITHOUT_BIZ_RESULT = {"count": 1, "info": [CMDB_HOST_INFO]} - -LIST_HOSTS_WITHOUT_BIZ_WITH_AGENT_ID_RESULT = {"count": 1, "info": [CMDB_HOST_INFO_WITH_AGENT_ID]} - +CMDB_BIZ_INFO = { + "bk_biz_id": utils.DEFAULT_BK_BIZ_ID, + "bk_biz_name": utils.DEFAULT_BK_BIZ_NAME, + "bk_biz_maintainer": utils.DEFAULT_USERNAME, +} -def mock_list_host_without_biz_result(*args, **kwargs): - has_agent_id = kwargs.get("has_agent_id") - if has_agent_id: - host_info_template = CMDB_HOST_INFO_WITH_AGENT_ID - else: - host_info_template = CMDB_HOST_INFO - host_infos = [] - check_by_bk_host_id = False - bk_host_ids = [] - for rule in kwargs["host_property_filter"]["rules"]: - if rule["field"] == "bk_host_id": - bk_host_ids = rule["value"] - check_by_bk_host_id = True - if check_by_bk_host_id: - for bk_host_id in bk_host_ids: - host_info = copy.deepcopy(host_info_template) - host_info["bk_host_id"] = bk_host_id - host_infos.append(host_info) - else: - host_infos = [host_info_template] - return {"count": len(host_infos), "info": host_infos} +CMDB_HOST_BIZ_RELATION = { + "bk_host_id": host.DEFAULT_HOST_ID, + "bk_biz_id": utils.DEFAULT_BK_BIZ_ID, + "bk_module_id": random.randint(1, 100), + "bk_set_id": random.randint(1, 100), + "bk_supplier_account": constants.DEFAULT_SUPPLIER_ID, +} +LIST_HOSTS_WITHOUT_BIZ_DATA = {"count": 1, "info": [CMDB_HOST_INFO]} -def mock_list_host_without_biz_without_agent_id_result(*args, **kwargs): - return mock_list_host_without_biz_result(has_agent_id=False, *args, **kwargs) +SEARCH_BUSINESS_DATA = {"count": 1, "info": [CMDB_BIZ_INFO]} +ADD_HOST_TO_BUSINESS_IDLE_DATA = {"bk_host_ids": [CMDB_HOST_INFO["bk_host_id"]]} -def mock_list_host_without_biz_with_agent_id_result(*args, **kwargs): - return mock_list_host_without_biz_result(has_agent_id=True, *args, **kwargs) +BATCH_UPDATE_HOST_DATA = None diff --git a/apps/mock_data/api_mkd/cmdb/utils.py b/apps/mock_data/api_mkd/cmdb/utils.py index e87d3afba..e4092a3b9 100644 --- a/apps/mock_data/api_mkd/cmdb/utils.py +++ b/apps/mock_data/api_mkd/cmdb/utils.py @@ -39,10 +39,24 @@ def __init__( class CCApiMockClient(utils.BaseMockClient): - def __init__(self, bind_host_agent_return=None, unbind_host_agent_return=None): + def __init__( + self, + search_business_return=None, + bind_host_agent_return=None, + unbind_host_agent_return=None, + list_hosts_without_biz_return=None, + batch_update_host=None, + add_host_to_business_idle_return=None, + find_host_biz_relations_return=None, + ): super(CCApiMockClient, self).__init__() + self.search_business = self.generate_magic_mock(mock_return_obj=search_business_return) self.bind_host_agent = self.generate_magic_mock(mock_return_obj=bind_host_agent_return) self.unbind_host_agent = self.generate_magic_mock(mock_return_obj=unbind_host_agent_return) + self.list_hosts_without_biz = self.generate_magic_mock(mock_return_obj=list_hosts_without_biz_return) + self.batch_update_host = self.generate_magic_mock(mock_return_obj=batch_update_host) + self.add_host_to_business_idle = self.generate_magic_mock(mock_return_obj=add_host_to_business_idle_return) + self.find_host_biz_relations = self.generate_magic_mock(mock_return_obj=find_host_biz_relations_return) # 记录接口调用 self.bind_host_agent = self.call_recorder.start(self.bind_host_agent, key=CCApi.bind_host_agent) diff --git a/apps/mock_data/common_unit/host.py b/apps/mock_data/common_unit/host.py index 3fced9a14..a011db5fb 100644 --- a/apps/mock_data/common_unit/host.py +++ b/apps/mock_data/common_unit/host.py @@ -20,7 +20,7 @@ DEFAULT_IP = "127.0.0.1" -DEFAULT_IPv6 = "ABCD:EF01:2345:6789:ABCD:EF01:2345:6789" +DEFAULT_IPV6 = "ABCD:EF01:2345:6789:ABCD:EF01:2345:6789" PROXY_INNER_IP = "1.1.1.1" @@ -102,8 +102,8 @@ HOST_MODEL_DATA_WITH_AGENT_ID.update( **{ "bk_agent_id": BK_AGENT_ID, - "inner_ipv6": DEFAULT_IPv6, - "outer_ipv6": DEFAULT_IPv6, + "inner_ipv6": DEFAULT_IPV6, + "outer_ipv6": DEFAULT_IPV6, } ) diff --git a/apps/mock_data/views_mkd/job.py b/apps/mock_data/views_mkd/job.py index 472303a3d..aa54ce870 100644 --- a/apps/mock_data/views_mkd/job.py +++ b/apps/mock_data/views_mkd/job.py @@ -25,9 +25,9 @@ "bk_biz_id": utils.DEFAULT_BK_BIZ_ID, "os_type": constants.OsType.LINUX, "inner_ip": host.DEFAULT_IP, - "inner_ipv6": host.DEFAULT_IPv6, + "inner_ipv6": host.DEFAULT_IPV6, "outer_ip": host.DEFAULT_IP, - "outer_ipv6": host.DEFAULT_IPv6, + "outer_ipv6": host.DEFAULT_IPV6, "login_ip": host.DEFAULT_IP, "data_ip": host.DEFAULT_IP, "account": constants.LINUX_ACCOUNT, diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index d5c8292ba..d33822f8a 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -520,6 +520,7 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: QUERY_AGENT_STATUS_HOST_LENS = 500 QUERY_PROC_STATUS_HOST_LENS = 2000 QUERY_CMDB_LIMIT = 500 +WRITE_CMDB_LIMIT = 500 QUERY_CMDB_MODULE_LIMIT = 500 QUERY_CLOUD_LIMIT = 200 QUERY_HOST_SERVICE_TEMPLATE_LIMIT = 200 @@ -589,6 +590,7 @@ class BkappRunEnvType(Enum): "bk_host_id", "bk_agent_id", "bk_cloud_id", + "bk_addressing", "bk_host_innerip", "bk_host_outerip", "bk_host_innerip_v6", @@ -757,6 +759,11 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: } +######################################################################################################## +# GSE +######################################################################################################## + + class GseOpType(object): """ GSE进程操作类型 @@ -835,6 +842,11 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: } +######################################################################################################## +# CMDB +######################################################################################################## + + class CmdbObjectId: BIZ = "biz" SET = "set" @@ -848,6 +860,17 @@ class CmdbObjectId: OBJ_ID_ALIAS_MAP = {BIZ: _("业务"), SET: _("集群"), MODULE: _("模块"), HOST: _("主机"), CUSTOM: _("自定义")} +class CmdbAddressingType(EnhanceEnum): + """寻址方式""" + + STATIC = "0" + DYNAMIC = "1" + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return {cls.STATIC: _("静态"), cls.DYNAMIC: _("动态")} + + class PolicyRollBackType: SUPPRESSED = "SUPPRESSED" LOSE_CONTROL = "LOSE_CONTROL" diff --git a/apps/node_man/handlers/job.py b/apps/node_man/handlers/job.py index b4d52ef5e..c6c0b6a6b 100644 --- a/apps/node_man/handlers/job.py +++ b/apps/node_man/handlers/job.py @@ -508,6 +508,11 @@ def subscription_install(self, accept_list: list, node_type: str, cloud_info: di "bk_biz_name": biz_info.get(host["bk_biz_id"]), "bk_cloud_id": host["bk_cloud_id"], "bk_cloud_name": str(cloud_info.get(host["bk_cloud_id"], {}).get("bk_cloud_name")), + # 开启动态主机配置协议适配后,增量主机走动态IP方案 + "bk_addressing": ( + constants.CmdbAddressingType.STATIC.value, + constants.CmdbAddressingType.DYNAMIC.value, + )[settings.BKAPP_ENABLE_DHCP], "bk_supplier_account": settings.DEFAULT_SUPPLIER_ACCOUNT, "host_node_type": host_node_type, "os_type": host["os_type"], diff --git a/apps/node_man/migrations/0062_auto_20220620_1150.py b/apps/node_man/migrations/0062_auto_20220620_1150.py new file mode 100644 index 000000000..0e83d18d1 --- /dev/null +++ b/apps/node_man/migrations/0062_auto_20220620_1150.py @@ -0,0 +1,28 @@ +# Generated by Django 3.2.4 on 2022-06-20 03:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("node_man", "0061_merge_0059_auto_20220415_2150_0060_auto_20220315_2044"), + ] + + operations = [ + migrations.AddField( + model_name="host", + name="bk_addressing", + field=models.CharField(default="0", max_length=16, verbose_name="寻地方式"), + ), + migrations.AlterField( + model_name="host", + name="data_ip", + field=models.CharField(blank=True, default="", max_length=45, null=True, verbose_name="数据IP"), + ), + migrations.AlterField( + model_name="host", + name="login_ip", + field=models.CharField(blank=True, default="", max_length=45, null=True, verbose_name="登录IP"), + ), + ] diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 6bf98263f..7035f6898 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -311,11 +311,12 @@ class Host(models.Model): bk_agent_id = models.CharField(_("AgentID"), max_length=64, db_index=True, blank=True, null=True) bk_biz_id = models.IntegerField(_("业务ID"), db_index=True) bk_cloud_id = models.IntegerField(_("云区域ID"), db_index=True) + bk_addressing = models.CharField(_("寻地方式"), max_length=16, default=constants.CmdbAddressingType.STATIC.value) inner_ip = models.CharField(_("内网IP"), max_length=15, db_index=True) outer_ip = models.CharField(_("外网IP"), max_length=15, blank=True, null=True, default="") - login_ip = models.CharField(_("登录IP"), max_length=15, blank=True, null=True, default="") - data_ip = models.CharField(_("数据IP"), max_length=15, blank=True, null=True, default="") + login_ip = models.CharField(_("登录IP"), max_length=45, blank=True, null=True, default="") + data_ip = models.CharField(_("数据IP"), max_length=45, blank=True, null=True, default="") inner_ipv6 = models.CharField(_("内网IPv6"), max_length=45, blank=True, null=True, default="") outer_ipv6 = models.CharField(_("外网IPv6"), max_length=45, blank=True, null=True, default="") diff --git a/apps/node_man/periodic_tasks/sync_cmdb_host.py b/apps/node_man/periodic_tasks/sync_cmdb_host.py index f77e9ea68..d4750a621 100644 --- a/apps/node_man/periodic_tasks/sync_cmdb_host.py +++ b/apps/node_man/periodic_tasks/sync_cmdb_host.py @@ -95,7 +95,15 @@ def _list_resource_pool_hosts(start): def _bulk_update_host(hosts, extra_fields): - update_fields = ["bk_cloud_id", "inner_ip", "outer_ip", "inner_ipv6", "outer_ipv6", "bk_agent_id"] + extra_fields + update_fields = [ + "bk_cloud_id", + "bk_addressing", + "inner_ip", + "outer_ip", + "inner_ipv6", + "outer_ipv6", + "bk_agent_id", + ] + extra_fields if hosts: models.Host.objects.bulk_update(hosts, fields=update_fields) @@ -108,10 +116,11 @@ def _generate_host(biz_id, host, ap_id): bk_agent_id=host.get("bk_agent_id"), bk_biz_id=biz_id, bk_cloud_id=host["bk_cloud_id"], - inner_ip=host["bk_host_innerip"].split(",")[0], - outer_ip=host["bk_host_outerip"].split(",")[0], - inner_ipv6=host.get("bk_host_innerip_v6"), - outer_ipv6=host.get("bk_host_outerip_v6"), + bk_addressing=host.get("bk_addressing") or constants.CmdbAddressingType.STATIC.value, + inner_ip=(host.get("bk_host_innerip") or "").split(",")[0], + outer_ip=(host.get("bk_host_outerip") or "").split(",")[0], + inner_ipv6=(host.get("bk_host_innerip_v6") or "").split(",")[0], + outer_ipv6=(host.get("bk_host_outerip_v6") or "").split(",")[0], node_from=constants.NodeFrom.CMDB, os_type=os_type, cpu_arch=cpu_arch, @@ -205,10 +214,11 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): "bk_host_id": host["bk_host_id"], "bk_agent_id": host.get("bk_agent_id"), "bk_cloud_id": host["bk_cloud_id"], - "inner_ip": host["bk_host_innerip"].split(",")[0], - "outer_ip": host["bk_host_outerip"].split(",")[0], - "inner_ipv6": host.get("bk_host_innerip_v6"), - "outer_ipv6": host.get("bk_host_outerip_v6"), + "bk_addressing": host.get("bk_addressing") or constants.CmdbAddressingType.STATIC.value, + "inner_ip": (host.get("bk_host_innerip") or "").split(",")[0], + "outer_ip": (host.get("bk_host_outerip") or "").split(",")[0], + "inner_ipv6": (host.get("bk_host_innerip_v6") or "").split(",")[0], + "outer_ipv6": (host.get("bk_host_outerip_v6") or "").split(",")[0], } if host["bk_host_id"] in exist_agent_host_ids: diff --git a/apps/utils/batch_request.py b/apps/utils/batch_request.py index 47354f3f5..ed2629386 100644 --- a/apps/utils/batch_request.py +++ b/apps/utils/batch_request.py @@ -69,7 +69,9 @@ def batch_request( return sync_batch_request(func, params, get_data, limit) if not split_params: - final_request_params = [{"count": get_count(func(page={"start": 0, "limit": 1}, **params)), "params": params}] + final_request_params = [ + {"count": get_count(func(dict(page={"start": 0, "limit": 1}, **params))), "params": params} + ] else: final_request_params = format_params(params, get_count, func) @@ -86,7 +88,7 @@ def batch_request( if sort: request_params["page"]["sort"] = sort request_params.update(req["params"]) - futures.append(pool.apply_async(func, kwds=request_params)) + futures.append(pool.apply_async(func, args=(request_params,))) start += limit diff --git a/apps/utils/exc.py b/apps/utils/exc.py index 4353ad23e..31f0b09dc 100644 --- a/apps/utils/exc.py +++ b/apps/utils/exc.py @@ -71,6 +71,7 @@ def wrapped_executor(self, wrapped: Callable, instance: Any, args: Tuple[Any], k try: return wrapped(*args, **kwargs) except Exception as exc: + print(self.exc_handler) return self.exc_handler(wrapped, instance, args, kwargs, exc) def __init__( diff --git a/common/api/modules/cc.py b/common/api/modules/cc.py index cf5e75e18..0ae086071 100644 --- a/common/api/modules/cc.py +++ b/common/api/modules/cc.py @@ -48,6 +48,13 @@ def __init__(self): description="批量获取模块详情", before_request=add_esb_info_before_request, ) + self.list_hosts_without_biz = DataAPI( + method="POST", + url=CC_APIGATEWAY_ROOT_V2 + "list_hosts_without_biz/", + module=self.MODULE, + description="没有业务ID的主机查询", + before_request=add_esb_info_before_request, + ) self.list_service_template = DataAPI( method="POST", url=CC_APIGATEWAY_ROOT_V2 + "list_service_template/", @@ -111,6 +118,20 @@ def __init__(self): description="获取主机与拓扑的关系", before_request=add_esb_info_before_request, ) + self.find_host_biz_relations = DataAPI( + method="POST", + url=CC_APIGATEWAY_ROOT_V2 + "find_host_biz_relations/", + module=self.MODULE, + description="查询主机业务关系信息", + before_request=add_esb_info_before_request, + ) + self.batch_update_host = DataAPI( + method="POST", + url=CC_APIGATEWAY_ROOT_V2 + "batch_update_host/", + module=self.MODULE, + description="批量更新主机属性", + before_request=add_esb_info_before_request, + ) self.resource_watch = DataAPI( method="POST", url=CC_APIGATEWAY_ROOT_V2 + "resource_watch/", diff --git a/config/default.py b/config/default.py index 6b6266c3b..66f7eb8c3 100644 --- a/config/default.py +++ b/config/default.py @@ -41,6 +41,7 @@ def redirect_func(request): # =============================================================================== BKAPP_RUN_ENV = env.BKAPP_RUN_ENV BKAPP_IS_PAAS_DEPLOY = env.BKAPP_IS_PAAS_DEPLOY +BKAPP_ENABLE_DHCP = env.BKAPP_ENABLE_DHCP BK_BACKEND_CONFIG = env.BK_BACKEND_CONFIG diff --git a/env/__init__.py b/env/__init__.py index bd1cb0b43..22d64c841 100644 --- a/env/__init__.py +++ b/env/__init__.py @@ -16,6 +16,7 @@ __all__ = [ "BKAPP_RUN_ENV", + "BKAPP_ENABLE_DHCP", "BKAPP_IS_PAAS_DEPLOY", "BK_BACKEND_CONFIG", "LOG_TYPE", @@ -42,6 +43,8 @@ BKAPP_RUN_ENV = get_type_env(key="BKAPP_RUN_ENV", default="ee", _type=str) # 后台是否为 PaaS 部署 BKAPP_IS_PAAS_DEPLOY = BKAPP_IS_PAAS_DEPLOY +# 是否开启动态主机配置协议适配 +BKAPP_ENABLE_DHCP = get_type_env(key="BKAPP_ENABLE_DHCP", default=False, _type=bool) # # 是否为后台配置 BK_BACKEND_CONFIG = BK_BACKEND_CONFIG diff --git a/support-files/kubernetes/helm/bk-nodeman/README.md b/support-files/kubernetes/helm/bk-nodeman/README.md index 2a99d996a..07bceb8ce 100644 --- a/support-files/kubernetes/helm/bk-nodeman/README.md +++ b/support-files/kubernetes/helm/bk-nodeman/README.md @@ -277,6 +277,7 @@ externalRabbitMQ: | `config.appCode` | app code | `bk_nodeman` | | `config.appSecret` | app secret | `""` | | `config.bkAppRunEnv` | 运行环境,ce / ee / ieod,影响 gse 端口等配置 | `ce` | +| `config.bkAppEnableDHCP` | 是否开启动态主机配置协议适配 | `false` | | `config.bkPaasMajorVersion` | 开发框架 PaaS 版本适配,目前仅支持 `3` | `3` | | `config.bkPaaSEnvironment` | 开发框架 PaaS 环境适配,目前仅支持 `prod` | `prod` | | `config.logType` | 日志类别,`DEFAULT`- `STDOUT` | `STDOUT` | diff --git a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml index 61a365304..327091b19 100644 --- a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml +++ b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml @@ -20,6 +20,7 @@ data: BKPAAS_APP_SECRET: "{{ .Values.config.appSecret }}" BKAPP_RUN_ENV: "{{ .Values.config.bkAppRunEnv }}" + BKAPP_ENABLE_DHCP: "{{ .Values.config.bkAppEnableDHCP }}" BKPAAS_MAJOR_VERSION: "{{ .Values.config.bkPaasMajorVersion }}" BKPAAS_ENVIRONMENT: "{{ .Values.config.bkPaaSEnvironment }}" diff --git a/support-files/kubernetes/helm/bk-nodeman/values.yaml b/support-files/kubernetes/helm/bk-nodeman/values.yaml index 53ab2e299..dfe52ef32 100644 --- a/support-files/kubernetes/helm/bk-nodeman/values.yaml +++ b/support-files/kubernetes/helm/bk-nodeman/values.yaml @@ -365,6 +365,7 @@ config: ## ## 线程最大并发数 concurrentNumber: 50 + bkAppEnableDHCP: false ## -------------------------------------- ## 节点管理后台环境变量配置