Skip to content

Commit

Permalink
switch to pubsub implementation
Browse files Browse the repository at this point in the history
- remove gunicorn remnants and flask app (for now)
- make a package for code
- update docker file and cloudbuild config
  • Loading branch information
aabmass committed May 10, 2021
1 parent e57ca09 commit f573120
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 38 deletions.
4 changes: 2 additions & 2 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ steps:
- --file=e2e-test-server/Dockerfile
- .

- name: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.5
- name: gcr.io/opentelemetry-ops-e2e/opentelemetry-operations-e2e-testing:0.5.9
id: run-tests
dir: /
env:
- "PROJECT_ID=$PROJECT_ID"
args:
- local
- --image=gcr.io/$PROJECT_ID/opentelemetry-operations-python-e2e-test-server
- --network=cloudbuild

logsBucket: gs://opentelemetry-ops-e2e-cloud-build-logs
2 changes: 1 addition & 1 deletion e2e-test-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ WORKDIR $SRC/e2e-test-server
COPY --from=build-base $SRC/e2e-test-server/venv venv/
COPY e2e-test-server/ ./

ENTRYPOINT ["./venv/bin/gunicorn"]
ENTRYPOINT ["./venv/bin/python", "main.py"]
10 changes: 9 additions & 1 deletion e2e-test-server/constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,33 @@ Flask==1.1.2
google-api-core==1.26.3
google-auth==1.30.0
google-cloud-core==1.6.0
google-cloud-pubsub==2.4.1
google-cloud-trace==0.24.0
googleapis-common-protos==1.53.0
grpc-google-iam-v1==0.12.3
grpcio==1.37.0
gunicorn==20.1.0
idna==2.10
itsdangerous==1.1.0
Jinja2==2.11.3
libcst==0.3.18
MarkupSafe==1.1.1
mypy-extensions==0.4.3
opentelemetry-api==1.1.0
opentelemetry-sdk==1.1.0
opentelemetry-semantic-conventions==0.20b0
packaging==20.9
pkg-resources==0.0.0
proto-plus==1.18.1
protobuf==3.15.8
pyasn1==0.4.8
pyasn1-modules==0.2.8
pyparsing==2.4.7
pytz==2021.1
PyYAML==5.4.1
requests==2.25.1
rsa==4.7.2
six==1.15.0
typing-extensions==3.10.0.0
typing-inspect==0.6.0
urllib3==1.26.4
Werkzeug==1.0.1
18 changes: 18 additions & 0 deletions e2e-test-server/e2e_test_server/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021 Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server"
SCENARIO = "scenario"
STATUS = "status"
TEST_ID = "test-id"
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@

import contextlib
import os
from typing import Iterator
from dataclasses import dataclass
from typing import Any, Iterator, Mapping

from flask import Flask, Response, request
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.trace import Tracer

TEST_ID = "test-id"
INSTRUMENTING_MODULE_NAME = "opentelemetry-ops-e2e-test-server"
from .constants import INSTRUMENTING_MODULE_NAME, TEST_ID

app = Flask(__name__)

@dataclass
class Response:
# HTTP style status code, even for pubsub
status: int


@contextlib.contextmanager
def common_setup() -> Iterator[tuple[str, Tracer]]:
def _tracer_setup(carrier: Mapping[str, str]) -> Iterator[tuple[str, Tracer]]:
"""\
Context manager with common setup for test endpoints
Context manager with common setup for tracing endpoints
It extracts the test-id header, creates a tracer, and finally flushes
spans created during the test
Yields a tracer (from a fresh SDK with new exporter) then finally flushes
spans created during the test after.
"""

if TEST_ID not in request.headers:
raise Exception(f"{TEST_ID} header is required")
test_id = request.headers[TEST_ID]

tracer_provider = TracerProvider(
sampler=ALWAYS_ON,
active_span_processor=BatchSpanProcessor(
Expand All @@ -51,21 +50,21 @@ def common_setup() -> Iterator[tuple[str, Tracer]]:
tracer = tracer_provider.get_tracer(INSTRUMENTING_MODULE_NAME)

try:
yield test_id, tracer
yield tracer
finally:
tracer_provider.shutdown()


@app.route("/health")
def health():
return "OK", 200
def health(test_id: str, headers: Mapping[str, str], body: bytes) -> Response:
return Response(status=200)


@app.route("/basicTrace", methods=["POST"])
def basicTrace():
def basicTrace(
test_id: str, headers: Mapping[str, str], body: bytes
) -> Response:
"""Create a basic trace"""

with common_setup() as (test_id, tracer):
with _tracer_setup(headers) as tracer:
with tracer.start_span("basicTrace", attributes={TEST_ID: test_id}):
pass

Expand Down
89 changes: 89 additions & 0 deletions e2e-test-server/e2e_test_server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2021 Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message

from . import scenarios
from .constants import SCENARIO, STATUS, TEST_ID

PROJECT_ID = os.environ["PROJECT_ID"]
REQUEST_SUBSCRIPTION_NAME = os.environ["REQUEST_SUBSCRIPTION_NAME"]
RESPONSE_TOPIC_NAME = os.environ["RESPONSE_TOPIC_NAME"]


def pubsub_pull() -> None:
publisher = pubsub_v1.PublisherClient(
# disable buffering
batch_settings=pubsub_v1.types.BatchSettings(max_messages=1)
)
response_topic = publisher.topic_path(PROJECT_ID, RESPONSE_TOPIC_NAME)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID, REQUEST_SUBSCRIPTION_NAME,
)

def respond(test_id: str, res: scenarios.Response) -> None:
"""Respond to the test runner that we finished executing the scenario"""
data = bytes()
attributes = {TEST_ID: test_id, STATUS: str(res.status)}
print(f"publishing {data=} and {attributes=}")
publisher.publish(
response_topic, bytes(), **attributes,
)

def pubsub_callback(message: Message) -> None:
"""Execute a scenario based on the incoming message from the test test runner"""
if TEST_ID not in message.attributes:
# don't even know how to write back to the publisher that the
# message is invalid, so nack()
message.nack()
test_id = message.attributes[TEST_ID]

if SCENARIO not in message.attributes:
publisher.publish(
response_topic,
f'Expected attribute "{SCENARIO}" is missing',
**{TEST_ID: test_id, "status": 500},
)
scenario = message.attributes[SCENARIO]

if scenario == "/health":
scenarioFunc = scenarios.health
elif scenario == "/basicTrace":
scenarioFunc = scenarios.basicTrace
else:
scenarioFunc = lambda *args, **kwargs: scenarios.Response(
status=404
)

res = scenarioFunc(test_id, message.attributes, message.data)

respond(test_id, res)
message.ack()

streaming_pull_future = subscriber.subscribe(
subscription_path, callback=pubsub_callback
)

print(
"Listening on subscription {} for pub/sub messages".format(
REQUEST_SUBSCRIPTION_NAME
)
)
with subscriber:
streaming_pull_future.result()
16 changes: 3 additions & 13 deletions e2e-test-server/gunicorn.conf.py → e2e-test-server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from e2e_test_server import server

if "PORT" not in os.environ:
raise Exception("Must supply environment variable PORT")

bind = "0.0.0.0:{}".format(os.environ["PORT"])

# Needed to prevent forking for OTel
workers = 1

wsgi_app = "server:app"

# log requests to stdout
accesslog = "-"
if __name__ == "__main__":
server.pubsub_pull()
2 changes: 1 addition & 1 deletion e2e-test-server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
opentelemetry-sdk
opentelemetry-api
Flask
gunicorn
google-cloud-pubsub

0 comments on commit f573120

Please sign in to comment.