From 8da5299621bd6b2de185a1e06c93289a96b1af25 Mon Sep 17 00:00:00 2001 From: Kacper Stasik Date: Wed, 17 Feb 2021 21:19:23 +0100 Subject: [PATCH] fix: grpc reconnection (#1521) * fix: grpc reconnection fixed * chore: changelog update * fix: grpc reconnection issue - red test * fix: grpc reconnection #1524 * fix: grpc reconnection issue cleanup --- CHANGELOG.md | 4 ++ exporters/otlp/otlpgrpc/connection.go | 4 +- .../otlp/otlpgrpc/otlp_integration_test.go | 66 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0b56d1c05d..077abba159d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Create new modules: otel/metric, otel/trace, otel/oteltest, otel/sdk/export/metric, otel/sdk/metric (#1528) - Move metric-related public global APIs from otel to otel/metric/global. (#1528) +## Fixed + +- Fixed otlpgrpc reconnection issue. + ## [0.16.0] - 2020-01-13 ### Added diff --git a/exporters/otlp/otlpgrpc/connection.go b/exporters/otlp/otlpgrpc/connection.go index c469d099c3e..5fbd68ffe42 100644 --- a/exporters/otlp/otlpgrpc/connection.go +++ b/exporters/otlp/otlpgrpc/connection.go @@ -66,7 +66,7 @@ func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection { func (c *connection) startConnection(ctx context.Context) { c.stopCh = make(chan struct{}) - c.disconnectedCh = make(chan bool) + c.disconnectedCh = make(chan bool, 1) c.backgroundConnectionDoneCh = make(chan struct{}) if err := c.connect(ctx); err == nil { @@ -155,6 +155,8 @@ func (c *connection) indefiniteBackgroundConnection() { if err := c.connect(context.Background()); err == nil { c.setStateConnected() } else { + // this code is unreachable in most cases + // c.connect does not establish connection c.setStateDisconnected(err) } diff --git a/exporters/otlp/otlpgrpc/otlp_integration_test.go b/exporters/otlp/otlpgrpc/otlp_integration_test.go index fb6272aa5e5..3566fbe4184 100644 --- a/exporters/otlp/otlpgrpc/otlp_integration_test.go +++ b/exporters/otlp/otlpgrpc/otlp_integration_test.go @@ -142,6 +142,72 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { } } +func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { + mc := runMockCollector(t) + + reconnectionPeriod := 2 * time.Second // 2 second + jitter rest time after reconnection + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint, + otlpgrpc.WithReconnectionPeriod(reconnectionPeriod)) + defer func() { + _ = exp.Shutdown(ctx) + }() + + // We'll now stop the collector right away to simulate a connection + // dying in the midst of communication or even not existing before. + _ = mc.stop() + + // first export, it will send disconnected message to the channel on export failure, + // trigger almost immediate reconnection + require.Error( + t, + exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}), + "transport: Error while dialing dial tcp %s: connect: connection refused", + mc.endpoint, + ) + + // give it time for first reconnection + <-time.After(time.Millisecond * 20) + + // second export, it will detect connection issue, change state of exporter to disconnected and + // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) + require.Error( + t, + exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}), + "transport: Error while dialing dial tcp %s: connect: connection refused2", + mc.endpoint, + ) + + // as a result we have exporter in disconnected state waiting for disconnection message to reconnect + + // resurrect collector + nmc := runMockCollectorAtEndpoint(t, mc.endpoint) + + // make sure reconnection loop hits beginning and goes back to waiting mode + // after hitting beginning of the loop it should reconnect + <-time.After(time.Second * 4) + + n := 10 + for i := 0; i < n; i++ { + // when disconnected exp.ExportSpans doesnt send disconnected messages again + // it just quits and return last connection error + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}})) + } + + nmaSpans := nmc.getSpans() + + // Expecting 10 SpanSnapshots that were sampled, given that + if g, w := len(nmaSpans), n; g != w { + t.Fatalf("Connected collector: spans: got %d want %d", g, w) + } + + dSpans := mc.getSpans() + // Expecting 0 spans to have been received by the original but now dead collector + if g, w := len(dSpans), 0; g != w { + t.Fatalf("Disconnected collector: spans: got %d want %d", g, w) + } +} + func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { mc := runMockCollector(t)