Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add argument white_list to masking function #146

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions core/utils/masking_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def card_sub_func(x: Match[str]) -> str:


def masking(data: Union[Dict, List], masking_fields: Optional[Union[Dict, List]] = None,
depth_level: int = 2, mask_available_depth: int = -1) -> Union[Dict, List]:
depth_level: int = 2, mask_available_depth: int = -1,
white_list: Optional[List[str]] = None) -> Union[Dict, List]:
"""
:param data: коллекция для маскирования приватных данных
:param masking_fields: поля для обязательной маскировки независимо от уровня
:param white_list: поля, которые не должны маскироваться
:param depth_level: глубина сохранения структуры маскируемого поля
:param mask_available_depth: глубина глубокой маскировки полей без сохранения структуры (см ниже)
"""
Expand All @@ -60,12 +62,13 @@ def masking(data: Union[Dict, List], masking_fields: Optional[Union[Dict, List]]
if masking_fields is None:
masking_fields = DEFAULT_MASKING_FIELDS

return _masking(data, masking_fields, depth_level, mask_available_depth, masking_on=False, card_masking_on=False)
return _masking(data, masking_fields, depth_level, mask_available_depth,
masking_on=False, card_masking_on=False, white_list=white_list)


def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
depth_level: int = 2, mask_available_depth: int = -1, masking_on: bool = False,
card_masking_on: bool = False) -> Union[Dict, List]:
card_masking_on: bool = False, white_list: Optional[List[str]] = None) -> Union[Dict, List]:

# тут в зависимости от листа или словаря создаем итератор
if isinstance(data, dict):
Expand All @@ -80,15 +83,17 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
if isinstance(data[key], (set, tuple)):
data[key] = list(data[key])
value_is_collection = True
if masking_on or key in masking_fields:
if white_list is not None and key in white_list:
masked_data[key] = data[key]
elif masking_on or key in masking_fields:
if value_is_collection:
# если глубина не превышена, идем внутрь с включенным флагом и уменьшаем глубину
if masking_on and depth_level > 0:
masked_data[key] = _masking(data[key], masking_fields, depth_level - 1,
mask_available_depth, masking_on=True)
mask_available_depth, masking_on=True, white_list=white_list)
elif key in masking_fields and masking_fields[key] > 0:
masked_data[key] = _masking(data[key], masking_fields, masking_fields[key] - 1,
mask_available_depth, masking_on=True)
mask_available_depth, masking_on=True, white_list=white_list)
else:
counter = structure_mask(data[key], depth=1, available_depth=mask_available_depth)
masked_data[key] = f'*items-{counter.items}*collections-{counter.collections}*maxdepth-{counter.max_depth}*' # noqa
Expand All @@ -98,8 +103,8 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
masked_data[key] = data[key]
elif key in CARD_MASKING_FIELDS or card_masking_on: # проверка на реквизиты карты
if value_is_collection:
masked_data[key] = _masking(data[key], masking_fields, depth_level,
mask_available_depth, masking_on, card_masking_on=True)
masked_data[key] = _masking(data[key], masking_fields, depth_level, mask_available_depth,
masking_on, card_masking_on=True, white_list=white_list)
elif isinstance(data[key], str):
masked_data[key] = card_regular.sub(card_sub_func, data[key])
elif isinstance(data[key], int):
Expand All @@ -114,7 +119,7 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
elif value_is_collection:
# если маскировка не нужна уходим глубже без включенного флага
masked_data[key] = _masking(data[key], masking_fields, depth_level, mask_available_depth,
masking_on=False, card_masking_on=card_masking_on)
masking_on=False, card_masking_on=card_masking_on, white_list=white_list)
else:
masked_data[key] = data[key]
return masked_data
Expand All @@ -123,7 +128,7 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
def structure_mask(data: Union[Dict, List], depth: int, available_depth: int = -1,
counter: Optional[Counter] = None):
"""
Функция максировки для сложной структуры
Функция маскирования для сложной структуры
:param data: структура маскируемая без сохранения структуры
:param depth: текущая глубина вложенности
:param available_depth: максимальная глубина прохода, при -1 глубина не ограничена
Expand Down
17 changes: 12 additions & 5 deletions smart_kit/message/smartapp_to_message.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
from __future__ import annotations

from functools import cached_property
from typing import Iterable
from typing import TYPE_CHECKING, Iterable, Optional, List
import json
from copy import copy

from core.utils.masking_message import masking
from core.message.msg_validator import MessageValidator
from smart_kit.request.kafka_request import SmartKitKafkaRequest
from smart_kit.utils import SmartAppToMessage_pb2

if TYPE_CHECKING:
from core.basic_models.actions.command import Command
from core.message.msg_validator import MessageValidator
from smart_kit.request.kafka_request import SmartKitKafkaRequest


class SmartAppToMessage:
ROOT_NODES_KEY = "root_nodes"
PAYLOAD = "payload"

def __init__(self, command, message, request: SmartKitKafkaRequest, forward_fields=None, masking_fields=None,
validators: Iterable[MessageValidator] = (), **kwargs):
def __init__(self, command: Command, message, request: SmartKitKafkaRequest,
forward_fields=None, masking_fields=None, validators: Iterable[MessageValidator] = (),
masking_white_list: Optional[List[str]] = None, **kwargs):
root_nodes = command.payload.pop(self.ROOT_NODES_KEY, None)
self.command = command
self.root_nodes = root_nodes or {}
self.incoming_message = message
self.request = request
self.forward_fields = forward_fields or ()
self.masking_fields = masking_fields
self.masking_white_list = masking_white_list
self.validators = validators
self._kwargs = kwargs

Expand Down
10 changes: 9 additions & 1 deletion tests/core_tests/masking_test/test_masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,20 @@ def test_masking(self):
masked_message = masking(input_msg, masking_fields)
self.assertEqual(expected, masked_message)

# если маскируемое поле окажется внутри банковского поля - то оно маскируется с заданной вложеностью
# если маскируемое поле окажется внутри банковского поля - то оно маскируется с заданной вложенностью
input_msg = {'message': {'spec_token': ['12', ['12', {'data': {'key': '12'}}]]}}
expected = {'message': {'spec_token': ['***', ['***', '*items-1*collections-1*maxdepth-2*']]}}
masked_message = masking(input_msg, masking_fields)
self.assertEqual(expected, masked_message)

def test_white_list(self):
masking_fields = ["spec_token"]
input_msg = {"spec_token": {"a": "a", "b": "b"}}
white_list = ["b"]
expected = {"spec_token": {"a": "***", "b": "b"}}
masked_message = masking(input_msg, masking_fields, white_list=white_list)
self.assertEqual(expected, masked_message)

def test_depth(self):
# вложенность любой длины не маскируется пока не встретим ключ для маскировки
masking_fields = ['spec_token']
Expand Down
Loading