Skip to content

Commit

Permalink
What’s included?
Browse files Browse the repository at this point in the history
- Enable few logs for Ad service and Recommendation service.
- Add OTLP exporters for logs
- Add the filter processor to prevent an error from the Prometheus exporter for duplicate queueSize metric, see
open-telemetry/opentelemetry-collector-contrib#18194. The filter processor can be removed when the fault gets fixed.

This PR doesn’t introduce any logs backend. Instead, logs are output only to Logging exporter and can be seen in the console (otelcol).
otel-col  | 2023-03-17T11:40:22.662Z	info	LogsExporter	{"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 2}

After this PR, different logging backends can be easily tested by configuring an additional exporter.
  • Loading branch information
mviitane committed Mar 17, 2023
1 parent 3680be7 commit 504a6f8
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 58 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ OTEL_COLLECTOR_PORT=4317
OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT}
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}
PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces

# OpenTelemetry Resource Definitions
Expand Down
37 changes: 13 additions & 24 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ services:
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_EXPORTER_OTLP_LOGS_ENDPOINT
- OTEL_RESOURCE_ATTRIBUTES
- OTEL_SERVICE_NAME=adservice
- OTEL_LOGS_EXPORTER=otlp
depends_on:
- otelcol
logging: *logging
Expand Down Expand Up @@ -224,7 +226,7 @@ services:
deploy:
resources:
limits:
memory: 175M
memory: 200M
restart: unless-stopped
ports:
- "${FEATURE_FLAG_SERVICE_PORT}" # Feature Flag Service UI
Expand Down Expand Up @@ -322,14 +324,10 @@ services:
# Frontend Proxy (Envoy)
frontendproxy:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontendproxy
container_name: frontend-proxy
build:
context: ./
dockerfile: src/frontendproxy/Dockerfile
deploy:
resources:
limits:
memory: 50M
container_name: frontend-proxy
ports:
- "${ENVOY_PORT}:${ENVOY_PORT}"
- 10000:10000
Expand Down Expand Up @@ -395,7 +393,7 @@ services:
deploy:
resources:
limits:
memory: 120M
memory: 70M
restart: unless-stopped
ports:
- "${PAYMENT_SERVICE_PORT}"
Expand Down Expand Up @@ -450,12 +448,12 @@ services:
deploy:
resources:
limits:
memory: 40M
memory: 30M
restart: unless-stopped
ports:
- "${QUOTE_SERVICE_PORT}"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:4318
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:4318/v1/traces
- OTEL_PHP_AUTOLOAD_ENABLED=true
- QUOTE_SERVICE_PORT
- OTEL_RESOURCE_ATTRIBUTES
Expand Down Expand Up @@ -535,18 +533,18 @@ services:
deploy:
resources:
limits:
memory: 120M
memory: 200M
restart: unless-stopped
environment:
- POSTGRES_USER=ffs
- POSTGRES_DB=ffs
- POSTGRES_PASSWORD=ffs
logging: *logging
healthcheck:
test: ["CMD-SHELL", "pg_isready -d ffs -U ffs"]
interval: 10s
timeout: 5s
retries: 5
logging: *logging

# Kafka used by Checkout, Accounting, and Fraud Detection services
kafka:
Expand All @@ -560,7 +558,7 @@ services:
deploy:
resources:
limits:
memory: 750M
memory: 800M
restart: unless-stopped
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
Expand All @@ -569,14 +567,13 @@ services:
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_RESOURCE_ATTRIBUTES
- OTEL_SERVICE_NAME=kafka
- KAFKA_HEAP_OPTS=-Xmx400m -Xms400m
logging: *logging
healthcheck:
test: nc -z kafka 9092
start_period: 10s
interval: 5s
timeout: 10s
retries: 10
logging: *logging

# Redis used by Cart service
redis-cart:
Expand Down Expand Up @@ -610,7 +607,7 @@ services:
deploy:
resources:
limits:
memory: 300M
memory: 275M
restart: unless-stopped
ports:
- "${JAEGER_SERVICE_PORT}" # Jaeger UI
Expand All @@ -624,10 +621,6 @@ services:
grafana:
image: grafana/grafana:9.1.0
container_name: grafana
deploy:
resources:
limits:
memory: 75M
volumes:
- ./src/grafana/grafana.ini:/etc/grafana/grafana.ini
- ./src/grafana/provisioning/:/etc/grafana/provisioning/
Expand All @@ -642,7 +635,7 @@ services:
deploy:
resources:
limits:
memory: 125M
memory: 100M
restart: unless-stopped
command: [ "--config=/etc/otelcol-config.yml", "--config=/etc/otelcol-config-extras.yml" ]
volumes:
Expand Down Expand Up @@ -672,10 +665,6 @@ services:
- --enable-feature=exemplar-storage
volumes:
- ./src/prometheus/prometheus-config.yaml:/etc/prometheus/prometheus-config.yaml
deploy:
resources:
limits:
memory: 300M
ports:
- "${PROMETHEUS_SERVICE_PORT}:${PROMETHEUS_SERVICE_PORT}"
logging: *logging
Expand Down
2 changes: 1 addition & 1 deletion src/adservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ RUN ./gradlew installDist -PprotoSourceDir=./proto

FROM eclipse-temurin:17-jre

ARG version=1.23.0
ARG version=1.24.0
WORKDIR /usr/src/app/

