Skip to content

Commit

Permalink
feat(lb): Introduce the ability to load balance on composite keys in lb
Browse files Browse the repository at this point in the history
Right now, there's a problem at high throughput using the load balancer
and the `service.name` resource attribute: The load balancers themself
get slow. While it's possible to vertically scale them to a point (e.g.
about 100k req/sec), as they get slow they star tot back up traffic and
block on requests. Applications then can't write as many spans out, and
start dropping spans.

This commit seeks to address that by extending the load balancing
collector to allow create a composite from attributes that can still
keep the load balancing decision "consistent enough" to reduce
cardinallity, but still spread the load across ${N} collectors.

It doesn't make too many assumptions about how the operators will use
this, except that the underlying data (the spans) are unlikely to be
complete in all cases, and the key generation is "best effort". This is
a deviation from the existing design, in which hard-requires
"span.name".

== Design Notes
=== Contributor Skill

As a contributor, I'm very much new to the opentelemetry collector, and
do not anticipate I will be contributing much except for as needs
require to tune the collectors that I am responsible for. Given this,
the code may violate certain assumptions that are otherwise "well
known".

=== Required Knowledge

The biggest surprise in this code was that despite accepting a slice,
the routingIdentifierFromTraces function assumes spans have been
processed with the batchpersignal.SplitTraces() function, which appears
to ensure taht each "trace" contains only a single span (thus allowing
them to be multiplexed effectively)

This allows the function to be simplified quite substantially.

=== Use case

The primary use case I am thinking about when writing this work is
calculating metrics in the spanmetricsconnector component. Essentially,
services drive far too much traffic for a single collector instance to
handle, so we need to multiplex it in a way that still allows them to be
calculated in a single place (limiting cardinality) but also, spreads
the load across ${N} collectors.

=== Traces only implementation

This commit addreses this only for traces, as I only care about traces.
The logic can likely be extended easily, however.
  • Loading branch information
andrewhowdencom committed Sep 11, 2024
1 parent 0fb57cb commit 9fb7317
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 21 deletions.
4 changes: 3 additions & 1 deletion exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* This resolver currently returns a maximum of 100 hosts.
* `TODO`: Feature request [29771](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29771) aims to cover the pagination for this scenario
* The `routing_key` property is used to specify how to route values (spans or metrics) to exporters based on different parameters. This functionality is currently enabled only for `trace` and `metric` pipeline types. It supports one of the following values:
* `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate. In addition to resource / span attributes, `span.kind`, `span.name` (the top level properties of a span) are also supported.
* `attributes`: Routes based on values in the attributes of the traces. This is similar to service, but useful for situations in which a single service overwhelms any given instance of the collector, and should be split over multiple collectors.
* `traceID`: Routes spans based on their `traceID`. Invalid for metrics.
* `metric`: Routes metrics based on their metric name. Invalid for spans.
* `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data
* The `routing_attributes` property is used to list the attributes that should be used if the `routing_key` is `attributes`.

Simple example

Expand Down
16 changes: 13 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
metricNameRouting
resourceRouting
streamIDRouting
attrRouting
)

