Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to pubsub implementation of e2e tests #137

Merged
merged 13 commits into from
May 18, 2021
17 changes: 14 additions & 3 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,28 @@ steps:
id: build-test-server
args:
- build
- --tag=gcr.io/$PROJECT_ID/opentelemetry-operations-python-e2e-test-server
- --tag=$_TEST_SERVER_IMAGE
- --file=e2e-test-server/Dockerfile
- .
- name: docker
id: push-test-server-image
args:
- push
- $_TEST_SERVER_IMAGE

- name: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.5
- name: $_TEST_RUNNER_IMAGE
id: run-tests
dir: /
env:
- "PROJECT_ID=$PROJECT_ID"
args:
- local
- --image=gcr.io/$PROJECT_ID/opentelemetry-operations-python-e2e-test-server
- --image=$_TEST_SERVER_IMAGE
- --network=cloudbuild

logsBucket: gs://opentelemetry-ops-e2e-cloud-build-logs
substitutions:
_TEST_RUNNER_IMAGE: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.12
_TEST_SERVER_IMAGE: gcr.io/${PROJECT_ID}/opentelemetry-operations-python-e2e-test-server:${SHORT_SHA}
images:
- $_TEST_SERVER_IMAGE
2 changes: 1 addition & 1 deletion e2e-test-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ WORKDIR $SRC/e2e-test-server
COPY --from=build-base $SRC/e2e-test-server/venv venv/
COPY e2e-test-server/ ./

ENTRYPOINT ["./venv/bin/gunicorn"]
ENTRYPOINT ["./venv/bin/python", "main.py"]
10 changes: 9 additions & 1 deletion e2e-test-server/constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,33 @@ Flask==1.1.2
google-api-core==1.26.3
google-auth==1.30.0
google-cloud-core==1.6.0
google-cloud-pubsub==2.4.1
google-cloud-trace==0.24.0
googleapis-common-protos==1.53.0
grpc-google-iam-v1==0.12.3
grpcio==1.37.0
gunicorn==20.1.0
idna==2.10
itsdangerous==1.1.0
Jinja2==2.11.3
libcst==0.3.18
MarkupSafe==1.1.1
mypy-extensions==0.4.3
opentelemetry-api==1.1.0
opentelemetry-sdk==1.1.0
opentelemetry-semantic-conventions==0.20b0
packaging==20.9
pkg-resources==0.0.0
proto-plus==1.18.1
protobuf==3.15.8
pyasn1==0.4.8
pyasn1-modules==0.2.8
pyparsing==2.4.7
pytz==2021.1
PyYAML==5.4.1
requests==2.25.1
rsa==4.7.2
six==1.15.0
typing-extensions==3.10.0.0
typing-inspect==0.6.0
urllib3==1.26.4
Werkzeug==1.0.1
18 changes: 18 additions & 0 deletions e2e-test-server/e2e_test_server/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021 Google
aabmass marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server"
SCENARIO = "scenario"
STATUS_CODE = "status_code"
TEST_ID = "test_id"
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,34 @@

import contextlib
import os
from typing import Iterator
from dataclasses import dataclass
from typing import Any, Iterator, Mapping

from google.rpc import code_pb2

from flask import Flask, Response, request
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.trace import Tracer

TEST_ID = "test-id"
INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server"
from .constants import INSTRUMENTING_MODULE_NAME, TEST_ID


app = Flask(__name__)
@dataclass
class Response:
status_code: code_pb2.Code


@contextlib.contextmanager
def common_setup() -> Iterator[tuple[str, Tracer]]:
def _tracer_setup(carrier: Mapping[str, str]) -> Iterator[Tracer]:
"""\
Context manager with common setup for test endpoints
Context manager with common setup for tracing endpoints

It extracts the test-id header, creates a tracer, and finally flushes
spans created during the test
Yields a tracer (from a fresh SDK with new exporter) then finally flushes
spans created during the test after.
"""

if TEST_ID not in request.headers:
raise Exception(f"{TEST_ID} header is required")
test_id = request.headers[TEST_ID]

