Skip to content

Commit

Permalink
fix(sdk-logs): await async resources in log processors (#4349)
Browse files Browse the repository at this point in the history
  • Loading branch information
hectorhdzg authored Dec 15, 2023
1 parent d3c311a commit d828041
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 36 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ All notable changes to experimental packages in this project will be documented

### :bug: (Bug Fix)

* fix(sdk-logs): await async resources in log processors
* fix(sdk-logs): avoid map attribute set when count limit exceeded
* fix(instrumentation-fetch): only access navigator if it is defined [#4063](https://github.com/open-telemetry/opentelemetry-js/pull/4063)
* allows for experimental usage of this instrumentation with non-browser runtimes
Expand Down
1 change: 1 addition & 0 deletions experimental/packages/sdk-logs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"@babel/core": "7.23.6",
"@opentelemetry/api": ">=1.4.0 <1.8.0",
"@opentelemetry/api-logs": "0.46.0",
"@opentelemetry/resources_1.9.0": "npm:@opentelemetry/[email protected]",
"@types/mocha": "10.0.6",
"@types/node": "18.6.5",
"@types/sinon": "10.0.20",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import {
getEnv,
globalErrorHandler,
unrefTimer,
callWithTimeout,
BindOnceFuture,
internal,
callWithTimeout,
} from '@opentelemetry/core';

import type { BufferConfig } from '../types';
Expand Down Expand Up @@ -163,21 +164,34 @@ export abstract class BatchLogRecordProcessorBase<T extends BufferConfig>
}
}

private _export(logRecords: LogRecord[]): Promise<ExportResult> {
return new Promise((resolve, reject) => {
this._exporter.export(logRecords, (res: ExportResult) => {
if (res.code !== ExportResultCode.SUCCESS) {
reject(
res.error ??
new Error(
`BatchLogRecordProcessorBase: log record export failed (status ${res})`
)
);
return;
}
resolve(res);
});
});
private _export(logRecords: LogRecord[]): Promise<void> {
const doExport = () =>
internal
._export(this._exporter, logRecords)
.then((result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error(
`BatchLogRecordProcessor: log record export failed (status ${result})`
)
);
}
})
.catch(globalErrorHandler);

const pendingResources = logRecords
.map(logRecord => logRecord.resource)
.filter(resource => resource.asyncAttributesPending);

// Avoid scheduling a promise to make the behavior more predictable and easier to test
if (pendingResources.length === 0) {
return doExport();
} else {
return Promise.all(
pendingResources.map(resource => resource.waitForAsyncAttributes?.())
).then(doExport, globalErrorHandler);
}
}

protected abstract onShutdown(): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,65 @@ import {
BindOnceFuture,
ExportResultCode,
globalErrorHandler,
internal,
} from '@opentelemetry/core';

import type { LogRecordExporter } from './LogRecordExporter';
import type { LogRecordProcessor } from '../LogRecordProcessor';
import type { LogRecord } from './../LogRecord';

export class SimpleLogRecordProcessor implements LogRecordProcessor {
private _shutdownOnce: BindOnceFuture<void>;
private _unresolvedExports: Set<Promise<void>>;

constructor(private readonly _exporter: LogRecordExporter) {
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._unresolvedExports = new Set<Promise<void>>();
}

public onEmit(logRecord: LogRecord): void {
if (this._shutdownOnce.isCalled) {
return;
}

this._exporter.export([logRecord], (res: ExportResult) => {
if (res.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
res.error ??
new Error(
`SimpleLogRecordProcessor: log record export failed (status ${res})`
)
);
return;
const doExport = () =>
internal
._export(this._exporter, [logRecord])
.then((result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error(
`SimpleLogRecordProcessor: log record export failed (status ${result})`
)
);
}
})
.catch(globalErrorHandler);

// Avoid scheduling a promise to make the behavior more predictable and easier to test
if (logRecord.resource.asyncAttributesPending) {
const exportPromise = logRecord.resource
.waitForAsyncAttributes?.()
.then(() => {
// Using TS Non-null assertion operator because exportPromise could not be null in here
// if waitForAsyncAttributes is not present this code will never be reached
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._unresolvedExports.delete(exportPromise!);
return doExport();
}, globalErrorHandler);

// store the unresolved exports
if (exportPromise != null) {
this._unresolvedExports.add(exportPromise);
}
});
} else {
void doExport();
}
}

