From b37fbf6ec4e7436c85f519460956815f23912568 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Thu, 17 Oct 2024 14:10:11 -0400 Subject: [PATCH 1/6] Add retry --- .../trace/exporters/collector_exporter.dart | 35 +++++++++++++----- .../exporters/collector_exporter_test.dart | 36 +++++++++++++++++-- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/lib/src/sdk/trace/exporters/collector_exporter.dart b/lib/src/sdk/trace/exporters/collector_exporter.dart index 69777e70..e2a63321 100644 --- a/lib/src/sdk/trace/exporters/collector_exporter.dart +++ b/lib/src/sdk/trace/exporters/collector_exporter.dart @@ -45,16 +45,33 @@ class CollectorExporter implements sdk.SpanExporter { Uri uri, List spans, ) async { - try { - final body = pb_trace_service.ExportTraceServiceRequest( - resourceSpans: _spansToProtobuf(spans)); - final headers = {'Content-Type': 'application/x-protobuf'} - ..addAll(this.headers); - - await client.post(uri, body: body.writeToBuffer(), headers: headers); - } catch (e) { - _log.warning('Failed to export ${spans.length} spans.', e); + const maxRetries = 3; + const retryDelay = Duration(seconds: 1); + var retries = 0; + while (retries < maxRetries) { + try { + final body = pb_trace_service.ExportTraceServiceRequest( + resourceSpans: _spansToProtobuf(spans)); + final headers = {'Content-Type': 'application/x-protobuf'} + ..addAll(this.headers); + + final response = await client.post(uri, + body: body.writeToBuffer(), headers: headers); + if (response.statusCode == 200) { + return; + } + _log.warning('Failed to export ${spans.length} spans. ' + 'HTTP status code: ${response.statusCode}'); + } catch (e, statckTrace) { + _log.warning('Failed to export ${spans.length} spans.', e, statckTrace); + } + retries++; + if (retries < maxRetries) { + await Future.delayed(retryDelay); + } } + _log.severe( + 'Failed to export ${spans.length} spans after $maxRetries retries'); } /// Group and construct the protobuf equivalent of the given list of [api.Span]s. diff --git a/test/unit/sdk/exporters/collector_exporter_test.dart b/test/unit/sdk/exporters/collector_exporter_test.dart index b118fd2a..242eb98d 100644 --- a/test/unit/sdk/exporters/collector_exporter_test.dart +++ b/test/unit/sdk/exporters/collector_exporter_test.dart @@ -4,6 +4,7 @@ @TestOn('vm') import 'dart:typed_data'; +import 'package:http/http.dart'; import 'package:mocktail/mocktail.dart'; import 'package:logging/logging.dart'; import 'package:opentelemetry/api.dart' as api; @@ -24,6 +25,7 @@ import '../../mocks.dart'; void main() { late MockHttpClient mockClient; + final uri = Uri.parse('https://example.test/s/opentelemetry-collector/v1/traces'); @@ -205,10 +207,11 @@ void main() { verify(() => mockClient.post(uri, body: anything, - headers: {'Content-Type': 'application/x-protobuf'})).called(1); + headers: {'Content-Type': 'application/x-protobuf'})).called(3); - expect(records, hasLength(1)); + expect(records, hasLength(4)); expect(records[0].level, equals(Level.WARNING)); + expect(records[3].level, equals(Level.SEVERE)); }); test('does not send spans when shutdown', () { @@ -294,4 +297,33 @@ void main() { verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) .called(1); }); + + test('client not return 200', () { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + + when(() => mockClient.post(uri, + body: any(named: 'body'), + headers: {'Content-Type': 'application/x-protobuf'})) + .thenAnswer((_) async => Response('Service unAvailable', 403)); + + final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; + sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + + verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) + .called(3); + }); } From d353d0102f306636b6accbd035038b8e7e30d79e Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Thu, 17 Oct 2024 15:19:16 -0400 Subject: [PATCH 2/6] update --- .../trace/exporters/collector_exporter.dart | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/src/sdk/trace/exporters/collector_exporter.dart b/lib/src/sdk/trace/exporters/collector_exporter.dart index e2a63321..b11555d3 100644 --- a/lib/src/sdk/trace/exporters/collector_exporter.dart +++ b/lib/src/sdk/trace/exporters/collector_exporter.dart @@ -29,7 +29,7 @@ class CollectorExporter implements sdk.SpanExporter { : client = httpClient ?? http.Client(); @override - void export(List spans) { + void export(List spans) async { if (_isShutdown) { return; } @@ -38,7 +38,7 @@ class CollectorExporter implements sdk.SpanExporter { return; } - unawaited(_send(uri, spans)); + await _send(uri, spans); } Future _send( @@ -48,13 +48,14 @@ class CollectorExporter implements sdk.SpanExporter { const maxRetries = 3; const retryDelay = Duration(seconds: 1); var retries = 0; - while (retries < maxRetries) { - try { - final body = pb_trace_service.ExportTraceServiceRequest( - resourceSpans: _spansToProtobuf(spans)); - final headers = {'Content-Type': 'application/x-protobuf'} - ..addAll(this.headers); + final body = pb_trace_service.ExportTraceServiceRequest( + resourceSpans: _spansToProtobuf(spans)); + final headers = {'Content-Type': 'application/x-protobuf'} + ..addAll(this.headers); + + while (retries++ < maxRetries) { + try { final response = await client.post(uri, body: body.writeToBuffer(), headers: headers); if (response.statusCode == 200) { @@ -65,10 +66,7 @@ class CollectorExporter implements sdk.SpanExporter { } catch (e, statckTrace) { _log.warning('Failed to export ${spans.length} spans.', e, statckTrace); } - retries++; - if (retries < maxRetries) { - await Future.delayed(retryDelay); - } + await Future.delayed(retryDelay); } _log.severe( 'Failed to export ${spans.length} spans after $maxRetries retries'); From 1ce804ea73bffc49da50345a525efda5832b40b2 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 21 Oct 2024 15:33:19 -0400 Subject: [PATCH 3/6] Fix the test units failures --- .../trace/exporters/collector_exporter.dart | 11 +- .../exporters/collector_exporter_test.dart | 609 +++++++++--------- 2 files changed, 323 insertions(+), 297 deletions(-) diff --git a/lib/src/sdk/trace/exporters/collector_exporter.dart b/lib/src/sdk/trace/exporters/collector_exporter.dart index b11555d3..fb6c3cab 100644 --- a/lib/src/sdk/trace/exporters/collector_exporter.dart +++ b/lib/src/sdk/trace/exporters/collector_exporter.dart @@ -29,7 +29,8 @@ class CollectorExporter implements sdk.SpanExporter { : client = httpClient ?? http.Client(); @override - void export(List spans) async { + Future export(List spans) async { + print('export func => _isShutdown: $_isShutdown'); if (_isShutdown) { return; } @@ -46,7 +47,6 @@ class CollectorExporter implements sdk.SpanExporter { List spans, ) async { const maxRetries = 3; - const retryDelay = Duration(seconds: 1); var retries = 0; final body = pb_trace_service.ExportTraceServiceRequest( @@ -63,10 +63,10 @@ class CollectorExporter implements sdk.SpanExporter { } _log.warning('Failed to export ${spans.length} spans. ' 'HTTP status code: ${response.statusCode}'); - } catch (e, statckTrace) { - _log.warning('Failed to export ${spans.length} spans.', e, statckTrace); + } catch (e) { + _log.warning('Failed to export ${spans.length} spans. $e'); } - await Future.delayed(retryDelay); + //await Future.delayed(Duration(seconds: retries)); } _log.severe( 'Failed to export ${spans.length} spans after $maxRetries retries'); @@ -260,5 +260,6 @@ class CollectorExporter implements sdk.SpanExporter { void shutdown() { _isShutdown = true; client.close(); + print('shutdown() func => _isShutdown: $_isShutdown'); } } diff --git a/test/unit/sdk/exporters/collector_exporter_test.dart b/test/unit/sdk/exporters/collector_exporter_test.dart index 242eb98d..f94c1c7e 100644 --- a/test/unit/sdk/exporters/collector_exporter_test.dart +++ b/test/unit/sdk/exporters/collector_exporter_test.dart @@ -24,306 +24,331 @@ import 'package:test/test.dart'; import '../../mocks.dart'; void main() { - late MockHttpClient mockClient; - final uri = Uri.parse('https://example.test/s/opentelemetry-collector/v1/traces'); - setUp(() { - mockClient = MockHttpClient(); - }); - - tearDown(() { - reset(mockClient); - }); - - test('sends spans', () { - final resource = - sdk.Resource([api.Attribute.fromString('service.name', 'bar')]); - final instrumentationLibrary = sdk.InstrumentationScope( - 'library_name', 'library_version', 'url://schema', []); - final limits = sdk.SpanLimits(maxNumAttributeLength: 5); - final span1 = Span( - 'foo', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - resource, - instrumentationLibrary, - api.SpanKind.client, - [], - sdk.SpanLimits(), - sdk.DateTimeTimeProvider().now) - ..setAttributes([api.Attribute.fromString('foo', 'bar')]) - ..end(); - final span2 = Span( - 'baz', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([10, 11, 12]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - resource, - instrumentationLibrary, - api.SpanKind.internal, - applyLinkLimits([ - api.SpanLink(span1.spanContext, attributes: [ - api.Attribute.fromString('longKey', - 'I am very long with maxNumAttributeLength: 5 limitation!') - ]), - ], limits), - limits, - sdk.DateTimeTimeProvider().now) - ..setAttributes([api.Attribute.fromBoolean('bool', true)]) - ..addEvent('testEvent', - timestamp: sdk.DateTimeTimeProvider().now, - attributes: [api.Attribute.fromString('foo', 'bar')]) - ..end(); - - sdk.CollectorExporter(uri, httpClient: mockClient).export([span1, span2]); - - final expectedBody = - pb_trace_service.ExportTraceServiceRequest(resourceSpans: [ - pb.ResourceSpans( - resource: pb_resource.Resource(attributes: [ - pb_common.KeyValue( - key: 'service.name', - value: pb_common.AnyValue(stringValue: 'bar')) - ]), - scopeSpans: [ - pb.ScopeSpans( - spans: [ - pb.Span( - traceId: [1, 2, 3], - spanId: [7, 8, 9], - traceState: '', - parentSpanId: [4, 5, 6], - name: 'foo', - kind: pb.Span_SpanKind.SPAN_KIND_CLIENT, - startTimeUnixNano: span1.startTime, - endTimeUnixNano: span1.endTime, - attributes: [ - pb_common.KeyValue( - key: 'foo', - value: pb_common.AnyValue(stringValue: 'bar')) - ], - droppedAttributesCount: 0, - status: pb.Status( - code: pb.Status_StatusCode.STATUS_CODE_UNSET, - message: ''), - flags: 0), - pb.Span( - traceId: [1, 2, 3], - spanId: [10, 11, 12], - traceState: '', - parentSpanId: [4, 5, 6], - name: 'baz', - kind: pb.Span_SpanKind.SPAN_KIND_INTERNAL, - startTimeUnixNano: span2.startTime, - endTimeUnixNano: span2.endTime, - attributes: [ - pb_common.KeyValue( - key: 'bool', - value: pb_common.AnyValue(boolValue: true)) - ], - droppedAttributesCount: 0, - events: [ - pb.Span_Event( - timeUnixNano: span2.events.first.timestamp, - name: 'testEvent', - attributes: [ - pb_common.KeyValue( - key: 'foo', - value: pb_common.AnyValue(stringValue: 'bar')) - ], - droppedAttributesCount: 0, - ) - ], - droppedEventsCount: 0, - status: pb.Status( - code: pb.Status_StatusCode.STATUS_CODE_UNSET, - message: ''), - links: [ - pb.Span_Link( - traceId: [1, 2, 3], - spanId: [7, 8, 9], - traceState: '', + group('Send spans with success - ', () { + late MockHttpClient mockClient; + setUp(() { + mockClient = MockHttpClient(); + when(() => mockClient.post(uri, + body: any(named: 'body'), headers: any(named: 'headers'))) + .thenAnswer((_) async => Response('', 200)); + }); + + tearDown(() { + reset(mockClient); + }); + + test('sends spans', () async { + final resource = + sdk.Resource([api.Attribute.fromString('service.name', 'bar')]); + final instrumentationLibrary = sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []); + final limits = sdk.SpanLimits(maxNumAttributeLength: 5); + final span1 = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + resource, + instrumentationLibrary, + api.SpanKind.client, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..setAttributes([api.Attribute.fromString('foo', 'bar')]) + ..end(); + final span2 = Span( + 'baz', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([10, 11, 12]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + resource, + instrumentationLibrary, + api.SpanKind.internal, + applyLinkLimits([ + api.SpanLink(span1.spanContext, attributes: [ + api.Attribute.fromString('longKey', + 'I am very long with maxNumAttributeLength: 5 limitation!') + ]), + ], limits), + limits, + sdk.DateTimeTimeProvider().now) + ..setAttributes([api.Attribute.fromBoolean('bool', true)]) + ..addEvent('testEvent', + timestamp: sdk.DateTimeTimeProvider().now, + attributes: [api.Attribute.fromString('foo', 'bar')]) + ..end(); + + await sdk.CollectorExporter(uri, httpClient: mockClient) + .export([span1, span2]); + + final expectedBody = + pb_trace_service.ExportTraceServiceRequest(resourceSpans: [ + pb.ResourceSpans( + resource: pb_resource.Resource(attributes: [ + pb_common.KeyValue( + key: 'service.name', + value: pb_common.AnyValue(stringValue: 'bar')) + ]), + scopeSpans: [ + pb.ScopeSpans( + spans: [ + pb.Span( + traceId: [1, 2, 3], + spanId: [7, 8, 9], + traceState: '', + parentSpanId: [4, 5, 6], + name: 'foo', + kind: pb.Span_SpanKind.SPAN_KIND_CLIENT, + startTimeUnixNano: span1.startTime, + endTimeUnixNano: span1.endTime, + attributes: [ + pb_common.KeyValue( + key: 'foo', + value: pb_common.AnyValue(stringValue: 'bar')) + ], + droppedAttributesCount: 0, + status: pb.Status( + code: pb.Status_StatusCode.STATUS_CODE_UNSET, + message: ''), + flags: 0), + pb.Span( + traceId: [1, 2, 3], + spanId: [10, 11, 12], + traceState: '', + parentSpanId: [4, 5, 6], + name: 'baz', + kind: pb.Span_SpanKind.SPAN_KIND_INTERNAL, + startTimeUnixNano: span2.startTime, + endTimeUnixNano: span2.endTime, + attributes: [ + pb_common.KeyValue( + key: 'bool', + value: pb_common.AnyValue(boolValue: true)) + ], + droppedAttributesCount: 0, + events: [ + pb.Span_Event( + timeUnixNano: span2.events.first.timestamp, + name: 'testEvent', attributes: [ pb_common.KeyValue( - key: 'longKey', - value: - pb_common.AnyValue(stringValue: 'I am ')) + key: 'foo', + value: pb_common.AnyValue(stringValue: 'bar')) ], droppedAttributesCount: 0, - flags: 0) - ], - droppedLinksCount: 0, - flags: 0) - ], - scope: pb_common.InstrumentationScope( - name: 'library_name', version: 'library_version')) - ]) - ]); - - final verifyResult = verify(() => mockClient.post(uri, - body: captureAny(named: 'body'), - headers: {'Content-Type': 'application/x-protobuf'})) - ..called(1); - final captured = verifyResult.captured; - - final traceRequest = pb_trace_service.ExportTraceServiceRequest.fromBuffer( - captured[0] as Uint8List); - expect(traceRequest, equals(expectedBody)); - }); - - test('shows a warning log when export failed', () { - final span = Span( - 'foo', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - sdk.Resource([]), - sdk.InstrumentationScope( - 'library_name', 'library_version', 'url://schema', []), - api.SpanKind.internal, - [], - sdk.SpanLimits(), - sdk.DateTimeTimeProvider().now) - ..end(); - - when(() => mockClient.post(uri, - body: any(named: 'body'), - headers: {'Content-Type': 'application/x-protobuf'})) - .thenThrow(Exception('Failed to connect')); - - final records = []; - final sub = Logger.root.onRecord.listen(records.add); - sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); - sub.cancel(); - - verify(() => mockClient.post(uri, - body: anything, - headers: {'Content-Type': 'application/x-protobuf'})).called(3); - - expect(records, hasLength(4)); - expect(records[0].level, equals(Level.WARNING)); - expect(records[3].level, equals(Level.SEVERE)); - }); - - test('does not send spans when shutdown', () { - final span = Span( - 'foo', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - sdk.Resource([]), - sdk.InstrumentationScope( - 'library_name', 'library_version', 'url://schema', []), - api.SpanKind.internal, - [], - sdk.SpanLimits(), - sdk.DateTimeTimeProvider().now) - ..end(); - - sdk.CollectorExporter(uri, httpClient: mockClient) - ..shutdown() - ..export([span]); - - verify(() => mockClient.close()).called(1); - verifyNever(() => mockClient.post(uri, - body: anything, headers: {'Content-Type': 'application/x-protobuf'})); - }); - - test('supplies HTTP headers', () { - final span = Span( - 'foo', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - sdk.Resource([]), - sdk.InstrumentationScope( - 'library_name', 'library_version', 'url://schema', []), - api.SpanKind.internal, - [], - sdk.SpanLimits(), - sdk.DateTimeTimeProvider().now) - ..end(); - - final suppliedHeaders = { - 'header-param-key-1': 'header-param-value-1', - 'header-param-key-2': 'header-param-value-2', - }; - final expectedHeaders = { - 'Content-Type': 'application/x-protobuf', - ...suppliedHeaders, - }; - - sdk.CollectorExporter(uri, httpClient: mockClient, headers: suppliedHeaders) - .export([span]); - - verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) - .called(1); + ) + ], + droppedEventsCount: 0, + status: pb.Status( + code: pb.Status_StatusCode.STATUS_CODE_UNSET, + message: ''), + links: [ + pb.Span_Link( + traceId: [1, 2, 3], + spanId: [7, 8, 9], + traceState: '', + attributes: [ + pb_common.KeyValue( + key: 'longKey', + value: pb_common.AnyValue( + stringValue: 'I am ')) + ], + droppedAttributesCount: 0, + flags: 0) + ], + droppedLinksCount: 0, + flags: 0) + ], + scope: pb_common.InstrumentationScope( + name: 'library_name', version: 'library_version')) + ]) + ]); + + final verifyResult = verify(() => mockClient.post(uri, + body: captureAny(named: 'body'), + headers: {'Content-Type': 'application/x-protobuf'})) + ..called(1); + final captured = verifyResult.captured; + + final traceRequest = + pb_trace_service.ExportTraceServiceRequest.fromBuffer( + captured[0] as Uint8List); + expect(traceRequest, equals(expectedBody)); + }); + + test('does not send spans when shutdown', () async { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + final exporter = sdk.CollectorExporter(uri, httpClient: mockClient); + // ignore: cascade_invocations + exporter.shutdown(); + await exporter.export([span]); + + verify(() => mockClient.close()).called(1); + verifyNever(() => mockClient.post(uri, + body: anything, headers: {'Content-Type': 'application/x-protobuf'})); + }); + + test('supplies HTTP headers', () async { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + + final suppliedHeaders = { + 'header-param-key-1': 'header-param-value-1', + 'header-param-key-2': 'header-param-value-2', + }; + final expectedHeaders = { + 'Content-Type': 'application/x-protobuf', + ...suppliedHeaders, + }; + + await sdk.CollectorExporter(uri, + httpClient: mockClient, headers: suppliedHeaders) + .export([span]); + + verify(() => + mockClient.post(uri, body: anything, headers: expectedHeaders)) + .called(1); + }); + + test('does not supply HTTP headers', () async { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + + final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; + + await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + + verify(() => + mockClient.post(uri, body: anything, headers: expectedHeaders)) + .called(1); + }); }); - test('does not supply HTTP headers', () { - final span = Span( - 'foo', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - sdk.Resource([]), - sdk.InstrumentationScope( - 'library_name', 'library_version', 'url://schema', []), - api.SpanKind.internal, - [], - sdk.SpanLimits(), - sdk.DateTimeTimeProvider().now) - ..end(); - - final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; - - sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); - - verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) - .called(1); - }); - - test('client not return 200', () { - final span = Span( - 'foo', - api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), - api.TraceFlags.none, api.TraceState.empty()), - api.SpanId([4, 5, 6]), - [], - sdk.DateTimeTimeProvider(), - sdk.Resource([]), - sdk.InstrumentationScope( - 'library_name', 'library_version', 'url://schema', []), - api.SpanKind.internal, - [], - sdk.SpanLimits(), - sdk.DateTimeTimeProvider().now) - ..end(); - - when(() => mockClient.post(uri, - body: any(named: 'body'), - headers: {'Content-Type': 'application/x-protobuf'})) - .thenAnswer((_) async => Response('Service unAvailable', 403)); - - final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; - sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); - - verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) - .called(3); + group('Send spans with failure - ', () { + late MockHttpClient mockClient; + setUp(() { + mockClient = MockHttpClient(); + when(() => mockClient.post(uri, + body: any(named: 'body'), headers: any(named: 'headers'))) + .thenAnswer((_) async => Response('', 403)); + }); + + tearDown(() { + reset(mockClient); + }); + test('shows a warning log when export failed', () async { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + + when(() => mockClient.post(uri, + body: any(named: 'body'), + headers: {'Content-Type': 'application/x-protobuf'})) + .thenThrow(Exception('Failed to connect')); + + final records = []; + final sub = Logger.root.onRecord.listen(records.add); + await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + await sub.cancel(); + + verify(() => mockClient.post(uri, + body: anything, + headers: {'Content-Type': 'application/x-protobuf'})).called(3); + + expect(records, hasLength(4)); + expect(records[0].level, equals(Level.WARNING)); + expect(records[1].level, equals(Level.WARNING)); + expect(records[2].level, equals(Level.WARNING)); + expect(records[3].level, equals(Level.SEVERE)); + }); + + test('client not return 200', () async { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + + when(() => mockClient.post(uri, + body: any(named: 'body'), + headers: {'Content-Type': 'application/x-protobuf'})) + .thenAnswer((_) async => Response('Service unAvailable', 403)); + + final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; + await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + + verify(() => + mockClient.post(uri, body: anything, headers: expectedHeaders)) + .called(3); + }); }); } From 8830ac4ca3e5bd76bcb6ef121ba68ac07b35fbba Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 21 Oct 2024 15:59:33 -0400 Subject: [PATCH 4/6] update --- .../trace/exporters/collector_exporter.dart | 10 ++++-- .../exporters/collector_exporter_test.dart | 34 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/lib/src/sdk/trace/exporters/collector_exporter.dart b/lib/src/sdk/trace/exporters/collector_exporter.dart index fb6c3cab..5e8bcb75 100644 --- a/lib/src/sdk/trace/exporters/collector_exporter.dart +++ b/lib/src/sdk/trace/exporters/collector_exporter.dart @@ -30,7 +30,6 @@ class CollectorExporter implements sdk.SpanExporter { @override Future export(List spans) async { - print('export func => _isShutdown: $_isShutdown'); if (_isShutdown) { return; } @@ -48,6 +47,7 @@ class CollectorExporter implements sdk.SpanExporter { ) async { const maxRetries = 3; var retries = 0; + const valid_retry_codes = [429, 408, 500, 502, 503, 504]; final body = pb_trace_service.ExportTraceServiceRequest( resourceSpans: _spansToProtobuf(spans)); @@ -61,12 +61,17 @@ class CollectorExporter implements sdk.SpanExporter { if (response.statusCode == 200) { return; } + // If the response is not 200, log a warning _log.warning('Failed to export ${spans.length} spans. ' 'HTTP status code: ${response.statusCode}'); + // If the response is not a valid retry code, do not retry + if (!valid_retry_codes.contains(response.statusCode)) { + return; + } } catch (e) { _log.warning('Failed to export ${spans.length} spans. $e'); } - //await Future.delayed(Duration(seconds: retries)); + await Future.delayed(Duration(seconds: retries)); } _log.severe( 'Failed to export ${spans.length} spans after $maxRetries retries'); @@ -260,6 +265,5 @@ class CollectorExporter implements sdk.SpanExporter { void shutdown() { _isShutdown = true; client.close(); - print('shutdown() func => _isShutdown: $_isShutdown'); } } diff --git a/test/unit/sdk/exporters/collector_exporter_test.dart b/test/unit/sdk/exporters/collector_exporter_test.dart index f94c1c7e..7f2f9e8f 100644 --- a/test/unit/sdk/exporters/collector_exporter_test.dart +++ b/test/unit/sdk/exporters/collector_exporter_test.dart @@ -321,7 +321,7 @@ void main() { expect(records[3].level, equals(Level.SEVERE)); }); - test('client not return 200', () async { + test('client not return 200, retryable', () async { final span = Span( 'foo', api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), @@ -341,7 +341,7 @@ void main() { when(() => mockClient.post(uri, body: any(named: 'body'), headers: {'Content-Type': 'application/x-protobuf'})) - .thenAnswer((_) async => Response('Service unAvailable', 403)); + .thenAnswer((_) async => Response('Service unAvailable', 503)); final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); @@ -350,5 +350,35 @@ void main() { mockClient.post(uri, body: anything, headers: expectedHeaders)) .called(3); }); + + test('client not return 200, nonRetryable', () async { + final span = Span( + 'foo', + api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), + api.TraceFlags.none, api.TraceState.empty()), + api.SpanId([4, 5, 6]), + [], + sdk.DateTimeTimeProvider(), + sdk.Resource([]), + sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + api.SpanKind.internal, + [], + sdk.SpanLimits(), + sdk.DateTimeTimeProvider().now) + ..end(); + + when(() => mockClient.post(uri, + body: any(named: 'body'), + headers: {'Content-Type': 'application/x-protobuf'})) + .thenAnswer((_) async => Response('Service unAvailable', 400)); + + final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; + await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + + verify(() => + mockClient.post(uri, body: anything, headers: expectedHeaders)) + .called(1); + }); }); } From 21d9e916023fba8dfcf1d5126ecdf35897a6f1cd Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Tue, 22 Oct 2024 10:12:31 -0400 Subject: [PATCH 5/6] Update based on comments --- lib/api.dart | 6 ++-- .../trace/exporters/collector_exporter.dart | 20 ++++++++--- .../exporters/collector_exporter_test.dart | 36 +++++++++---------- 3 files changed, 35 insertions(+), 27 deletions(-) diff --git a/lib/api.dart b/lib/api.dart index 4d378def..ff7cdc65 100644 --- a/lib/api.dart +++ b/lib/api.dart @@ -12,10 +12,8 @@ export 'src/api/context/context.dart' contextWithSpanContext, spanContextFromContext, spanFromContext; -export 'src/api/context/context_manager.dart' - show - globalContextManager, - registerGlobalContextManager; +export 'src/api/context/context_manager.dart' + show globalContextManager, registerGlobalContextManager; export 'src/api/exporters/span_exporter.dart' show SpanExporter; export 'src/api/instrumentation_library.dart' show InstrumentationLibrary; export 'src/api/open_telemetry.dart' diff --git a/lib/src/sdk/trace/exporters/collector_exporter.dart b/lib/src/sdk/trace/exporters/collector_exporter.dart index 5e8bcb75..402e63d7 100644 --- a/lib/src/sdk/trace/exporters/collector_exporter.dart +++ b/lib/src/sdk/trace/exporters/collector_exporter.dart @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information import 'dart:async'; +import 'dart:math'; import 'package:fixnum/fixnum.dart'; import 'package:http/http.dart' as http; @@ -29,7 +30,7 @@ class CollectorExporter implements sdk.SpanExporter { : client = httpClient ?? http.Client(); @override - Future export(List spans) async { + void export(List spans) { if (_isShutdown) { return; } @@ -38,7 +39,7 @@ class CollectorExporter implements sdk.SpanExporter { return; } - await _send(uri, spans); + unawaited(_send(uri, spans)); } Future _send( @@ -47,7 +48,8 @@ class CollectorExporter implements sdk.SpanExporter { ) async { const maxRetries = 3; var retries = 0; - const valid_retry_codes = [429, 408, 500, 502, 503, 504]; + // Retryable status from the spec: https://opentelemetry.io/docs/specs/otlp/#failures-1 + const valid_retry_codes = [429, 502, 503, 504]; final body = pb_trace_service.ExportTraceServiceRequest( resourceSpans: _spansToProtobuf(spans)); @@ -70,13 +72,23 @@ class CollectorExporter implements sdk.SpanExporter { } } catch (e) { _log.warning('Failed to export ${spans.length} spans. $e'); + return; } - await Future.delayed(Duration(seconds: retries)); + // Exponential backoff with jitter + final delay = calculateJitteredDelay(retries, Duration(seconds: 1)); + await Future.delayed(delay); } _log.severe( 'Failed to export ${spans.length} spans after $maxRetries retries'); } + Duration calculateJitteredDelay(int retries, Duration baseDelay) { + final random = Random(); + final jitter = random.nextDouble() * baseDelay.inMilliseconds; + return Duration( + milliseconds: baseDelay.inMilliseconds + jitter.toInt() * retries); + } + /// Group and construct the protobuf equivalent of the given list of [api.Span]s. /// Spans are grouped by a trace provider's [sdk.Resource] and a tracer's /// [sdk.InstrumentationScope]. diff --git a/test/unit/sdk/exporters/collector_exporter_test.dart b/test/unit/sdk/exporters/collector_exporter_test.dart index 7f2f9e8f..350ec999 100644 --- a/test/unit/sdk/exporters/collector_exporter_test.dart +++ b/test/unit/sdk/exporters/collector_exporter_test.dart @@ -40,7 +40,7 @@ void main() { reset(mockClient); }); - test('sends spans', () async { + test('sends spans', () { final resource = sdk.Resource([api.Attribute.fromString('service.name', 'bar')]); final instrumentationLibrary = sdk.InstrumentationScope( @@ -85,8 +85,7 @@ void main() { attributes: [api.Attribute.fromString('foo', 'bar')]) ..end(); - await sdk.CollectorExporter(uri, httpClient: mockClient) - .export([span1, span2]); + sdk.CollectorExporter(uri, httpClient: mockClient).export([span1, span2]); final expectedBody = pb_trace_service.ExportTraceServiceRequest(resourceSpans: [ @@ -199,10 +198,9 @@ void main() { sdk.SpanLimits(), sdk.DateTimeTimeProvider().now) ..end(); - final exporter = sdk.CollectorExporter(uri, httpClient: mockClient); - // ignore: cascade_invocations - exporter.shutdown(); - await exporter.export([span]); + sdk.CollectorExporter(uri, httpClient: mockClient) + ..shutdown() + ..export([span]); verify(() => mockClient.close()).called(1); verifyNever(() => mockClient.post(uri, @@ -235,7 +233,7 @@ void main() { ...suppliedHeaders, }; - await sdk.CollectorExporter(uri, + sdk.CollectorExporter(uri, httpClient: mockClient, headers: suppliedHeaders) .export([span]); @@ -263,7 +261,7 @@ void main() { final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; - await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) @@ -273,6 +271,7 @@ void main() { group('Send spans with failure - ', () { late MockHttpClient mockClient; + final waitSeconds = Duration(seconds: 10); setUp(() { mockClient = MockHttpClient(); when(() => mockClient.post(uri, @@ -283,7 +282,7 @@ void main() { tearDown(() { reset(mockClient); }); - test('shows a warning log when export failed', () async { + test('shows a warning log when export has exceptions', () async { final span = Span( 'foo', api.SpanContext(api.TraceId([1, 2, 3]), api.SpanId([7, 8, 9]), @@ -307,18 +306,15 @@ void main() { final records = []; final sub = Logger.root.onRecord.listen(records.add); - await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + await Future.delayed(Duration(seconds: 5)); await sub.cancel(); - verify(() => mockClient.post(uri, body: anything, - headers: {'Content-Type': 'application/x-protobuf'})).called(3); + headers: {'Content-Type': 'application/x-protobuf'})).called(1); - expect(records, hasLength(4)); + expect(records, hasLength(1)); expect(records[0].level, equals(Level.WARNING)); - expect(records[1].level, equals(Level.WARNING)); - expect(records[2].level, equals(Level.WARNING)); - expect(records[3].level, equals(Level.SEVERE)); }); test('client not return 200, retryable', () async { @@ -344,7 +340,8 @@ void main() { .thenAnswer((_) async => Response('Service unAvailable', 503)); final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; - await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + await Future.delayed(waitSeconds); verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) @@ -374,7 +371,8 @@ void main() { .thenAnswer((_) async => Response('Service unAvailable', 400)); final expectedHeaders = {'Content-Type': 'application/x-protobuf'}; - await sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); + await Future.delayed(waitSeconds); verify(() => mockClient.post(uri, body: anything, headers: expectedHeaders)) From c264ba77ca9ab96a0bdbf7fbeb7d50921a5cbabf Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Wed, 30 Oct 2024 10:22:14 -0400 Subject: [PATCH 6/6] Exponential backoff --- lib/src/sdk/trace/exporters/collector_exporter.dart | 12 ++++++------ test/unit/sdk/exporters/collector_exporter_test.dart | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/src/sdk/trace/exporters/collector_exporter.dart b/lib/src/sdk/trace/exporters/collector_exporter.dart index 402e63d7..f41000a5 100644 --- a/lib/src/sdk/trace/exporters/collector_exporter.dart +++ b/lib/src/sdk/trace/exporters/collector_exporter.dart @@ -56,7 +56,7 @@ class CollectorExporter implements sdk.SpanExporter { final headers = {'Content-Type': 'application/x-protobuf'} ..addAll(this.headers); - while (retries++ < maxRetries) { + while (retries < maxRetries) { try { final response = await client.post(uri, body: body.writeToBuffer(), headers: headers); @@ -75,7 +75,8 @@ class CollectorExporter implements sdk.SpanExporter { return; } // Exponential backoff with jitter - final delay = calculateJitteredDelay(retries, Duration(seconds: 1)); + final delay = + calculateJitteredDelay(retries++, Duration(milliseconds: 100)); await Future.delayed(delay); } _log.severe( @@ -83,10 +84,9 @@ class CollectorExporter implements sdk.SpanExporter { } Duration calculateJitteredDelay(int retries, Duration baseDelay) { - final random = Random(); - final jitter = random.nextDouble() * baseDelay.inMilliseconds; - return Duration( - milliseconds: baseDelay.inMilliseconds + jitter.toInt() * retries); + final delay = baseDelay.inMilliseconds * pow(2, retries); + final jitter = Random().nextDouble() * delay; + return Duration(milliseconds: (delay + jitter).toInt()); } /// Group and construct the protobuf equivalent of the given list of [api.Span]s. diff --git a/test/unit/sdk/exporters/collector_exporter_test.dart b/test/unit/sdk/exporters/collector_exporter_test.dart index 350ec999..2c00b98e 100644 --- a/test/unit/sdk/exporters/collector_exporter_test.dart +++ b/test/unit/sdk/exporters/collector_exporter_test.dart @@ -271,7 +271,7 @@ void main() { group('Send spans with failure - ', () { late MockHttpClient mockClient; - final waitSeconds = Duration(seconds: 10); + final waitSeconds = Duration(seconds: 2); setUp(() { mockClient = MockHttpClient(); when(() => mockClient.post(uri, @@ -307,7 +307,7 @@ void main() { final records = []; final sub = Logger.root.onRecord.listen(records.add); sdk.CollectorExporter(uri, httpClient: mockClient).export([span]); - await Future.delayed(Duration(seconds: 5)); + await Future.delayed(waitSeconds); await sub.cancel(); verify(() => mockClient.post(uri, body: anything,