diff --git a/packages/kbn-optimizer/src/common/index.ts b/packages/kbn-optimizer/src/common/index.ts index c51905be04565..376b9ed350908 100644 --- a/packages/kbn-optimizer/src/common/index.ts +++ b/packages/kbn-optimizer/src/common/index.ts @@ -21,6 +21,7 @@ export * from './bundle'; export * from './bundle_cache'; export * from './worker_config'; export * from './worker_messages'; +export * from './parent_messages'; export * from './compiler_messages'; export * from './ts_helpers'; export * from './rxjs_helpers'; diff --git a/packages/kbn-optimizer/src/common/parent_messages.ts b/packages/kbn-optimizer/src/common/parent_messages.ts new file mode 100644 index 0000000000000..c27bcb96f4a89 --- /dev/null +++ b/packages/kbn-optimizer/src/common/parent_messages.ts @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export interface ParentPongMsg { + type: 'pong'; +} + +export const isParentPong = (value: any): value is ParentPongMsg => + typeof value === 'object' && value && value.type === 'pong'; + +export class ParentMsgs { + pong(): ParentPongMsg { + return { + type: 'pong', + }; + } +} diff --git a/packages/kbn-optimizer/src/common/worker_messages.ts b/packages/kbn-optimizer/src/common/worker_messages.ts index d3c03f483d7e8..0435f5b4c4011 100644 --- a/packages/kbn-optimizer/src/common/worker_messages.ts +++ b/packages/kbn-optimizer/src/common/worker_messages.ts @@ -24,13 +24,17 @@ import { CompilerErrorMsg, } from './compiler_messages'; -export type WorkerMsg = +export type InternalWorkerMsg = + | WorkerPingMsg | CompilerRunningMsg | CompilerIssueMsg | CompilerSuccessMsg | CompilerErrorMsg | WorkerErrorMsg; +// ping messages are internal, they don't apper in public message streams +export type WorkerMsg = Exclude; + /** * Message sent when the worker encounters an error that it can't * recover from, no more messages will be sent and the worker @@ -42,6 +46,10 @@ export interface WorkerErrorMsg { errorStack?: string; } +export interface WorkerPingMsg { + type: 'ping'; +} + const WORKER_STATE_TYPES: ReadonlyArray = [ 'running', 'compiler issue', @@ -50,10 +58,19 @@ const WORKER_STATE_TYPES: ReadonlyArray = [ 'worker error', ]; +export const isWorkerPing = (value: any): value is WorkerPingMsg => + typeof value === 'object' && value && value.type === 'ping'; + export const isWorkerMsg = (value: any): value is WorkerMsg => typeof value === 'object' && value && WORKER_STATE_TYPES.includes(value.type); export class WorkerMsgs { + ping(): WorkerPingMsg { + return { + type: 'ping', + }; + } + error(error: Error): WorkerErrorMsg { return { type: 'worker error', diff --git a/packages/kbn-optimizer/src/optimizer/observe_worker.ts b/packages/kbn-optimizer/src/optimizer/observe_worker.ts index bfc853e5a6b75..90c53f1ef9e87 100644 --- a/packages/kbn-optimizer/src/optimizer/observe_worker.ts +++ b/packages/kbn-optimizer/src/optimizer/observe_worker.ts @@ -22,12 +22,14 @@ import { Readable } from 'stream'; import { inspect } from 'util'; import * as Rx from 'rxjs'; -import { map, takeUntil } from 'rxjs/operators'; +import { map, filter, takeUntil } from 'rxjs/operators'; -import { isWorkerMsg, WorkerConfig, WorkerMsg, Bundle } from '../common'; +import { isWorkerMsg, isWorkerPing, WorkerConfig, WorkerMsg, Bundle, ParentMsgs } from '../common'; import { OptimizerConfig } from './optimizer_config'; +const parentMsgs = new ParentMsgs(); + export interface WorkerStdio { type: 'worker stdio'; stream: 'stdout' | 'stderr'; @@ -146,6 +148,16 @@ export function observeWorker( observeStdio$(proc.stderr, 'stderr'), Rx.fromEvent<[unknown]>(proc, 'message') .pipe( + // filter out ping messages so they don't end up in the general message stream + filter(([msg]) => { + if (!isWorkerPing(msg)) { + return true; + } + + proc.send(parentMsgs.pong()); + return false; + }), + // validate the messages from the process map(([msg]) => { if (!isWorkerMsg(msg)) { diff --git a/packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts b/packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts new file mode 100644 index 0000000000000..353f570e2cacc --- /dev/null +++ b/packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts @@ -0,0 +1,178 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { EventEmitter } from 'events'; +import { inspect } from 'util'; + +import * as Rx from 'rxjs'; +import { tap, takeUntil } from 'rxjs/operators'; + +import { observeParentOffline, Process } from './observe_parent_offline'; +import { WorkerMsgs, ParentMsgs, isWorkerPing } from '../common'; + +jest.useFakeTimers(); + +beforeEach(() => { + jest.clearAllTimers(); +}); + +const workerMsgs = new WorkerMsgs(); +const parentMsgs = new ParentMsgs(); +class MockProcess extends EventEmitter implements Process { + connected?: boolean; + send?: jest.Mock; + + constructor(options: { connected?: boolean; send?: jest.Mock | false } = {}) { + super(); + + this.connected = options.connected ?? true; + this.send = options.send === false ? undefined : options.send ?? jest.fn(); + } +} + +async function record(observable: Rx.Observable): Promise { + const notes: string[] = []; + + await observable + .pipe( + tap({ + next(value) { + notes.push(`next: ${inspect(value)}`); + }, + error(error) { + notes.push(`error: ${inspect(error)}`); + }, + complete() { + notes.push(`complete`); + }, + }) + ) + .toPromise(); + + return notes; +} + +async function waitForTick() { + await new Promise(resolve => { + process.nextTick(resolve); + }); +} + +describe('emits and completes when parent exists because:', () => { + test('"disconnect" event', async () => { + const mockProc = new MockProcess(); + const promise = record(observeParentOffline(mockProc, workerMsgs)); + mockProc.emit('disconnect'); + expect(await promise).toMatchInlineSnapshot(` + Array [ + "next: 'parent offline (disconnect event)'", + "complete", + ] + `); + }); + + test('process.connected is false', async () => { + const mockProc = new MockProcess({ + connected: false, + }); + + const promise = record(observeParentOffline(mockProc, workerMsgs)); + jest.advanceTimersToNextTimer(); + expect(await promise).toMatchInlineSnapshot(` + Array [ + "next: 'parent offline (disconnected)'", + "complete", + ] + `); + }); + + test('process.send is falsey', async () => { + const mockProc = new MockProcess({ + send: false, + }); + + const promise = record(observeParentOffline(mockProc, workerMsgs)); + jest.advanceTimersToNextTimer(); + expect(await promise).toMatchInlineSnapshot(` + Array [ + "next: 'parent offline (disconnected)'", + "complete", + ] + `); + }); + + test('process.send throws "ERR_IPC_CHANNEL_CLOSED"', async () => { + const mockProc = new MockProcess({ + send: jest.fn(() => { + const error = new Error(); + (error as any).code = 'ERR_IPC_CHANNEL_CLOSED'; + throw error; + }), + }); + + const promise = record(observeParentOffline(mockProc, workerMsgs)); + jest.advanceTimersToNextTimer(); + expect(await promise).toMatchInlineSnapshot(` + Array [ + "next: 'parent offline (ipc channel exception)'", + "complete", + ] + `); + }); + + test('ping timeout', async () => { + const mockProc = new MockProcess({}); + + const promise = record(observeParentOffline(mockProc, workerMsgs)); + jest.advanceTimersByTime(10000); + expect(await promise).toMatchInlineSnapshot(` + Array [ + "next: 'parent offline (ping timeout)'", + "complete", + ] + `); + }); +}); + +test('it emits nothing if parent responds with pongs', async () => { + const send = jest.fn((msg: any) => { + if (isWorkerPing(msg)) { + process.nextTick(() => { + mockProc.emit('message', parentMsgs.pong(), undefined); + }); + } + }); + + const mockProc = new MockProcess({ send }); + const unsub$ = new Rx.Subject(); + const promise = record(observeParentOffline(mockProc, workerMsgs).pipe(takeUntil(unsub$))); + + jest.advanceTimersByTime(5000); + await waitForTick(); + jest.advanceTimersByTime(5000); + await waitForTick(); + unsub$.next(); + + expect(await promise).toMatchInlineSnapshot(` + Array [ + "complete", + ] + `); + expect(send).toHaveBeenCalledTimes(2); +}); diff --git a/packages/kbn-optimizer/src/worker/observe_parent_offline.ts b/packages/kbn-optimizer/src/worker/observe_parent_offline.ts new file mode 100644 index 0000000000000..94ec34b409dd4 --- /dev/null +++ b/packages/kbn-optimizer/src/worker/observe_parent_offline.ts @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { EventEmitter } from 'events'; + +import * as Rx from 'rxjs'; +import { mergeMap, take, first, map, catchError } from 'rxjs/operators'; + +import { isParentPong, WorkerMsgs } from '../common'; + +const sleep = (ms: number) => Rx.timer(ms).pipe(take(1)); + +export interface Process extends EventEmitter { + connected?: boolean; + send?: (msg: any) => void; +} + +/** + * Returns an observable that will emit a value when the parent + * process goes offline. It accomplishes this by merging several + * signals: + * + * 1. process "disconnect" event + * 2. process.connected or process.send are falsy + * 3. a ping was sent to the parent process but it didn't respond + * with a pong within 5 seconds + * 4. a ping was sent to the parent process but the process.send + * call errored with an 'ERR_IPC_CHANNEL_CLOSED' exception + */ +export function observeParentOffline(process: Process, workerMsgs: WorkerMsgs) { + return Rx.race( + Rx.fromEvent(process, 'disconnect').pipe( + take(1), + map(() => 'parent offline (disconnect event)') + ), + + sleep(5000).pipe( + mergeMap(() => { + if (!process.connected || !process.send) { + return Rx.of('parent offline (disconnected)'); + } + + process.send(workerMsgs.ping()); + + const pong$ = Rx.fromEvent<[any]>(process, 'message').pipe( + first(([msg]) => isParentPong(msg)), + map(() => { + throw new Error('parent still online'); + }) + ); + + // give the parent some time to respond, if the ping + // wins the race the parent is considered online + const timeout$ = sleep(5000).pipe(map(() => 'parent offline (ping timeout)')); + + return Rx.race(pong$, timeout$); + }) + ) + ).pipe( + /** + * resubscribe to the source observable (triggering the timer, + * ping, wait for response) if the source observable does not + * observe the parent being offline yet. + * + * Scheduling the interval this way prevents the ping timeout + * from overlaping with the interval by only scheduling the + * next ping once the previous ping has completed + */ + catchError((error, resubscribe) => { + if (error.code === 'ERR_IPC_CHANNEL_CLOSED') { + return Rx.of('parent offline (ipc channel exception)'); + } + + if (error.message === 'parent still online') { + return resubscribe; + } + + throw error; + }) + ); +} diff --git a/packages/kbn-optimizer/src/worker/run_worker.ts b/packages/kbn-optimizer/src/worker/run_worker.ts index cbec4c3f44c7d..0a9adc2a3db55 100644 --- a/packages/kbn-optimizer/src/worker/run_worker.ts +++ b/packages/kbn-optimizer/src/worker/run_worker.ts @@ -18,10 +18,12 @@ */ import * as Rx from 'rxjs'; +import { takeUntil } from 'rxjs/operators'; import { parseBundles, parseWorkerConfig, WorkerMsg, isWorkerMsg, WorkerMsgs } from '../common'; import { runCompilers } from './run_compilers'; +import { observeParentOffline } from './observe_parent_offline'; /** ** @@ -64,15 +66,6 @@ const exit = (code: number) => { }, 5000).unref(); }; -// check for connected parent on an unref'd timer rather than listening -// to "disconnect" since that listner prevents the process from exiting -setInterval(() => { - if (!process.connected) { - // parent is gone - process.exit(0); - } -}, 1000).unref(); - Rx.defer(() => { const workerConfig = parseWorkerConfig(process.argv[2]); const bundles = parseBundles(process.argv[3]); @@ -81,20 +74,22 @@ Rx.defer(() => { process.env.BROWSERSLIST_ENV = workerConfig.browserslistEnv; return runCompilers(workerConfig, bundles); -}).subscribe( - msg => { - send(msg); - }, - error => { - if (isWorkerMsg(error)) { - send(error); - } else { - send(workerMsgs.error(error)); - } +}) + .pipe(takeUntil(observeParentOffline(process, workerMsgs))) + .subscribe( + msg => { + send(msg); + }, + error => { + if (isWorkerMsg(error)) { + send(error); + } else { + send(workerMsgs.error(error)); + } - exit(1); - }, - () => { - exit(0); - } -); + exit(1); + }, + () => { + exit(0); + } + );