public forceFlush(): Promise<void> {
// do nothing as all log records are being exported without waiting
return Promise.resolve();
public async forceFlush(): Promise<void> {
// await unresolved resources before resolving
await Promise.all(Array.from(this._unresolvedExports));
}

public shutdown(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ import {
import { BatchLogRecordProcessorBase } from '../../../src/export/BatchLogRecordProcessorBase';
import { reconfigureLimits } from '../../../src/config';
import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState';
import { Resource } from '@opentelemetry/resources';
import { Resource, ResourceAttributes } from '@opentelemetry/resources';

class BatchLogRecordProcessor extends BatchLogRecordProcessorBase<BufferConfig> {
onInit() {}
onShutdown() {}
}

const createLogRecord = (limits?: LogRecordLimits): LogRecord => {
const createLogRecord = (
limits?: LogRecordLimits,
resource?: Resource
): LogRecord => {
const sharedState = new LoggerProviderSharedState(
Resource.default(),
resource || Resource.default(),
Infinity,
reconfigureLimits(limits ?? {})
);
Expand Down Expand Up @@ -308,6 +311,25 @@ describe('BatchLogRecordProcessorBase', () => {
await processor.forceFlush();
assert.strictEqual(exporter.getFinishedLogRecords().length, 1);
});

it('should wait for pending resource on flush', async () => {
const processor = new BatchLogRecordProcessor(exporter);
const asyncResource = new Resource(
{},
new Promise<ResourceAttributes>(resolve => {
setTimeout(() => resolve({ async: 'fromasync' }), 1);
})
);
const logRecord = createLogRecord(undefined, asyncResource);
processor.onEmit(logRecord);
await processor.forceFlush();
const exportedLogs = exporter.getFinishedLogRecords();
assert.strictEqual(exportedLogs.length, 1);
assert.strictEqual(
exportedLogs[0].resource.attributes['async'],
'fromasync'
);
});
});

describe('shutdown', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import {
loggingErrorHandler,
setGlobalErrorHandler,
} from '@opentelemetry/core';
import { Resource, ResourceAttributes } from '@opentelemetry/resources';
import { Resource as Resource190 } from '@opentelemetry/resources_1.9.0';

import {
InMemoryLogRecordExporter,
Expand All @@ -29,12 +31,12 @@ import {
LogRecord,
} from './../../../src';
import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState';
import { Resource } from '@opentelemetry/resources';
import { reconfigureLimits } from '../../../src/config';
import { TestExporterWithDelay } from './TestExporterWithDelay';

const setup = (exporter: LogRecordExporter) => {
const setup = (exporter: LogRecordExporter, resource?: Resource) => {
const sharedState = new LoggerProviderSharedState(
Resource.default(),
resource || Resource.default(),
Infinity,
reconfigureLimits({})
);
Expand Down Expand Up @@ -113,4 +115,65 @@ describe('SimpleLogRecordProcessor', () => {
assert.ok(shutdownSpy.callCount === 1);
});
});

describe('force flush', () => {
it('should await unresolved resources', async () => {
const exporter = new InMemoryLogRecordExporter();
const asyncResource = new Resource(
{},
new Promise<ResourceAttributes>(resolve => {
setTimeout(() => resolve({ async: 'fromasync' }), 1);
})
);
const { processor, logRecord } = setup(exporter, asyncResource);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
processor.onEmit(logRecord);

await processor.forceFlush();

const exportedLogs = exporter.getFinishedLogRecords();
assert.strictEqual(exportedLogs.length, 1);
assert.strictEqual(
exportedLogs[0].resource.attributes['async'],
'fromasync'
);
});

it('should await doExport() and delete from _unresolvedExports', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const asyncResource = new Resource(
{},
new Promise<ResourceAttributes>(resolve => {
setTimeout(() => resolve({ async: 'fromasync' }), 1);
})
);
const processor = new SimpleLogRecordProcessor(testExporterWithDelay);
const { logRecord } = setup(testExporterWithDelay, asyncResource);

processor.onEmit(logRecord);
assert.strictEqual(processor['_unresolvedExports'].size, 1);
await processor.forceFlush();
assert.strictEqual(processor['_unresolvedExports'].size, 0);
const exportedLogRecords = testExporterWithDelay.getFinishedLogRecords();
assert.strictEqual(exportedLogRecords.length, 1);
});
});

describe('compatibility', () => {
it('should export when using old resource implementation', async () => {
const exporter = new InMemoryLogRecordExporter();
const { processor, logRecord } = setup(
exporter,
new Resource190({ fromold: 'fromold' })
);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
processor.onEmit(logRecord);
const exportedLogs = exporter.getFinishedLogRecords();
assert.strictEqual(exportedLogs.length, 1);
assert.strictEqual(
exportedLogs[0].resource.attributes['fromold'],
'fromold'
);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ExportResult } from '@opentelemetry/core';
import { InMemoryLogRecordExporter, ReadableLogRecord } from '../../../src';

/**
* A test-only exporter that delays during export to mimic a real exporter.
*/
export class TestExporterWithDelay extends InMemoryLogRecordExporter {
private _exporterCreatedLogRecords: ReadableLogRecord[] = [];

constructor() {
super();
}

override export(
logRecords: ReadableLogRecord[],
resultCallback: (result: ExportResult) => void
): void {
super.export(logRecords, () => setTimeout(resultCallback, 1));
}

override shutdown(): Promise<void> {
return super.shutdown().then(() => {
this._exporterCreatedLogRecords = [];
});
}

override reset() {
super.reset();
this._exporterCreatedLogRecords = [];
}

getExporterCreatedLogRecords(): ReadableLogRecord[] {
return this._exporterCreatedLogRecords;
}
}
Loading

0 comments on commit d828041

Please sign in to comment.