Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry Integration: Refactoring, support configuring OTLP headers #460

Merged
merged 3 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package config

import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type Config struct {
Expand Down Expand Up @@ -183,7 +186,10 @@ type ServerConfig struct {
FilterQueryText string `ini:"filter_query_text"` // none/unparsable (defaults to "unparsable")

// Configuration for OpenTelemetry trace exports
OtelExporterOtlpEndpoint string `ini:"otel_exporter_otlp_endpoint"` // See https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/protocol/exporter.md
//
// See https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/protocol/exporter.md
OtelExporterOtlpEndpoint string `ini:"otel_exporter_otlp_endpoint"`
OtelExporterOtlpHeaders string `ini:"otel_exporter_otlp_headers"`

// HTTP proxy overrides
HTTPProxy string `ini:"http_proxy"`
Expand All @@ -193,6 +199,10 @@ type ServerConfig struct {
// HTTP clients to be used for API connections
HTTPClient *http.Client
HTTPClientWithRetry *http.Client

// OpenTelemetry tracing provider, if enabled
OTelTracingProvider *sdktrace.TracerProvider
OTelTracingProviderShutdownFunc func(context.Context) error
}

// SupportsLogDownload - Determines whether the specified config can download logs
Expand Down
89 changes: 89 additions & 0 deletions config/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"time"

"github.com/go-ini/ini"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"golang.org/x/net/http/httpproxy"

"github.com/pganalyze/collector/util"
Expand Down Expand Up @@ -257,6 +263,9 @@ func getDefaultConfig() *ServerConfig {
if otelExporterOtlpEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); otelExporterOtlpEndpoint != "" {
config.OtelExporterOtlpEndpoint = otelExporterOtlpEndpoint
}
if otelExporterOtlpHeaders := os.Getenv("OTEL_EXPORTER_OTLP_HEADERS"); otelExporterOtlpHeaders != "" {
config.OtelExporterOtlpHeaders = otelExporterOtlpHeaders
}
if httpProxy := os.Getenv("HTTP_PROXY"); httpProxy != "" {
config.HTTPProxy = httpProxy
}
Expand Down Expand Up @@ -351,6 +360,86 @@ func CreateEC2IMDSHTTPClient(conf ServerConfig) *http.Client {
}
}

const otelServiceName = "Postgres (pganalyze)"

func CreateOTelTracingProvider(ctx context.Context, conf ServerConfig) (*sdktrace.TracerProvider, func(context.Context) error, error) {
res, err := sdkresource.New(ctx,
sdkresource.WithAttributes(
semconv.ServiceName(otelServiceName),
),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create resource: %w", err)
}

u, err := url.Parse(conf.OtelExporterOtlpEndpoint)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse endpoint URL: %w", err)
}
scheme := strings.ToLower(u.Scheme)

var traceExporter *otlptrace.Exporter
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

var headers map[string]string

if conf.OtelExporterOtlpHeaders != "" {
headers = make(map[string]string)
for _, h := range strings.Split(conf.OtelExporterOtlpHeaders, ",") {
nameEscaped, valueEscaped, found := strings.Cut(h, "=")
if !found {
return nil, nil, fmt.Errorf("unsupported header setting: missing '='")
}
name, err := url.QueryUnescape(nameEscaped)
if err != nil {
return nil, nil, fmt.Errorf("unsupported header setting, could not unescape header name: %s", err)
}
value, err := url.QueryUnescape(valueEscaped)
if err != nil {
return nil, nil, fmt.Errorf("unsupported header setting, could not unescape header value: %s", err)
}
headers[strings.TrimSpace(name)] = strings.TrimSpace(value)
}
}

switch scheme {
case "http", "https":
opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(u.Host)}
if scheme == "http" {
opts = append(opts, otlptracehttp.WithInsecure())
}
if headers != nil {
opts = append(opts, otlptracehttp.WithHeaders(headers))
}
traceExporter, err = otlptracehttp.New(ctx, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err)
}
case "grpc":
// For now we always require TLS for gRPC connections
opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(u.Host)}
if headers != nil {
opts = append(opts, otlptracegrpc.WithHeaders(headers))
}
traceExporter, err = otlptracegrpc.New(ctx, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create GRPC trace exporter: %w", err)
}
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", u.Scheme)
}

bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)

return tracerProvider, tracerProvider.Shutdown, nil
}

func writeValueToTempfile(value string) (string, error) {
file, err := ioutil.TempFile("", "")
if err != nil {
Expand Down
97 changes: 16 additions & 81 deletions logs/querysample/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"net/url"
"strings"
"time"

"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
)

const otelServiceName = "Postgres (pganalyze)"
const otelSpanName = "EXPLAIN Plan"

func urlToSample(server *state.Server, grant state.GrantLogs, sample state.PostgresQuerySample) string {
Expand All @@ -40,77 +32,8 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg
)
}

func initProvider(ctx context.Context, endpoint string) (*sdktrace.TracerProvider, func(context.Context) error, error) {
res, err := sdkresource.New(ctx,
sdkresource.WithAttributes(
semconv.ServiceName(otelServiceName),
),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create resource: %w", err)
}

url, err := url.Parse(endpoint)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse endpoint URL: %w", err)
}
scheme := strings.ToLower(url.Scheme)

var traceExporter *otlptrace.Exporter
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

switch scheme {
case "http", "https":
opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(url.Host)}
if scheme == "http" {
opts = append(opts, otlptracehttp.WithInsecure())
}
traceExporter, err = otlptracehttp.New(ctx, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err)
}
case "grpc":
// For now we always require TLS for gRPC connections
opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(url.Host)}
traceExporter, err = otlptracegrpc.New(ctx, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create HTTP trace exporter: %w", err)
}
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", url.Scheme)
}

bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)

return tracerProvider, tracerProvider.Shutdown, nil
}

func ReportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) {
endpoint := server.Config.OtelExporterOtlpEndpoint
if endpoint == "" {
return
}

// TODO: Initialize the provider once instead of each time we need to send.
// When we fix this we likely need to explicitly flush all spans at the end
// of this function (currently done by shutdown).
tracerProvider, shutdown, err := initProvider(ctx, server.Config.OtelExporterOtlpEndpoint)
if err != nil {
logger.PrintError("Failed to initialize OpenTelemetry tracing provider: %s", err)
return
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.PrintError("Failed to shutdown OpenTelemetry tracing provider: %s", err)
}
}()

