diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 40785823..e70c8c8b 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -21,13 +21,13 @@ steps: - --file=e2e-test-server/Dockerfile - . - - name: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.5 + - name: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.9 id: run-tests + dir: / env: - "PROJECT_ID=$PROJECT_ID" args: - local - --image=gcr.io/$PROJECT_ID/opentelemetry-operations-python-e2e-test-server - - --network=cloudbuild logsBucket: gs://opentelemetry-ops-e2e-cloud-build-logs diff --git a/e2e-test-server/Dockerfile b/e2e-test-server/Dockerfile index 195fa113..a2ef6fc1 100644 --- a/e2e-test-server/Dockerfile +++ b/e2e-test-server/Dockerfile @@ -37,4 +37,4 @@ WORKDIR $SRC/e2e-test-server COPY --from=build-base $SRC/e2e-test-server/venv venv/ COPY e2e-test-server/ ./ -ENTRYPOINT ["./venv/bin/gunicorn"] +ENTRYPOINT ["./venv/bin/python", "main.py"] diff --git a/e2e-test-server/constraints.txt b/e2e-test-server/constraints.txt index a9cc0ca9..52b7990a 100644 --- a/e2e-test-server/constraints.txt +++ b/e2e-test-server/constraints.txt @@ -6,25 +6,33 @@ Flask==1.1.2 google-api-core==1.26.3 google-auth==1.30.0 google-cloud-core==1.6.0 +google-cloud-pubsub==2.4.1 google-cloud-trace==0.24.0 googleapis-common-protos==1.53.0 +grpc-google-iam-v1==0.12.3 grpcio==1.37.0 -gunicorn==20.1.0 idna==2.10 itsdangerous==1.1.0 Jinja2==2.11.3 +libcst==0.3.18 MarkupSafe==1.1.1 +mypy-extensions==0.4.3 opentelemetry-api==1.1.0 opentelemetry-sdk==1.1.0 opentelemetry-semantic-conventions==0.20b0 packaging==20.9 +pkg-resources==0.0.0 +proto-plus==1.18.1 protobuf==3.15.8 pyasn1==0.4.8 pyasn1-modules==0.2.8 pyparsing==2.4.7 pytz==2021.1 +PyYAML==5.4.1 requests==2.25.1 rsa==4.7.2 six==1.15.0 +typing-extensions==3.10.0.0 +typing-inspect==0.6.0 urllib3==1.26.4 Werkzeug==1.0.1 diff --git a/e2e-test-server/e2e_test_server/constants.py b/e2e-test-server/e2e_test_server/constants.py new file mode 100644 index 00000000..3a043ac1 --- /dev/null +++ b/e2e-test-server/e2e_test_server/constants.py @@ -0,0 +1,18 @@ +# Copyright 2021 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server" +SCENARIO = "scenario" +STATUS = "status" +TEST_ID = "test-id" diff --git a/e2e-test-server/server.py b/e2e-test-server/e2e_test_server/scenarios.py similarity index 65% rename from e2e-test-server/server.py rename to e2e-test-server/e2e_test_server/scenarios.py index 1b6ad571..2cbc3721 100644 --- a/e2e-test-server/server.py +++ b/e2e-test-server/e2e_test_server/scenarios.py @@ -14,34 +14,33 @@ import contextlib import os -from typing import Iterator +from dataclasses import dataclass +from typing import Any, Iterator, Mapping -from flask import Flask, Response, request from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import ALWAYS_ON from opentelemetry.trace import Tracer -TEST_ID = "test-id" -INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server" +from .constants import INSTRUMENTING_MODULE_NAME, TEST_ID -app = Flask(__name__) + +@dataclass +class Response: + # HTTP style status code, even for pubsub + status: int @contextlib.contextmanager -def common_setup() -> Iterator[tuple[str, Tracer]]: +def _tracer_setup(carrier: Mapping[str, str]) -> Iterator[tuple[str, Tracer]]: """\ - Context manager with common setup for test endpoints + Context manager with common setup for tracing endpoints - It extracts the test-id header, creates a tracer, and finally flushes - spans created during the test + Yields a tracer (from a fresh SDK with new exporter) then finally flushes + spans created during the test after. """ - if TEST_ID not in request.headers: - raise Exception(f"{TEST_ID} header is required") - test_id = request.headers[TEST_ID] - tracer_provider = TracerProvider( sampler=ALWAYS_ON, active_span_processor=BatchSpanProcessor( @@ -51,21 +50,21 @@ def common_setup() -> Iterator[tuple[str, Tracer]]: tracer = tracer_provider.get_tracer(INSTRUMENTING_MODULE_NAME) try: - yield test_id, tracer + yield tracer finally: tracer_provider.shutdown() -@app.route("/health") -def health(): - return "OK", 200 +def health(test_id: str, headers: Mapping[str, str], body: bytes) -> Response: + return Response(status=200) -@app.route("/basicTrace", methods=["POST"]) -def basicTrace(): +def basicTrace( + test_id: str, headers: Mapping[str, str], body: bytes +) -> Response: """Create a basic trace""" - with common_setup() as (test_id, tracer): + with _tracer_setup(headers) as tracer: with tracer.start_span("basicTrace", attributes={TEST_ID: test_id}): pass diff --git a/e2e-test-server/e2e_test_server/server.py b/e2e-test-server/e2e_test_server/server.py new file mode 100644 index 00000000..8cddf2bf --- /dev/null +++ b/e2e-test-server/e2e_test_server/server.py @@ -0,0 +1,89 @@ +# Copyright 2021 Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1.subscriber.message import Message + +from . import scenarios +from .constants import SCENARIO, STATUS, TEST_ID + +PROJECT_ID = os.environ["PROJECT_ID"] +REQUEST_SUBSCRIPTION_NAME = os.environ["REQUEST_SUBSCRIPTION_NAME"] +RESPONSE_TOPIC_NAME = os.environ["RESPONSE_TOPIC_NAME"] + + +def pubsub_pull() -> None: + publisher = pubsub_v1.PublisherClient( + # disable buffering + batch_settings=pubsub_v1.types.BatchSettings(max_messages=1) + ) + response_topic = publisher.topic_path(PROJECT_ID, RESPONSE_TOPIC_NAME) + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + PROJECT_ID, REQUEST_SUBSCRIPTION_NAME, + ) + + def respond(test_id: str, res: scenarios.Response) -> None: + """Respond to the test runner that we finished executing the scenario""" + data = bytes() + attributes = {TEST_ID: test_id, STATUS: str(res.status)} + print(f"publishing {data=} and {attributes=}") + publisher.publish( + response_topic, bytes(), **attributes, + ) + + def pubsub_callback(message: Message) -> None: + """Execute a scenario based on the incoming message from the test test runner""" + if TEST_ID not in message.attributes: + # don't even know how to write back to the publisher that the + # message is invalid, so nack() + message.nack() + test_id = message.attributes[TEST_ID] + + if SCENARIO not in message.attributes: + publisher.publish( + response_topic, + f'Expected attribute "{SCENARIO}" is missing', + **{TEST_ID: test_id, "status": 500}, + ) + scenario = message.attributes[SCENARIO] + + if scenario == "/health": + scenarioFunc = scenarios.health + elif scenario == "/basicTrace": + scenarioFunc = scenarios.basicTrace + else: + scenarioFunc = lambda *args, **kwargs: scenarios.Response( + status=404 + ) + + res = scenarioFunc(test_id, message.attributes, message.data) + + respond(test_id, res) + message.ack() + + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=pubsub_callback + ) + + print( + "Listening on subscription {} for pub/sub messages".format( + REQUEST_SUBSCRIPTION_NAME + ) + ) + with subscriber: + streaming_pull_future.result() diff --git a/e2e-test-server/gunicorn.conf.py b/e2e-test-server/main.py similarity index 68% rename from e2e-test-server/gunicorn.conf.py rename to e2e-test-server/main.py index 955defd8..c12cae0f 100644 --- a/e2e-test-server/gunicorn.conf.py +++ b/e2e-test-server/main.py @@ -12,17 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os +from e2e_test_server import server -if "PORT" not in os.environ: - raise Exception("Must supply environment variable PORT") - -bind = "0.0.0.0:{}".format(os.environ["PORT"]) - -# Needed to prevent forking for OTel -workers = 1 - -wsgi_app = "server:app" - -# log requests to stdout -accesslog = "-" +if __name__ == "__main__": + server.pubsub_pull() diff --git a/e2e-test-server/requirements.txt b/e2e-test-server/requirements.txt index 946f3f3c..77608e3c 100644 --- a/e2e-test-server/requirements.txt +++ b/e2e-test-server/requirements.txt @@ -3,4 +3,4 @@ opentelemetry-sdk opentelemetry-api Flask -gunicorn +google-cloud-pubsub