tracer_provider = TracerProvider(
sampler=ALWAYS_ON,
active_span_processor=BatchSpanProcessor(
Expand All @@ -51,22 +51,22 @@ def common_setup() -> Iterator[tuple[str, Tracer]]:
tracer = tracer_provider.get_tracer(INSTRUMENTING_MODULE_NAME)

try:
yield test_id, tracer
yield tracer
finally:
tracer_provider.shutdown()


@app.route("/health")
def health():
return "OK", 200
def health(test_id: str, headers: Mapping[str, str], body: bytes) -> Response:
return Response(status_code=code_pb2.OK)


@app.route("/basicTrace", methods=["POST"])
def basicTrace():
def basicTrace(
test_id: str, headers: Mapping[str, str], body: bytes
) -> Response:
"""Create a basic trace"""

with common_setup() as (test_id, tracer):
with _tracer_setup(headers) as tracer:
with tracer.start_span("basicTrace", attributes={TEST_ID: test_id}):
pass

return Response(status=200)
return Response(status_code=code_pb2.OK)
96 changes: 96 additions & 0 deletions e2e-test-server/e2e_test_server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2021 Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message

from . import scenarios
from .constants import SCENARIO, STATUS_CODE, TEST_ID
from google.rpc import code_pb2

PROJECT_ID = os.environ["PROJECT_ID"]
REQUEST_SUBSCRIPTION_NAME = os.environ["REQUEST_SUBSCRIPTION_NAME"]
RESPONSE_TOPIC_NAME = os.environ["RESPONSE_TOPIC_NAME"]


def pubsub_pull() -> None:
publisher = pubsub_v1.PublisherClient(
# disable buffering
batch_settings=pubsub_v1.types.BatchSettings(max_messages=1)
)
response_topic = publisher.topic_path(PROJECT_ID, RESPONSE_TOPIC_NAME)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID,
REQUEST_SUBSCRIPTION_NAME,
)

def respond(test_id: str, res: scenarios.Response) -> None:
"""Respond to the test runner that we finished executing the scenario"""
data = bytes()
attributes = {TEST_ID: test_id, STATUS_CODE: str(res.status_code)}
print(f"publishing {data=} and {attributes=}")
publisher.publish(
response_topic,
bytes(),
**attributes,
)

def pubsub_callback(message: Message) -> None:
"""Execute a scenario based on the incoming message from the test test runner"""
aabmass marked this conversation as resolved.
Show resolved Hide resolved
if TEST_ID not in message.attributes:
# don't even know how to write back to the publisher that the
# message is invalid, so nack()
message.nack()
test_id = message.attributes[TEST_ID]

if SCENARIO not in message.attributes:
publisher.publish(
response_topic,
f'Expected attribute "{SCENARIO}" is missing',
**{
TEST_ID: test_id,
STATUS_CODE: str(code_pb2.INVALID_ARGUMENT),
},
)
scenario = message.attributes[SCENARIO]

if scenario == "/health":
scenarioFunc = scenarios.health
elif scenario == "/basicTrace":
scenarioFunc = scenarios.basicTrace
else:
scenarioFunc = lambda *args, **kwargs: scenarios.Response(
status_code=str(code_pb2.UNIMPLEMENTED)
)

res = scenarioFunc(test_id, message.attributes, message.data)

respond(test_id, res)
message.ack()

streaming_pull_future = subscriber.subscribe(
subscription_path, callback=pubsub_callback
)

print(
"Listening on subscription {} for pub/sub messages".format(
REQUEST_SUBSCRIPTION_NAME
)
)
with subscriber:
streaming_pull_future.result()
16 changes: 3 additions & 13 deletions e2e-test-server/gunicorn.conf.py → e2e-test-server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from e2e_test_server import server

if "PORT" not in os.environ:
raise Exception("Must supply environment variable PORT")

bind = "0.0.0.0:{}".format(os.environ["PORT"])

# Needed to prevent forking for OTel
workers = 1

wsgi_app = "server:app"

# log requests to stdout
accesslog = "-"
if __name__ == "__main__":
server.pubsub_pull()
3 changes: 2 additions & 1 deletion e2e-test-server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
opentelemetry-sdk
opentelemetry-api
Flask
gunicorn
google-cloud-pubsub
googleapis-common-protos