Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add a fixture for setting up neon_env and metrics server #5487

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from functools import cached_property
from itertools import chain, product
from pathlib import Path
from queue import SimpleQueue
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast
from urllib.parse import urlparse
Expand All @@ -36,8 +37,11 @@
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from pytest_httpserver import HTTPServer
from typing_extensions import Literal
from urllib3.util.retry import Retry
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

from fixtures.broker import NeonBroker
from fixtures.log_helper import log
Expand Down Expand Up @@ -1005,6 +1009,68 @@ def neon_env_builder(
yield builder


@pytest.fixture(scope="function")
def neon_env_and_metrics_server(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
) -> Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]:
"""
Fixture to create a Neon environment and metrics server.
"""

(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()

def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""

neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")

# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)

# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)

return (env, httpserver, uploads)


@dataclass
class PageserverPort:
pg: int
Expand Down
116 changes: 8 additions & 108 deletions test_runner/regress/test_pageserver_metric_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,21 @@
from dataclasses import dataclass
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
from typing import Any, Dict, Set, Tuple

from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonEnv,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP


def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
neon_env_and_metrics_server: Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()

def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""

neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")

# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)

# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
(env, httpserver, uploads) = neon_env_and_metrics_server

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
Expand Down Expand Up @@ -168,59 +114,11 @@ def get_num_remote_ops(file_kind: str, op_kind: str) -> int:


def test_metric_collection_cleans_up_tempfile(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
neon_env_and_metrics_server: Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()

def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""

neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)

# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
(env, httpserver, uploads) = neon_env_and_metrics_server
pageserver_http = env.pageserver.http_client()

# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
Expand Down Expand Up @@ -290,6 +188,8 @@ def metrics_handler(request: Request) -> Response:
), "only initial tempfile should had been removed"
assert initially.other.issuperset(later.other), "no other files should had been removed"

httpserver.check()


@dataclass
class PrefixPartitionedFiles:
Expand Down
Loading