Skip to content

Commit

Permalink
[exporter/clickhouseexporter] Sort attribute maps before insertion (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
earwin committed Oct 16, 2024
1 parent 0986213 commit 636bdaf
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 121 deletions.
27 changes: 27 additions & 0 deletions .chloggen/clickhouseexporter-ordered-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: clickhouseexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Exporter now sorts attribute maps' keys during INSERT, yielding better compression and predictable aggregates"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33634]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 4 additions & 13 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)

Expand Down Expand Up @@ -77,7 +77,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
logs := ld.ResourceLogs().At(i)
res := logs.Resource()
resURL := logs.SchemaUrl()
resAttr := attributesToMap(res.Attributes())
resAttr := internal.AttributesToMap(res.Attributes())
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
Expand All @@ -87,7 +87,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
scopeURL := logs.ScopeLogs().At(j).SchemaUrl()
scopeName := logs.ScopeLogs().At(j).Scope().Name()
scopeVersion := logs.ScopeLogs().At(j).Scope().Version()
scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())
scopeAttr := internal.AttributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())

for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
Expand All @@ -97,7 +97,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
timestamp = r.ObservedTimestamp()
}

logAttr := attributesToMap(r.Attributes())
logAttr := internal.AttributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,
timestamp.AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
Expand Down Expand Up @@ -129,15 +129,6 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
return err
}

func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
attributes.Range(func(k string, v pcommon.Value) bool {
m[k] = v.AsString()
return true
})
return m
}

