Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to pubsub implementation of e2e tests #137

Merged
merged 13 commits into from
May 18, 2021
17 changes: 14 additions & 3 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,28 @@ steps:
id: build-test-server
args:
- build
- --tag=gcr.io/$PROJECT_ID/opentelemetry-operations-python-e2e-test-server
- --tag=$_TEST_SERVER_IMAGE
- --file=e2e-test-server/Dockerfile
- .
- name: docker
id: push-test-server-image
args:
- push
- $_TEST_SERVER_IMAGE

- name: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.5
- name: $_TEST_RUNNER_IMAGE
id: run-tests
dir: /
env:
- "PROJECT_ID=$PROJECT_ID"
args:
- local
- --image=gcr.io/$PROJECT_ID/opentelemetry-operations-python-e2e-test-server
- --image=$_TEST_SERVER_IMAGE
- --network=cloudbuild

logsBucket: gs://opentelemetry-ops-e2e-cloud-build-logs
substitutions:
_TEST_RUNNER_IMAGE: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.22
_TEST_SERVER_IMAGE: gcr.io/${PROJECT_ID}/opentelemetry-operations-python-e2e-test-server:${SHORT_SHA}
images:
- $_TEST_SERVER_IMAGE
2 changes: 1 addition & 1 deletion e2e-test-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ WORKDIR $SRC/e2e-test-server
COPY --from=build-base $SRC/e2e-test-server/venv venv/
COPY e2e-test-server/ ./

ENTRYPOINT ["./venv/bin/gunicorn"]
ENTRYPOINT ["./venv/bin/python", "main.py"]
11 changes: 10 additions & 1 deletion e2e-test-server/constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,34 @@ Flask==1.1.2
google-api-core==1.26.3
google-auth==1.30.0
google-cloud-core==1.6.0
google-cloud-pubsub==2.4.1
google-cloud-trace==0.24.0
googleapis-common-protos==1.53.0
grpc-google-iam-v1==0.12.3
grpcio==1.37.0
gunicorn==20.1.0
idna==2.10
itsdangerous==1.1.0
Jinja2==2.11.3
libcst==0.3.18
MarkupSafe==1.1.1
mypy-extensions==0.4.3
opentelemetry-api==1.1.0
opentelemetry-sdk==1.1.0
opentelemetry-semantic-conventions==0.20b0
packaging==20.9
pkg-resources==0.0.0
proto-plus==1.18.1
protobuf==3.15.8
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydantic==1.8.2
pyparsing==2.4.7
pytz==2021.1
PyYAML==5.4.1
requests==2.25.1
rsa==4.7.2
six==1.15.0
typing-extensions==3.10.0.0
typing-inspect==0.6.0
urllib3==1.26.4
Werkzeug==1.0.1
33 changes: 33 additions & 0 deletions e2e-test-server/e2e_test_server/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2021 Google
aabmass marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum
import os


class SubscriptionMode(enum.Enum):
PULL = "pull"
PUSH = "push"


INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server"
SCENARIO = "scenario"
STATUS_CODE = "status_code"
TEST_ID = "test_id"
SUBSCRIPTION_MODE: SubscriptionMode = SubscriptionMode(
os.environ["SUBSCRIPTION_MODE"]
)
PROJECT_ID = os.environ["PROJECT_ID"]
REQUEST_SUBSCRIPTION_NAME = os.environ["REQUEST_SUBSCRIPTION_NAME"]
RESPONSE_TOPIC_NAME = os.environ["RESPONSE_TOPIC_NAME"]
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,41 @@

import contextlib
import os
from typing import Iterator
from dataclasses import dataclass
from typing import Callable, Iterator, Mapping

from flask import Flask, Response, request
import pydantic
from google.rpc import code_pb2
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.trace import Tracer

TEST_ID = "test-id"
INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server"
from .constants import INSTRUMENTING_MODULE_NAME, TEST_ID

app = Flask(__name__)

class Request(pydantic.BaseModel):
test_id: str
headers: Mapping[str, str]
data: bytes


@dataclass
class Response:
status_code: code_pb2.Code
data: bytes = bytes()


@contextlib.contextmanager
def common_setup() -> Iterator[tuple[str, Tracer]]:
def _tracer_setup() -> Iterator[Tracer]:
"""\
Context manager with common setup for test endpoints
Context manager with common setup for tracing endpoints

It extracts the test-id header, creates a tracer, and finally flushes
spans created during the test
Yields a tracer (from a fresh SDK with new exporter) then finally flushes
spans created during the test after.
"""

if TEST_ID not in request.headers:
raise Exception(f"{TEST_ID} header is required")
test_id = request.headers[TEST_ID]

