Skip to content

Commit

Permalink
feat: add BatchSpanProcessor (open-telemetry#238)
Browse files Browse the repository at this point in the history
* feat: add BatchSpanProcessor

* fix: remove default value, already done Ctor

* fix: append the MS unit

* fix: move unrefTimer function in core

* fix: use SetInterval instead of SetTimer

* fix: rename TraceOptions -> TraceFlags

* Update packages/opentelemetry-basic-tracer/src/export/BatchSpanProcessor.ts

Co-Authored-By: Olivier Albertini <[email protected]>

* fix: remove _MS

* fix: rename _lastSpanWrite -> _lastSpanFlush
  • Loading branch information
mayurkale22 authored Sep 25, 2019
1 parent 13ff3bd commit cb035ce
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { TraceFlags } from '@opentelemetry/types';
import { unrefTimer } from '@opentelemetry/core';
import { SpanProcessor } from '../SpanProcessor';
import { SpanExporter } from './SpanExporter';
import { Span } from '../Span';
import { ReadableSpan } from './ReadableSpan';
import { BufferConfig } from '../types';

const DEFAULT_BUFFER_SIZE = 100;
const DEFAULT_BUFFER_TIMEOUT_MS = 20_000;

/**
* Implementation of the {@link SpanProcessor} that batches spans exported by
* the SDK then pushes them to the exporter pipeline.
*/
export class BatchSpanProcessor implements SpanProcessor {
private readonly _bufferSize: number;
private readonly _bufferTimeout: number;
private _finishedSpans: ReadableSpan[] = [];
private _lastSpanFlush = Date.now();
private _timer: NodeJS.Timeout;

constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) {
this._bufferSize =
config && config.bufferSize ? config.bufferSize : DEFAULT_BUFFER_SIZE;
this._bufferTimeout =
config && config.bufferTimeout
? config.bufferTimeout
: DEFAULT_BUFFER_TIMEOUT_MS;

this._timer = setInterval(() => {
if (Date.now() - this._lastSpanFlush >= this._bufferTimeout) {
this._flush();
}
}, this._bufferTimeout);
unrefTimer(this._timer);
}

// does nothing.
onStart(span: Span): void {}

onEnd(span: Span): void {
if (span.context().traceFlags !== TraceFlags.SAMPLED) return;
this._addToBuffer(span.toReadableSpan());
}

shutdown(): void {
clearInterval(this._timer);
this._exporter.shutdown();
}

/** Add a span in the buffer. */
private _addToBuffer(span: ReadableSpan) {
this._finishedSpans.push(span);
if (this._finishedSpans.length > this._bufferSize) {
this._flush();
}
}

/** Send the span data list to exporter */
private _flush() {
this._exporter.export(this._finishedSpans, () => {});
this._finishedSpans = [];
this._lastSpanFlush = Date.now();
}
}
1 change: 1 addition & 0 deletions packages/opentelemetry-basic-tracer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

export * from './BasicTracer';
export * from './export/ConsoleSpanExporter';
export * from './export/BatchSpanProcessor';
export * from './export/ExportResult';
export * from './export/InMemorySpanExporter';
export * from './export/ReadableSpan';
Expand Down
8 changes: 8 additions & 0 deletions packages/opentelemetry-basic-tracer/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,11 @@ export interface TraceParams {
/** numberOfEventsPerSpan is number of message events per span */
numberOfEventsPerSpan?: number;
}

/** Interface configuration for a buffer. */
export interface BufferConfig {
/** Maximum size of a buffer. */
bufferSize?: number;
/** Max time for a buffer can wait before being sent */
bufferTimeout?: number;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import { BatchSpanProcessor } from '../../src/export/BatchSpanProcessor';
import { Span, BasicTracer } from '../../src';
import { SpanExporter } from '../../src/export/SpanExporter';
import { ReadableSpan } from '../../src/export/ReadableSpan';
import { NoopScopeManager } from '@opentelemetry/scope-base';
import { NEVER_SAMPLER, ALWAYS_SAMPLER, NoopLogger } from '@opentelemetry/core';

// @todo: replace TestExporter with InMemorySpanExporter (pull/234)
class TestExporter implements SpanExporter {
spansDataList: ReadableSpan[] = [];
export(spans: ReadableSpan[]): void {
this.spansDataList.push(...spans);
}

shutdown(): void {
this.spansDataList = [];
}
}

function createSampledSpan(spanName: string): Span {
const tracer = new BasicTracer({
scopeManager: new NoopScopeManager(),
sampler: ALWAYS_SAMPLER,
});
const span = tracer.startSpan(spanName);
span.end();
return span as Span;
}

function createUnSampledSpan(spanName: string): Span {
const tracer = new BasicTracer({
scopeManager: new NoopScopeManager(),
sampler: NEVER_SAMPLER,
logger: new NoopLogger(),
});
const span = tracer.startSpan(spanName, { isRecordingEvents: false });
span.end();
return span as Span;
}

describe('BatchSpanProcessor', () => {
const name = 'span-name';
const defaultBufferConfig = {
bufferSize: 5,
bufferTimeout: 2000,
};
const exporter = new TestExporter();

describe('constructor', () => {
it('should create a BatchSpanProcessor instance', () => {
const processor = new BatchSpanProcessor(exporter);
assert.ok(processor instanceof BatchSpanProcessor);
});

it('should create a BatchSpanProcessor instance with config', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
assert.ok(processor instanceof BatchSpanProcessor);
});

it('should create a BatchSpanProcessor instance with empty config', () => {
const processor = new BatchSpanProcessor(exporter, {});
assert.ok(processor instanceof BatchSpanProcessor);
});
});

describe('.onStart/.onEnd/.shutdown', () => {
it('should export the sampled spans with buffer size reached', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
processor.onStart(span);
assert.strictEqual(exporter.spansDataList.length, 0);

processor.onEnd(span);
assert.strictEqual(exporter.spansDataList.length, 0);
}
// Now we should start seeing the spans in exporter
const span = createSampledSpan(`${name}_6`);
processor.onEnd(span);
assert.strictEqual(exporter.spansDataList.length, 6);

processor.shutdown();
assert.strictEqual(exporter.spansDataList.length, 0);
});

it('should not export the unsampled spans', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize * 2; i++) {
const span = createUnSampledSpan(`${name}_${i}`);
processor.onEnd(span);
assert.strictEqual(exporter.spansDataList.length, 0);
}
});

it('should force flush when timeout exceeded', done => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
processor.onEnd(span);
assert.strictEqual(exporter.spansDataList.length, 0);
}

setTimeout(() => {
assert.strictEqual(exporter.spansDataList.length, 5);
done();
}, defaultBufferConfig.bufferTimeout + 100);
}).timeout(defaultBufferConfig.bufferTimeout * 2);
});
});
1 change: 1 addition & 0 deletions packages/opentelemetry-core/src/platform/browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

export * from './id';
export * from './performance';
export * from './timer-util';
18 changes: 18 additions & 0 deletions packages/opentelemetry-core/src/platform/browser/timer-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** This is Node specific, does nothing in case of browser */
export function unrefTimer(timer: number): void {}
1 change: 1 addition & 0 deletions packages/opentelemetry-core/src/platform/node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

export * from './id';
export * from './performance';
export * from './timer-util';
23 changes: 23 additions & 0 deletions packages/opentelemetry-core/src/platform/node/timer-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* When called, the active Timeout object will not require the Node.js event
* loop to remain active.
*/
export function unrefTimer(timer: NodeJS.Timeout): void {
timer.unref();
}

0 comments on commit cb035ce

Please sign in to comment.