diff --git a/jobs/metric-store-nozzle/templates/bpm.yml.erb b/jobs/metric-store-nozzle/templates/bpm.yml.erb index 4cc215109..9e0cb06d8 100644 --- a/jobs/metric-store-nozzle/templates/bpm.yml.erb +++ b/jobs/metric-store-nozzle/templates/bpm.yml.erb @@ -31,7 +31,7 @@ processes: LOGS_PROVIDER_KEY_PATH: "<%= "#{cert_dir}/logs_provider.key" %>" # Otel Provider - OTEL_ADDR: "<%= "0.0.0.0:#{p('otel_port')}" %>" + OTEL_ADDR: "<%= "#{p('otel_port')}" %>" OTEL_PROVIDER_CA_PATH: "<%= "#{cert_dir}/otel_provider_ca.crt" %>" OTEL_PROVIDER_CERT_PATH: "<%= "#{cert_dir}/otel_provider.crt" %>" OTEL_PROVIDER_KEY_PATH: "<%= "#{cert_dir}/otel_provider.key" %>" diff --git a/scripts/configure-otel b/scripts/configure-otel deleted file mode 100755 index ea8301f94..000000000 --- a/scripts/configure-otel +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env bash - -SCRIPT_DIR=$(cd $(dirname $0) && pwd) - -if [ -z "${OM_TARGET}" ] || [ -z "${OM_USERNAME}" ] || [ -z "${OM_PASSWORD}" ]; then - echo "OM_TARGET, OM_USERNAME, and OM_PASSWORD must be set" - return -fi - -echo "Configuring $OM_TARGET" - -CERTS_DIR=$SCRIPT_DIR/certs -mkdir $CERTS_DIR -CERT_NAME=".properties.otel_provider_mtls" PRODUCT="metric-store" - -function get-certs() { - echo "Getting $PRODUCT cert of $CERT_NAME" - export KEY_FILE=$CERTS_DIR/key.pem - om -k credentials -p ${PRODUCT} -c ${CERT_NAME} -f private_key_pem > "${KEY_FILE}" - - export CERT_FILE=$CERTS_DIR/cert.pem - om -k credentials -p ${PRODUCT} -c ${CERT_NAME} -f cert_pem > "${CERT_FILE}" -} - -export SYS_DOMAIN=$(cat "${ENVIRONMENT_LOCK_METADATA}" | jq -r .sys_domain) -ca_cert="$(om certificate-authorities -f json | jq '.[0].cert_pem')" -export CA_FILE=$CERTS_DIR/ca_cert.pem -cat "${ca_cert}" > $CA_FILE - -PRODUCT="metric-store" CERT_NAME=".properties.otel_provider_mtls" get-certs -ms_private_key_pem="$(awk '{printf "%s\\n", $0}' ${KEY_FILE} )" -ms_cert_pem="$(awk '{printf "%s\\n", $0}' ${CERT_FILE} )" - -PRODUCT="p-healthwatch2" CERT_NAME=".properties.healthwatch_exporter_client_mtls" get-certs -hw_private_key_pem="$(awk '{printf "%s\\n", $0}' ${KEY_FILE} )" -hw_cert_pem="$(awk '{printf "%s\\n", $0}' ${CERT_FILE} )" - -# curl --cert "${CERT_FILE}" --key "${KEY_FILE}" --cacert "${CA_FILE}" -cat <<-HEREDOC > otel-config.yaml ---- -product-name: cf -product-properties: - .properties.otel_collector_metric_exporters_config: - value: |- - prometheus: - endpoint: ":53035" - add_metric_suffixes: false - tls: - ca_pem: ${ca_cert} - cert_pem: "${hw_cert_pem}" - key_pem: "${hw_private_key_pem}" - otlp: - endpoint: dns:metric-store.service.internal:6062 - tls: - insecure_skip_verify: true - ca_pem: ${ca_cert} - cert_pem: "${ms_cert_pem}" - key_pem: "${ms_private_key_pem}" -HEREDOC - -echo "CF Config" -echo "===================================" -cat otel-config.yaml -echo "===================================" -om configure-product -p cf -c otel-config.yaml && om apply-changes --product-name cf diff --git a/scripts/configure-otel-tas10 b/scripts/configure-otel-tas10 new file mode 100755 index 000000000..c2e0904d8 --- /dev/null +++ b/scripts/configure-otel-tas10 @@ -0,0 +1,137 @@ +#!/usr/bin/env bash + +SCRIPT_DIR=$(cd $(dirname $0) && pwd) + +if [ -z "${OM_TARGET}" ] || [ -z "${OM_USERNAME}" ] || [ -z "${OM_PASSWORD}" ]; then + echo "OM_TARGET, OM_USERNAME, and OM_PASSWORD must be set" + return +fi + +echo "Configuring $OM_TARGET" + +CERTS_DIR=$SCRIPT_DIR/certs +mkdir "$CERTS_DIR" + +export AVAILABLE_PRODUCTS=$(om products --deployed -f json) + +function get-certs() { + echo "Getting $PRODUCT cert of $CERT_NAME" + + export HAS_PRODUCT=$( echo "$AVAILABLE_PRODUCTS" | grep "\"$PRODUCT\"") + if [ -n "$HAS_PRODUCT" ]; then + export KEY_FILE=$CERTS_DIR/key.pem + om -k credentials -p "${PRODUCT}" -c "${CERT_NAME}" -f private_key_pem > "${KEY_FILE}" + export private_key_pem="$(awk '{printf "%s\\n", $0}' "${KEY_FILE}" )" + echo "received $private_key_pem" + + export CERT_FILE=$CERTS_DIR/cert.pem + om -k credentials -p "${PRODUCT}" -c "${CERT_NAME}" -f cert_pem > "${CERT_FILE}" + export cert_pem="$(awk '{printf "%s\\n", $0}' "${CERT_FILE}" )" + echo "received $cert_pem" + fi +} + +export SYS_DOMAIN=$(cat "${ENVIRONMENT_LOCK_METADATA}" | jq -r .sys_domain) +export ca_cert="$(om certificate-authorities -f json | jq '.[0].cert_pem')" +export CA_FILE=$CERTS_DIR/ca_cert.pem +cat "${ca_cert}" > "$CA_FILE" + + +cat <<-HEREDOC > otel-config.yaml +--- +product-name: cf +product-properties: + .properties.otel_collector_config: + value: + receivers: + otlp: + protocols: + grpc: + endpoint: 127.0.0.1:9100 + tls: + client_ca_file: "/var/vcap/jobs/otel-collector/config/certs/otel-collector-ca.crt" + cert_file: "/var/vcap/jobs/otel-collector/config/certs/otel-collector.crt" + key_file: "/var/vcap/jobs/otel-collector/config/certs/otel-collector.key" + min_version: '1.3' + exporters: +HEREDOC + + +PRODUCT="metric-store" CERT_NAME=".properties.otel_provider_mtls" get-certs +if [ -n "$HAS_PRODUCT" ]; then +metricStoreMetricExporter="- otlp/metric-store-metrics" +metricStoreTraceExporter="- otlp/metric-store-traces" +cat <<-HEREDOC > metric-store-config.yaml + otlp/metric-store-metrics: + endpoint: metric-store.service.internal:6062 + retry_on_failure: + enabled: true + max_interval: 90s + max_elapsed_time: 600s + balancer_name: round_robin + tls: + insecure_skip_verify: true + ca_pem: ${ca_cert} + cert_pem: "${cert_pem}" + key_pem: "${private_key_pem}" + otlp/metric-store-traces: + endpoint: metric-store.service.internal:6062 + retry_on_failure: + enabled: true + max_interval: 90s + max_elapsed_time: 600s + balancer_name: round_robin + tls: + insecure_skip_verify: true + ca_pem: ${ca_cert} + cert_pem: "${cert_pem}" + key_pem: "${private_key_pem}" +HEREDOC + cat metric-store-config.yaml >> otel-config.yaml +fi + +PRODUCT="p-healthwatch2" CERT_NAME=".properties.healthwatch_exporter_client_mtls" get-certs +if [ -n "$HAS_PRODUCT" ]; then +healthwatchMetricExporter="- prometheus/healthwatch" +cat <<-HEREDOC > prometheus-config.yaml + prometheus/healthwatch: + endpoint: ":65331" + add_metric_suffixes: false + tls: + ca_pem: ${ca_cert} + cert_pem: "${cert_pem}" + key_pem: "${private_key_pem}" +HEREDOC + cat prometheus-config.yaml >> otel-config.yaml +fi + +cat <<-HEREDOC > services-config.yaml + service: + telemetry: + metrics: + level: basic + address: 127.0.0.1:14830 + pipelines: + metrics: + receivers: + - otlp + exporters: + ${metricStoreMetricExporter} + ${healthwatchMetricExporter} + traces: + receivers: + - otlp + exporters: + ${metricStoreTraceExporter} + +HEREDOC + +cat services-config.yaml >> otel-config.yaml + +# curl --cert "${CERT_FILE}" --key "${KEY_FILE}" --cacert "${CA_FILE}" + +echo "CF Config" +echo "===================================" +cat otel-config.yaml +echo "===================================" +om configure-product -p cf -c otel-config.yaml #&& om apply-changes --product-name cf diff --git a/scripts/configure-otel-tas6 b/scripts/configure-otel-tas6 new file mode 100755 index 000000000..54f8177ff --- /dev/null +++ b/scripts/configure-otel-tas6 @@ -0,0 +1,106 @@ +#!/usr/bin/env bash + +SCRIPT_DIR=$(cd $(dirname $0) && pwd) + +if [ -z "${OM_TARGET}" ] || [ -z "${OM_USERNAME}" ] || [ -z "${OM_PASSWORD}" ]; then + echo "OM_TARGET, OM_USERNAME, and OM_PASSWORD must be set" + return +fi + +echo "Configuring $OM_TARGET" + +CERTS_DIR=$SCRIPT_DIR/certs +mkdir "$CERTS_DIR" + +export AVAILABLE_PRODUCTS=$(om products --deployed -f json) + +function get-certs() { + echo "Getting $PRODUCT cert of $CERT_NAME" + + export HAS_PRODUCT=$( echo "$AVAILABLE_PRODUCTS" | grep "\"$PRODUCT\"") + if [ -n "$HAS_PRODUCT" ]; then + export KEY_FILE=$CERTS_DIR/key.pem + om -k credentials -p "${PRODUCT}" -c "${CERT_NAME}" -f private_key_pem > "${KEY_FILE}" + export private_key_pem="$(awk '{printf "%s\\n", $0}' "${KEY_FILE}" )" + echo "received $private_key_pem" + + export CERT_FILE=$CERTS_DIR/cert.pem + om -k credentials -p "${PRODUCT}" -c "${CERT_NAME}" -f cert_pem > "${CERT_FILE}" + export cert_pem="$(awk '{printf "%s\\n", $0}' "${CERT_FILE}" )" + echo "received $cert_pem" + fi +} + +export SYS_DOMAIN=$(cat "${ENVIRONMENT_LOCK_METADATA}" | jq -r .sys_domain) +export ca_cert="$(om certificate-authorities -f json | jq '.[0].cert_pem')" +export CA_FILE=$CERTS_DIR/ca_cert.pem +cat "${ca_cert}" > "$CA_FILE" + +cat <<-HEREDOC > otel-config.yaml +--- +product-name: cf +product-properties: + .properties.otel_collector_metric_exporters_config: + value: |- +HEREDOC + +PRODUCT="p-healthwatch2" CERT_NAME=".properties.healthwatch_exporter_client_mtls" get-certs +if [ -n "$HAS_PRODUCT" ]; then +cat <<-HEREDOC > prometheus-config.yaml + prometheus/healthwatch: + endpoint: ":65331" + add_metric_suffixes: false + tls: + ca_pem: ${ca_cert} + cert_pem: "${cert_pem}" + key_pem: "${private_key_pem}" +HEREDOC + cat prometheus-config.yaml >> otel-config.yaml +fi + +PRODUCT="metric-store" CERT_NAME=".properties.otel_provider_mtls" get-certs +if [ -n "$HAS_PRODUCT" ]; then +cat <<-HEREDOC > metric-store-config.yaml + otlp/metric-store-metrics: + endpoint: dns:metric-store.service.internal:6061 + retry_on_failure: + enabled: true + max_interval: 90s + max_elapsed_time: 600s + balancer_name: round_robin + tls: + insecure_skip_verify: true + ca_pem: ${ca_cert} + cert_pem: "${cert_pem}" + key_pem: "${private_key_pem}" +HEREDOC + +cat metric-store-config.yaml >> otel-config.yaml + +cat <<-HEREDOC > trace-config.yaml + .properties.otel_collector_trace_exporter_config: + value: |- + otlp/metric-store-traces: + endpoint: dns:metric-store.service.internal:6061 + retry_on_failure: + enabled: true + max_interval: 90s + max_elapsed_time: 600s + balancer_name: round_robin + tls: + insecure_skip_verify: true + ca_pem: ${ca_cert} + cert_pem: "${cert_pem}" + key_pem: "${private_key_pem}" +HEREDOC + +cat trace-config.yaml >> otel-config.yaml +fi + +# curl --cert "${CERT_FILE}" --key "${KEY_FILE}" --cacert "${CA_FILE}" + +echo "CF Config" +echo "===================================" +cat otel-config.yaml +echo "===================================" +om configure-product -p cf -c otel-config.yaml #&& om apply-changes --product-name cf diff --git a/sha b/sha index 8ae02cf39..178b13f5c 100644 --- a/sha +++ b/sha @@ -1 +1 @@ -da19787a73e08e848fd3e2b5b3cdbd0d0ced2413 +0df81d43bb5fa458d480984903d84efd8c09c712 diff --git a/src/internal/nozzle/nozzle.go b/src/internal/nozzle/nozzle.go index 8dd8e07d1..6028701f9 100644 --- a/src/internal/nozzle/nozzle.go +++ b/src/internal/nozzle/nozzle.go @@ -52,7 +52,7 @@ type StreamConnector interface { const ( BATCH_FLUSH_INTERVAL = 500 * time.Millisecond - BATCH_CHANNEL_SIZE = 512 + BATCH_CHANNEL_SIZE = 1024 ) func NewNozzle(c StreamConnector, ingressAddr string, tlsConfig *tls.Config, shardId string, nodeIndex int, filterMetrics bool, allowListTags []string, opts ...Option) *Nozzle { @@ -263,7 +263,7 @@ func (n *Nozzle) timerEmitter() { points = append(points, pointsBatch.Points...) size += pointsBatch.Size - if size >= ingressclient.MAX_BATCH_SIZE_IN_BYTES { + if size >= ingressclient.MAX_BATCH_SIZE_IN_BYTES || len(points) >= BATCH_CHANNEL_SIZE { points = n.writeToChannelOrDiscard(points) size = 0 } @@ -273,7 +273,7 @@ func (n *Nozzle) timerEmitter() { points = append(points, pointsBatch.Points...) size += pointsBatch.Size - if size >= ingressclient.MAX_BATCH_SIZE_IN_BYTES { + if size >= ingressclient.MAX_BATCH_SIZE_IN_BYTES || len(points) >= BATCH_CHANNEL_SIZE { points = n.writeToChannelOrDiscard(points) size = 0 } diff --git a/src/internal/nozzle/otel_server.go b/src/internal/nozzle/otel_server.go index 5a1d0f6ff..5caf6e478 100644 --- a/src/internal/nozzle/otel_server.go +++ b/src/internal/nozzle/otel_server.go @@ -3,16 +3,18 @@ package nozzle import ( "context" "crypto/tls" + "fmt" + "net" + metricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" tracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" "go.uber.org/zap" _ "google.golang.org/grpc/encoding/gzip" - "net" - "time" "github.com/cloudfoundry/metric-store-release/src/pkg/logger" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) type OtelServer struct { @@ -40,8 +42,7 @@ func (s *OtelServer) Close() error { s.ms.Stop() s.ts.Stop() - // Optionally, reset s.grpcServer to nil to indicate it's closed - // This is useful if you're checking the state of the server elsewhere + // Reset s.grpcServer to nil to indicate it's closed s.grpcServer = nil s.cancel() @@ -51,19 +52,22 @@ func (s *OtelServer) Close() error { } func (s *OtelServer) Start(addr string, otelTlsConfig *tls.Config) { - s.log.Info("OtelServer starting grpc server") - go s.startGRPCServer(addr, otelTlsConfig) + s.log.Info("OtelServer starting grpc server at ", zap.String("address", addr)) + + // Initialize gRPC server with TLS credentials + grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(otelTlsConfig))) + + // Register Metric and Trace services with the gRPC server + metricspb.RegisterMetricsServiceServer(grpcServer, s.ms) + tracepb.RegisterTraceServiceServer(grpcServer, s.ts) - s.log.Info("Registering Metrics and Trace Server") - metricspb.RegisterMetricsServiceServer(s.grpcServer, s.ms) - tracepb.RegisterTraceServiceServer(s.grpcServer, s.ts) + // Start the gRPC server in a separate goroutine + go s.startGRPCServer(addr, grpcServer) - s.log.Info("Starting Metrics Server") + s.log.Info("Starting Metrics and Trace Servers") s.ms.StartListening() - s.log.Info("Starting Trace Server") s.ts.StartListening() s.log.Info("OtelServer started") - } func NewOtelServer( @@ -71,18 +75,13 @@ func NewOtelServer( ms *MetricService, ts *TraceService, ) *OtelServer { - ctx, cancel := context.WithCancel(context.Background()) - // Initialize the gRPC server and register the metric service - grpcServer := grpc.NewServer() // Return a new OtelServer instance containing the gRPC server and other relevant info return &OtelServer{ - grpcServer: grpcServer, - log: log, - ms: ms, - ts: ts, - + log: log, + ms: ms, + ts: ts, ctx: ctx, cancel: cancel, done: make(chan struct{}, 1), @@ -90,33 +89,27 @@ func NewOtelServer( } // StartGRPCServer starts the gRPC server and listens for incoming connections. -func (s *OtelServer) startGRPCServer(addr string, otelTlsConfig *tls.Config) { +func (s *OtelServer) startGRPCServer(addr string, grpcServer *grpc.Server) { defer func() { close(s.done) }() - address, err := net.ResolveTCPAddr("tcp", s.addr) + address := fmt.Sprintf("0.0.0.0:%s", addr) + resolvedAddr, err := net.ResolveTCPAddr("tcp", address) if err != nil { s.log.Panic("Failed to resolve address", zap.Error(err)) } - listener, err := tls.Listen("tcp", address.String(), otelTlsConfig) + listener, err := net.ListenTCP("tcp", resolvedAddr) if err != nil { s.log.Panic("Failed to start listener", zap.Error(err)) } defer listener.Close() - for { - // Accept incoming TCP connection - conn, err := listener.Accept() - if err != nil { - s.log.Error("Error while accepting connection", err) - err := conn.Close() - if err != nil { - return - } - continue - } - time.Sleep(time.Second * 5) + s.log.Info("Listening on", zap.String("address", resolvedAddr.String())) + + // Serve the gRPC server on the TCP listener + if err := grpcServer.Serve(listener); err != nil { + s.log.Panic("gRPC server failed", zap.Error(err)) } } diff --git a/src/pkg/rpc/rpc.go b/src/pkg/rpc/rpc.go index b90051ff4..21098b719 100644 --- a/src/pkg/rpc/rpc.go +++ b/src/pkg/rpc/rpc.go @@ -29,7 +29,7 @@ func (p *Point) EstimatePointSize() (size int) { // add the size of all label keys and values for k, v := range p.Labels { - size += (len(k) + len(v)) + size += len(k) + len(v) } return size