Skip to content

Commit

Permalink
add argument white_list to masking function (#146)
Browse files Browse the repository at this point in the history
* add argument white_list to masking function

* add masking_white_list to SmartAppToMessage

* fix annotations for SmartAppToMessage
  • Loading branch information
Dan1lD authored and Dmitrii Proskurin committed Oct 16, 2023
1 parent 60cbf3c commit 012cf42
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 16 deletions.
25 changes: 15 additions & 10 deletions core/utils/masking_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def card_sub_func(x: Match[str]) -> str:


def masking(data: Union[Dict, List], masking_fields: Optional[Union[Dict, List]] = None,
depth_level: int = 2, mask_available_depth: int = -1) -> Union[Dict, List]:
depth_level: int = 2, mask_available_depth: int = -1,
white_list: Optional[List[str]] = None) -> Union[Dict, List]:
"""
:param data: коллекция для маскирования приватных данных
:param masking_fields: поля для обязательной маскировки независимо от уровня
:param white_list: поля, которые не должны маскироваться
:param depth_level: глубина сохранения структуры маскируемого поля
:param mask_available_depth: глубина глубокой маскировки полей без сохранения структуры (см ниже)
"""
Expand All @@ -60,12 +62,13 @@ def masking(data: Union[Dict, List], masking_fields: Optional[Union[Dict, List]]
if masking_fields is None:
masking_fields = DEFAULT_MASKING_FIELDS

return _masking(data, masking_fields, depth_level, mask_available_depth, masking_on=False, card_masking_on=False)
return _masking(data, masking_fields, depth_level, mask_available_depth,
masking_on=False, card_masking_on=False, white_list=white_list)


def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
depth_level: int = 2, mask_available_depth: int = -1, masking_on: bool = False,
card_masking_on: bool = False) -> Union[Dict, List]:
card_masking_on: bool = False, white_list: Optional[List[str]] = None) -> Union[Dict, List]:

# тут в зависимости от листа или словаря создаем итератор
if isinstance(data, dict):
Expand All @@ -80,15 +83,17 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
if isinstance(data[key], (set, tuple)):
data[key] = list(data[key])
value_is_collection = True
if masking_on or key in masking_fields:
if white_list is not None and key in white_list:
masked_data[key] = data[key]
elif masking_on or key in masking_fields:
if value_is_collection:
# если глубина не превышена, идем внутрь с включенным флагом и уменьшаем глубину
if masking_on and depth_level > 0:
masked_data[key] = _masking(data[key], masking_fields, depth_level - 1,
mask_available_depth, masking_on=True)
mask_available_depth, masking_on=True, white_list=white_list)
elif key in masking_fields and masking_fields[key] > 0:
masked_data[key] = _masking(data[key], masking_fields, masking_fields[key] - 1,
mask_available_depth, masking_on=True)
mask_available_depth, masking_on=True, white_list=white_list)
else:
counter = structure_mask(data[key], depth=1, available_depth=mask_available_depth)
masked_data[key] = f'*items-{counter.items}*collections-{counter.collections}*maxdepth-{counter.max_depth}*' # noqa
Expand All @@ -98,8 +103,8 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
masked_data[key] = data[key]
elif key in CARD_MASKING_FIELDS or card_masking_on: # проверка на реквизиты карты
if value_is_collection:
masked_data[key] = _masking(data[key], masking_fields, depth_level,
mask_available_depth, masking_on, card_masking_on=True)
masked_data[key] = _masking(data[key], masking_fields, depth_level, mask_available_depth,
masking_on, card_masking_on=True, white_list=white_list)
elif isinstance(data[key], str):
masked_data[key] = card_regular.sub(card_sub_func, data[key])
elif isinstance(data[key], int):
Expand All @@ -114,7 +119,7 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
elif value_is_collection:
# если маскировка не нужна уходим глубже без включенного флага
masked_data[key] = _masking(data[key], masking_fields, depth_level, mask_available_depth,
masking_on=False, card_masking_on=card_masking_on)
masking_on=False, card_masking_on=card_masking_on, white_list=white_list)
else:
masked_data[key] = data[key]
return masked_data
Expand All @@ -123,7 +128,7 @@ def _masking(data: Union[Dict, List], masking_fields: Union[Dict, List],
def structure_mask(data: Union[Dict, List], depth: int, available_depth: int = -1,
counter: Optional[Counter] = None):
"""
Функция максировки для сложной структуры
Функция маскирования для сложной структуры
:param data: структура маскируемая без сохранения структуры
:param depth: текущая глубина вложенности
:param available_depth: максимальная глубина прохода, при -1 глубина не ограничена
Expand Down
17 changes: 12 additions & 5 deletions smart_kit/message/smartapp_to_message.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
from __future__ import annotations

from functools import cached_property
from typing import Iterable
from typing import TYPE_CHECKING, Iterable, Optional, List
import json
from copy import copy

from core.utils.masking_message import masking
from core.message.msg_validator import MessageValidator
from smart_kit.request.kafka_request import SmartKitKafkaRequest
from smart_kit.utils import SmartAppToMessage_pb2

if TYPE_CHECKING:
from core.basic_models.actions.command import Command
from core.message.msg_validator import MessageValidator
from smart_kit.request.kafka_request import SmartKitKafkaRequest


class SmartAppToMessage:
ROOT_NODES_KEY = "root_nodes"
PAYLOAD = "payload"

def __init__(self, command, message, request: SmartKitKafkaRequest, forward_fields=None, masking_fields=None,
validators: Iterable[MessageValidator] = (), **kwargs):
def __init__(self, command: Command, message, request: SmartKitKafkaRequest,
forward_fields=None, masking_fields=None, validators: Iterable[MessageValidator] = (),
masking_white_list: Optional[List[str]] = None, **kwargs):
root_nodes = command.payload.pop(self.ROOT_NODES_KEY, None)
self.command = command
self.root_nodes = root_nodes or {}
self.incoming_message = message
self.request = request
self.forward_fields = forward_fields or ()
self.masking_fields = masking_fields
self.masking_white_list = masking_white_list
self.validators = validators
self._kwargs = kwargs

Expand Down
10 changes: 9 additions & 1 deletion tests/core_tests/masking_test/test_masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,20 @@ def test_masking(self):
masked_message = masking(input_msg, masking_fields)
self.assertEqual(expected, masked_message)

# если маскируемое поле окажется внутри банковского поля - то оно маскируется с заданной вложеностью
# если маскируемое поле окажется внутри банковского поля - то оно маскируется с заданной вложенностью
input_msg = {'message': {'spec_token': ['12', ['12', {'data': {'key': '12'}}]]}}
expected = {'message': {'spec_token': ['***', ['***', '*items-1*collections-1*maxdepth-2*']]}}
masked_message = masking(input_msg, masking_fields)
self.assertEqual(expected, masked_message)

def test_white_list(self):
masking_fields = ["spec_token"]
input_msg = {"spec_token": {"a": "a", "b": "b"}}
white_list = ["b"]
expected = {"spec_token": {"a": "***", "b": "b"}}
masked_message = masking(input_msg, masking_fields, white_list=white_list)
self.assertEqual(expected, masked_message)

def test_depth(self):
# вложенность любой длины не маскируется пока не встретим ключ для маскировки
masking_fields = ['spec_token']
Expand Down

0 comments on commit 012cf42

Please sign in to comment.