Skip to content

Commit

Permalink
[exporter/loadbalancing] Fix panic on a sub-exporter shutdown
Browse files Browse the repository at this point in the history
Fix panic when a sub-exporter is shut down while still handling requests
  • Loading branch information
dmitryax committed Mar 5, 2024
1 parent 0bd3c27 commit 366b03b
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 183 deletions.
22 changes: 22 additions & 0 deletions .chloggen/fix-load-balancing-exp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/loadbalancing

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix panic when a sub-exporter is shut down while still handling requests.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31410]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
51 changes: 19 additions & 32 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,24 @@ var (
errMultipleResolversProvided = errors.New("only one resolver should be specified")
)

var _ loadBalancer = (*loadBalancerImp)(nil)

type componentFactory func(ctx context.Context, endpoint string) (component.Component, error)

type loadBalancer interface {
component.Component
Endpoint(identifier []byte) string
Exporter(endpoint string) (component.Component, error)
}

type loadBalancerImp struct {
type loadBalancer struct {
logger *zap.Logger
host component.Host

res resolver
ring *hashRing

componentFactory componentFactory
exporters map[string]component.Component
exporters map[string]*wrappedExporter

stopped bool
updateLock sync.RWMutex
}

// Create new load balancer
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) {
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancer, error) {
oCfg := cfg.(*Config)

if oCfg.Resolver.DNS != nil && oCfg.Resolver.Static != nil {
Expand Down Expand Up @@ -90,21 +82,21 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
return nil, errNoResolver
}

return &loadBalancerImp{
return &loadBalancer{
logger: params.Logger,
res: res,
componentFactory: factory,
exporters: map[string]component.Component{},
exporters: map[string]*wrappedExporter{},
}, nil
}

func (lb *loadBalancerImp) Start(ctx context.Context, host component.Host) error {
func (lb *loadBalancer) Start(ctx context.Context, host component.Host) error {
lb.res.onChange(lb.onBackendChanges)
lb.host = host
return lb.res.start(ctx)
}

func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
func (lb *loadBalancer) onBackendChanges(resolved []string) {
newRing := newHashRing(resolved)

if !newRing.equal(lb.ring) {
Expand All @@ -122,7 +114,7 @@ func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
}
}

func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []string) {
func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []string) {
for _, endpoint := range endpoints {
endpoint = endpointWithPort(endpoint)

Expand All @@ -132,12 +124,12 @@ func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []
lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
continue
}

if err = exp.Start(ctx, lb.host); err != nil {
we := newWrappedExporter(exp)
if err = we.Start(ctx, lb.host); err != nil {
lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
continue
}
lb.exporters[endpoint] = exp
lb.exporters[endpoint] = we
}
}
}
Expand All @@ -149,7 +141,7 @@ func endpointWithPort(endpoint string) string {
return endpoint
}

func (lb *loadBalancerImp) removeExtraExporters(ctx context.Context, endpoints []string) {
func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []string) {
endpointsWithPort := make([]string, len(endpoints))
for i, e := range endpoints {
endpointsWithPort[i] = endpointWithPort(e)
Expand All @@ -172,29 +164,24 @@ func endpointFound(endpoint string, endpoints []string) bool {
return false
}

func (lb *loadBalancerImp) Shutdown(context.Context) error {
func (lb *loadBalancer) Shutdown(context.Context) error {
lb.stopped = true
return nil
}

func (lb *loadBalancerImp) Endpoint(identifier []byte) string {
lb.updateLock.RLock()
defer lb.updateLock.RUnlock()

return lb.ring.endpointFor(identifier)
}

func (lb *loadBalancerImp) Exporter(endpoint string) (component.Component, error) {
// exporterAndEndpoint returns the exporter and the endpoint for the given identifier.
func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter, string, error) {
// NOTE: make rolling updates of next tier of collectors work. currently, this may cause
// data loss because the latest batches sent to outdated backend will never find their way out.
// for details: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690
lb.updateLock.RLock()
defer lb.updateLock.RUnlock()
endpoint := lb.ring.endpointFor(identifier)
exp, found := lb.exporters[endpointWithPort(endpoint)]
lb.updateLock.RUnlock()
if !found {
// something is really wrong... how come we couldn't find the exporter??
return nil, fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
}

return exp, nil
return exp, endpoint, nil
}
10 changes: 5 additions & 5 deletions exporter/loadbalancingexporter/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) {
require.NoError(t, err)

// test
e := p.Endpoint([]byte{128, 128, 0, 0})
_, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0})

// verify
assert.Equal(t, "", e)
Expand Down Expand Up @@ -376,19 +376,19 @@ func TestFailedExporterInRing(t *testing.T) {

// test
// this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info
_, err = p.Exporter(p.Endpoint([]byte{128, 128, 0, 0}))
_, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0})