func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) {
exportCount := 0
for _, sample := range samples {
if !sample.HasExplain {
// Skip samples without an EXPLAIN plan for now
Expand All @@ -121,7 +44,7 @@ func ReportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l
prop := propagation.TraceContext{}
ctx := prop.Extract(context.Background(), propagation.MapCarrier(queryTags))

tracer := tracerProvider.Tracer(
tracer := server.Config.OTelTracingProvider.Tracer(
"",
trace.WithInstrumentationVersion(util.CollectorVersion),
trace.WithSchemaURL(semconv.SchemaURL),
Expand All @@ -130,8 +53,20 @@ func ReportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l
startTime := sample.OccurredAt.Add(duration)
endTime := sample.OccurredAt
_, span := tracer.Start(ctx, otelSpanName, trace.WithTimestamp(startTime))
span.SetAttributes(attribute.String("url.full", urlToSample(server, grant, sample)))
// See https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/
// however note that "db.postgresql.plan" is non-standard.
span.SetAttributes(attribute.String("db.system", "postgresql"))
span.SetAttributes(attribute.String("db.postgresql.plan", urlToSample(server, grant, sample)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really follow this change. Neither url.full nor db.postgresql.plan are mentioned in the URL cited from a quick skim, and I don't understand what "plan" means in this context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that there isn't really a proper place to put information like this - my initial thinking was that url.full may be a way to get tracing platforms to actually turn the attribute value into a clickable link, but that does not appear to be the case. It appears url.full is intended to represent the URL that an HTTP call from an application goes to, so not really the same as this use case.

I only today discovered that there was the semantic convention for database calls (and how to represent them in spans), and thus the addition of db.system. I then thought "how could we represent a query plan within this structure", and since there is some existing cases where there is database-system specific data (https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/#call-level-attributes-for-specific-technologies) this seemed like the best name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for clarifying.

span.End(trace.WithTimestamp(endTime))
exportCount += 1
}
}

if exportCount > 0 {
err := server.Config.OTelTracingProvider.ForceFlush(ctx)
if err != nil {
logger.PrintError("Failed to export OpenTelemetry data: %s", err)
}
logger.PrintVerbose("Exported %d tracing spans to OpenTelemetry endpoint at %s", exportCount, server.Config.OtelExporterOtlpEndpoint)
}
}
33 changes: 26 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
_ "github.com/lib/pq" // Enable database package to use Postgres
)

func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, configFilename string) (keepRunning bool, testRunSuccess chan bool, writeStateFile func()) {
func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, configFilename string) (keepRunning bool, testRunSuccess chan bool, writeStateFile func(), shutdown func()) {
var servers []*state.Server

keepRunning = false
Expand All @@ -54,12 +54,29 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col
return
}

for idx, server := range conf.Servers {
prefixedLogger := logger.WithPrefix(server.SectionName)
prefixedLogger.PrintVerbose("Identified as api_system_type: %s, api_system_scope: %s, api_system_id: %s", server.SystemType, server.SystemScope, server.SystemID)
for idx, cfg := range conf.Servers {
prefixedLogger := logger.WithPrefix(cfg.SectionName)
prefixedLogger.PrintVerbose("Identified as api_system_type: %s, api_system_scope: %s, api_system_id: %s", cfg.SystemType, cfg.SystemScope, cfg.SystemID)

conf.Servers[idx].HTTPClient = config.CreateHTTPClient(server, prefixedLogger, false)
conf.Servers[idx].HTTPClientWithRetry = config.CreateHTTPClient(server, prefixedLogger, true)
conf.Servers[idx].HTTPClient = config.CreateHTTPClient(cfg, prefixedLogger, false)
conf.Servers[idx].HTTPClientWithRetry = config.CreateHTTPClient(cfg, prefixedLogger, true)
if cfg.OtelExporterOtlpEndpoint != "" {
conf.Servers[idx].OTelTracingProvider, conf.Servers[idx].OTelTracingProviderShutdownFunc, err = config.CreateOTelTracingProvider(ctx, cfg)
if err != nil {
logger.PrintError("Failed to initialize OpenTelemetry tracing provider, disabling exports: %s", err)
}
}
}

shutdown = func() {
for _, cfg := range conf.Servers {
if cfg.OTelTracingProviderShutdownFunc == nil {
continue
}
if err := cfg.OTelTracingProviderShutdownFunc(ctx); err != nil {
logger.PrintError("Failed to shutdown OpenTelemetry tracing provider: %s", err)
}
}
}

// Avoid even running the scheduler when we already know its not needed
Expand Down Expand Up @@ -464,7 +481,7 @@ ReadConfigAndRun:
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
exitCode := 0
keepRunning, testRunSuccess, writeStateFile := run(ctx, &wg, globalCollectionOpts, logger, configFilename)
keepRunning, testRunSuccess, writeStateFile, shutdown := run(ctx, &wg, globalCollectionOpts, logger, configFilename)

if keepRunning {
// Block here until we get any of the registered signals
Expand All @@ -484,6 +501,7 @@ ReadConfigAndRun:
}
}
logger.PrintInfo("Reloading configuration...")
shutdown()
cancel()
wg.Wait()
writeStateFile()
Expand Down Expand Up @@ -522,6 +540,7 @@ ReadConfigAndRun:
}
}

shutdown()
cancel()
wg.Wait()

Expand Down
4 changes: 3 additions & 1 deletion runner/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ func postprocessAndSendLogs(ctx context.Context, server *state.Server, globalCol
}

// Export query samples as traces, if OpenTelemetry endpoint is configured
querysample.ReportQuerySamplesAsTraceSpans(ctx, server, logger, grant, transientLogState.QuerySamples)
if server.Config.OTelTracingProvider != nil && len(transientLogState.QuerySamples) != 0 {
querysample.ExportQuerySamplesAsTraceSpans(ctx, server, logger, grant, transientLogState.QuerySamples)
}

for idx := range transientLogState.LogFiles {
// The actual filtering (aka masking of secrets) is done later in
Expand Down
Loading