const (
Expand All @@ -26,13 +27,22 @@ const (
metricNameRoutingStr = "metric"
resourceRoutingStr = "resource"
streamIDRoutingStr = "streamID"
attrRoutingStr = "attributes"
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`

// RoutingKey is a single routing key value
RoutingKey string `mapstructure:"routing_key"`

// RoutingAttributes creates a composite routing key, based on several resource attributes of the application.
//
// Supports all attributes available (both resource and span), as well as the pseudo attributes "span.kind" and
// "span.name".
RoutingAttributes []string `mapstructure:"routing_attributes"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
95 changes: 84 additions & 11 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -22,13 +23,19 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
)

const (
pseudoAttrSpanName = "span.name"
pseudoAttrSpanKind = "span.kind"
)

var _ exporter.Traces = (*traceExporterImp)(nil)

type exporterTraces map[*wrappedExporter]ptrace.Traces

type traceExporterImp struct {
loadBalancer *loadBalancer
routingKey routingKey
routingAttrs []string

stopped bool
shutdownWg sync.WaitGroup
Expand Down Expand Up @@ -62,6 +69,9 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx
switch cfg.(*Config).RoutingKey {
case svcRoutingStr:
traceExporter.routingKey = svcRouting
case attrRoutingStr:
traceExporter.routingKey = attrRouting
traceExporter.routingAttrs = cfg.(*Config).RoutingAttributes
case traceIDRoutingStr, "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand Down Expand Up @@ -96,7 +106,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
exporterSegregatedTraces := make(exporterTraces)
endpoints := make(map[*wrappedExporter]string)
for _, batch := range batches {
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey)
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey, e.routingAttrs)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,7 +147,15 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
return errs
}

func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
// routingIdentifiersFromTraces reads the traces and determines an identifier that can be used to define a position on the
// ring hash. It takes the routingKey, defining what type of routing should be used, and a series of attributes
// (optionally) used if the routingKey is attrRouting.
//
// only svcRouting and attrRouting are supported. For attrRouting, any attribute, as well the "psuedo" attributes span.name

Check failure on line 154 in exporter/loadbalancingexporter/trace_exporter.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, exporter-2)

"psuedo" is a misspelling of "pseudo"

Check failure on line 154 in exporter/loadbalancingexporter/trace_exporter.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, exporter-2)

"psuedo" is a misspelling of "pseudo"
// and span.kind are supported.
//
// In practice, makes the assumption that ptrace.Traces includes only one trace of each kind, in the "trace tree".
func routingIdentifiersFromTraces(td ptrace.Traces, rType routingKey, attrs []string) (map[string]bool, error) {
ids := make(map[string]bool)
rs := td.ResourceSpans()
if rs.Len() == 0 {
Expand All @@ -153,18 +171,73 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]
if spans.Len() == 0 {
return nil, errors.New("empty spans")
}
// Determine how the key should be populated.
switch rType {
case traceIDRouting:
// The simple case is the TraceID routing. In this case, we just use the string representation of the Trace ID.
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true

if key == svcRouting {
for i := 0; i < rs.Len(); i++ {
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
if !ok {
return nil, errors.New("unable to get service name")
return ids, nil
case svcRouting:
// Service Name is still handled as an "attribute router", it's just expressed as a "pseudo attribute"
attrs = []string{"service.name"}
case attrRouting:
// By default, we'll just use the input provided.
break
default:
return nil, fmt.Errorf("unsupported routing_key: %d", rType)
}

// Composite the attributes together as a key.
for i := 0; i < rs.Len(); i++ {
// rKey will never return an error. See
// 1. https://pkg.go.dev/bytes#Buffer.Write
// 2. https://stackoverflow.com/a/70388629
var rKey strings.Builder

for _, a := range attrs {

// resource spans
rAttr, ok := rs.At(i).Resource().Attributes().Get(a)
if ok {
rKey.WriteString(rAttr.Str())
continue
}

// ils or "instrumentation library spans"
ils := rs.At(0).ScopeSpans()
iAttr, ok := ils.At(0).Scope().Attributes().Get(a)
if ok {
rKey.WriteString(iAttr.Str())
continue
}

// the lowest level span (or what engineers regularly interact with)
spans := ils.At(0).Spans()

if a == pseudoAttrSpanKind {
rKey.WriteString(spans.At(0).Kind().String())

continue
}

if a == pseudoAttrSpanName {
rKey.WriteString(spans.At(0).Name())

continue
}

sAttr, ok := spans.At(0).Attributes().Get(a)
if ok {
rKey.WriteString(sAttr.Str())
continue
}
ids[svc.Str()] = true
}
return ids, nil

// No matter what, there will be a key here (even if that key is "").
ids[rKey.String()] = true
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true

return ids, nil
}
Loading

0 comments on commit 9fb7317

Please sign in to comment.