Skip to content

Commit

Permalink
fix(@opentelemetry/exporter-collector): remove fulfilled promises cor… (
Browse files Browse the repository at this point in the history
#1775)

Co-authored-by: Daniel Dyla <[email protected]>
Co-authored-by: Bartlomiej Obecny <[email protected]>
  • Loading branch information
3 people authored Aug 11, 2021
1 parent 4553b29 commit 90ea0fe
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,17 @@ export abstract class CollectorExporterNodeBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
});
const promise = new Promise<void>((resolve, reject) => {
this._send(this, objects, resolve, reject);
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

onInit(config: CollectorExporterConfigNode): void {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { collectorTypes } from '@opentelemetry/exporter-collector';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';

import * as assert from 'assert';
import { CollectorExporterNodeBase } from '../src/CollectorExporterNodeBase';
import { CollectorExporterConfigNode, ServiceClientType } from '../src/types';
import { mockedReadableSpan } from './helper';

class MockCollectorExporter extends CollectorExporterNodeBase<
ReadableSpan,
ReadableSpan[]
> {
/**
* Callbacks passed to _send()
*/
sendCallbacks: {
onSuccess: () => void;
onError: (error: collectorTypes.CollectorExporterError) => void;
}[] = [];

getDefaultUrl(config: CollectorExporterConfigNode): string {
return '';
}

getDefaultServiceName(config: CollectorExporterConfigNode): string {
return '';
}

convert(spans: ReadableSpan[]): ReadableSpan[] {
return spans;
}

getServiceClientType() {
return ServiceClientType.SPANS;
}

getServiceProtoPath(): string {
return 'opentelemetry/proto/collector/trace/v1/trace_service.proto';
}
}

// Mocked _send which just saves the callbacks for later
MockCollectorExporter.prototype['_send'] = function _sendMock(
self: MockCollectorExporter,
objects: ReadableSpan[],
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
self.sendCallbacks.push({ onSuccess, onError });
};

describe('CollectorExporterNodeBase', () => {
let exporter: MockCollectorExporter;
const concurrencyLimit = 5;

beforeEach(done => {
exporter = new MockCollectorExporter({ concurrencyLimit });
done();
});

describe('export', () => {
it('should export requests concurrently', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];
const numToExport = concurrencyLimit;

for (let i = 0; i < numToExport; ++i) {
exporter.export(spans, () => {});
}

assert.strictEqual(exporter['_sendingPromises'].length, numToExport);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests finish sending
exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess());

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should drop new export requests when already sending at concurrencyLimit', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];
const numToExport = concurrencyLimit + 5;

for (let i = 0; i < numToExport; ++i) {
exporter.export(spans, () => {});
}

assert.strictEqual(exporter['_sendingPromises'].length, concurrencyLimit);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests finish sending
exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess());

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should pop export request promises even if they failed', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];

exporter.export(spans, () => {});
assert.strictEqual(exporter['_sendingPromises'].length, 1);
const promisesAllDone = Promise.all(exporter['_sendingPromises']);
// Mock that all requests fail sending
exporter.sendCallbacks.forEach(({ onError }) =>
onError(new Error('Failed to send!!'))
);

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});

it('should pop export request promises even if success callback throws error', async () => {
const spans = [Object.assign({}, mockedReadableSpan)];

exporter['_sendPromise'](
spans,
() => {
throw new Error('Oops');
},
() => {}
);

assert.strictEqual(exporter['_sendingPromises'].length, 1);
const promisesAllDone = Promise.all(exporter['_sendingPromises'])
// catch expected unhandled exception
.catch(() => {});

// Mock that the request finishes sending
exporter.sendCallbacks.forEach(({ onSuccess }) => {
onSuccess();
});

// All finished promises should be popped off
await promisesAllDone;
assert.strictEqual(exporter['_sendingPromises'].length, 0);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,17 @@ export abstract class CollectorExporterNodeBase<
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, this.compression, _onSuccess, _onError);
});
const promise = new Promise<void>((resolve, reject) => {
this._send(this, objects, this.compression, resolve, reject);
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

override onInit(config: CollectorExporterNodeConfigBase): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,20 @@ export abstract class CollectorExporterBrowserBase<
const serviceRequest = this.convert(items);
const body = JSON.stringify(serviceRequest);

const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};

const promise = new Promise<void>((resolve, reject) => {
if (this._useXHR) {
sendWithXhr(body, this.url, this._headers, _onSuccess, _onError);
sendWithXhr(body, this.url, this._headers, resolve, reject);
} else {
sendWithBeacon(body, this.url, { type: 'application/json' }, _onSuccess, _onError);
sendWithBeacon(body, this.url, { type: 'application/json' }, resolve, reject);
}
});
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,23 @@ export abstract class CollectorExporterNodeBase<
}
const serviceRequest = this.convert(objects);

const promise = new Promise<void>(resolve => {
const _onSuccess = (): void => {
onSuccess();
_onFinish();
};
const _onError = (error: collectorTypes.CollectorExporterError): void => {
onError(error);
_onFinish();
};
const _onFinish = () => {
resolve();
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
};
const promise = new Promise<void>((resolve, reject) => {
sendWithHttp(
this,
JSON.stringify(serviceRequest),
'application/json',
_onSuccess,
_onError
resolve,
reject
);
});
})
.then(onSuccess, onError);

this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

onShutdown(): void {}
Expand Down
9 changes: 7 additions & 2 deletions packages/opentelemetry-exporter-zipkin/src/zipkin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,16 @@ export class ZipkinExporter implements SpanExporter {
this._sendSpans(spans, serviceName, result => {
resolve();
resultCallback(result);
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
});
});


this._sendingPromises.push(promise);
const popPromise = () => {
const index = this._sendingPromises.indexOf(promise);
this._sendingPromises.splice(index, 1);
}
promise.then(popPromise, popPromise);
}

/**
Expand Down

0 comments on commit 90ea0fe

Please sign in to comment.