Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProductCatalogService Cleanup #317

Merged
merged 3 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ significant modifications will be credited to OpenTelemetry Authors.
([#245](https://github.com/open-telemetry/opentelemetry-demo/pull/245))
* Added Frontend Instrumentation
([#293](https://github.com/open-telemetry/opentelemetry-demo/pull/293))
* Simplified and cleaned up ProductCatalogService
([#317](https://github.com/open-telemetry/opentelemetry-demo/pull/317))
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,11 @@ package main

import (
"context"
"flag"
"fmt"
"google.golang.org/grpc/credentials/insecure"
"io/ioutil"
"net"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

pb "github.com/opentelemetry/opentelemetry-demo/src/productcatalogservice/genproto/hipstershop"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand All @@ -48,47 +42,27 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
)

var (
cat pb.ListProductsResponse
catalogMutex *sync.Mutex
log *logrus.Logger
extraLatency time.Duration

port = "3550"

reloadCatalog bool
log *logrus.Logger
catalog []*pb.Product
)

func init() {
log = logrus.New()
log.Formatter = &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "severity",
logrus.FieldKeyMsg: "message",
},
TimestampFormat: time.RFC3339Nano,
}
log.Out = os.Stdout
catalogMutex = &sync.Mutex{}
err := readCatalogFile(&cat)
if err != nil {
log.Warnf("could not parse product catalog")
}
catalog = readCatalogFile()
}

func InitTracerProvider() *sdktrace.TracerProvider {
ctx := context.Background()

exporter, err := otlptracegrpc.New(ctx)
if err != nil {
log.Fatal(err)
log.Fatalf("OTLP Trace gRPC Creation: %v", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
)
tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp
Expand All @@ -98,104 +72,57 @@ func main() {
tp := InitTracerProvider()
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()

flag.Parse()

// set injected latency
if s := os.Getenv("EXTRA_LATENCY"); s != "" {
v, err := time.ParseDuration(s)
if err != nil {
log.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err)
}
extraLatency = v
log.Infof("extra latency enabled (duration: %v)", extraLatency)
} else {
extraLatency = time.Duration(0)
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGUSR2)
go func() {
for {
sig := <-sigs
log.Printf("Received signal: %s", sig)
if sig == syscall.SIGUSR1 {
reloadCatalog = true
log.Infof("Enable catalog reloading")
} else {
reloadCatalog = false
log.Infof("Disable catalog reloading")
}
log.Fatalf("Tracer Provider Shutdown: %v", err)
}
}()

svc := &productCatalog{}
var port string
mustMapEnv(&port, "PRODUCT_CATALOG_SERVICE_PORT")
mustMapEnv(&svc.featureFlagSvcAddr, "FEATURE_FLAG_GRPC_SERVICE_ADDR")

log.Infof("starting grpc server at :%s", port)
run(port)
select {}
}
log.Infof("ProductCatalogService gRPC server started on port: %s", port)

func run(port string) string {
l, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
ln, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatal(err)
log.Fatalf("TCP Listen: %v", err)
}

var srv *grpc.Server = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)

svc := &productCatalog{}
mustMapEnv(&svc.featureFlagSvcAddr, "FEATURE_FLAG_GRPC_SERVICE_ADDR")

pb.RegisterProductCatalogServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)
go srv.Serve(l)
return l.Addr().String()
srv.Serve(ln)
}

type productCatalog struct {
featureFlagSvcAddr string
pb.UnimplementedProductCatalogServiceServer
}

func readCatalogFile(catalog *pb.ListProductsResponse) error {
catalogMutex.Lock()
defer catalogMutex.Unlock()
func readCatalogFile() []*pb.Product {
catalogJSON, err := ioutil.ReadFile("products.json")
if err != nil {
log.Fatalf("failed to open product catalog json file: %v", err)
return err
}
if err := protojson.Unmarshal(catalogJSON, catalog); err != nil {
log.Warnf("failed to parse the catalog JSON: %v", err)
return err
log.Fatalf("Reading Catalog File: %v", err)
}
log.Info("successfully parsed product catalog json")
return nil
}

func parseCatalog() []*pb.Product {
if reloadCatalog || len(cat.Products) == 0 {
err := readCatalogFile(&cat)
if err != nil {
return []*pb.Product{}
}
var res pb.ListProductsResponse
if err := protojson.Unmarshal(catalogJSON, &res); err != nil {
log.Fatalf("Parsing Catalog JSON: %v", err)
}
return cat.Products

return res.Products
}

func mustMapEnv(target *string, envKey string) {
v := os.Getenv(envKey)
if v == "" {
panic(fmt.Sprintf("environment variable %q not set", envKey))
func mustMapEnv(target *string, key string) {
value, present := os.LookupEnv(key)
if !present {
log.Fatalf("Environment Variable Not Set: %q", key)
}
*target = v
*target = value
}

func (p *productCatalog) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
Expand All @@ -209,14 +136,10 @@ func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Hea
func (p *productCatalog) ListProducts(ctx context.Context, req *pb.Empty) (*pb.ListProductsResponse, error) {
span := trace.SpanFromContext(ctx)

time.Sleep(extraLatency)
var ps []*pb.Product
ps = parseCatalog()

span.SetAttributes(
attribute.Int("app.products.count", len(ps)),
attribute.Int("app.products.count", len(catalog)),
)
return &pb.ListProductsResponse{Products: ps}, nil
return &pb.ListProductsResponse{Products: catalog}, nil
}

func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) {
Expand All @@ -225,82 +148,77 @@ func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductReque
attribute.String("app.product.id", req.Id),
)

time.Sleep(extraLatency)

// conditional break if feature flag is enabled on a specific product
// GetProduct will fail on a specific product when feature flag is enabled
if p.checkProductFailure(ctx, req.Id) {
msg := fmt.Sprintf("interal error: product catalog feature flag for failure is enabled")
msg := fmt.Sprintf("Error: ProductCatalogService Fail Feature Flag Enabled")
span.SetStatus(otelcodes.Error, msg)
span.AddEvent(msg)
return nil, status.Errorf(codes.Internal, msg)
}

var found *pb.Product
for i := 0; i < len(parseCatalog()); i++ {
if req.Id == parseCatalog()[i].Id {
found = parseCatalog()[i]
for _, product := range catalog {
if req.Id == product.Id {
found = product
break
}
}

if found == nil {
msg := fmt.Sprintf("no product with ID %s", req.Id)
msg := fmt.Sprintf("Product Not Found: %s", req.Id)
span.SetStatus(otelcodes.Error, msg)
span.AddEvent(msg)
return nil, status.Errorf(codes.NotFound, msg)
} else {
msg := fmt.Sprintf("found product with ID %s, name %s", req.Id, found.Name)
span.AddEvent(msg)
span.SetAttributes(
attribute.String("app.product.name", found.Name),
)
}

msg := fmt.Sprintf("Product Found - ID: %s, Name: %s", req.Id, found.Name)
span.AddEvent(msg)
span.SetAttributes(
attribute.String("app.product.name", found.Name),
)
return found, nil
}

func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProductsRequest) (*pb.SearchProductsResponse, error) {
span := trace.SpanFromContext(ctx)
time.Sleep(extraLatency)
// Intepret query as a substring match in name or description.
var ps []*pb.Product
for _, p := range parseCatalog() {
if strings.Contains(strings.ToLower(p.Name), strings.ToLower(req.Query)) ||
strings.Contains(strings.ToLower(p.Description), strings.ToLower(req.Query)) {
ps = append(ps, p)

var result []*pb.Product
for _, product := range catalog {
if strings.Contains(strings.ToLower(product.Name), strings.ToLower(req.Query)) ||
strings.Contains(strings.ToLower(product.Description), strings.ToLower(req.Query)) {
result = append(result, product)
}
}
span.SetAttributes(
attribute.Int("app.products.count", len(ps)),
attribute.Int("app.products.count", len(result)),
)
return &pb.SearchProductsResponse{Results: ps}, nil
return &pb.SearchProductsResponse{Results: result}, nil
}

func (p *productCatalog) checkProductFailure(ctx context.Context, id string) bool {
if id != "OLJCESPC7Z" {
return false
}

if id == "OLJCESPC7Z" {
conn, err := createClient(ctx, p.featureFlagSvcAddr)
if err != nil {
//report the error but don't fail
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", "failed to connect to feature flag service")))
return false
}
defer conn.Close()

ffResponse, err := pb.NewFeatureFlagServiceClient(conn).GetFlag(ctx, &pb.GetFlagRequest{
Name: "productCatalogFailure",
})
if err != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", "failed to retrieve product catalog feature flag")))
return false
}

if ffResponse.GetFlag().Enabled {
return true
}
conn, err := createClient(ctx, p.featureFlagSvcAddr)
if err != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", "Feature Flag Connection Failed")))
return false
}
defer conn.Close()

flagName := "productCatalogFailure"
ffResponse, err := pb.NewFeatureFlagServiceClient(conn).GetFlag(ctx, &pb.GetFlagRequest{
Name: flagName,
})
if err != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", fmt.Sprintf("GetFlag Failed: %s", flagName))))
return false
}
return false

return ffResponse.GetFlag().Enabled
}

func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
Expand Down