From 504a6f84bbf8af20370867b1126429448116d277 Mon Sep 17 00:00:00 2001 From: Mikko Viitanen Date: Fri, 17 Mar 2023 14:41:47 +0200 Subject: [PATCH] =?UTF-8?q?What=E2=80=99s=20included=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Enable few logs for Ad service and Recommendation service. - Add OTLP exporters for logs - Add the filter processor to prevent an error from the Prometheus exporter for duplicate queueSize metric, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18194. The filter processor can be removed when the fault gets fixed. This PR doesn’t introduce any logs backend. Instead, logs are output only to Logging exporter and can be seen in the console (otelcol). otel-col | 2023-03-17T11:40:22.662Z info LogsExporter {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 2} After this PR, different logging backends can be easily tested by configuring an additional exporter. --- .env | 1 + docker-compose.yml | 37 +++++--------- src/adservice/Dockerfile | 2 +- .../src/main/java/oteldemo/AdService.java | 7 +-- src/otelcollector/otelcol-config.yml | 28 +++++------ .../recommendation_server.py | 49 +++++++++++++------ 6 files changed, 66 insertions(+), 58 deletions(-) diff --git a/.env b/.env index a3ba836924..09ee9f7a4c 100644 --- a/.env +++ b/.env @@ -25,6 +25,7 @@ OTEL_COLLECTOR_PORT=4317 OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT} OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT} OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT} +OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT} PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces # OpenTelemetry Resource Definitions diff --git a/docker-compose.yml b/docker-compose.yml index 5d335ea656..a3f995eb65 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -78,8 +78,10 @@ services: - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE + - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT - OTEL_RESOURCE_ATTRIBUTES - OTEL_SERVICE_NAME=adservice + - OTEL_LOGS_EXPORTER=otlp depends_on: - otelcol logging: *logging @@ -224,7 +226,7 @@ services: deploy: resources: limits: - memory: 175M + memory: 200M restart: unless-stopped ports: - "${FEATURE_FLAG_SERVICE_PORT}" # Feature Flag Service UI @@ -322,14 +324,10 @@ services: # Frontend Proxy (Envoy) frontendproxy: image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontendproxy - container_name: frontend-proxy build: context: ./ dockerfile: src/frontendproxy/Dockerfile - deploy: - resources: - limits: - memory: 50M + container_name: frontend-proxy ports: - "${ENVOY_PORT}:${ENVOY_PORT}" - 10000:10000 @@ -395,7 +393,7 @@ services: deploy: resources: limits: - memory: 120M + memory: 70M restart: unless-stopped ports: - "${PAYMENT_SERVICE_PORT}" @@ -450,12 +448,12 @@ services: deploy: resources: limits: - memory: 40M + memory: 30M restart: unless-stopped ports: - "${QUOTE_SERVICE_PORT}" environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:4318 + - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:4318/v1/traces - OTEL_PHP_AUTOLOAD_ENABLED=true - QUOTE_SERVICE_PORT - OTEL_RESOURCE_ATTRIBUTES @@ -535,18 +533,18 @@ services: deploy: resources: limits: - memory: 120M + memory: 200M restart: unless-stopped environment: - POSTGRES_USER=ffs - POSTGRES_DB=ffs - POSTGRES_PASSWORD=ffs + logging: *logging healthcheck: test: ["CMD-SHELL", "pg_isready -d ffs -U ffs"] interval: 10s timeout: 5s retries: 5 - logging: *logging # Kafka used by Checkout, Accounting, and Fraud Detection services kafka: @@ -560,7 +558,7 @@ services: deploy: resources: limits: - memory: 750M + memory: 800M restart: unless-stopped environment: - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 @@ -569,14 +567,13 @@ services: - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE - OTEL_RESOURCE_ATTRIBUTES - OTEL_SERVICE_NAME=kafka - - KAFKA_HEAP_OPTS=-Xmx400m -Xms400m + logging: *logging healthcheck: test: nc -z kafka 9092 start_period: 10s interval: 5s timeout: 10s retries: 10 - logging: *logging # Redis used by Cart service redis-cart: @@ -610,7 +607,7 @@ services: deploy: resources: limits: - memory: 300M + memory: 275M restart: unless-stopped ports: - "${JAEGER_SERVICE_PORT}" # Jaeger UI @@ -624,10 +621,6 @@ services: grafana: image: grafana/grafana:9.1.0 container_name: grafana - deploy: - resources: - limits: - memory: 75M volumes: - ./src/grafana/grafana.ini:/etc/grafana/grafana.ini - ./src/grafana/provisioning/:/etc/grafana/provisioning/ @@ -642,7 +635,7 @@ services: deploy: resources: limits: - memory: 125M + memory: 100M restart: unless-stopped command: [ "--config=/etc/otelcol-config.yml", "--config=/etc/otelcol-config-extras.yml" ] volumes: @@ -672,10 +665,6 @@ services: - --enable-feature=exemplar-storage volumes: - ./src/prometheus/prometheus-config.yaml:/etc/prometheus/prometheus-config.yaml - deploy: - resources: - limits: - memory: 300M ports: - "${PROMETHEUS_SERVICE_PORT}:${PROMETHEUS_SERVICE_PORT}" logging: *logging diff --git a/src/adservice/Dockerfile b/src/adservice/Dockerfile index 8b6aa1da92..834ddfc6e1 100644 --- a/src/adservice/Dockerfile +++ b/src/adservice/Dockerfile @@ -30,7 +30,7 @@ RUN ./gradlew installDist -PprotoSourceDir=./proto FROM eclipse-temurin:17-jre -ARG version=1.23.0 +ARG version=1.24.0 WORKDIR /usr/src/app/ COPY --from=builder /usr/src/app/ ./ diff --git a/src/adservice/src/main/java/oteldemo/AdService.java b/src/adservice/src/main/java/oteldemo/AdService.java index e8905f7009..d0ec11d3ec 100644 --- a/src/adservice/src/main/java/oteldemo/AdService.java +++ b/src/adservice/src/main/java/oteldemo/AdService.java @@ -99,7 +99,7 @@ private void start() throws IOException { .addService(healthMgr.getHealthService()) .build() .start(); - logger.info("Ad Service started, listening on " + port); + logger.info("Ad service started, listening on " + port); Runtime.getRuntime() .addShutdownHook( new Thread( @@ -160,8 +160,8 @@ public void getAds(AdRequest req, StreamObserver responseObserver) { span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString()); span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount()); - logger.info("received ad request (context_words=" + req.getContextKeysList() + ")"); if (req.getContextKeysCount() > 0) { + logger.info("Targeted ad request received for " + req.getContextKeysList()); for (int i = 0; i < req.getContextKeysCount(); i++) { Collection ads = service.getAdsByCategory(req.getContextKeys(i)); allAds.addAll(ads); @@ -169,6 +169,7 @@ public void getAds(AdRequest req, StreamObserver responseObserver) { adRequestType = AdRequestType.TARGETED; adResponseType = AdResponseType.TARGETED; } else { + logger.info("Non-targeted ad request received, preparing random response."); allAds = service.getRandomAds(); adRequestType = AdRequestType.NOT_TARGETED; adResponseType = AdResponseType.RANDOM; @@ -314,7 +315,7 @@ private static ImmutableListMultimap createAdsMap() { /** Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { // Start the RPC server. You shouldn't see any output from gRPC before this. - logger.info("AdService starting."); + logger.info("Ad service starting."); final AdService service = AdService.getInstance(); service.start(); service.blockUntilShutdown(); diff --git a/src/otelcollector/otelcol-config.yml b/src/otelcollector/otelcol-config.yml index ea4ab62aa4..c814dc46a1 100644 --- a/src/otelcollector/otelcol-config.yml +++ b/src/otelcollector/otelcol-config.yml @@ -1,17 +1,3 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - receivers: otlp: protocols: @@ -43,14 +29,24 @@ processors: - context: metric statements: - set(description, "Measures the duration of inbound HTTP requests") where name == "http.server.duration" + filter: + metrics: + exclude: + match_type: strict + metric_names: + - queueSize service: pipelines: traces: receivers: [otlp] processors: [spanmetrics, batch] - exporters: [logging, otlp] + exporters: [otlp, logging] metrics: receivers: [otlp] - processors: [transform, batch] + processors: [filter, transform, batch] exporters: [prometheus, logging] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging] \ No newline at end of file diff --git a/src/recommendationservice/recommendation_server.py b/src/recommendationservice/recommendation_server.py index 86fd2b274c..c59e6fbd66 100644 --- a/src/recommendationservice/recommendation_server.py +++ b/src/recommendationservice/recommendation_server.py @@ -22,14 +22,20 @@ # Pip import grpc from opentelemetry import trace, metrics - +from opentelemetry._logs import set_logger_provider +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.resources import Resource # Local +import logging import demo_pb2 import demo_pb2_grpc from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc -from logger import getJSONLogger from metrics import ( init_metrics @@ -43,7 +49,8 @@ def ListRecommendations(self, request, context): prod_list = get_product_list(request.product_ids) span = trace.get_current_span() span.set_attribute("app.products_recommended.count", len(prod_list)) - logger.info(f"[Recv ListRecommendations] product_ids={prod_list}") + logger.info(f"Receive ListRecommendations for product ids:{prod_list}") + # build and return response response = demo_pb2.ListRecommendationsResponse() response.product_ids.extend(prod_list) @@ -78,7 +85,7 @@ def get_product_list(request_product_ids): if random.random() < 0.5 or first_run: first_run = False span.set_attribute("app.cache_hit", False) - logger.info("cache miss") + logger.info("get_product_list: cache miss") cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty()) response_ids = [x.id for x in cat_response.products] cached_ids = cached_ids + response_ids @@ -86,7 +93,7 @@ def get_product_list(request_product_ids): product_ids = cached_ids else: span.set_attribute("app.cache_hit", True) - logger.info("cache hit") + logger.info("get_product_list: cache hit") product_ids = cached_ids else: span.set_attribute("app.recommendation.cache_enabled", False) @@ -119,19 +126,35 @@ def must_map_env(key: str): def check_feature_flag(flag_name: str): flag = feature_flag_stub.GetFlag(demo_pb2.GetFlagRequest(name=flag_name)).flag - logger.info(flag) return flag.enabled if __name__ == "__main__": + service_name = must_map_env('OTEL_SERVICE_NAME') + # Initialize Traces and Metrics - tracer = trace.get_tracer_provider().get_tracer("recommendationservice") - meter = metrics.get_meter_provider().get_meter("recommendationservice") + tracer = trace.get_tracer_provider().get_tracer(service_name) + meter = metrics.get_meter_provider().get_meter(service_name) rec_svc_metrics = init_metrics(meter) - port = must_map_env('RECOMMENDATION_SERVICE_PORT') + # Initialize Logs + logger_provider = LoggerProvider( + resource=Resource.create( + { + 'service.name': service_name, + } + ), + ) + set_logger_provider(logger_provider) + log_exporter = OTLPLogExporter(insecure=True) + logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) + handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider) + + # Attach OTLP handler to logger + logger = logging.getLogger('main') + logger.addHandler(handler) + catalog_addr = must_map_env('PRODUCT_CATALOG_SERVICE_ADDR') ff_addr = must_map_env('FEATURE_FLAG_GRPC_SERVICE_ADDR') - pc_channel = grpc.insecure_channel(catalog_addr) ff_channel = grpc.insecure_channel(ff_addr) product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(pc_channel) @@ -145,11 +168,9 @@ def check_feature_flag(flag_name: str): demo_pb2_grpc.add_RecommendationServiceServicer_to_server(service, server) health_pb2_grpc.add_HealthServicer_to_server(service, server) - # Start logger - logger = getJSONLogger('recommendationservice-server') - logger.info(f"RecommendationService listening on port: {port}") - # Start server + port = must_map_env('RECOMMENDATION_SERVICE_PORT') server.add_insecure_port(f'[::]:{port}') server.start() + logger.info(f'Recommendation service started, listening on port {port}') server.wait_for_termination()