Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move some functions to a new appsec/_capabilities.py file to avoid cyclic imports #6700

Merged
merged 9 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions ddtrace/appsec/_capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import base64
import os
import sys
from typing import Optional

from ddtrace import Tracer
from ddtrace.appsec.utils import _appsec_rc_features_is_enabled
from ddtrace.internal.compat import to_bytes_py2
from ddtrace.internal.utils.formats import asbool


def _appsec_rc_file_is_not_static():
return "DD_APPSEC_RULES" not in os.environ


def _appsec_rc_capabilities(test_tracer=None):
# type: (Optional[Tracer]) -> str
r"""return the bit representation of the composed capabilities in base64
bit 0: Reserved
bit 1: ASM 1-click Activation
bit 2: ASM Ip blocking

Int Number -> binary number -> bytes representation -> base64 representation
ASM Activation:
2 -> 10 -> b'\x02' -> "Ag=="
ASM Ip blocking:
4 -> 100 -> b'\x04' -> "BA=="
ASM Activation and ASM Ip blocking:
6 -> 110 -> b'\x06' -> "Bg=="
...
256 -> 100000000 -> b'\x01\x00' -> b'AQA='
"""
if test_tracer is None:
from ddtrace import tracer
else:
tracer = test_tracer

value = 0b0
result = ""
if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")):
if _appsec_rc_features_is_enabled():
value |= 1 << 1 # Enable ASM_ACTIVATION
if tracer._appsec_processor and _appsec_rc_file_is_not_static():
value |= 1 << 2 # Enable ASM_IP_BLOCKING
value |= 1 << 3 # Enable ASM_DD_RULES
value |= 1 << 4 # Enable ASM_EXCLUSIONS
value |= 1 << 5 # Enable ASM_REQUEST_BLOCKING
value |= 1 << 6 # Enable ASM_ASM_RESPONSE_BLOCKING
value |= 1 << 7 # Enable ASM_USER_BLOCKING
value |= 1 << 8 # Enable ASM_CUSTOM_RULES
value |= 1 << 9 # Enable ASM_CUSTOM_BLOCKING_RESPONSE

