Skip to content

Commit

Permalink
fix(express): listener leak (#278)
Browse files Browse the repository at this point in the history
* fix(express): listener leak

* chore(express): more middleware tests
  • Loading branch information
johanneswuerbach authored Jan 12, 2021
1 parent 7770109 commit db464de
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 11 deletions.
4 changes: 3 additions & 1 deletion plugins/node/opentelemetry-plugin-express/src/express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ export class ExpressPlugin extends BasePlugin<typeof express> {
* the layer directly end the http response, so we'll hook into the "finish"
* event to handle the later case.
*/
req.res?.once('finish', onResponseFinish);
if (!spanHasEnded) {
req.res?.once('finish', onResponseFinish);
}
return result;
};
});
Expand Down
178 changes: 168 additions & 10 deletions plugins/node/opentelemetry-plugin-express/test/express.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { context } from '@opentelemetry/api';
import { context, Span, Tracer } from '@opentelemetry/api';
import { NoopLogger } from '@opentelemetry/core';
import { NodeTracerProvider } from '@opentelemetry/node';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
Expand Down Expand Up @@ -53,6 +53,37 @@ const httpRequest = {
},
};

const serverWithMiddleware = async (
tracer: Tracer,
rootSpan: Span,
addMiddlewares: (app: express.Express) => void
): Promise<http.Server> => {
const app = express();
if (tracer) {
app.use((req, res, next) => tracer.withSpan(rootSpan, next));
}

app.use(express.json());

addMiddlewares(app);

const router = express.Router();
app.use('/toto', router);
router.get('/:id', (req, res) => {
setImmediate(() => {
res.status(200).end(req.params.id);
});
});
const server = http.createServer(app);
await new Promise<void>(resolve =>
server.listen(0, () => {
resolve();
})
);

return server;
};

describe('Express Plugin', () => {
const logger = new NoopLogger();
const provider = new NodeTracerProvider();
Expand Down Expand Up @@ -82,28 +113,37 @@ describe('Express Plugin', () => {
const app = express();
app.use((req, res, next) => tracer.withSpan(rootSpan, next));
app.use(express.json());
// eslint-disable-next-line prefer-arrow-callback
app.use(function customMiddleware(req, res, next) {
const customMiddleware: express.RequestHandler = (req, res, next) => {
for (let i = 0; i < 1000000; i++) {
continue;
}
return next();
});
};
app.use(customMiddleware);
const router = express.Router();
app.use('/toto', router);
router.get('/:id', (req, res) => {
setImmediate(() => {
res.status(200).end();
let finishListenerCount: number | undefined;
const server = await serverWithMiddleware(tracer, rootSpan, app => {
app.use((req, res, next) => {
res.on('finish', () => {
finishListenerCount = res.listenerCount('finish');
});
next();
});
for (let index = 0; index < 15; index++) {
app.use(customMiddleware);
}
});
const server = http.createServer(app);
await new Promise(resolve => server.listen(0, resolve));
const port = (server.address() as AddressInfo).port;
assert.strictEqual(memoryExporter.getFinishedSpans().length, 0);
await tracer.withSpan(rootSpan, async () => {
await httpRequest.get(`http://localhost:${port}/toto/tata`);
const response = await httpRequest.get(
`http://localhost:${port}/toto/tata`
);
assert.strictEqual(response, 'tata');
rootSpan.end();
assert.strictEqual(rootSpan.name, 'GET /toto/:id');
assert.strictEqual(finishListenerCount, 2);
assert.notStrictEqual(
memoryExporter
.getFinishedSpans()
Expand Down Expand Up @@ -140,6 +180,124 @@ describe('Express Plugin', () => {
server.close();
});

it('supports sync middlewares directly responding', async () => {
const rootSpan = tracer.startSpan('rootSpan');
let finishListenerCount: number | undefined;
const server = await serverWithMiddleware(tracer, rootSpan, app => {
app.use((req, res, next) => {
res.on('finish', () => {
finishListenerCount = res.listenerCount('finish');
});
next();
});
const syncMiddleware: express.RequestHandler = (req, res, next) => {
for (let i = 0; i < 1000000; i++) {
continue;
}
res.status(200).end('middleware');
};
for (let index = 0; index < 15; index++) {
app.use(syncMiddleware);
}
});
const port = (server.address() as AddressInfo).port;
assert.strictEqual(memoryExporter.getFinishedSpans().length, 0);
await tracer.withSpan(rootSpan, async () => {
const response = await httpRequest.get(
`http://localhost:${port}/toto/tata`
);
assert.strictEqual(response, 'middleware');
rootSpan.end();
assert.strictEqual(finishListenerCount, 3);
assert.notStrictEqual(
memoryExporter
.getFinishedSpans()
.find(span => span.name.includes('syncMiddleware')),
undefined,
'no syncMiddleware span'
);
});
server.close();
});

it('supports async middlewares', async () => {
const rootSpan = tracer.startSpan('rootSpan');
let finishListenerCount: number | undefined;
const server = await serverWithMiddleware(tracer, rootSpan, app => {
app.use((req, res, next) => {
res.on('finish', () => {
finishListenerCount = res.listenerCount('finish');
});
next();
});
const asyncMiddleware: express.RequestHandler = (req, res, next) => {
setTimeout(() => {
next();
}, 50);
};
for (let index = 0; index < 15; index++) {
app.use(asyncMiddleware);
}
});
const port = (server.address() as AddressInfo).port;
assert.strictEqual(memoryExporter.getFinishedSpans().length, 0);
await tracer.withSpan(rootSpan, async () => {
const response = await httpRequest.get(
`http://localhost:${port}/toto/tata`
);
assert.strictEqual(response, 'tata');
rootSpan.end();
assert.strictEqual(finishListenerCount, 2);
assert.notStrictEqual(
memoryExporter
.getFinishedSpans()
.find(span => span.name.includes('asyncMiddleware')),
undefined,
'no asyncMiddleware span'
);
});
server.close();
});

it('supports async middlewares directly responding', async () => {
const rootSpan = tracer.startSpan('rootSpan');
let finishListenerCount: number | undefined;
const server = await serverWithMiddleware(tracer, rootSpan, app => {
app.use((req, res, next) => {
res.on('finish', () => {
finishListenerCount = res.listenerCount('finish');
});
next();
});
const asyncMiddleware: express.RequestHandler = (req, res, next) => {
setTimeout(() => {
res.status(200).end('middleware');
}, 50);
};
for (let index = 0; index < 15; index++) {
app.use(asyncMiddleware);
}
});
const port = (server.address() as AddressInfo).port;
assert.strictEqual(memoryExporter.getFinishedSpans().length, 0);
await tracer.withSpan(rootSpan, async () => {
const response = await httpRequest.get(
`http://localhost:${port}/toto/tata`
);
assert.strictEqual(response, 'middleware');
rootSpan.end();
assert.strictEqual(finishListenerCount, 3);
assert.notStrictEqual(
memoryExporter
.getFinishedSpans()
.find(span => span.name.includes('asyncMiddleware')),
undefined,
'no asyncMiddleware span'
);
});
server.close();
});

it('should not create span because there are no parent', async () => {
const app = express();
app.use(express.json());
Expand Down

0 comments on commit db464de

Please sign in to comment.