Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adding force flush to span processors #802

Merged
merged 5 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,5 @@ export class CollectorExporter implements SpanExporter {

// platform dependent
onShutdown(this.shutdown);

// @TODO get spans from span processor (batch)
this._exportSpans([])
.then(() => {
this.logger.debug('shutdown completed');
})
.catch(() => {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,6 @@ describe('CollectorExporter - common', () => {
onShutdownSpy.restore();
});

it('should export spans once only', done => {
collectorExporter.shutdown();
collectorExporter.shutdown();
collectorExporter.shutdown();

setTimeout(() => {
assert.strictEqual(onShutdownSpy.callCount, 1);
done();
});
});

it('should call onShutdown', done => {
collectorExporter.shutdown();
setTimeout(() => {
Expand Down
2 changes: 0 additions & 2 deletions packages/opentelemetry-exporter-zipkin/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ export interface ExporterConfig {
logger?: types.Logger;
serviceName: string;
url?: string;
// Initiates a request with spans in memory to the backend.
forceFlush?: boolean;
// Optional mapping overrides for OpenTelemetry status code and description.
statusCodeTagName?: string;
statusDescriptionTagName?: string;
Expand Down
7 changes: 0 additions & 7 deletions packages/opentelemetry-exporter-zipkin/src/zipkin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import { OT_REQUEST_HEADER } from './utils';
*/
export class ZipkinExporter implements SpanExporter {
static readonly DEFAULT_URL = 'http://localhost:9411/api/v2/spans';
private readonly _forceFlush: boolean;
private readonly _logger: types.Logger;
private readonly _serviceName: string;
private readonly _statusCodeTagName: string;
Expand All @@ -45,7 +44,6 @@ export class ZipkinExporter implements SpanExporter {
const urlStr = config.url || ZipkinExporter.DEFAULT_URL;
const urlOpts = url.parse(urlStr);

this._forceFlush = config.forceFlush || true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove _forceFlushOnShutdown from Jaeger exporter also.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In zipkin this functionality was in fact "dead" but for Jaeger it does something - there is a timeout which waits for the Jaeger to close the sender. Are you ok to make it in separate PR then ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure 👍

this._logger = config.logger || new NoopLogger();
this._reqOpts = Object.assign(
{
Expand Down Expand Up @@ -88,11 +86,6 @@ export class ZipkinExporter implements SpanExporter {
return;
}
this._isShutdown = true;
// Make an optimistic flush.
if (this._forceFlush) {
// @todo get spans from span processor (batch)
this._sendSpans([]);
}
}

/**
Expand Down
16 changes: 0 additions & 16 deletions packages/opentelemetry-exporter-zipkin/test/zipkin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,6 @@ describe('ZipkinExporter', () => {
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');
});
it('should construct an exporter with forceFlush', () => {
const exporter = new ZipkinExporter({
serviceName: 'my-service',
forceFlush: false,
});
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');
});
it('should construct an exporter with statusCodeTagName', () => {
const exporter = new ZipkinExporter({
serviceName: 'my-service',
Expand Down Expand Up @@ -338,13 +330,5 @@ describe('ZipkinExporter', () => {

// @todo: implement
it('should send by default');
it('should not send with forceFlush=false', () => {
const exporter = new ZipkinExporter({
serviceName: 'my-service',
forceFlush: false,
});

exporter.shutdown();
});
});
});
2 changes: 1 addition & 1 deletion packages/opentelemetry-plugin-express/src/version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
*/

// this is autogenerated file, see scripts/version-update.js
export const VERSION = '0.3.2';
export const VERSION = '0.4.0';
4 changes: 4 additions & 0 deletions packages/opentelemetry-tracing/src/MultiSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import { SpanProcessor } from './SpanProcessor';
export class MultiSpanProcessor implements SpanProcessor {
constructor(private readonly _spanProcessors: SpanProcessor[]) {}

forceFlush(): void {
// do nothing as all spans are being exported without waiting
}

onStart(span: Span): void {
for (const spanProcessor of this._spanProcessors) {
spanProcessor.onStart(span);
Expand Down
1 change: 1 addition & 0 deletions packages/opentelemetry-tracing/src/NoopSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ export class NoopSpanProcessor implements SpanProcessor {
onStart(span: Span): void {}
onEnd(span: Span): void {}
shutdown(): void {}
forceFlush(): void {}
}
5 changes: 5 additions & 0 deletions packages/opentelemetry-tracing/src/SpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import { Span } from '@opentelemetry/api';
* for when a {@link Span} is started or when a {@link Span} is ended.
*/
export interface SpanProcessor {
/**
* Forces to export all finished spans
*/
forceFlush(): void;

/**
* Called when a {@link Span} is started, if the `span.isRecording()`
* returns true.
Expand Down
18 changes: 17 additions & 1 deletion packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ export class BatchSpanProcessor implements SpanProcessor {
private _finishedSpans: ReadableSpan[] = [];
private _lastSpanFlush = Date.now();
private _timer: NodeJS.Timeout;
private _isShutdown = false;

constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) {
this._bufferSize =
config && config.bufferSize ? config.bufferSize : DEFAULT_BUFFER_SIZE;
this._bufferTimeout =
config && config.bufferTimeout
config && typeof config.bufferTimeout === 'number'
? config.bufferTimeout
: DEFAULT_BUFFER_TIMEOUT_MS;

Expand All @@ -52,16 +53,31 @@ export class BatchSpanProcessor implements SpanProcessor {
unrefTimer(this._timer);
}

forceFlush(): void {
if (this._isShutdown) {
return;
}
this._flush();
}

// does nothing.
onStart(span: Span): void {}

onEnd(span: Span): void {
if (this._isShutdown) {
return;
}
if (span.context().traceFlags !== TraceFlags.SAMPLED) return;
this._addToBuffer(span.toReadableSpan());
}

shutdown(): void {
if (this._isShutdown) {
return;
}
clearInterval(this._timer);
this.forceFlush();
this._isShutdown = true;
this._exporter.shutdown();
}

Expand Down
13 changes: 13 additions & 0 deletions packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,29 @@ import { Span } from '../Span';
*/
export class SimpleSpanProcessor implements SpanProcessor {
constructor(private readonly _exporter: SpanExporter) {}
private _isShutdown = false;

forceFlush(): void {
// do nothing as all spans are being exported without waiting
}

// does nothing.
onStart(span: Span): void {}

onEnd(span: Span): void {
if (this._isShutdown) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks redundant to me, we already have the same logic in Zipkin and Collector exporter. Maybe bug in Stackdriver exporter where we don't do anything on shutdown and in Jaeger exporter, we juse close the socket connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually agree. if the span processor shuts down its exporter, then it should be safe for it to continue sending spans even after shutdown and the exporter will just no-op

Copy link
Member Author

@obecny obecny Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this is what spec says for span processors
https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-tracing.md#shutdown
so they should have the same functionality ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one additional if check isn't hurting anything. I would say just leave it in

return;
}
if (span.context().traceFlags !== TraceFlags.SAMPLED) return;
this._exporter.export([span.toReadableSpan()], () => {});
}

shutdown(): void {
if (this._isShutdown) {
return;
}
this._isShutdown = true;

this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TestProcessor implements SpanProcessor {
shutdown(): void {
this.spans = [];
}
forceFlush(): void {}
}

describe('MultiSpanProcessor', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ describe('BatchSpanProcessor', () => {
});

describe('.onStart/.onEnd/.shutdown', () => {
it('should do nothing after processor is shutdown', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
const spy: sinon.SinonSpy = sinon.spy(exporter, 'export') as any;

const span = createSampledSpan(`${name}_0`);

processor.onEnd(span);
assert.strictEqual(processor['_finishedSpans'].length, 1);

processor.forceFlush();
assert.strictEqual(exporter.getFinishedSpans().length, 1);

processor.onEnd(span);
assert.strictEqual(processor['_finishedSpans'].length, 1);

assert.strictEqual(spy.args.length, 1);
processor.shutdown();
assert.strictEqual(spy.args.length, 2);
assert.strictEqual(exporter.getFinishedSpans().length, 0);

processor.onEnd(span);
assert.strictEqual(spy.args.length, 2);
assert.strictEqual(processor['_finishedSpans'].length, 0);
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should export the sampled spans with buffer size reached', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
Expand Down Expand Up @@ -125,5 +151,16 @@ describe('BatchSpanProcessor', () => {

clock.restore();
});

it('should force flush on demand', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
processor.onEnd(span);
}
assert.strictEqual(exporter.getFinishedSpans().length, 0);
processor.forceFlush();
assert.strictEqual(exporter.getFinishedSpans().length, 5);
});
});
});