if sys.version_info.major < 3:
bytes_res = to_bytes_py2(value, (value.bit_length() + 7) // 8, "big")
# "type: ignore" because mypy does not notice this is for Python2 b64encode
result = str(base64.b64encode(bytes_res)) # type: ignore
else:
result = str(base64.b64encode(value.to_bytes((value.bit_length() + 7) // 8, "big")), encoding="utf-8")

return result
2 changes: 1 addition & 1 deletion ddtrace/appsec/_remoteconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from typing import TYPE_CHECKING

from ddtrace import config
from ddtrace.appsec._capabilities import _appsec_rc_file_is_not_static
from ddtrace.appsec._constants import PRODUCTS
from ddtrace.appsec.utils import _appsec_rc_features_is_enabled
from ddtrace.appsec.utils import _appsec_rc_file_is_not_static
from ddtrace.constants import APPSEC_ENV
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/appsec/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from ddtrace import config
from ddtrace.appsec import _asm_request_context
from ddtrace.appsec._capabilities import _appsec_rc_file_is_not_static
from ddtrace.appsec._constants import APPSEC
from ddtrace.appsec._constants import DEFAULT
from ddtrace.appsec._constants import SPAN_DATA_NAMES
Expand All @@ -24,7 +25,6 @@
from ddtrace.appsec.ddwaf import DDWaf
from ddtrace.appsec.ddwaf import version
from ddtrace.appsec.trace_utils import _asm_manual_keep
from ddtrace.appsec.utils import _appsec_rc_file_is_not_static
from ddtrace.constants import ORIGIN_KEY
from ddtrace.constants import RUNTIME_FAMILY
from ddtrace.contrib import trace_utils
Expand Down
70 changes: 7 additions & 63 deletions ddtrace/appsec/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import base64
import os
import sys
import time
from typing import TYPE_CHECKING

from ddtrace.appsec import _asm_request_context
from ddtrace.appsec._constants import API_SECURITY
from ddtrace.constants import APPSEC_ENV
from ddtrace.internal.compat import parse
from ddtrace.internal.compat import to_bytes_py2
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.http import _get_blocked_template # noqa
Expand All @@ -17,73 +14,13 @@
if TYPE_CHECKING: # pragma: no cover
from typing import Any
from typing import Dict
from typing import Optional

from ddtrace import Tracer
from ddtrace.internal.compat import text_type as unicode


log = get_logger(__name__)


def _appsec_rc_features_is_enabled():
# type: () -> bool
if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")):
return APPSEC_ENV not in os.environ
return False


def _appsec_rc_file_is_not_static():
return "DD_APPSEC_RULES" not in os.environ


def _appsec_rc_capabilities(test_tracer=None):
# type: (Optional[Tracer]) -> str
r"""return the bit representation of the composed capabilities in base64
bit 0: Reserved
bit 1: ASM 1-click Activation
bit 2: ASM Ip blocking

Int Number -> binary number -> bytes representation -> base64 representation
ASM Activation:
2 -> 10 -> b'\x02' -> "Ag=="
ASM Ip blocking:
4 -> 100 -> b'\x04' -> "BA=="
ASM Activation and ASM Ip blocking:
6 -> 110 -> b'\x06' -> "Bg=="
...
256 -> 100000000 -> b'\x01\x00' -> b'AQA='
"""
if test_tracer is None:
from ddtrace import tracer
else:
tracer = test_tracer

value = 0b0
result = ""
if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")):
if _appsec_rc_features_is_enabled():
value |= 1 << 1 # Enable ASM_ACTIVATION
if tracer._appsec_processor and _appsec_rc_file_is_not_static():
value |= 1 << 2 # Enable ASM_IP_BLOCKING
value |= 1 << 3 # Enable ASM_DD_RULES
value |= 1 << 4 # Enable ASM_EXCLUSIONS
value |= 1 << 5 # Enable ASM_REQUEST_BLOCKING
value |= 1 << 6 # Enable ASM_ASM_RESPONSE_BLOCKING
value |= 1 << 7 # Enable ASM_USER_BLOCKING
value |= 1 << 8 # Enable ASM_CUSTOM_RULES
value |= 1 << 9 # Enable ASM_CUSTOM_BLOCKING_RESPONSE

if sys.version_info.major < 3:
bytes_res = to_bytes_py2(value, (value.bit_length() + 7) // 8, "big")
# "type: ignore" because mypy does not notice this is for Python2 b64encode
result = str(base64.b64encode(bytes_res)) # type: ignore
else:
result = str(base64.b64encode(value.to_bytes((value.bit_length() + 7) // 8, "big")), encoding="utf-8")

return result


def parse_form_params(body):
# type: (unicode) -> dict[unicode, unicode|list[unicode]]
"""Return a dict of form data after HTTP form parsing"""
Expand Down Expand Up @@ -212,3 +149,10 @@ def __call__(self, *args, **kwargs):
result = self.func(*args, **kwargs)
self.reported_logs[raw_log_hash] = time.time() + self._time_lapse
return result


def _appsec_rc_features_is_enabled():
# type: () -> bool
if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")):
return APPSEC_ENV not in os.environ
return False
2 changes: 1 addition & 1 deletion ddtrace/internal/remoteconfig/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import six

import ddtrace
from ddtrace.appsec.utils import _appsec_rc_capabilities
from ddtrace.appsec._capabilities import _appsec_rc_capabilities
from ddtrace.internal import agent
from ddtrace.internal import runtime
from ddtrace.internal.hostname import get_hostname
Expand Down
2 changes: 1 addition & 1 deletion tests/appsec/test_remoteconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest

from ddtrace.appsec import _asm_request_context
from ddtrace.appsec._capabilities import _appsec_rc_capabilities
from ddtrace.appsec._constants import APPSEC
from ddtrace.appsec._constants import DEFAULT
from ddtrace.appsec._constants import PRODUCTS
Expand All @@ -18,7 +19,6 @@
from ddtrace.appsec._remoteconfiguration import _preprocess_results_appsec_1click_activation
from ddtrace.appsec._remoteconfiguration import enable_appsec_rc
from ddtrace.appsec.processor import AppSecSpanProcessor
from ddtrace.appsec.utils import _appsec_rc_capabilities
from ddtrace.appsec.utils import _appsec_rc_features_is_enabled
from ddtrace.contrib.trace_utils import set_http_meta
from ddtrace.ext import SpanTypes
Expand Down