Skip to content

Commit

Permalink
Add a fixture for setting up neon_env and metrics server
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Modpur <[email protected]>
  • Loading branch information
rmodpur committed Oct 6, 2023
1 parent a3c82f1 commit 4538d62
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 113 deletions.
68 changes: 68 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from functools import cached_property
from itertools import chain, product
from pathlib import Path
from queue import SimpleQueue
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast
from urllib.parse import urlparse
Expand All @@ -36,8 +37,11 @@
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from pytest_httpserver import HTTPServer
from typing_extensions import Literal
from urllib3.util.retry import Retry
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

from fixtures.broker import NeonBroker
from fixtures.log_helper import log
Expand Down Expand Up @@ -997,6 +1001,70 @@ def neon_env_builder(
yield builder


@pytest.fixture(scope="function")
def neon_env_and_metrics_server(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
) -> Iterator[Tuple[NeonEnv, SimpleQueue[Any]]]:
"""
Fixture to create a Neon environment and metrics server.
"""

(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()

def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""

neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")

# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)

# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)

yield (env, uploads)

httpserver.check()


@dataclass
class PageserverPort:
pg: int
Expand Down
119 changes: 6 additions & 113 deletions test_runner/regress/test_pageserver_metric_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,18 @@
from dataclasses import dataclass
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
from typing import Any, Dict, Set, Tuple

from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonEnv,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP


def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()

def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""

neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")

# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)

# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
def test_metric_collection(neon_env_and_metrics_server: Tuple[NeonEnv, SimpleQueue[Any]]):
(env, uploads) = neon_env_and_metrics_server

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
Expand Down Expand Up @@ -164,63 +107,13 @@ def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
(events, is_last) = events
v.ingest(events, is_last)

httpserver.check()


def test_metric_collection_cleans_up_tempfile(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
neon_env_and_metrics_server: Tuple[NeonEnv, SimpleQueue[Any]]
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"

# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()

def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)

events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)

# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""

neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)

# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
(env, uploads) = neon_env_and_metrics_server
pageserver_http = env.pageserver.http_client()

# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
Expand Down

0 comments on commit 4538d62

Please sign in to comment.