diff --git a/ddtrace/appsec/_capabilities.py b/ddtrace/appsec/_capabilities.py new file mode 100644 index 00000000000..698b32a6159 --- /dev/null +++ b/ddtrace/appsec/_capabilities.py @@ -0,0 +1,60 @@ +import base64 +import os +import sys +from typing import Optional + +from ddtrace import Tracer +from ddtrace.appsec.utils import _appsec_rc_features_is_enabled +from ddtrace.internal.compat import to_bytes_py2 +from ddtrace.internal.utils.formats import asbool + + +def _appsec_rc_file_is_not_static(): + return "DD_APPSEC_RULES" not in os.environ + + +def _appsec_rc_capabilities(test_tracer=None): + # type: (Optional[Tracer]) -> str + r"""return the bit representation of the composed capabilities in base64 + bit 0: Reserved + bit 1: ASM 1-click Activation + bit 2: ASM Ip blocking + + Int Number -> binary number -> bytes representation -> base64 representation + ASM Activation: + 2 -> 10 -> b'\x02' -> "Ag==" + ASM Ip blocking: + 4 -> 100 -> b'\x04' -> "BA==" + ASM Activation and ASM Ip blocking: + 6 -> 110 -> b'\x06' -> "Bg==" + ... + 256 -> 100000000 -> b'\x01\x00' -> b'AQA=' + """ + if test_tracer is None: + from ddtrace import tracer + else: + tracer = test_tracer + + value = 0b0 + result = "" + if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")): + if _appsec_rc_features_is_enabled(): + value |= 1 << 1 # Enable ASM_ACTIVATION + if tracer._appsec_processor and _appsec_rc_file_is_not_static(): + value |= 1 << 2 # Enable ASM_IP_BLOCKING + value |= 1 << 3 # Enable ASM_DD_RULES + value |= 1 << 4 # Enable ASM_EXCLUSIONS + value |= 1 << 5 # Enable ASM_REQUEST_BLOCKING + value |= 1 << 6 # Enable ASM_ASM_RESPONSE_BLOCKING + value |= 1 << 7 # Enable ASM_USER_BLOCKING + value |= 1 << 8 # Enable ASM_CUSTOM_RULES + value |= 1 << 9 # Enable ASM_CUSTOM_BLOCKING_RESPONSE + + if sys.version_info.major < 3: + bytes_res = to_bytes_py2(value, (value.bit_length() + 7) // 8, "big") + # "type: ignore" because mypy does not notice this is for Python2 b64encode + result = str(base64.b64encode(bytes_res)) # type: ignore + else: + result = str(base64.b64encode(value.to_bytes((value.bit_length() + 7) // 8, "big")), encoding="utf-8") + + return result diff --git a/ddtrace/appsec/_remoteconfiguration.py b/ddtrace/appsec/_remoteconfiguration.py index da2aefbcb8b..63e3b6aae51 100644 --- a/ddtrace/appsec/_remoteconfiguration.py +++ b/ddtrace/appsec/_remoteconfiguration.py @@ -3,9 +3,9 @@ from typing import TYPE_CHECKING from ddtrace import config +from ddtrace.appsec._capabilities import _appsec_rc_file_is_not_static from ddtrace.appsec._constants import PRODUCTS from ddtrace.appsec.utils import _appsec_rc_features_is_enabled -from ddtrace.appsec.utils import _appsec_rc_file_is_not_static from ddtrace.constants import APPSEC_ENV from ddtrace.internal.logger import get_logger from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector diff --git a/ddtrace/appsec/processor.py b/ddtrace/appsec/processor.py index b05d6a911bb..4e572aeb33d 100644 --- a/ddtrace/appsec/processor.py +++ b/ddtrace/appsec/processor.py @@ -11,6 +11,7 @@ from ddtrace import config from ddtrace.appsec import _asm_request_context +from ddtrace.appsec._capabilities import _appsec_rc_file_is_not_static from ddtrace.appsec._constants import APPSEC from ddtrace.appsec._constants import DEFAULT from ddtrace.appsec._constants import SPAN_DATA_NAMES @@ -24,7 +25,6 @@ from ddtrace.appsec.ddwaf import DDWaf from ddtrace.appsec.ddwaf import version from ddtrace.appsec.trace_utils import _asm_manual_keep -from ddtrace.appsec.utils import _appsec_rc_file_is_not_static from ddtrace.constants import ORIGIN_KEY from ddtrace.constants import RUNTIME_FAMILY from ddtrace.contrib import trace_utils diff --git a/ddtrace/appsec/utils.py b/ddtrace/appsec/utils.py index 3f59fe33f12..8d6895f801c 100644 --- a/ddtrace/appsec/utils.py +++ b/ddtrace/appsec/utils.py @@ -1,6 +1,4 @@ -import base64 import os -import sys import time from typing import TYPE_CHECKING @@ -8,7 +6,6 @@ from ddtrace.appsec._constants import API_SECURITY from ddtrace.constants import APPSEC_ENV from ddtrace.internal.compat import parse -from ddtrace.internal.compat import to_bytes_py2 from ddtrace.internal.logger import get_logger from ddtrace.internal.utils.formats import asbool from ddtrace.internal.utils.http import _get_blocked_template # noqa @@ -17,73 +14,13 @@ if TYPE_CHECKING: # pragma: no cover from typing import Any from typing import Dict - from typing import Optional - from ddtrace import Tracer from ddtrace.internal.compat import text_type as unicode log = get_logger(__name__) -def _appsec_rc_features_is_enabled(): - # type: () -> bool - if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")): - return APPSEC_ENV not in os.environ - return False - - -def _appsec_rc_file_is_not_static(): - return "DD_APPSEC_RULES" not in os.environ - - -def _appsec_rc_capabilities(test_tracer=None): - # type: (Optional[Tracer]) -> str - r"""return the bit representation of the composed capabilities in base64 - bit 0: Reserved - bit 1: ASM 1-click Activation - bit 2: ASM Ip blocking - - Int Number -> binary number -> bytes representation -> base64 representation - ASM Activation: - 2 -> 10 -> b'\x02' -> "Ag==" - ASM Ip blocking: - 4 -> 100 -> b'\x04' -> "BA==" - ASM Activation and ASM Ip blocking: - 6 -> 110 -> b'\x06' -> "Bg==" - ... - 256 -> 100000000 -> b'\x01\x00' -> b'AQA=' - """ - if test_tracer is None: - from ddtrace import tracer - else: - tracer = test_tracer - - value = 0b0 - result = "" - if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")): - if _appsec_rc_features_is_enabled(): - value |= 1 << 1 # Enable ASM_ACTIVATION - if tracer._appsec_processor and _appsec_rc_file_is_not_static(): - value |= 1 << 2 # Enable ASM_IP_BLOCKING - value |= 1 << 3 # Enable ASM_DD_RULES - value |= 1 << 4 # Enable ASM_EXCLUSIONS - value |= 1 << 5 # Enable ASM_REQUEST_BLOCKING - value |= 1 << 6 # Enable ASM_ASM_RESPONSE_BLOCKING - value |= 1 << 7 # Enable ASM_USER_BLOCKING - value |= 1 << 8 # Enable ASM_CUSTOM_RULES - value |= 1 << 9 # Enable ASM_CUSTOM_BLOCKING_RESPONSE - - if sys.version_info.major < 3: - bytes_res = to_bytes_py2(value, (value.bit_length() + 7) // 8, "big") - # "type: ignore" because mypy does not notice this is for Python2 b64encode - result = str(base64.b64encode(bytes_res)) # type: ignore - else: - result = str(base64.b64encode(value.to_bytes((value.bit_length() + 7) // 8, "big")), encoding="utf-8") - - return result - - def parse_form_params(body): # type: (unicode) -> dict[unicode, unicode|list[unicode]] """Return a dict of form data after HTTP form parsing""" @@ -212,3 +149,10 @@ def __call__(self, *args, **kwargs): result = self.func(*args, **kwargs) self.reported_logs[raw_log_hash] = time.time() + self._time_lapse return result + + +def _appsec_rc_features_is_enabled(): + # type: () -> bool + if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")): + return APPSEC_ENV not in os.environ + return False diff --git a/ddtrace/internal/remoteconfig/client.py b/ddtrace/internal/remoteconfig/client.py index 44cca116d24..2fc02392227 100644 --- a/ddtrace/internal/remoteconfig/client.py +++ b/ddtrace/internal/remoteconfig/client.py @@ -19,7 +19,7 @@ import six import ddtrace -from ddtrace.appsec.utils import _appsec_rc_capabilities +from ddtrace.appsec._capabilities import _appsec_rc_capabilities from ddtrace.internal import agent from ddtrace.internal import runtime from ddtrace.internal.hostname import get_hostname diff --git a/tests/appsec/test_remoteconfiguration.py b/tests/appsec/test_remoteconfiguration.py index 52d60cd57d0..21b517ee300 100644 --- a/tests/appsec/test_remoteconfiguration.py +++ b/tests/appsec/test_remoteconfiguration.py @@ -10,6 +10,7 @@ import pytest from ddtrace.appsec import _asm_request_context +from ddtrace.appsec._capabilities import _appsec_rc_capabilities from ddtrace.appsec._constants import APPSEC from ddtrace.appsec._constants import DEFAULT from ddtrace.appsec._constants import PRODUCTS @@ -18,7 +19,6 @@ from ddtrace.appsec._remoteconfiguration import _preprocess_results_appsec_1click_activation from ddtrace.appsec._remoteconfiguration import enable_appsec_rc from ddtrace.appsec.processor import AppSecSpanProcessor -from ddtrace.appsec.utils import _appsec_rc_capabilities from ddtrace.appsec.utils import _appsec_rc_features_is_enabled from ddtrace.contrib.trace_utils import set_http_meta from ddtrace.ext import SpanTypes