From 67f7af965978be3e1d763368ef544a6959e397f2 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Wed, 4 Oct 2023 14:06:04 -0700 Subject: [PATCH 1/3] OpenTelemetry integration: Initialize once at startup, flush with snapshot This refactors the logic to only initialize the configured tracing provider once for each server, instead of doing so when tracing spans are being sent. Additionally this adds a new verbose log message that indicates when spans were exported successfully. --- config/config.go | 7 +++ config/read.go | 59 ++++++++++++++++++++++++ logs/querysample/tracing.go | 92 +++++-------------------------------- main.go | 33 ++++++++++--- runner/logs.go | 4 +- 5 files changed, 107 insertions(+), 88 deletions(-) diff --git a/config/config.go b/config/config.go index 0c60a7d32..97b58e6bf 100644 --- a/config/config.go +++ b/config/config.go @@ -1,11 +1,14 @@ package config import ( + "context" "fmt" "net/http" "net/url" "strconv" "strings" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) type Config struct { @@ -193,6 +196,10 @@ type ServerConfig struct { // HTTP clients to be used for API connections HTTPClient *http.Client HTTPClientWithRetry *http.Client + + // OpenTelemetry tracing provider, if enabled + OTelTracingProvider *sdktrace.TracerProvider + OTelTracingProviderShutdownFunc func(context.Context) error } // SupportsLogDownload - Determines whether the specified config can download logs diff --git a/config/read.go b/config/read.go index afe67a143..d65c3ac78 100644 --- a/config/read.go +++ b/config/read.go @@ -15,6 +15,12 @@ import ( "time" "github.com/go-ini/ini" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + sdkresource "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "golang.org/x/net/http/httpproxy" "github.com/pganalyze/collector/util" @@ -351,6 +357,59 @@ func CreateEC2IMDSHTTPClient(conf ServerConfig) *http.Client { } } +const otelServiceName = "Postgres (pganalyze)" + +func CreateOTelTracingProvider(ctx context.Context, conf ServerConfig) (*sdktrace.TracerProvider, func(context.Context) error, error) { + res, err := sdkresource.New(ctx, + sdkresource.WithAttributes( + semconv.ServiceName(otelServiceName), + ), + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to create resource: %w", err) + } + + u, err := url.Parse(conf.OtelExporterOtlpEndpoint) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse endpoint URL: %w", err) + } + scheme := strings.ToLower(u.Scheme) + + var traceExporter *otlptrace.Exporter + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + switch scheme { + case "http", "https": + opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(u.Host)} + if scheme == "http" { + opts = append(opts, otlptracehttp.WithInsecure()) + } + traceExporter, err = otlptracehttp.New(ctx, opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err) + } + case "grpc": + // For now we always require TLS for gRPC connections + opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(u.Host)} + traceExporter, err = otlptracegrpc.New(ctx, opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to create GRPC trace exporter: %w", err) + } + default: + return nil, nil, fmt.Errorf("unsupported protocol: %s", u.Scheme) + } + + bsp := sdktrace.NewBatchSpanProcessor(traceExporter) + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(bsp), + ) + + return tracerProvider, tracerProvider.Shutdown, nil +} + func writeValueToTempfile(value string) (string, error) { file, err := ioutil.TempFile("", "") if err != nil { diff --git a/logs/querysample/tracing.go b/logs/querysample/tracing.go index 2b2212205..b491db873 100644 --- a/logs/querysample/tracing.go +++ b/logs/querysample/tracing.go @@ -5,24 +5,16 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "net/url" - "strings" "time" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" - sdkresource "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "go.opentelemetry.io/otel/trace" ) -const otelServiceName = "Postgres (pganalyze)" const otelSpanName = "EXPLAIN Plan" func urlToSample(server *state.Server, grant state.GrantLogs, sample state.PostgresQuerySample) string { @@ -40,77 +32,8 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg ) } -func initProvider(ctx context.Context, endpoint string) (*sdktrace.TracerProvider, func(context.Context) error, error) { - res, err := sdkresource.New(ctx, - sdkresource.WithAttributes( - semconv.ServiceName(otelServiceName), - ), - ) - if err != nil { - return nil, nil, fmt.Errorf("failed to create resource: %w", err) - } - - url, err := url.Parse(endpoint) - if err != nil { - return nil, nil, fmt.Errorf("failed to parse endpoint URL: %w", err) - } - scheme := strings.ToLower(url.Scheme) - - var traceExporter *otlptrace.Exporter - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - - switch scheme { - case "http", "https": - opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(url.Host)} - if scheme == "http" { - opts = append(opts, otlptracehttp.WithInsecure()) - } - traceExporter, err = otlptracehttp.New(ctx, opts...) - if err != nil { - return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err) - } - case "grpc": - // For now we always require TLS for gRPC connections - opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(url.Host)} - traceExporter, err = otlptracegrpc.New(ctx, opts...) - if err != nil { - return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err) - } - default: - return nil, nil, fmt.Errorf("unsupported protocol: %s", url.Scheme) - } - - bsp := sdktrace.NewBatchSpanProcessor(traceExporter) - tracerProvider := sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), - sdktrace.WithResource(res), - sdktrace.WithSpanProcessor(bsp), - ) - - return tracerProvider, tracerProvider.Shutdown, nil -} - -func ReportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) { - endpoint := server.Config.OtelExporterOtlpEndpoint - if endpoint == "" { - return - } - - // TODO: Initialize the provider once instead of each time we need to send. - // When we fix this we likely need to explicitly flush all spans at the end - // of this function (currently done by shutdown). - tracerProvider, shutdown, err := initProvider(ctx, server.Config.OtelExporterOtlpEndpoint) - if err != nil { - logger.PrintError("Failed to initialize OpenTelemetry tracing provider: %s", err) - return - } - defer func() { - if err := shutdown(ctx); err != nil { - logger.PrintError("Failed to shutdown OpenTelemetry tracing provider: %s", err) - } - }() - +func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) { + exportCount := 0 for _, sample := range samples { if !sample.HasExplain { // Skip samples without an EXPLAIN plan for now @@ -121,7 +44,7 @@ func ReportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l prop := propagation.TraceContext{} ctx := prop.Extract(context.Background(), propagation.MapCarrier(queryTags)) - tracer := tracerProvider.Tracer( + tracer := server.Config.OTelTracingProvider.Tracer( "", trace.WithInstrumentationVersion(util.CollectorVersion), trace.WithSchemaURL(semconv.SchemaURL), @@ -132,6 +55,15 @@ func ReportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l _, span := tracer.Start(ctx, otelSpanName, trace.WithTimestamp(startTime)) span.SetAttributes(attribute.String("url.full", urlToSample(server, grant, sample))) span.End(trace.WithTimestamp(endTime)) + exportCount += 1 + } + } + + if exportCount > 0 { + err := server.Config.OTelTracingProvider.ForceFlush(ctx) + if err != nil { + logger.PrintError("Failed to export OpenTelemetry data: %s", err) } + logger.PrintVerbose("Exported %d tracing spans to OpenTelemetry endpoint at %s", exportCount, server.Config.OtelExporterOtlpEndpoint) } } diff --git a/main.go b/main.go index 11a44b2e1..e351adbcc 100644 --- a/main.go +++ b/main.go @@ -35,7 +35,7 @@ import ( _ "github.com/lib/pq" // Enable database package to use Postgres ) -func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, configFilename string) (keepRunning bool, testRunSuccess chan bool, writeStateFile func()) { +func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, configFilename string) (keepRunning bool, testRunSuccess chan bool, writeStateFile func(), shutdown func()) { var servers []*state.Server keepRunning = false @@ -54,12 +54,29 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col return } - for idx, server := range conf.Servers { - prefixedLogger := logger.WithPrefix(server.SectionName) - prefixedLogger.PrintVerbose("Identified as api_system_type: %s, api_system_scope: %s, api_system_id: %s", server.SystemType, server.SystemScope, server.SystemID) + for idx, cfg := range conf.Servers { + prefixedLogger := logger.WithPrefix(cfg.SectionName) + prefixedLogger.PrintVerbose("Identified as api_system_type: %s, api_system_scope: %s, api_system_id: %s", cfg.SystemType, cfg.SystemScope, cfg.SystemID) - conf.Servers[idx].HTTPClient = config.CreateHTTPClient(server, prefixedLogger, false) - conf.Servers[idx].HTTPClientWithRetry = config.CreateHTTPClient(server, prefixedLogger, true) + conf.Servers[idx].HTTPClient = config.CreateHTTPClient(cfg, prefixedLogger, false) + conf.Servers[idx].HTTPClientWithRetry = config.CreateHTTPClient(cfg, prefixedLogger, true) + if cfg.OtelExporterOtlpEndpoint != "" { + conf.Servers[idx].OTelTracingProvider, conf.Servers[idx].OTelTracingProviderShutdownFunc, err = config.CreateOTelTracingProvider(ctx, cfg) + if err != nil { + logger.PrintError("Failed to initialize OpenTelemetry tracing provider, disabling exports: %s", err) + } + } + } + + shutdown = func() { + for _, cfg := range conf.Servers { + if cfg.OTelTracingProviderShutdownFunc == nil { + continue + } + if err := cfg.OTelTracingProviderShutdownFunc(ctx); err != nil { + logger.PrintError("Failed to shutdown OpenTelemetry tracing provider: %s", err) + } + } } // Avoid even running the scheduler when we already know its not needed @@ -464,7 +481,7 @@ ReadConfigAndRun: ctx, cancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} exitCode := 0 - keepRunning, testRunSuccess, writeStateFile := run(ctx, &wg, globalCollectionOpts, logger, configFilename) + keepRunning, testRunSuccess, writeStateFile, shutdown := run(ctx, &wg, globalCollectionOpts, logger, configFilename) if keepRunning { // Block here until we get any of the registered signals @@ -484,6 +501,7 @@ ReadConfigAndRun: } } logger.PrintInfo("Reloading configuration...") + shutdown() cancel() wg.Wait() writeStateFile() @@ -522,6 +540,7 @@ ReadConfigAndRun: } } + shutdown() cancel() wg.Wait() diff --git a/runner/logs.go b/runner/logs.go index 6f9418b5b..c80ab2e9e 100644 --- a/runner/logs.go +++ b/runner/logs.go @@ -319,7 +319,9 @@ func postprocessAndSendLogs(ctx context.Context, server *state.Server, globalCol } // Export query samples as traces, if OpenTelemetry endpoint is configured - querysample.ReportQuerySamplesAsTraceSpans(ctx, server, logger, grant, transientLogState.QuerySamples) + if server.Config.OTelTracingProvider != nil && len(transientLogState.QuerySamples) != 0 { + querysample.ExportQuerySamplesAsTraceSpans(ctx, server, logger, grant, transientLogState.QuerySamples) + } for idx := range transientLogState.LogFiles { // The actual filtering (aka masking of secrets) is done later in From 629e03dcd743616bcc1291627484dee9b10ed88a Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Wed, 4 Oct 2023 14:08:48 -0700 Subject: [PATCH 2/3] OpenTelemetry integration: Use "db.postgresql.plan" attribute to link plan This changes the span attribute "url.full" to "db.postgresql.plan" to fit better in existing attribute semantics. It also adds the "db.system" attribute set to "postgresql", per the semantic convention for databases: https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/ --- logs/querysample/tracing.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/logs/querysample/tracing.go b/logs/querysample/tracing.go index b491db873..00374f9e7 100644 --- a/logs/querysample/tracing.go +++ b/logs/querysample/tracing.go @@ -53,7 +53,10 @@ func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l startTime := sample.OccurredAt.Add(duration) endTime := sample.OccurredAt _, span := tracer.Start(ctx, otelSpanName, trace.WithTimestamp(startTime)) - span.SetAttributes(attribute.String("url.full", urlToSample(server, grant, sample))) + // See https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/ + // however note that "db.postgresql.plan" is non-standard. + span.SetAttributes(attribute.String("db.system", "postgresql")) + span.SetAttributes(attribute.String("db.postgresql.plan", urlToSample(server, grant, sample))) span.End(trace.WithTimestamp(endTime)) exportCount += 1 } From da3b04d41c62b508d6f5d600e6c8e913e7884aa6 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Wed, 4 Oct 2023 14:10:26 -0700 Subject: [PATCH 3/3] OpenTelemetry integration: Add support for setting OTLP headers This is necessary for using remote OTLP endpoints provided by providers such as Honeycomb and New Relic, which require the API key to be set through the headers mechanism. Adds the otel_exporter_otlp_headers / OTEL_EXPORTER_OTLP_HEADERS config setting, which follows the regular OpenTelemetry settings format: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/protocol/exporter.md --- config/config.go | 5 ++++- config/read.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 97b58e6bf..666c21d06 100644 --- a/config/config.go +++ b/config/config.go @@ -186,7 +186,10 @@ type ServerConfig struct { FilterQueryText string `ini:"filter_query_text"` // none/unparsable (defaults to "unparsable") // Configuration for OpenTelemetry trace exports - OtelExporterOtlpEndpoint string `ini:"otel_exporter_otlp_endpoint"` // See https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/protocol/exporter.md + // + // See https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/protocol/exporter.md + OtelExporterOtlpEndpoint string `ini:"otel_exporter_otlp_endpoint"` + OtelExporterOtlpHeaders string `ini:"otel_exporter_otlp_headers"` // HTTP proxy overrides HTTPProxy string `ini:"http_proxy"` diff --git a/config/read.go b/config/read.go index d65c3ac78..ec34b82e4 100644 --- a/config/read.go +++ b/config/read.go @@ -263,6 +263,9 @@ func getDefaultConfig() *ServerConfig { if otelExporterOtlpEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); otelExporterOtlpEndpoint != "" { config.OtelExporterOtlpEndpoint = otelExporterOtlpEndpoint } + if otelExporterOtlpHeaders := os.Getenv("OTEL_EXPORTER_OTLP_HEADERS"); otelExporterOtlpHeaders != "" { + config.OtelExporterOtlpHeaders = otelExporterOtlpHeaders + } if httpProxy := os.Getenv("HTTP_PROXY"); httpProxy != "" { config.HTTPProxy = httpProxy } @@ -379,12 +382,36 @@ func CreateOTelTracingProvider(ctx context.Context, conf ServerConfig) (*sdktrac ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() + var headers map[string]string + + if conf.OtelExporterOtlpHeaders != "" { + headers = make(map[string]string) + for _, h := range strings.Split(conf.OtelExporterOtlpHeaders, ",") { + nameEscaped, valueEscaped, found := strings.Cut(h, "=") + if !found { + return nil, nil, fmt.Errorf("unsupported header setting: missing '='") + } + name, err := url.QueryUnescape(nameEscaped) + if err != nil { + return nil, nil, fmt.Errorf("unsupported header setting, could not unescape header name: %s", err) + } + value, err := url.QueryUnescape(valueEscaped) + if err != nil { + return nil, nil, fmt.Errorf("unsupported header setting, could not unescape header value: %s", err) + } + headers[strings.TrimSpace(name)] = strings.TrimSpace(value) + } + } + switch scheme { case "http", "https": opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(u.Host)} if scheme == "http" { opts = append(opts, otlptracehttp.WithInsecure()) } + if headers != nil { + opts = append(opts, otlptracehttp.WithHeaders(headers)) + } traceExporter, err = otlptracehttp.New(ctx, opts...) if err != nil { return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err) @@ -392,6 +419,9 @@ func CreateOTelTracingProvider(ctx context.Context, conf ServerConfig) (*sdktrac case "grpc": // For now we always require TLS for gRPC connections opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(u.Host)} + if headers != nil { + opts = append(opts, otlptracegrpc.WithHeaders(headers)) + } traceExporter, err = otlptracegrpc.New(ctx, opts...) if err != nil { return nil, nil, fmt.Errorf("failed to create GRPC trace exporter: %w", err)