const (
// language=ClickHouse SQL
createLogsTableSQL = `
Expand Down
9 changes: 5 additions & 4 deletions exporter/clickhouseexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/column/orderedmap"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -94,9 +95,9 @@ func TestExporter_pushLogsData(t *testing.T) {
initClickhouseTestServer(t, func(query string, values []driver.Value) error {
if strings.HasPrefix(query, "INSERT") {
require.Equal(t, "https://opentelemetry.io/schemas/1.4.0", values[8])
require.Equal(t, map[string]string{
require.Equal(t, orderedmap.FromMap(map[string]string{
"service.name": "test-service",
}, values[9])
}), values[9])
}
return nil
})
Expand All @@ -109,9 +110,9 @@ func TestExporter_pushLogsData(t *testing.T) {
require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[10])
require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[11])
require.Equal(t, "1.0.0", values[12])
require.Equal(t, map[string]string{
require.Equal(t, orderedmap.FromMap(map[string]string{
"lib": "clickhouse",
}, values[13])
}), values[13])
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhouseexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
metricsMap := internal.NewMetricsModel(e.tablesConfig)
for i := 0; i < md.ResourceMetrics().Len(); i++ {
metrics := md.ResourceMetrics().At(i)
resAttr := attributesToMap(metrics.Resource().Attributes())
resAttr := metrics.Resource().Attributes()
for j := 0; j < metrics.ScopeMetrics().Len(); j++ {
rs := metrics.ScopeMetrics().At(j).Metrics()
scopeInstr := metrics.ScopeMetrics().At(j).Scope()
Expand Down
36 changes: 12 additions & 24 deletions exporter/clickhouseexporter/exporter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)

Expand Down Expand Up @@ -74,18 +76,15 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
for i := 0; i < td.ResourceSpans().Len(); i++ {
spans := td.ResourceSpans().At(i)
res := spans.Resource()
resAttr := attributesToMap(res.Attributes())
var serviceName string
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
resAttr := internal.AttributesToMap(res.Attributes())
serviceName, _ := res.Attributes().Get(conventions.AttributeServiceName)
for j := 0; j < spans.ScopeSpans().Len(); j++ {
rs := spans.ScopeSpans().At(j).Spans()
scopeName := spans.ScopeSpans().At(j).Scope().Name()
scopeVersion := spans.ScopeSpans().At(j).Scope().Version()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
spanAttr := attributesToMap(r.Attributes())
spanAttr := internal.AttributesToMap(r.Attributes())
status := r.Status()
eventTimes, eventNames, eventAttrs := convertEvents(r.Events())
linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links())
Expand All @@ -97,7 +96,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
r.TraceState().AsRaw(),
r.Name(),
r.Kind().String(),
serviceName,
serviceName.AsString(),
resAttr,
scopeName,
scopeVersion,
Expand Down Expand Up @@ -127,36 +126,25 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
return err
}

func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) {
var (
times []time.Time
names []string
attrs []map[string]string
)
func convertEvents(events ptrace.SpanEventSlice) (times []time.Time, names []string, attrs []column.IterableOrderedMap) {
for i := 0; i < events.Len(); i++ {
event := events.At(i)
times = append(times, event.Timestamp().AsTime())
names = append(names, event.Name())
attrs = append(attrs, attributesToMap(event.Attributes()))
attrs = append(attrs, internal.AttributesToMap(event.Attributes()))
}
return times, names, attrs
return
}

func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) {
var (
traceIDs []string
spanIDs []string
states []string
attrs []map[string]string
)
func convertLinks(links ptrace.SpanLinkSlice) (traceIDs []string, spanIDs []string, states []string, attrs []column.IterableOrderedMap) {
for i := 0; i < links.Len(); i++ {
link := links.At(i)
traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID()))
spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID()))
states = append(states, link.TraceState().AsRaw())
attrs = append(attrs, attributesToMap(link.Attributes()))
attrs = append(attrs, internal.AttributesToMap(link.Attributes()))
}
return traceIDs, spanIDs, states, attrs
return
}

const (
Expand Down
8 changes: 4 additions & 4 deletions exporter/clickhouseexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickh
go 1.22.0

require (
github.com/ClickHouse/clickhouse-go/v2 v2.29.0
github.com/ClickHouse/clickhouse-go/v2 v2.29.1-0.20241016123550-43d09d87da1f
github.com/cenkalti/backoff/v4 v4.3.0
github.com/jmoiron/sqlx v1.4.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
Expand All @@ -25,7 +25,7 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
Expand Down Expand Up @@ -98,8 +98,8 @@ require (
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
Expand Down
22 changes: 12 additions & 10 deletions exporter/clickhouseexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -130,27 +130,24 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error {
}()

for _, model := range e.expHistogramModels {
var serviceName string
if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok {
serviceName = v
}
serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName)

for i := 0; i < model.expHistogram.DataPoints().Len(); i++ {
dp := model.expHistogram.DataPoints().At(i)
attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars())
_, err = statement.ExecContext(ctx,
model.metadata.ResAttr,
AttributesToMap(model.metadata.ResAttr),
model.metadata.ResURL,
model.metadata.ScopeInstr.Name(),
model.metadata.ScopeInstr.Version(),
attributesToMap(model.metadata.ScopeInstr.Attributes()),
AttributesToMap(model.metadata.ScopeInstr.Attributes()),
model.metadata.ScopeInstr.DroppedAttributesCount(),
model.metadata.ScopeURL,
serviceName,
serviceName.AsString(),
model.metricName,
model.metricDescription,
model.metricUnit,
attributesToMap(dp.Attributes()),
AttributesToMap(dp.Attributes()),
dp.StartTimestamp().AsTime(),
dp.Timestamp().AsTime(),
dp.Count(),
Expand Down Expand Up @@ -190,7 +187,7 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (e *expHistogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
func (e *expHistogramMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
expHistogram, ok := metrics.(pmetric.ExponentialHistogram)
if !ok {
return fmt.Errorf("metrics param is not type of ExponentialHistogram")
Expand Down
15 changes: 6 additions & 9 deletions exporter/clickhouseexporter/internal/gauge_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,24 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error {
}()

for _, model := range g.gaugeModels {
var serviceName string
if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok {
serviceName = v
}
serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName)

for i := 0; i < model.gauge.DataPoints().Len(); i++ {
dp := model.gauge.DataPoints().At(i)
attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars())
_, err = statement.ExecContext(ctx,
model.metadata.ResAttr,
AttributesToMap(model.metadata.ResAttr),
model.metadata.ResURL,
model.metadata.ScopeInstr.Name(),
model.metadata.ScopeInstr.Version(),
attributesToMap(model.metadata.ScopeInstr.Attributes()),
AttributesToMap(model.metadata.ScopeInstr.Attributes()),
model.metadata.ScopeInstr.DroppedAttributesCount(),
model.metadata.ScopeURL,
serviceName,
serviceName.AsString(),
model.metricName,
model.metricDescription,
model.metricUnit,
attributesToMap(dp.Attributes()),
AttributesToMap(dp.Attributes()),
dp.StartTimestamp().AsTime(),
dp.Timestamp().AsTime(),
getValue(dp.IntValue(), dp.DoubleValue(), dp.ValueType()),
Expand All @@ -155,7 +152,7 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (g *gaugeMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
func (g *gaugeMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
gauge, ok := metrics.(pmetric.Gauge)
if !ok {
return fmt.Errorf("metrics param is not type of Gauge")
Expand Down
Loading

0 comments on commit 636bdaf

Please sign in to comment.