COPY --from=builder /usr/src/app/ ./
Expand Down
7 changes: 4 additions & 3 deletions src/adservice/src/main/java/oteldemo/AdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void start() throws IOException {
.addService(healthMgr.getHealthService())
.build()
.start();
logger.info("Ad Service started, listening on " + port);
logger.info("Ad service started, listening on " + port);
Runtime.getRuntime()
.addShutdownHook(
new Thread(
Expand Down Expand Up @@ -160,15 +160,16 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {

span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString());
span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount());
logger.info("received ad request (context_words=" + req.getContextKeysList() + ")");
if (req.getContextKeysCount() > 0) {
logger.info("Targeted ad request received for " + req.getContextKeysList());
for (int i = 0; i < req.getContextKeysCount(); i++) {
Collection<Ad> ads = service.getAdsByCategory(req.getContextKeys(i));
allAds.addAll(ads);
}
adRequestType = AdRequestType.TARGETED;
adResponseType = AdResponseType.TARGETED;
} else {
logger.info("Non-targeted ad request received, preparing random response.");
allAds = service.getRandomAds();
adRequestType = AdRequestType.NOT_TARGETED;
adResponseType = AdResponseType.RANDOM;
Expand Down Expand Up @@ -314,7 +315,7 @@ private static ImmutableListMultimap<String, Ad> createAdsMap() {
/** Main launches the server from the command line. */
public static void main(String[] args) throws IOException, InterruptedException {
// Start the RPC server. You shouldn't see any output from gRPC before this.
logger.info("AdService starting.");
logger.info("Ad service starting.");
final AdService service = AdService.getInstance();
service.start();
service.blockUntilShutdown();
Expand Down
28 changes: 12 additions & 16 deletions src/otelcollector/otelcol-config.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

receivers:
otlp:
protocols:
Expand Down Expand Up @@ -43,14 +29,24 @@ processors:
- context: metric
statements:
- set(description, "Measures the duration of inbound HTTP requests") where name == "http.server.duration"
filter:
metrics:
exclude:
match_type: strict
metric_names:
- queueSize

service:
pipelines:
traces:
receivers: [otlp]
processors: [spanmetrics, batch]
exporters: [logging, otlp]
exporters: [otlp, logging]
metrics:
receivers: [otlp]
processors: [transform, batch]
processors: [filter, transform, batch]
exporters: [prometheus, logging]
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging]
49 changes: 35 additions & 14 deletions src/recommendationservice/recommendation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
# Pip
import grpc
from opentelemetry import trace, metrics

from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource

# Local
import logging
import demo_pb2
import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from logger import getJSONLogger

from metrics import (
init_metrics
Expand All @@ -43,7 +49,8 @@ def ListRecommendations(self, request, context):
prod_list = get_product_list(request.product_ids)
span = trace.get_current_span()
span.set_attribute("app.products_recommended.count", len(prod_list))
logger.info(f"[Recv ListRecommendations] product_ids={prod_list}")
logger.info(f"Receive ListRecommendations for product ids:{prod_list}")

# build and return response
response = demo_pb2.ListRecommendationsResponse()
response.product_ids.extend(prod_list)
Expand Down Expand Up @@ -78,15 +85,15 @@ def get_product_list(request_product_ids):
if random.random() < 0.5 or first_run:
first_run = False
span.set_attribute("app.cache_hit", False)
logger.info("cache miss")
logger.info("get_product_list: cache miss")
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
response_ids = [x.id for x in cat_response.products]
cached_ids = cached_ids + response_ids
cached_ids = cached_ids + cached_ids[:len(cached_ids) // 4]
product_ids = cached_ids
else:
span.set_attribute("app.cache_hit", True)
logger.info("cache hit")
logger.info("get_product_list: cache hit")
product_ids = cached_ids
else:
span.set_attribute("app.recommendation.cache_enabled", False)
Expand Down Expand Up @@ -119,19 +126,35 @@ def must_map_env(key: str):

def check_feature_flag(flag_name: str):
flag = feature_flag_stub.GetFlag(demo_pb2.GetFlagRequest(name=flag_name)).flag
logger.info(flag)
return flag.enabled

if __name__ == "__main__":
service_name = must_map_env('OTEL_SERVICE_NAME')

# Initialize Traces and Metrics
tracer = trace.get_tracer_provider().get_tracer("recommendationservice")
meter = metrics.get_meter_provider().get_meter("recommendationservice")
tracer = trace.get_tracer_provider().get_tracer(service_name)
meter = metrics.get_meter_provider().get_meter(service_name)
rec_svc_metrics = init_metrics(meter)

port = must_map_env('RECOMMENDATION_SERVICE_PORT')
# Initialize Logs
logger_provider = LoggerProvider(
resource=Resource.create(
{
'service.name': service_name,
}
),
)
set_logger_provider(logger_provider)
log_exporter = OTLPLogExporter(insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)

# Attach OTLP handler to logger
logger = logging.getLogger('main')
logger.addHandler(handler)

catalog_addr = must_map_env('PRODUCT_CATALOG_SERVICE_ADDR')
ff_addr = must_map_env('FEATURE_FLAG_GRPC_SERVICE_ADDR')

pc_channel = grpc.insecure_channel(catalog_addr)
ff_channel = grpc.insecure_channel(ff_addr)
product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(pc_channel)
Expand All @@ -145,11 +168,9 @@ def check_feature_flag(flag_name: str):
demo_pb2_grpc.add_RecommendationServiceServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)

# Start logger
logger = getJSONLogger('recommendationservice-server')
logger.info(f"RecommendationService listening on port: {port}")

# Start server
port = must_map_env('RECOMMENDATION_SERVICE_PORT')
server.add_insecure_port(f'[::]:{port}')
server.start()
logger.info(f'Recommendation service started, listening on port {port}')
server.wait_for_termination()

0 comments on commit 504a6f8

Please sign in to comment.