tracer_provider = TracerProvider(
sampler=ALWAYS_ON,
active_span_processor=BatchSpanProcessor(
Expand All @@ -51,22 +58,32 @@ def common_setup() -> Iterator[tuple[str, Tracer]]:
tracer = tracer_provider.get_tracer(INSTRUMENTING_MODULE_NAME)

try:
yield test_id, tracer
yield tracer
finally:
tracer_provider.shutdown()


@app.route("/health")
def health():
return "OK", 200
def health(request: Request) -> Response:
return Response(status_code=code_pb2.OK)


@app.route("/basicTrace", methods=["POST"])
def basicTrace():
def basic_trace(request: Request) -> Response:
"""Create a basic trace"""

with common_setup() as (test_id, tracer):
with tracer.start_span("basicTrace", attributes={TEST_ID: test_id}):
with _tracer_setup() as tracer:
with tracer.start_span(
"basicTrace", attributes={TEST_ID: request.test_id}
):
pass

return Response(status=200)
return Response(status_code=code_pb2.OK)


def not_implemented_handler(_: Request) -> Response:
return Response(status_code=str(code_pb2.UNIMPLEMENTED))


SCENARIO_TO_HANDLER: dict[str, Callable[[Request], Response]] = {
"/health": health,
"/basicTrace": basic_trace,
}
106 changes: 106 additions & 0 deletions e2e-test-server/e2e_test_server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2021 Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message
from google.rpc import code_pb2

from . import scenarios
from .constants import (
PROJECT_ID,
REQUEST_SUBSCRIPTION_NAME,
RESPONSE_TOPIC_NAME,
SCENARIO,
STATUS_CODE,
TEST_ID,
)

logger = logging.getLogger(__name__)


def pubsub_pull() -> None:
publisher = pubsub_v1.PublisherClient(
# disable buffering
batch_settings=pubsub_v1.types.BatchSettings(max_messages=1)
)
response_topic = publisher.topic_path(PROJECT_ID, RESPONSE_TOPIC_NAME)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID,
REQUEST_SUBSCRIPTION_NAME,
)

def respond(test_id: str, res: scenarios.Response) -> None:
"""Respond to the test runner that we finished executing the scenario"""
data = res.data
attributes = {TEST_ID: test_id, STATUS_CODE: str(res.status_code)}
logger.info(f"publishing {data=} and {attributes=}")
publisher.publish(
response_topic,
data,
**attributes,
)

def pubsub_callback(message: Message) -> None:
"""Execute a scenario based on the incoming message from the test runner"""
if TEST_ID not in message.attributes:
# don't even know how to write back to the publisher that the
# message is invalid, so nack()
message.nack()
test_id: str = message.attributes[TEST_ID]

if SCENARIO not in message.attributes:
respond(
test_id,
scenarios.Response(
status_code=code_pb2.INVALID_ARGUMENT,
data=f'Expected attribute "{SCENARIO}" is missing'.encode(),
),
)
scenario = message.attributes[SCENARIO]
handler = scenarios.SCENARIO_TO_HANDLER.get(
scenario, scenarios.not_implemented_handler
)

try:
res = handler(
scenarios.Request(
test_id=test_id,
headers=dict(message.attributes),
data=message.data,
)
)
except Exception as e:
logger.exception("exception trying to handle request")
res = scenarios.Response(
status_code=code_pb2.INTERNAL, data=str(e).encode()
)
finally:
respond(test_id, res)
message.ack()

streaming_pull_future = subscriber.subscribe(
subscription_path, callback=pubsub_callback
)

logger.info(
"Listening on subscription {} for pub/sub messages".format(
REQUEST_SUBSCRIPTION_NAME
)
)
with subscriber:
streaming_pull_future.result()
24 changes: 13 additions & 11 deletions e2e-test-server/gunicorn.conf.py → e2e-test-server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import logging

if "PORT" not in os.environ:
raise Exception("Must supply environment variable PORT")
from e2e_test_server import server
from e2e_test_server.constants import SUBSCRIPTION_MODE, SubscriptionMode

bind = "0.0.0.0:{}".format(os.environ["PORT"])
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

# Needed to prevent forking for OTel
workers = 1

wsgi_app = "server:app"

# log requests to stdout
accesslog = "-"
if SUBSCRIPTION_MODE is SubscriptionMode.PULL:
server.pubsub_pull()
else:
raise RuntimeError(
"server does not support subscription mode {}".format(
SUBSCRIPTION_MODE
)
)
4 changes: 3 additions & 1 deletion e2e-test-server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
opentelemetry-sdk
opentelemetry-api
Flask
gunicorn
google-cloud-pubsub
googleapis-common-protos
pydantic