From a76bc423294d5e87a45717497b89158b4b5d7e8f Mon Sep 17 00:00:00 2001 From: Dylan Pulver Date: Fri, 28 Jun 2024 15:44:12 -0400 Subject: [PATCH] parent f15d7908d27fd887dcc6b31237b8e3df79a9359b author Dylan Pulver 1719603852 -0400 committer Dylan Pulver 1720552637 -0400 gpgsig -----BEGIN PGP SIGNATURE----- iHUEABYKAB0WIQQVa+zaLSZxm3/LxmooEm7VNVc3KwUCZo2MvQAKCRAoEm7VNVc3 Kw3EAP0acAlkZ3XYxvP5P8H4rAx4XXhAOp1Sv7OnQu/nbuPhIgEAitluAgNBdtDU jrt9pNq7Y1I76KOuKS+8VRewmuYJUQI= =bYzZ -----END PGP SIGNATURE----- fix(debug): fix --debug flag and associated tests - Fix --debug flag functionality - Add tests using runner.invoke - Fix and modify fetch_database function - Ensure all tests pass with appropriate patchings Fix --debug flag remove duplicate block undo linting remove test_debug file modify fetch_database fix(cache): handle get_from_cache=None and ensure directory exists (#544) Signed-off-by: Dylan Pulver chore: release 3.2.4 (#545) refactor: Jeff refactor suggestions Switch to use main() function refactor: adjust to cli() and cli_internal() refactor: fix click group to be on new cli() func fix: fix function order fix: fix function order fix: fix tests for debug flag changes remove uneeded files don't call dummy funcs fix(debug): fix --debug flag and associated tests - Fix --debug flag functionality - Add tests using runner.invoke - Fix and modify fetch_database function - Ensure all tests pass with appropriate patchings Fix --debug flag undo linting --- CHANGELOG.md | 25 ++++--- safety/VERSION | 2 +- safety/cli.py | 56 +++++++++------ safety/safety.py | 98 +++++++++++++++------------ safety/scan/ecosystems/python/main.py | 91 ++++++++++++++----------- setup.cfg | 1 + test_requirements.txt | 1 + tests/test_cli.py | 55 ++++++++++++++- tests/test_debug.py | 19 ++++++ tests/test_safety.py | 3 + 10 files changed, 230 insertions(+), 121 deletions(-) create mode 100644 tests/test_debug.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d435402..2404f6ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is partly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [PEP 440](https://peps.python.org/pep-0440/) +## [3.2.4] - 2024-07-04 +- Handle `get_from_cache=None` and ensure directory exists (#538) +- Switch filelock package to compatible release clause (#538) +- Add filelock to `install_requires` (#538) + ## [3.2.3] - 2024-06-10 - Increase request timeout to 30 seconds (#535) - fix: fail on none severities (#534) @@ -38,22 +43,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Main updates - Added scan command, which scans a project’s directory for all Python dependencies and includes many improvements over the `check` command, including automatic Python project scanning, native support for Poetry and Pipenv files, Python virtual environment folders, and more granular configuration options. -- Added auth commands, enabling new browser-based authentication of Safety CLI. +- Added auth commands, enabling new browser-based authentication of Safety CLI. - An updated safety policy file schema to support new scan and system-scan commands. This policy file schema is a breaking change from the policy schema used for `safety check`. To migrate a Safety 2.x policy, see [Migrating from Safety 2.x to Safety CLI 3](https://docs.safetycli.com/safety-docs/safety-cli-3/migrating-from-safety-cli-2.x-to-safety-cli-3.x). - Updated screen output to modern interactive interface, with new help interfaces. -- Updated to new JSON output structure to support new scan command, other ecosystems, and other security findings. +- Updated to new JSON output structure to support new scan command, other ecosystems, and other security findings. - Added a supporting [safety-schemas project dependency](https://pypi.org/project/safety-schemas/), also published and maintained by Safety, which defines Safety vulnerability database file, Safety CLI policy file, and Safety CLI JSON output schemas as pydantic models, formalizing these into testable and versioned schemas. ### New scan command: - New scan command: scans a Python project directory for Python dependencies and security vulnerabilities. Safety scan replaces `safety check` with a more powerful and easier to use command. The scan command: - Finds and scans Python dependency files and virtual environments inside the target directory without needing to specify file or environment locations. - Adds native scanning and reporting for Poetry and Pipenv manifest files, and Python virtual environment folders. -- Adds configuration of scanning rules to; +- Adds configuration of scanning rules to; - exclude files and folders from the scan using Unix shell-style wildcards only - Include files to be scanned - Max folder depth setting - Reporting configuration rules - - Reporting rules defining which types and specific vulnerabilities to include or ignore stay the same as safety 2.x, although now in a slightly different structure. + - Reporting rules defining which types and specific vulnerabilities to include or ignore stay the same as safety 2.x, although now in a slightly different structure. - Failing rules - Adds ability to configure rules for when safety should return a non-zero (failing) exit code, which can be different from reporting rules under the `report` field. - Auto-updating rules @@ -63,9 +68,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added auth command: manages Safety CLI’s authentication in development environments, allowing easy authentication via the browser. - auth login - adds ability to authenticate safety cli via the browser - auth register - adds ability to register for a Safety account via the CLI, and get scanning within minutes - - auth status - - - auth logout - - - `safety check` command can still be used with the API key --key argument, and scan and system-scan commands should also be + - auth status - + - auth logout - + - `safety check` command can still be used with the API key --key argument, and scan and system-scan commands should also be - Added configure command: configures safety cli using a config.ini file, either saved to the user settings or system settings. This can be used to configure safety’s authentication methods and global proxy details. - Added system-scan command (beta): Adds the system-scan command, which scans a machine for Python files and environments, reporting these to screen output. system-scan is an experimental beta feature that can scan an entire drive or machine for Python dependency files and Python virtual environments, reporting on packages found and their associated security vulnerabilities. - Added check-updates command: Check for version updates to Safety CLI, and supports screen and JSON format outputs. Can be used in organizations to test and rollout new version updates as recommended by Safety Cybersecurity. @@ -73,13 +78,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### New policy file schema for scan and system-scan commands - New policy file schema to support safety scan and safety system-scan. Adds scanning-settings root property, which contains settings to configure rules and settings for how safety traverses the directory and subdirectories being scanned, including “exclude” rules, “include” rules, the max directory depth to scan and which root directories safety system-scan should start from. -- Adds report root property, which defines which vulnerability findings safety should auto-ignore (exclude) in its reporting. Supports excluding vulnerability IDs manually, as well as vulnerability groups to ignore based on CVSS severity score. +- Adds report root property, which defines which vulnerability findings safety should auto-ignore (exclude) in its reporting. Supports excluding vulnerability IDs manually, as well as vulnerability groups to ignore based on CVSS severity score. - Adds new fail-scan-with-exit-code root property, which defines when safety should exit with a failing exit code. This separates safety’s reporting rules from its failing exit code rules, which is a departure from Safety 2.x which had combined rulesets for these. Failing exit codes can be configured based on CVSS severity score. - Note that the old `safety check` command still supports and relies on the policy schema from safety 2.3.5 and below, meaning no changes are required when migrating to safety 2.x to Safety 3.0.0 when only using the `safety check` command. ### New global options and configurations -- Added global --stage option, to set the development lifecycle stage for the `scan` and `system-scan` commands. -- Added global --key option, to set a Safety API key for any command, including scan, system-scan and check. +- Added global --stage option, to set the development lifecycle stage for the `scan` and `system-scan` commands. +- Added global --key option, to set a Safety API key for any command, including scan, system-scan and check. ### Other - Safety now requires Python>=3.7. Python 3.7 doesn't have active security support from the Python foundation, and we recommend upgrading to at least Python >= 3.8 whenever possible. Safety’s 3.0.0 Docker image can still be used to scan and secure all Python projects, regardless of Python version. Refer to our [Documentation](https://docs.safetycli.com) for details. diff --git a/safety/VERSION b/safety/VERSION index b347b11e..351227fc 100644 --- a/safety/VERSION +++ b/safety/VERSION @@ -1 +1 @@ -3.2.3 +3.2.4 diff --git a/safety/cli.py b/safety/cli.py index 6edef4dc..19018479 100644 --- a/safety/cli.py +++ b/safety/cli.py @@ -47,29 +47,41 @@ from typing_extensions import Annotated -LOG = logging.getLogger(__name__) -def configure_logger(ctx, param, debug): + +LOG = logging.getLogger(__name__) + +def configure_logger(ctx, param, value): level = logging.CRITICAL - - if debug: + if value: level = logging.DEBUG + logging.basicConfig(format='%(asctime)s %(name)s => %(message)s', level=level) + return value - logging.basicConfig(format='%(asctime)s %(name)s => %(message)s', level=level) +def preprocess_args(f): + if '--debug' in sys.argv: + index = sys.argv.index('--debug') + if len(sys.argv) > index + 1: + next_arg = sys.argv[index + 1] + if next_arg in ('1', 'true'): + sys.argv.pop(index + 1) # Remove the next argument (1 or true) + return f @click.group(cls=SafetyCLILegacyGroup, help=CLI_MAIN_INTRODUCTION, epilog=DEFAULT_EPILOG) @auth_options() @proxy_options @click.option('--disable-optional-telemetry', default=False, is_flag=True, show_default=True, help=CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP) -@click.option('--debug', default=False, help=CLI_DEBUG_HELP, callback=configure_logger) +@click.option('--debug', is_flag=True, help=CLI_DEBUG_HELP, callback=configure_logger) @click.version_option(version=get_safety_version()) @click.pass_context @inject_session +@preprocess_args def cli(ctx, debug, disable_optional_telemetry): """ Scan and secure Python projects against package vulnerabilities. To get started navigate to a Python project and run `safety scan`. """ + SafetyContext().safety_source = 'cli' telemetry = not disable_optional_telemetry ctx.obj.config = ConfigModel(telemetry_enabled=telemetry) @@ -107,7 +119,7 @@ def inner(ctx, *args, **kwargs): kwargs.pop('proxy_protocol', None) kwargs.pop('proxy_host', None) kwargs.pop('proxy_port', None) - + if ctx.get_parameter_source("json_version") != click.core.ParameterSource.DEFAULT and not ( save_json or json or output == 'json'): raise click.UsageError( @@ -128,8 +140,8 @@ def inner(ctx, *args, **kwargs): proxy_dictionary=None) audit_and_monitor = (audit_and_monitor and server_audit_and_monitor) - kwargs.update({"auto_remediation_limit": auto_remediation_limit, - "policy_file":policy_file, + kwargs.update({"auto_remediation_limit": auto_remediation_limit, + "policy_file":policy_file, "audit_and_monitor": audit_and_monitor}) except SafetyError as e: @@ -441,18 +453,18 @@ def validate(ctx, name, version, path): if not os.path.exists(path): click.secho(f'The path "{path}" does not exist.', fg='red', file=sys.stderr) sys.exit(EXIT_CODE_FAILURE) - + if version not in ["3.0", "2.0", None]: click.secho(f'Version "{version}" is not a valid value, allowed values are 3.0 and 2.0. Use --path to specify the target file.', fg='red', file=sys.stderr) sys.exit(EXIT_CODE_FAILURE) - + def fail_validation(e): click.secho(str(e).lstrip(), fg='red', file=sys.stderr) sys.exit(EXIT_CODE_FAILURE) if not version: version = "3.0" - + result = "" if version == "3.0": @@ -463,7 +475,7 @@ def fail_validation(e): policy = load_policy_file(Path(path)) except Exception as e: fail_validation(e) - + click.secho(f"The Safety policy ({version}) file " \ "(Used for scan and system-scan commands) " \ "was successfully parsed " \ @@ -478,18 +490,18 @@ def fail_validation(e): sys.exit(EXIT_CODE_FAILURE) del values['raw'] - + result = json.dumps(values, indent=4, default=str) click.secho("The Safety policy file " \ "(Valid only for the check command) " \ "was successfully parsed with the " \ "following values:", fg="green") - + console.print_json(result) -@cli.command(cls=SafetyCLILegacyCommand, +@cli.command(cls=SafetyCLILegacyCommand, help=CLI_CONFIGURE_HELP, utility_command=True) @click.option("--proxy-protocol", "-pr", type=click.Choice(['http', 'https']), default='https', cls=DependentOption, @@ -519,8 +531,8 @@ def fail_validation(e): @click.option("--save-to-system/--save-to-user", default=False, is_flag=True, help=CLI_CONFIGURE_SAVE_TO_SYSTEM) @click.pass_context -def configure(ctx, proxy_protocol, proxy_host, proxy_port, proxy_timeout, - proxy_required, organization_id, organization_name, stage, +def configure(ctx, proxy_protocol, proxy_host, proxy_port, proxy_timeout, + proxy_required, organization_id, organization_name, stage, save_to_system): """ Configure global settings, like proxy settings and organization details @@ -565,7 +577,7 @@ def configure(ctx, proxy_protocol, proxy_host, proxy_port, proxy_timeout, 'host': proxy_host, 'port': str(proxy_port) }) - + if not config.has_section(PROXY_SECTION_NAME): config.add_section(PROXY_SECTION_NAME) @@ -669,7 +681,7 @@ def check_updates(ctx: typer.Context, if not data: raise SafetyException("No data found.") - + console.print("[green]Safety CLI is authenticated:[/green]") from rich.padding import Padding @@ -696,7 +708,7 @@ def check_updates(ctx: typer.Context, f"If Safety was installed from a requirements file, update Safety to version {latest_available_version} in that requirements file." ) console.print() - # `pip -i install safety=={latest_available_version}` OR + # `pip -i install safety=={latest_available_version}` OR console.print(f"Pip: To install the updated version of Safety directly via pip, run: `pip install safety=={latest_available_version}`") if console.quiet: @@ -717,5 +729,5 @@ def check_updates(ctx: typer.Context, cli.add_command(alert) -if __name__ == "__main__": +if __name__ == "__main__": cli() diff --git a/safety/safety.py b/safety/safety.py index a368561b..6544bbe3 100644 --- a/safety/safety.py +++ b/safety/safety.py @@ -12,7 +12,7 @@ import time from collections import defaultdict from datetime import datetime -from typing import Dict, Optional, List +from typing import Dict, Optional, List, Any import click import requests @@ -21,6 +21,7 @@ from packaging.utils import canonicalize_name from packaging.version import parse as parse_version, Version from pydantic.json import pydantic_encoder +from filelock import FileLock from safety_schemas.models import Ecosystem, FileType @@ -41,18 +42,23 @@ LOG = logging.getLogger(__name__) -def get_from_cache(db_name, cache_valid_seconds=0, skip_time_verification=False): - if os.path.exists(DB_CACHE_FILE): - with open(DB_CACHE_FILE) as f: - try: - data = json.loads(f.read()) +def get_from_cache(db_name: str, cache_valid_seconds: int = 0, skip_time_verification: bool = False) -> Optional[Dict[str, Any]]: + cache_file_lock = f"{DB_CACHE_FILE}.lock" + os.makedirs(os.path.dirname(cache_file_lock), exist_ok=True) + with FileLock(cache_file_lock, timeout=10) as lock: + if os.path.exists(DB_CACHE_FILE): + with open(DB_CACHE_FILE) as f: + try: + data = json.loads(f.read()) + except json.JSONDecodeError: + LOG.debug('JSONDecodeError trying to get the cached database.') if db_name in data: if "cached_at" in data[db_name]: if data[db_name]["cached_at"] + cache_valid_seconds > time.time() or skip_time_verification: LOG.debug('Getting the database from cache at %s, cache setting: %s', - data[db_name]["cached_at"], cache_valid_seconds) - + data[db_name]["cached_at"], cache_valid_seconds) + try: data[db_name]["db"]["meta"]["base_domain"] = "https://data.safetycli.com" except KeyError as e: @@ -63,12 +69,9 @@ def get_from_cache(db_name, cache_valid_seconds=0, skip_time_verification=False) LOG.debug('Cached file is too old, it was cached at %s', data[db_name]["cached_at"]) else: LOG.debug('There is not the cached_at key in %s database', data[db_name]) - - except json.JSONDecodeError: - LOG.debug('JSONDecodeError trying to get the cached database.') - else: - LOG.debug("Cache file doesn't exist...") - return False + else: + LOG.debug("Cache file doesn't exist...") + return None def write_to_cache(db_name, data): @@ -95,25 +98,30 @@ def write_to_cache(db_name, data): if exc.errno != errno.EEXIST: raise - with open(DB_CACHE_FILE, "r") as f: - try: - cache = json.loads(f.read()) - except json.JSONDecodeError: - LOG.debug('JSONDecodeError in the local cache, dumping the full cache file.') + cache_file_lock = f"{DB_CACHE_FILE}.lock" + with FileLock(cache_file_lock, timeout=10) as lock: + if os.path.exists(DB_CACHE_FILE): + with open(DB_CACHE_FILE, "r") as f: + try: + cache = json.loads(f.read()) + except json.JSONDecodeError: + LOG.debug('JSONDecodeError in the local cache, dumping the full cache file.') + cache = {} + else: cache = {} - with open(DB_CACHE_FILE, "w") as f: - cache[db_name] = { - "cached_at": time.time(), - "db": data - } - f.write(json.dumps(cache)) - LOG.debug('Safety updated the cache file for %s database.', db_name) + with open(DB_CACHE_FILE, "w") as f: + cache[db_name] = { + "cached_at": time.time(), + "db": data + } + f.write(json.dumps(cache)) + LOG.debug('Safety updated the cache file for %s database.', db_name) def fetch_database_url(session, mirror, db_name, cached, telemetry=True, ecosystem: Ecosystem = Ecosystem.PYTHON, from_cache=True): - headers = {'schema-version': JSON_SCHEMA_VERSION, 'ecosystem': ecosystem.value} + headers = {'schema-version': JSON_SCHEMA_VERSION, 'ecosystem': ecosystem.value} if cached and from_cache: cached_data = get_from_cache(db_name=db_name, cache_valid_seconds=cached) @@ -122,13 +130,13 @@ def fetch_database_url(session, mirror, db_name, cached, telemetry=True, return cached_data url = mirror + db_name - + telemetry_data = { - 'telemetry': json.dumps(build_telemetry_data(telemetry=telemetry), + 'telemetry': json.dumps(build_telemetry_data(telemetry=telemetry), default=pydantic_encoder)} try: - r = session.get(url=url, timeout=REQUEST_TIMEOUT, + r = session.get(url=url, timeout=REQUEST_TIMEOUT, headers=headers, params=telemetry_data) except requests.exceptions.ConnectionError: raise NetworkConnectionError() @@ -205,10 +213,10 @@ def fetch_database_file(path: str, db_name: str, cached = 0, if not full_path.exists(): raise DatabaseFileNotFoundError(db=path) - + with open(full_path) as f: data = json.loads(f.read()) - + if cached: LOG.info('Writing %s to cache because cached value was %s', db_name, cached) write_to_cache(db_name, data) @@ -226,7 +234,7 @@ def is_valid_database(db) -> bool: return False -def fetch_database(session, full=False, db=False, cached=0, telemetry=True, +def fetch_database(session, full=False, db=False, cached=0, telemetry=True, ecosystem: Optional[Ecosystem] = None, from_cache=True): if session.is_using_auth_credentials(): @@ -242,7 +250,7 @@ def fetch_database(session, full=False, db=False, cached=0, telemetry=True, if is_a_remote_mirror(mirror): if ecosystem is None: ecosystem = Ecosystem.PYTHON - data = fetch_database_url(session, mirror, db_name=db_name, cached=cached, + data = fetch_database_url(session, mirror, db_name=db_name, cached=cached, telemetry=telemetry, ecosystem=ecosystem, from_cache=from_cache) else: data = fetch_database_file(mirror, db_name=db_name, cached=cached, @@ -562,16 +570,16 @@ def compute_sec_ver(remediations, packages: Dict[str, Package], secure_vulns_by_ secure_v = compute_sec_ver_for_user(package=pkg, secure_vulns_by_user=secure_vulns_by_user, db_full=db_full) rem['closest_secure_version'] = get_closest_ver(secure_v, version, spec) - + upgrade = rem['closest_secure_version'].get('upper', None) downgrade = rem['closest_secure_version'].get('lower', None) recommended_version = None - + if upgrade: recommended_version = upgrade elif downgrade: recommended_version = downgrade - + rem['recommended_version'] = recommended_version rem['other_recommended_versions'] = [other_v for other_v in secure_v if other_v != str(recommended_version)] @@ -645,12 +653,12 @@ def process_fixes(files, remediations, auto_remediation_limit, output, no_output def process_fixes_scan(file_to_fix, to_fix_spec, auto_remediation_limit, output, no_output=True, prompt=False): to_fix_remediations = [] - + def get_remmediation_from(spec): upper = None lower = None recommended = None - + try: upper = Version(spec.remediation.closest_secure.upper) if spec.remediation.closest_secure.upper else None except Exception as e: @@ -664,7 +672,7 @@ def get_remmediation_from(spec): try: recommended = Version(spec.remediation.recommended) except Exception as e: - LOG.error(f'Error getting recommended version for remediation, ignoring', exc_info=True) + LOG.error(f'Error getting recommended version for remediation, ignoring', exc_info=True) return { "vulnerabilities_found": spec.remediation.vulnerabilities_found, @@ -672,7 +680,7 @@ def get_remmediation_from(spec): "requirement": spec, "more_info_url": spec.remediation.more_info_url, "closest_secure_version": { - 'upper': upper, + 'upper': upper, 'lower': lower }, "recommended_version": recommended, @@ -690,7 +698,7 @@ def get_remmediation_from(spec): 'files': {str(file_to_fix.location): {'content': None, 'fixes': {'TO_SKIP': [], 'TO_APPLY': [], 'TO_CONFIRM': []}, 'supported': False, 'filename': file_to_fix.location.name}}, 'dependencies': defaultdict(dict), } - + fixes = apply_fixes(requirements, output, no_output, prompt, scan_flow=True, auto_remediation_limit=auto_remediation_limit) return fixes @@ -822,7 +830,7 @@ def apply_fixes(requirements, out_type, no_output, prompt, scan_flow=False, auto for name, data in requirements['files'].items(): output = [('', {}), (f"Analyzing {name}... [{get_fix_opt_used_msg(auto_remediation_limit)} limit]", {'styling': {'bold': True}, 'start_line_decorator': '->', 'indent': ' '})] - + r_skip = data['fixes']['TO_SKIP'] r_apply = data['fixes']['TO_APPLY'] r_confirm = data['fixes']['TO_CONFIRM'] @@ -901,7 +909,7 @@ def apply_fixes(requirements, out_type, no_output, prompt, scan_flow=False, auto else: not_supported_filename = data.get('filename', name) output.append( - (f"{not_supported_filename} updates not supported: Please update these dependencies using your package manager.", + (f"{not_supported_filename} updates not supported: Please update these dependencies using your package manager.", {'start_line_decorator': ' -', 'indent': ' '})) output.append(('', {})) @@ -999,7 +1007,7 @@ def review(*, report=None, params=None): @sync_safety_context def get_licenses(*, session=None, db_mirror=False, cached=0, telemetry=True): - + if db_mirror: mirrors = [db_mirror] else: diff --git a/safety/scan/ecosystems/python/main.py b/safety/scan/ecosystems/python/main.py index 275b089d..bd9353bf 100644 --- a/safety/scan/ecosystems/python/main.py +++ b/safety/scan/ecosystems/python/main.py @@ -29,19 +29,19 @@ LOG = logging.getLogger(__name__) -def ignore_vuln_if_needed(dependency: PythonDependency, file_type: FileType, +def ignore_vuln_if_needed(dependency: PythonDependency, file_type: FileType, vuln_id: str, cve, ignore_vulns, ignore_unpinned: bool, ignore_environment: bool, specification: PythonSpecification, ignore_severity: List[VulnerabilitySeverityLabels] = []): - - vuln_ignored: bool = vuln_id in ignore_vulns + + vuln_ignored: bool = vuln_id in ignore_vulns if vuln_ignored and ignore_vulns[vuln_id].code is IgnoreCodes.manual: - if (not ignore_vulns[vuln_id].expires + if (not ignore_vulns[vuln_id].expires or ignore_vulns[vuln_id].expires > datetime.utcnow().date()): return - + del ignore_vulns[vuln_id] if ignore_environment and file_type is FileType.VIRTUAL_ENVIRONMENT: @@ -56,7 +56,7 @@ def ignore_vuln_if_needed(dependency: PythonDependency, file_type: FileType, if cve.cvssv3 and cve.cvssv3.get("base_severity", None): severity_label = VulnerabilitySeverityLabels( cve.cvssv3["base_severity"].lower()) - + if severity_label in ignore_severity: reason = f"{severity_label.value.capitalize()} severity ignored by rule in policy file." ignore_vulns[vuln_id] = IgnoredItemDetail( @@ -75,7 +75,7 @@ def ignore_vuln_if_needed(dependency: PythonDependency, file_type: FileType, specifications = set() specifications.add(str(specification.specifier)) ignore_vulns[vuln_id] = IgnoredItemDetail( - code=IgnoreCodes.unpinned_specification, reason=reason, + code=IgnoreCodes.unpinned_specification, reason=reason, specifications=specifications) @@ -84,7 +84,7 @@ def should_fail(config: ConfigModel, vulnerability: Vulnerability) -> bool: return False # If Severity is None type, it will be considered as UNKNOWN and NONE - # They are not the same, but we are handling like the same when a + # They are not the same, but we are handling like the same when a # vulnerability does not have a severity value. severities = [VulnerabilitySeverityLabels.NONE, VulnerabilitySeverityLabels.UNKNOWN] @@ -127,7 +127,7 @@ def get_vulnerability(vuln_id: str, cve, unpinned_ignored = ignore_vulns[vuln_id].specifications \ if vuln_id in ignore_vulns.keys() else None should_ignore = not unpinned_ignored or str(affected.specifier) in unpinned_ignored - ignored: bool = bool(ignore_vulns and + ignored: bool = bool(ignore_vulns and vuln_id in ignore_vulns and should_ignore) more_info_url = f"{base_domain}{data.get('more_info_path', '')}" @@ -175,13 +175,13 @@ def get_vulnerability(vuln_id: str, cve, ) class PythonFile(InspectableFile, Remediable): - + def __init__(self, file_type: FileType, file: FileTextWrite) -> None: super().__init__(file=file) self.ecosystem = file_type.ecosystem self.file_type = file_type - def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependency], + def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependency], config: ConfigModel): ignored_vulns_data = {} ignore_vulns = {} \ @@ -191,8 +191,11 @@ def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependenc ignore_severity = config.depedendency_vulnerability.ignore_cvss_severity ignore_unpinned = config.depedendency_vulnerability.python_ignore.unpinned_specifications ignore_environment = config.depedendency_vulnerability.python_ignore.environment_results - + db = get_from_cache(db_name="insecure.json", skip_time_verification=True) + if not db: + LOG.debug("Cache data for insecure.json is not available or is invalid.") + return db_full = None vulnerable_packages = frozenset(db.get('vulnerable_packages', [])) found_dependencies = {} @@ -214,8 +217,11 @@ def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependenc if not dependency.version: if not db_full: - db_full = get_from_cache(db_name="insecure_full.json", + db_full = get_from_cache(db_name="insecure_full.json", skip_time_verification=True) + if not db_full: + LOG.debug("Cache data for insecure_full.json is not available or is invalid.") + return dependency.refresh_from(db_full) if name in vulnerable_packages: @@ -225,8 +231,11 @@ def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependenc if spec.is_vulnerable(spec_set, dependency.insecure_versions): if not db_full: - db_full = get_from_cache(db_name="insecure_full.json", + db_full = get_from_cache(db_name="insecure_full.json", skip_time_verification=True) + if not db_full: + LOG.debug("Cache data for insecure_full.json is not available or is invalid.") + return if not dependency.latest_version: dependency.refresh_from(db_full) @@ -247,23 +256,23 @@ def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependenc vuln_id=vuln_id, cve=cve, ignore_vulns=ignore_vulns, ignore_severity=ignore_severity, - ignore_unpinned=ignore_unpinned, - ignore_environment=ignore_environment, + ignore_unpinned=ignore_unpinned, + ignore_environment=ignore_environment, specification=spec) include_ignored = True - vulnerability = get_vulnerability(vuln_id, cve, data, + vulnerability = get_vulnerability(vuln_id, cve, data, specifier, db_full, name, ignore_vulns, spec) - should_add_vuln = not (vulnerability.is_transitive and - dependency.found and + should_add_vuln = not (vulnerability.is_transitive and + dependency.found and dependency.found.parts[-1] == FileType.VIRTUAL_ENVIRONMENT.value) - + if vulnerability.ignored: ignored_vulns_data[ vulnerability.vulnerability_id] = vulnerability - + if not self.dependency_results.failed and not vulnerability.ignored: self.dependency_results.failed = should_fail(config, vulnerability) @@ -277,16 +286,16 @@ def __find_dependency_vulnerabilities__(self, dependencies: List[PythonDependenc self.dependency_results.dependencies = [dep for _, dep in found_dependencies.items()] self.dependency_results.ignored_vulns = ignore_vulns self.dependency_results.ignored_vulns_data = ignored_vulns_data - + def inspect(self, config: ConfigModel): - + # We only support vulnerability checking for now dependencies = get_dependencies(self) if not dependencies: self.results = [] - - self.__find_dependency_vulnerabilities__(dependencies=dependencies, + + self.__find_dependency_vulnerabilities__(dependencies=dependencies, config=config) def __get_secure_specifications_for_user__(self, dependency: PythonDependency, db_full, @@ -309,26 +318,26 @@ def __get_secure_specifications_for_user__(self, dependency: PythonDependency, d sec_ver_for_user = list(versions.difference(affected_v)) return sorted(sec_ver_for_user, key=lambda ver: parse_version(ver), reverse=True) - + def remediate(self): - db_full = get_from_cache(db_name="insecure_full.json", + db_full = get_from_cache(db_name="insecure_full.json", skip_time_verification=True) if not db_full: return for dependency in self.dependency_results.get_affected_dependencies(): secure_versions = dependency.secure_versions - + if not secure_versions: secure_versions = [] secure_vulns_by_user = set(self.dependency_results.ignored_vulns.keys()) if not secure_vulns_by_user: - secure_v = sorted(secure_versions, key=lambda ver: parse_version(ver), + secure_v = sorted(secure_versions, key=lambda ver: parse_version(ver), reverse=True) else: secure_v = self.__get_secure_specifications_for_user__( - dependency=dependency, db_full=db_full, + dependency=dependency, db_full=db_full, secure_vulns_by_user=secure_vulns_by_user) for specification in dependency.specifications: @@ -338,35 +347,35 @@ def remediate(self): version = None if is_pinned_requirement(specification.specifier): version = next(iter(specification.specifier)).version - closest_secure = {key: str(value) if value else None for key, value in - get_closest_ver(secure_v, - version, + closest_secure = {key: str(value) if value else None for key, value in + get_closest_ver(secure_v, + version, specification.specifier).items()} closest_secure = ClosestSecureVersion(**closest_secure) recommended = None - + if closest_secure.upper: recommended = closest_secure.upper elif closest_secure.lower: recommended = closest_secure.lower - + other_recommended = [other_v for other_v in secure_v if other_v != str(recommended)] remed_more_info_url = dependency.more_info_url if remed_more_info_url: remed_more_info_url = build_remediation_info_url( - base_url=remed_more_info_url, version=version, + base_url=remed_more_info_url, version=version, spec=str(specification.specifier), target_version=recommended) - + if not remed_more_info_url: remed_more_info_url = "-" vulns_found = sum(1 for vuln in specification.vulnerabilities if not vuln.ignored) - specification.remediation = RemediationModel(vulnerabilities_found=vulns_found, - more_info_url=remed_more_info_url, - closest_secure=closest_secure if recommended else None, - recommended=recommended, + specification.remediation = RemediationModel(vulnerabilities_found=vulns_found, + more_info_url=remed_more_info_url, + closest_secure=closest_secure if recommended else None, + recommended=recommended, other_recommended=other_recommended) diff --git a/setup.cfg b/setup.cfg index 97a72546..3d8ab9b3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,6 +52,7 @@ install_requires = pydantic>=1.10.12 safety_schemas>=0.0.2 typing-extensions>=4.7.1 + filelock~=3.12.2 [options.entry_points] console_scripts = diff --git a/test_requirements.txt b/test_requirements.txt index f1cc7af2..b465c4a9 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -19,3 +19,4 @@ typer pydantic>=1.10.12 safety_schemas>=0.0.2 typing-extensions>=4.7.1 +filelock~=3.12.2 \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index 03e25f23..31e1749a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,5 +1,7 @@ import json +import logging import os +import sys import shutil import tempfile import unittest @@ -15,7 +17,8 @@ from safety import cli from safety.models import CVE, SafetyRequirement, Severity, Vulnerability from safety.util import Package, SafetyContext - +from safety.auth.models import Auth +from safety_schemas.models.base import AuthenticationType def get_vulnerability(vuln_kwargs=None, cve_kwargs=None, pkg_kwargs=None): vuln_kwargs = {} if vuln_kwargs is None else vuln_kwargs @@ -69,6 +72,8 @@ def setUp(self): self.runner = CliRunner(mix_stderr=False) self.output_options = ['screen', 'text', 'json', 'bare'] self.dirname = os.path.dirname(__file__) + # Set up logging to capture debug output for tests + logging.basicConfig(level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s') def test_command_line_interface(self): runner = CliRunner() @@ -513,4 +518,50 @@ def test_license_with_file(self, fetch_database_url): test_filename = os.path.join(dirname, "reqs_4.txt") result = self.runner.invoke(cli.cli, ['license', '--key', 'foo', '--file', test_filename]) print(result.stdout) - self.assertEqual(result.exit_code, 0) \ No newline at end of file + self.assertEqual(result.exit_code, 0) + + @patch('safety.auth.cli.get_auth_info', return_value={'email': 'test@test.com'}) + @patch.object(Auth, 'is_valid', return_value=True) + @patch('safety.auth.utils.SafetyAuthSession.get_authentication_type', return_value=AuthenticationType.TOKEN) + @patch('builtins.input', lambda *args: '') + @patch('safety.safety.fetch_database', return_value={'vulnerable_packages': []}) + def test_debug_flag(self, mock_get_auth_info, mock_is_valid, mock_get_auth_type, mock_fetch_database): + result = self.runner.invoke(cli.cli, ['--debug', 'scan']) + assert result.exit_code == 0, f"CLI exited with code {result.exit_code} and output: {result.output} and error: {result.stderr}" + assert "for known security issues using default" in result.output + + @patch('safety.auth.cli.get_auth_info', return_value={'email': 'test@test.com'}) + @patch.object(Auth, 'is_valid', return_value=True) + @patch('safety.auth.utils.SafetyAuthSession.get_authentication_type', return_value=AuthenticationType.TOKEN) + @patch('builtins.input', lambda *args: '') + @patch('safety.safety.fetch_database', return_value={'vulnerable_packages': []}) + def test_debug_flag_with_value_1(self, mock_get_auth_info, mock_is_valid, mock_get_auth_type, mock_fetch_database): + sys.argv = ['safety', '--debug', '1', 'scan'] + + @cli.preprocess_args + def dummy_function(): + pass + + # Extract the preprocessed arguments from sys.argv + preprocessed_args = sys.argv[1:] # Exclude the script name 'safety' + + # Assert the preprocessed arguments + assert preprocessed_args == ['--debug', 'scan'], f"Preprocessed args: {preprocessed_args}" + + @patch('safety.auth.cli.get_auth_info', return_value={'email': 'test@test.com'}) + @patch.object(Auth, 'is_valid', return_value=True) + @patch('safety.auth.utils.SafetyAuthSession.get_authentication_type', return_value=AuthenticationType.TOKEN) + @patch('builtins.input', lambda *args: '') + @patch('safety.safety.fetch_database', return_value={'vulnerable_packages': []}) + def test_debug_flag_with_value_true(self, mock_get_auth_info, mock_is_valid, mock_get_auth_type, mock_fetch_database): + sys.argv = ['safety', '--debug', 'true', 'scan'] + + @cli.preprocess_args + def dummy_function(): + pass + + # Extract the preprocessed arguments from sys.argv + preprocessed_args = sys.argv[1:] # Exclude the script name 'safety' + + # Assert the preprocessed arguments + assert preprocessed_args == ['--debug', 'scan'], f"Preprocessed args: {preprocessed_args}" diff --git a/tests/test_debug.py b/tests/test_debug.py new file mode 100644 index 00000000..ed6b4954 --- /dev/null +++ b/tests/test_debug.py @@ -0,0 +1,19 @@ +import subprocess + +import pytest + + +def test_debug_flag(): + result = subprocess.run(['safety', '--debug', 'scan'], capture_output=True, text=True) + assert "safety.auth" in result.stderr or "DEBUG" in result.stderr + +def test_debug_flag_with_value_1(): + result = subprocess.run(['safety', '--debug', '1', 'scan'], capture_output=True, text=True) + assert "safety.auth" in result.stderr or "DEBUG" in result.stderr + +def test_debug_flag_with_value_true(): + result = subprocess.run(['safety', '--debug', 'true', 'scan'], capture_output=True, text=True) + assert "safety.auth" in result.stderr or "DEBUG" in result.stderr + +if __name__ == '__main__': + pytest.main() diff --git a/tests/test_safety.py b/tests/test_safety.py index d829d6b2..630704e6 100644 --- a/tests/test_safety.py +++ b/tests/test_safety.py @@ -171,6 +171,9 @@ def test_check_live(self): def test_check_live_cached(self): from safety.constants import DB_CACHE_FILE + # Ensure the cache directory and file exist + os.makedirs(os.path.dirname(DB_CACHE_FILE), exist_ok=True) + # lets clear the cache first try: with open(DB_CACHE_FILE, 'w') as f: