diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index feac846a4c82..09941b85e8ef 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -18,6 +18,7 @@ from functools import cached_property from itertools import chain, product from pathlib import Path +from queue import SimpleQueue from types import TracebackType from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast from urllib.parse import urlparse @@ -36,8 +37,11 @@ from psycopg2.extensions import connection as PgConnection from psycopg2.extensions import cursor as PgCursor from psycopg2.extensions import make_dsn, parse_dsn +from pytest_httpserver import HTTPServer from typing_extensions import Literal from urllib3.util.retry import Retry +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response from fixtures.broker import NeonBroker from fixtures.log_helper import log @@ -997,6 +1001,70 @@ def neon_env_builder( yield builder +@pytest.fixture(scope="function") +def neon_env_and_metrics_server( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address, +) -> Iterator[Tuple[NeonEnv, SimpleQueue[Any]]]: + """ + Fixture to create a Neon environment and metrics server. + """ + + (host, port) = httpserver_listen_address + metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + + # this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose + uploads: SimpleQueue[Any] = SimpleQueue() + + def metrics_handler(request: Request) -> Response: + if request.json is None: + return Response(status=400) + + events = request.json["events"] + is_last = request.headers["pageserver-metrics-last-upload-in-batch"] + assert is_last in ["true", "false"] + uploads.put((events, is_last == "true")) + return Response(status=200) + + # Require collecting metrics frequently, since we change + # the timeline and want something to be logged about it. + # + # Disable time-based pitr, we will use the manual GC calls + # to trigger remote storage operations in a controlled way + neon_env_builder.pageserver_config_override = f""" + metric_collection_interval="1s" + metric_collection_endpoint="{metric_collection_endpoint}" + cached_metric_collection_interval="0s" + synthetic_size_calculation_interval="3s" + """ + + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}") + + # mock http server that returns OK for the metrics + httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( + metrics_handler + ) + + # spin up neon, after http server is ready + env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) + # httpserver is shut down before pageserver during passing run + env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") + # we have a fast rate of calculation, these can happen at shutdown + env.pageserver.allowed_errors.append( + ".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*" + ) + env.pageserver.allowed_errors.append( + ".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes" + ) + + yield (env, uploads) + + httpserver.check() + + @dataclass class PageserverPort: pg: int diff --git a/test_runner/regress/test_pageserver_metric_collection.py b/test_runner/regress/test_pageserver_metric_collection.py index b76dbbee0376..b16f6bfad13e 100644 --- a/test_runner/regress/test_pageserver_metric_collection.py +++ b/test_runner/regress/test_pageserver_metric_collection.py @@ -3,75 +3,18 @@ from dataclasses import dataclass from pathlib import Path from queue import SimpleQueue -from typing import Any, Dict, Set +from typing import Any, Dict, Set, Tuple from fixtures.log_helper import log from fixtures.neon_fixtures import ( - NeonEnvBuilder, + NeonEnv, wait_for_last_flush_lsn, ) -from fixtures.remote_storage import RemoteStorageKind from fixtures.types import TenantId, TimelineId -from pytest_httpserver import HTTPServer -from werkzeug.wrappers.request import Request -from werkzeug.wrappers.response import Response -# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP - -def test_metric_collection( - httpserver: HTTPServer, - neon_env_builder: NeonEnvBuilder, - httpserver_listen_address, -): - (host, port) = httpserver_listen_address - metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" - - # this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose - uploads: SimpleQueue[Any] = SimpleQueue() - - def metrics_handler(request: Request) -> Response: - if request.json is None: - return Response(status=400) - - events = request.json["events"] - is_last = request.headers["pageserver-metrics-last-upload-in-batch"] - assert is_last in ["true", "false"] - uploads.put((events, is_last == "true")) - return Response(status=200) - - # Require collecting metrics frequently, since we change - # the timeline and want something to be logged about it. - # - # Disable time-based pitr, we will use the manual GC calls - # to trigger remote storage operations in a controlled way - neon_env_builder.pageserver_config_override = f""" - metric_collection_interval="1s" - metric_collection_endpoint="{metric_collection_endpoint}" - cached_metric_collection_interval="0s" - synthetic_size_calculation_interval="3s" - """ - - neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) - - log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}") - - # mock http server that returns OK for the metrics - httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( - metrics_handler - ) - - # spin up neon, after http server is ready - env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) - # httpserver is shut down before pageserver during passing run - env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") - # we have a fast rate of calculation, these can happen at shutdown - env.pageserver.allowed_errors.append( - ".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*" - ) - env.pageserver.allowed_errors.append( - ".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes" - ) +def test_metric_collection(neon_env_and_metrics_server: Tuple[NeonEnv, SimpleQueue[Any]]): + (env, uploads) = neon_env_and_metrics_server tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -164,63 +107,13 @@ def get_num_remote_ops(file_kind: str, op_kind: str) -> int: (events, is_last) = events v.ingest(events, is_last) - httpserver.check() - def test_metric_collection_cleans_up_tempfile( - httpserver: HTTPServer, - neon_env_builder: NeonEnvBuilder, - httpserver_listen_address, + neon_env_and_metrics_server: Tuple[NeonEnv, SimpleQueue[Any]] ): - (host, port) = httpserver_listen_address - metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" - - # this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose - uploads: SimpleQueue[Any] = SimpleQueue() - - def metrics_handler(request: Request) -> Response: - if request.json is None: - return Response(status=400) - - events = request.json["events"] - is_last = request.headers["pageserver-metrics-last-upload-in-batch"] - assert is_last in ["true", "false"] - uploads.put((events, is_last == "true")) - return Response(status=200) - - # Require collecting metrics frequently, since we change - # the timeline and want something to be logged about it. - # - # Disable time-based pitr, we will use the manual GC calls - # to trigger remote storage operations in a controlled way - neon_env_builder.pageserver_config_override = f""" - metric_collection_interval="1s" - metric_collection_endpoint="{metric_collection_endpoint}" - cached_metric_collection_interval="0s" - synthetic_size_calculation_interval="3s" - """ - - neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) - - # mock http server that returns OK for the metrics - httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( - metrics_handler - ) - - # spin up neon, after http server is ready - env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) + (env, uploads) = neon_env_and_metrics_server pageserver_http = env.pageserver.http_client() - # httpserver is shut down before pageserver during passing run - env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") - # we have a fast rate of calculation, these can happen at shutdown - env.pageserver.allowed_errors.append( - ".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*" - ) - env.pageserver.allowed_errors.append( - ".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes" - ) - tenant_id = env.initial_tenant timeline_id = env.initial_timeline endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)