// verify
assert.Error(t, err)

// test
// this service name will reach the endpoint-2 -- see the consistent hashing tests for more info
_, err = p.Exporter(p.Endpoint([]byte("get-recommendations-1")))
_, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1"))

// verify
assert.Error(t, err)
}

func newNopMockExporter() component.Component {
return mockComponent{}
func newNopMockExporter() *wrappedExporter {
return newWrappedExporter(mockComponent{})
}
12 changes: 4 additions & 8 deletions exporter/loadbalancingexporter/log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -26,7 +25,7 @@ import (
var _ exporter.Logs = (*logExporterImp)(nil)

type logExporterImp struct {
loadBalancer loadBalancer
loadBalancer *loadBalancer

started bool
shutdownWg sync.WaitGroup
Expand Down Expand Up @@ -87,16 +86,13 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error {
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
exp, err := e.loadBalancer.Exporter(endpoint)
le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:])
if err != nil {
return err
}

le, ok := exp.(exporter.Logs)
if !ok {
return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", exp)
}
le.consumeWG.Add(1)
defer le.consumeWG.Done()

start := time.Now()
err = le.ConsumeLogs(ctx, ld)
Expand Down
79 changes: 65 additions & 14 deletions exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) {
assert.Len(t, sink.AllLogs(), 1)
}

// this test validates that exporter is can concurrently change the endpoints while consuming logs.
func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) {
consumeStarted := make(chan struct{})
consumeDone := make(chan struct{})

// imitate a slow exporter
te := &mockLogsExporter{Component: mockComponent{}}
te.consumelogsfn = func(ctx context.Context, td plog.Logs) error {
close(consumeStarted)
time.Sleep(50 * time.Millisecond)
return te.consumeErr
}
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return te, nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig())
require.NotNil(t, p)
require.NoError(t, err)

endpoints := []string{"endpoint-1"}
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(ctx context.Context) ([]string, error) {
return endpoints, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

go func() {
assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs()))
close(consumeDone)
}()

// update endpoint while consuming logs
<-consumeStarted
endpoints = []string{"endpoint-2"}
endpoint, err := lb.res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, endpoints, endpoint)
<-consumeDone
}

func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331")

Expand Down Expand Up @@ -360,19 +412,17 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {

counter1 := &atomic.Int64{}
counter2 := &atomic.Int64{}
defaultExporters := map[string]component.Component{
"127.0.0.1:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
defaultExporters := map[string]*wrappedExporter{
"127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
counter1.Add(1)
// simulate an unreachable backend
time.Sleep(10 * time.Second)
return nil
},
),
"127.0.0.2:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
})),
"127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
counter2.Add(1)
return nil
},
),
})),
}

// test
Expand Down Expand Up @@ -458,15 +508,21 @@ func simpleLogWithoutID() plog.Logs {
type mockLogsExporter struct {
component.Component
consumelogsfn func(ctx context.Context, ld plog.Logs) error
consumeErr error
}

func (e *mockLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *mockLogsExporter) Shutdown(context.Context) error {
e.consumeErr = errors.New("exporter is shut down")
return nil
}

func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if e.consumelogsfn == nil {
return nil
return e.consumeErr
}
return e.consumelogsfn(ctx, ld)
}
Expand All @@ -484,10 +540,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e
}

func newNopMockLogsExporter() exporter.Logs {
return &mockLogsExporter{
Component: mockComponent{},
consumelogsfn: func(ctx context.Context, ld plog.Logs) error {
return nil
},
}
return &mockLogsExporter{Component: mockComponent{}}
}
Loading

0 comments on commit 366b03b

Please sign in to comment.