Skip to content

Commit

Permalink
refactor: unifying shutdown once with BindOnceFuture (#2695)
Browse files Browse the repository at this point in the history
Co-authored-by: Valentin Marchaud <[email protected]>
  • Loading branch information
legendecas and vmarchaud authored Jan 6, 2022
1 parent 5f3cbc2 commit d61f7be
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 111 deletions.
9 changes: 9 additions & 0 deletions packages/exporter-trace-otlp-grpc/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
"references": [
{
"path": "../exporter-trace-otlp-http"
},
{
"path": "../opentelemetry-core"
},
{
"path": "../opentelemetry-resources"
},
{
"path": "../opentelemetry-sdk-trace-base"
}
]
}
38 changes: 13 additions & 25 deletions packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { SpanAttributes, diag } from '@opentelemetry/api';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { ExportResult, ExportResultCode, BindOnceFuture } from '@opentelemetry/core';
import {
OTLPExporterError,
OTLPExporterConfigBase,
Expand All @@ -34,9 +34,8 @@ export abstract class OTLPExporterBase<
public readonly hostname: string | undefined;
public readonly attributes?: SpanAttributes;
protected _concurrencyLimit: number;
protected _isShutdown: boolean = false;
private _shuttingDownPromise: Promise<void> = Promise.resolve();
protected _sendingPromises: Promise<unknown>[] = [];
protected _shutdownOnce: BindOnceFuture<void>;

/**
* @param config
Expand All @@ -50,6 +49,7 @@ export abstract class OTLPExporterBase<
this.attributes = config.attributes;

this.shutdown = this.shutdown.bind(this);
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);

this._concurrencyLimit =
typeof config.concurrencyLimit === 'number'
Expand All @@ -66,7 +66,7 @@ export abstract class OTLPExporterBase<
* @param resultCallback
*/
export(items: ExportItem[], resultCallback: (result: ExportResult) => void): void {
if (this._isShutdown) {
if (this._shutdownOnce.isCalled) {
resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Exporter has been shutdown'),
Expand Down Expand Up @@ -106,28 +106,16 @@ export abstract class OTLPExporterBase<
* Shutdown the exporter.
*/
shutdown(): Promise<void> {
if (this._isShutdown) {
diag.debug('shutdown already started');
return this._shuttingDownPromise;
}
this._isShutdown = true;
return this._shutdownOnce.call();
}

private _shutdown(): Promise<void> {
diag.debug('shutdown started');
this._shuttingDownPromise = new Promise((resolve, reject) => {
Promise.resolve()
.then(() => {
return this.onShutdown();
})
.then(() => {
return Promise.all(this._sendingPromises);
})
.then(() => {
resolve();
})
.catch(e => {
reject(e);
});
});
return this._shuttingDownPromise;
this.onShutdown();
return Promise.all(this._sendingPromises)
.then(() => {
/** ignore resolved values */
});
}

abstract onShutdown(): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export abstract class OTLPExporterBrowserBase<
onSuccess: () => void,
onError: (error: otlpTypes.OTLPExporterError) => void
): void {
if (this._isShutdown) {
if (this._shutdownOnce.isCalled) {
diag.debug('Shutdown already started. Cannot send objects');
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,14 @@ export abstract class OTLPExporterNodeBase<
this.compression = config.compression || CompressionAlgorithm.NONE;
}

onInit(_config: OTLPExporterNodeConfigBase): void {
this._isShutdown = false;
}
onInit(_config: OTLPExporterNodeConfigBase): void {}

send(
objects: ExportItem[],
onSuccess: () => void,
onError: (error: otlpTypes.OTLPExporterError) => void
): void {
if (this._isShutdown) {
if (this._shutdownOnce.isCalled) {
diag.debug('Shutdown already started. Cannot send objects');
return;
}
Expand Down
11 changes: 11 additions & 0 deletions packages/exporter-trace-otlp-http/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,16 @@
"include": [
"src/**/*.ts",
"test/**/*.ts"
],
"references": [
{
"path": "../opentelemetry-core"
},
{
"path": "../opentelemetry-resources"
},
{
"path": "../opentelemetry-sdk-trace-base"
}
]
}
9 changes: 9 additions & 0 deletions packages/exporter-trace-otlp-proto/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
"references": [
{
"path": "../exporter-trace-otlp-http"
},
{
"path": "../opentelemetry-core"
},
{
"path": "../opentelemetry-resources"
},
{
"path": "../opentelemetry-sdk-trace-base"
}
]
}
1 change: 1 addition & 0 deletions packages/opentelemetry-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ export * from './utils/merge';
export * from './utils/sampling';
export * from './utils/url';
export * from './utils/wrap';
export * from './utils/callback';
export * from './version';
54 changes: 54 additions & 0 deletions packages/opentelemetry-core/src/utils/callback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Deferred } from './promise';

/**
* Bind the callback and only invoke the callback once regardless how many times `BindOnceFuture.call` is invoked.
*/
export class BindOnceFuture<
R,
This = unknown,
T extends (this: This, ...args: unknown[]) => R = () => R
> {
private _isCalled = false;
private _deferred = new Deferred<R>();
constructor(private _callback: T, private _that: This) {}

get isCalled() {
return this._isCalled;
}

get promise() {
return this._deferred.promise;
}

call(...args: Parameters<T>): Promise<R> {
if (!this._isCalled) {
this._isCalled = true;
try {
Promise.resolve(this._callback.call(this._that, ...args))
.then(
val => this._deferred.resolve(val),
err => this._deferred.reject(err)
);
} catch (err) {
this._deferred.reject(err);
}
}
return this._deferred.promise;
}
}
39 changes: 39 additions & 0 deletions packages/opentelemetry-core/src/utils/promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export class Deferred<T> {
private _promise: Promise<T>;
private _resolve!: (val: T) => void;
private _reject!: (error: unknown) => void;
constructor() {
this._promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
}

get promise() {
return this._promise;
}

resolve(val: T) {
this._resolve(val);
}

reject(err: unknown) {
this._reject(err);
}
}
30 changes: 30 additions & 0 deletions packages/opentelemetry-core/test/test-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';

/**
* Node.js v8.x and browser compatible `assert.rejects`.
*/
export async function assertRejects(promise: any, expect: any) {
try {
await promise;
} catch (err) {
assert.throws(() => {
throw err;
}, expect);
}
}
63 changes: 63 additions & 0 deletions packages/opentelemetry-core/test/utils/callback.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import * as sinon from 'sinon';
import { BindOnceFuture } from '../../src';
import { assertRejects } from '../test-utils';

describe('callback', () => {
describe('BindOnceFuture', () => {
it('should call once', async () => {
const stub = sinon.stub();
const that = {};
const future = new BindOnceFuture(stub, that);

await Promise.all([
future.call(1),
future.call(2),
]);
await future.call(3);
await future.promise;

assert.strictEqual(stub.callCount, 1);
assert.deepStrictEqual(stub.firstCall.args, [1]);
assert.deepStrictEqual(stub.firstCall.thisValue, that);

assert(future.isCalled);
});

it('should handle thrown errors', async () => {
const stub = sinon.stub();
stub.throws(new Error('foo'));
const future = new BindOnceFuture(stub, undefined);

await assertRejects(future.call(), /foo/);
await assertRejects(future.call(), /foo/);
await assertRejects(future.promise, /foo/);
});

it('should handle rejections', async () => {
const stub = sinon.stub();
stub.rejects(new Error('foo'));
const future = new BindOnceFuture(stub, undefined);

await assertRejects(future.call(), /foo/);
await assertRejects(future.call(), /foo/);
await assertRejects(future.promise, /foo/);
});
});
});
41 changes: 41 additions & 0 deletions packages/opentelemetry-core/test/utils/promise.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import { Deferred } from '../../src/utils/promise';
import { assertRejects } from '../test-utils';

describe('promise', () => {
describe('Deferred', () => {
it('should resolve', async () => {
const deferred = new Deferred();
deferred.resolve(1);
deferred.resolve(2);
deferred.reject(new Error('foo'));

const ret = await deferred.promise;
assert.strictEqual(ret, 1);
});

it('should reject', async () => {
const deferred = new Deferred();
deferred.reject(new Error('foo'));
deferred.reject(new Error('bar'));

await assertRejects(deferred.promise, /foo/);
});
});
});
Loading

0 comments on commit d61f7be

Please sign in to comment.