diff --git a/CHANGELOG.md b/CHANGELOG.md index 02e80aa9fb..420a87436a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,5 +118,7 @@ significant modifications will be credited to OpenTelemetry Authors. [#432](https://github.com/open-telemetry/opentelemetry-demo/pull/432) * Replaced the Jaeger exporter to the OTLP exporter in the OTel Collector ([#435](https://github.com/open-telemetry/opentelemetry-demo/pull/435)) +* Added cache scenario to recommendation service +([#455](https://github.com/open-telemetry/opentelemetry-demo/pull/455)) * Update cartservice Dockerfile to support ARM64 ([#439](https://github.com/open-telemetry/opentelemetry-demo/pull/439)) diff --git a/docker-compose.yml b/docker-compose.yml index eeacfde224..4ae3e60f29 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -330,9 +330,11 @@ services: depends_on: - productcatalogservice - otelcol + - featureflagservice environment: - RECOMMENDATION_SERVICE_PORT - PRODUCT_CATALOG_SERVICE_ADDR + - FEATURE_FLAG_GRPC_SERVICE_ADDR - OTEL_PYTHON_LOG_CORRELATION=true - OTEL_TRACES_EXPORTER=otlp - OTEL_METRICS_EXPORTER=otlp @@ -341,6 +343,11 @@ services: - OTEL_SERVICE_NAME=recommendationservice - PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python logging: *logging + restart: on-failure + deploy: + resources: + limits: + memory: 512M # ShippingService shippingservice: diff --git a/docs/manual_span_attributes.md b/docs/manual_span_attributes.md index 72648960b1..4f79dac724 100644 --- a/docs/manual_span_attributes.md +++ b/docs/manual_span_attributes.md @@ -107,11 +107,12 @@ This document contains the list of manual Span Attributes used throughout the de ## RecommendationService -| Name | Type | Description | -|----------------------------------|--------|-----------------------------------------| -| `app.filtered_products.count` | number | Number of filtered products returned | -| `app.products.count` | number | Number of products in catalog | -| `app.products_recommended.count` | number | Number of recommended products returned | +| Name | Type | Description | +|----------------------------------|---------|-----------------------------------------| +| `app.filtered_products.count` | number | Number of filtered products returned | +| `app.products.count` | number | Number of products in catalog | +| `app.products_recommended.count` | number | Number of recommended products returned | +| `app.cache_hit` | boolean | If cache was accessed or not | ## ShippingService diff --git a/src/featureflagservice/priv/repo/migrations/20220524172636_create_featureflags.exs b/src/featureflagservice/priv/repo/migrations/20220524172636_create_featureflags.exs index 61dee0c5ed..c70ff7b765 100644 --- a/src/featureflagservice/priv/repo/migrations/20220524172636_create_featureflags.exs +++ b/src/featureflagservice/priv/repo/migrations/20220524172636_create_featureflags.exs @@ -25,10 +25,16 @@ defmodule Featureflagservice.Repo.Migrations.CreateFeatureflags do name: "shippingFailure", description: "Fail shipping service when shipping a product to a non-USA address", enabled: false}) + + repo().insert(%Featureflagservice.FeatureFlags.FeatureFlag{ + name: "recommendationCache", + description: "Cache recommendations", + enabled: false}) end defp execute_down do repo().delete(%Featureflagservice.FeatureFlags.FeatureFlag{name: "productCatalogFailure"}) repo().delete(%Featureflagservice.FeatureFlags.FeatureFlag{name: "shippingFailure"}) + repo().delete(%Featureflagservice.FeatureFlags.FeatureFlag{name: "recommendationCache"}) end end diff --git a/src/recommendationservice/Dockerfile b/src/recommendationservice/Dockerfile index a178ae549d..3e542988a7 100644 --- a/src/recommendationservice/Dockerfile +++ b/src/recommendationservice/Dockerfile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM python:3.10-slim +FROM python:3.10 WORKDIR /usr/src/app/ diff --git a/src/recommendationservice/recommendation_server.py b/src/recommendationservice/recommendation_server.py index 87c87a6286..883c7cb823 100644 --- a/src/recommendationservice/recommendation_server.py +++ b/src/recommendationservice/recommendation_server.py @@ -35,6 +35,9 @@ init_metrics ) +cached_ids = [] +first_run = True + class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer): def ListRecommendations(self, request, context): prod_list = get_product_list(request.product_ids) @@ -60,6 +63,8 @@ def Watch(self, request, context): def get_product_list(request_product_ids): + global first_run + global cached_ids with tracer.start_as_current_span("get_product_list") as span: max_responses = 5 @@ -67,9 +72,27 @@ def get_product_list(request_product_ids): request_product_ids_str = ''.join(request_product_ids) request_product_ids = request_product_ids_str.split(',') - # Fetch list of products from product catalog stub - cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty()) - product_ids = [x.id for x in cat_response.products] + # Feature flag scenario - Cache Leak + if check_feature_flag("recommendationCache"): + span.set_attribute("app.recommendation.cache_enabled", True) + if random.random() < 0.5 or first_run: + first_run = False + span.set_attribute("app.cache_hit", False) + logger.info("cache miss") + cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty()) + response_ids = [x.id for x in cat_response.products] + cached_ids = cached_ids + response_ids + cached_ids = cached_ids + cached_ids[:len(cached_ids) // 4] + product_ids = cached_ids + else: + span.set_attribute("app.cache_hit", True) + logger.info("cache hit") + product_ids = cached_ids + else: + span.set_attribute("app.recommendation.cache_enabled", False) + cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty()) + product_ids = [x.id for x in cat_response.products] + span.set_attribute("app.products.count", len(product_ids)) # Create a filtered list of products excluding the products received as input @@ -94,6 +117,11 @@ def must_map_env(key: str): raise Exception(f'{key} environment variable must be set') return value +def check_feature_flag(flag_name: str): + flag = feature_flag_stub.GetFlag(demo_pb2.GetFlagRequest(name=flag_name)).flag + logger.info(flag) + return flag.enabled + if __name__ == "__main__": # Initialize Traces and Metrics tracer = trace.get_tracer_provider().get_tracer("recommendationservice") @@ -102,9 +130,12 @@ def must_map_env(key: str): port = must_map_env('RECOMMENDATION_SERVICE_PORT') catalog_addr = must_map_env('PRODUCT_CATALOG_SERVICE_ADDR') + ff_addr = must_map_env('FEATURE_FLAG_GRPC_SERVICE_ADDR') - channel = grpc.insecure_channel(catalog_addr) - product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel) + pc_channel = grpc.insecure_channel(catalog_addr) + ff_channel = grpc.insecure_channel(ff_addr) + product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(pc_channel) + feature_flag_stub = demo_pb2_grpc.FeatureFlagServiceStub(ff_channel) # Create gRPC server server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))