From 4a36da37e85e57abb2b529b388b16ef75461af14 Mon Sep 17 00:00:00 2001 From: Kim Joar Bekkelund Date: Wed, 1 Nov 2017 11:56:56 +0100 Subject: [PATCH] k$ -> observable.pipe --- example_plugins/baz/src/BazService.ts | 8 +- package-lock.json | 74 ++++++------ .../__tests__/index.test.js | 34 ++++++ .../kbn-internal-native-observable/index.d.ts | 66 +++++++++++ .../kbn-internal-native-observable/index.js | 8 ++ packages/kbn-observable/package.json | 2 +- .../src/__tests__/Subject.test.ts | 3 +- .../kbn-observable/src/__tests__/k$.test.ts | 75 ------------ packages/kbn-observable/src/index.ts | 2 - packages/kbn-observable/src/k$.ts | 74 ------------ .../src/operators/__tests__/filter.test.ts | 5 +- .../src/operators/__tests__/first.test.ts | 7 +- .../src/operators/__tests__/last.test.ts | 5 +- .../src/operators/__tests__/map.test.ts | 7 +- .../src/operators/__tests__/mergeMap.test.ts | 19 ++-- .../src/operators/__tests__/reduce.test.ts | 7 +- .../src/operators/__tests__/scan.test.ts | 7 +- .../src/operators/__tests__/shareLast.test.ts | 15 ++- .../operators/__tests__/skipRepeats.test.ts | 23 ++-- .../src/operators/__tests__/switchMap.test.ts | 107 ++++++++++-------- .../src/operators/__tests__/toPromise.test.ts | 7 +- platform/cli/cli.ts | 8 +- platform/config/ConfigService.ts | 19 +--- platform/config/RawConfigService.ts | 3 +- .../config/__tests__/ConfigService.test.ts | 12 +- .../config/__tests__/RawConfigService.test.ts | 9 +- platform/legacy/index.ts | 4 +- .../elasticsearch/ElasticsearchFacade.ts | 9 +- .../elasticsearch/ElasticsearchService.ts | 5 +- platform/server/http/HttpService.ts | 4 +- platform/server/plugins/PluginsService.ts | 5 +- 31 files changed, 286 insertions(+), 347 deletions(-) create mode 100644 packages/kbn-internal-native-observable/__tests__/index.test.js delete mode 100644 packages/kbn-observable/src/__tests__/k$.test.ts delete mode 100644 packages/kbn-observable/src/k$.ts diff --git a/example_plugins/baz/src/BazService.ts b/example_plugins/baz/src/BazService.ts index c177f4a2f8635..342a302c5d001 100644 --- a/example_plugins/baz/src/BazService.ts +++ b/example_plugins/baz/src/BazService.ts @@ -1,4 +1,4 @@ -import { k$, Observable, $combineLatest, map, first, toPromise } from 'kbn-observable'; +import { Observable, $combineLatest, map, first, toPromise } from 'kbn-observable'; import { ElasticsearchService, KibanaConfig, KibanaRequest } from 'kbn-types'; @@ -13,7 +13,7 @@ export class BazService { const { page = 1, perPage = 20, type } = options; const [kibanaIndex, adminCluster] = await latestValues( - k$(this.kibanaConfig$)(map(config => config.index)), + this.kibanaConfig$.pipe(map(config => config.index)), this.elasticsearchService.getClusterOfType$('admin') ); @@ -64,7 +64,5 @@ function latestValues( d: Observable ): Promise<[A, B, C, D]>; function latestValues(...values: Observable[]) { - return k$($combineLatest(values))( - first(), - toPromise()); + return $combineLatest(values).pipe(first(), toPromise()); } diff --git a/package-lock.json b/package-lock.json index d34343d89709e..8526d4af7dc3d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -232,6 +232,20 @@ "integrity": "sha1-kfjiWAqAgwSfeDEcBZqlfWlJ32s=", "dev": true }, + "Base64": { + "version": "0.2.1", + "resolved": "https://registry.npmjs.org/Base64/-/Base64-0.2.1.tgz", + "integrity": "sha1-ujpCMHCOGGcFBl5mur3Uw1z2ACg=" + }, + "JSONStream": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/JSONStream/-/JSONStream-1.1.1.tgz", + "integrity": "sha1-yYv9iMjx4ehpTlPFuqbIaRVT5Zo=", + "requires": { + "jsonparse": "1.3.1", + "through": "2.3.8" + } + }, "abab": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/abab/-/abab-1.0.3.tgz", @@ -1800,11 +1814,6 @@ "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=" }, - "Base64": { - "version": "0.2.1", - "resolved": "https://registry.npmjs.org/Base64/-/Base64-0.2.1.tgz", - "integrity": "sha1-ujpCMHCOGGcFBl5mur3Uw1z2ACg=" - }, "base64-arraybuffer": { "version": "0.1.5", "resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-0.1.5.tgz", @@ -6946,14 +6955,6 @@ } } }, - "string_decoder": { - "version": "1.0.1", - "bundled": true, - "dev": true, - "requires": { - "safe-buffer": "5.0.1" - } - }, "string-width": { "version": "1.0.2", "bundled": true, @@ -6964,6 +6965,14 @@ "strip-ansi": "3.0.1" } }, + "string_decoder": { + "version": "1.0.1", + "bundled": true, + "dev": true, + "requires": { + "safe-buffer": "5.0.1" + } + }, "stringstream": { "version": "0.0.5", "bundled": true, @@ -12407,15 +12416,6 @@ "resolved": "https://registry.npmjs.org/jsonpointer/-/jsonpointer-4.0.1.tgz", "integrity": "sha1-T9kss04OnbPInIYi7PUfm5eMbLk=" }, - "JSONStream": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/JSONStream/-/JSONStream-1.1.1.tgz", - "integrity": "sha1-yYv9iMjx4ehpTlPFuqbIaRVT5Zo=", - "requires": { - "jsonparse": "1.3.1", - "through": "2.3.8" - } - }, "jsprim": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/jsprim/-/jsprim-1.4.1.tgz", @@ -18333,14 +18333,6 @@ "resolved": "https://registry.npmjs.org/strict-uri-encode/-/strict-uri-encode-1.1.0.tgz", "integrity": "sha1-J5siXfHVgrH1TmWt3UNS4Y+qBxM=" }, - "string_decoder": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", - "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", - "requires": { - "safe-buffer": "5.1.1" - } - }, "string-length": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/string-length/-/string-length-1.0.1.tgz", @@ -18371,6 +18363,14 @@ "resolved": "https://registry.npmjs.org/string.prototype.repeat/-/string.prototype.repeat-0.2.0.tgz", "integrity": "sha1-q6Nt4I3O5qWjN9SbLqHaGyj8Ds8=" }, + "string_decoder": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", + "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", + "requires": { + "safe-buffer": "5.1.1" + } + }, "stringstream": { "version": "0.0.5", "resolved": "https://registry.npmjs.org/stringstream/-/stringstream-0.0.5.tgz", @@ -20819,13 +20819,6 @@ } } }, - "string_decoder": { - "version": "1.0.1", - "bundled": true, - "requires": { - "safe-buffer": "5.0.1" - } - }, "string-width": { "version": "1.0.2", "bundled": true, @@ -20835,6 +20828,13 @@ "strip-ansi": "3.0.1" } }, + "string_decoder": { + "version": "1.0.1", + "bundled": true, + "requires": { + "safe-buffer": "5.0.1" + } + }, "stringstream": { "version": "0.0.5", "bundled": true, diff --git a/packages/kbn-internal-native-observable/__tests__/index.test.js b/packages/kbn-internal-native-observable/__tests__/index.test.js new file mode 100644 index 0000000000000..b6f8ee88143fc --- /dev/null +++ b/packages/kbn-internal-native-observable/__tests__/index.test.js @@ -0,0 +1,34 @@ +import { Observable } from '../'; + +const first = () => source => + new Observable(observer => + source.subscribe({ + next(value) { + observer.next(value); + observer.complete(); + } + }) + ); + +const plus = x => source => + new Observable(observer => + source.subscribe({ + next(value) { + observer.next(value + x); + }, + complete() { + observer.complete(); + } + }) + ); + +test('can pipe values', () => { + const observable = Observable.of(1, 2, 3).pipe(plus(10), first()); + + let value; + observable.subscribe(x => { + value = x; + }); + + expect(value).toEqual(11); +}); diff --git a/packages/kbn-internal-native-observable/index.d.ts b/packages/kbn-internal-native-observable/index.d.ts index 9d618e14b249e..da7e08612e01e 100644 --- a/packages/kbn-internal-native-observable/index.d.ts +++ b/packages/kbn-internal-native-observable/index.d.ts @@ -6,6 +6,8 @@ declare global { } } +type UnaryFunction = (source: T) => R; + // These types are based on the Observable proposal readme, see // https://github.com/tc39/proposal-observable#api, with the addition of using // generics to define the type of the `value`. @@ -109,6 +111,70 @@ declare namespace Observable { onComplete?: () => void ): Subscription; + // pipe + pipe(): Observable; + pipe(op1: UnaryFunction, A>): A; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, B> + ): B; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, C> + ): C; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, D> + ): D; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, E> + ): E; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, F> + ): F; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, Observable>, + op7: UnaryFunction, G> + ): G; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, Observable>, + op7: UnaryFunction, Observable>, + op8: UnaryFunction, H> + ): H; + pipe( + op1: UnaryFunction, Observable>, + op2: UnaryFunction, Observable>, + op3: UnaryFunction, Observable>, + op4: UnaryFunction, Observable>, + op5: UnaryFunction, Observable>, + op6: UnaryFunction, Observable>, + op7: UnaryFunction, Observable>, + op8: UnaryFunction, Observable>, + op9: UnaryFunction, I> + ): I; + // Returns itself [Symbol.observable](): Observable; diff --git a/packages/kbn-internal-native-observable/index.js b/packages/kbn-internal-native-observable/index.js index d03dce80ac01f..3a999d1b23a17 100644 --- a/packages/kbn-internal-native-observable/index.js +++ b/packages/kbn-internal-native-observable/index.js @@ -255,6 +255,14 @@ export class Observable { return new Subscription(observer, this._subscriber); } + + pipe(...operations) { + if (operations.length === 0) { + return this; + } + + return operations.reduce((prev, fn) => fn(prev), this); + } [symbolObservable]() { return this } diff --git a/packages/kbn-observable/package.json b/packages/kbn-observable/package.json index 54dea3b0f7328..756edd3422003 100644 --- a/packages/kbn-observable/package.json +++ b/packages/kbn-observable/package.json @@ -11,6 +11,6 @@ "rxjs": "5.4.3" }, "devDependencies": { - "typescript": "^2.5.3" + "typescript": "2.5.3" } } diff --git a/packages/kbn-observable/src/__tests__/Subject.test.ts b/packages/kbn-observable/src/__tests__/Subject.test.ts index 4bd67425278f0..488f962a70535 100644 --- a/packages/kbn-observable/src/__tests__/Subject.test.ts +++ b/packages/kbn-observable/src/__tests__/Subject.test.ts @@ -1,6 +1,5 @@ import { Observable } from '../Observable'; import { Subject } from '../Subject'; -import { k$ } from '../k$'; import { first } from '../operators'; const noop = () => {}; @@ -340,7 +339,7 @@ test('can use subject in $k', async () => { const complete = jest.fn(); const error = jest.fn(); - k$(values$)(first()).subscribe({ + values$.pipe(first()).subscribe({ next, error, complete diff --git a/packages/kbn-observable/src/__tests__/k$.test.ts b/packages/kbn-observable/src/__tests__/k$.test.ts deleted file mode 100644 index d581aa499f2d9..0000000000000 --- a/packages/kbn-observable/src/__tests__/k$.test.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { Observable } from '../Observable'; -import { k$ } from '../k$'; -import { - OperatorFunction, - UnaryFunction, - MonoTypeOperatorFunction -} from '../interfaces'; - -const plus1: MonoTypeOperatorFunction = source => - new Observable(observer => { - source.subscribe({ - next(val) { - observer.next(val + 1); - }, - error(err) { - observer.error(err); - }, - complete() { - observer.complete(); - } - }); - }); - -const toString: OperatorFunction = source => - new Observable(observer => { - source.subscribe({ - next(val) { - observer.next(val.toString()); - }, - error(err) { - observer.error(err); - }, - complete() { - observer.complete(); - } - }); - }); - -const toPromise: UnaryFunction, Promise> = source => - new Promise((resolve, reject) => { - let lastValue: number; - - source.subscribe({ - next(value) { - lastValue = value; - }, - error(error) { - reject(error); - }, - complete() { - resolve(lastValue); - } - }); - }); - -test('observable to observable', () => { - const numbers$ = Observable.of(1, 2, 3); - const actual: any[] = []; - - k$(numbers$)(plus1, toString).subscribe({ - next(x) { - actual.push(x); - } - }); - - expect(actual).toEqual(['2', '3', '4']); -}); - -test('observable to promise', async () => { - const numbers$ = Observable.of(1, 2, 3); - - const value = await k$(numbers$)(plus1, toPromise); - - expect(value).toEqual(4); -}); diff --git a/packages/kbn-observable/src/index.ts b/packages/kbn-observable/src/index.ts index 9bb85a159fc97..b2c4437b601d4 100644 --- a/packages/kbn-observable/src/index.ts +++ b/packages/kbn-observable/src/index.ts @@ -1,5 +1,3 @@ -export { k$ } from './k$'; - export * from './Observable'; export { Subject } from './Subject'; export { BehaviorSubject } from './BehaviorSubject'; diff --git a/packages/kbn-observable/src/k$.ts b/packages/kbn-observable/src/k$.ts deleted file mode 100644 index 24e63997a5774..0000000000000 --- a/packages/kbn-observable/src/k$.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { Observable, ObservableInput } from './Observable'; -import { pipeFromArray } from './lib'; -import { UnaryFunction } from './interfaces'; -import { $from } from './factories'; - -export function k$(source: ObservableInput) { - function kOperations(op1: UnaryFunction, A>): A; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction - ): B; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction - ): C; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction - ): D; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction - ): E; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction - ): F; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction, - op7: UnaryFunction - ): G; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction, - op7: UnaryFunction, - op8: UnaryFunction - ): H; - function kOperations( - op1: UnaryFunction, A>, - op2: UnaryFunction, - op3: UnaryFunction, - op4: UnaryFunction, - op5: UnaryFunction, - op6: UnaryFunction, - op7: UnaryFunction, - op8: UnaryFunction, - op9: UnaryFunction - ): I; - - function kOperations(...operations: UnaryFunction, R>[]) { - return pipeFromArray(operations)($from(source)); - } - - return kOperations; -} diff --git a/packages/kbn-observable/src/operators/__tests__/filter.test.ts b/packages/kbn-observable/src/operators/__tests__/filter.test.ts index 707ddbd438f18..79d4476c93afc 100644 --- a/packages/kbn-observable/src/operators/__tests__/filter.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/filter.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { $from } from '../../factories'; import { filter } from '../'; import { collect } from '../../lib/collect'; @@ -6,14 +5,14 @@ import { collect } from '../../lib/collect'; const number$ = $from([1, 2, 3]); test('returns the filtered values', async () => { - const filter$ = k$(number$)(filter(n => n > 1)); + const filter$ = number$.pipe(filter(n => n > 1)); const res = collect(filter$); expect(await res).toEqual([2, 3, 'C']); }); test('sends the index as arg 2', async () => { - const filter$ = k$(number$)(filter((n, i) => i > 1)); + const filter$ = number$.pipe(filter((n, i) => i > 1)); const res = collect(filter$); expect(await res).toEqual([3, 'C']); diff --git a/packages/kbn-observable/src/operators/__tests__/first.test.ts b/packages/kbn-observable/src/operators/__tests__/first.test.ts index 997bb9403edf8..9e74e5e314daf 100644 --- a/packages/kbn-observable/src/operators/__tests__/first.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/first.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { first } from '../'; import { Subject } from '../../Subject'; import { collect } from '../../lib/collect'; @@ -6,7 +5,7 @@ import { collect } from '../../lib/collect'; test('returns the first value, then completes', async () => { const values$ = new Subject(); - const observable = k$(values$)(first()); + const observable = values$.pipe(first()); const res = collect(observable); values$.next('foo'); @@ -18,7 +17,7 @@ test('returns the first value, then completes', async () => { test('handles source completing after receiving value', async () => { const values$ = new Subject(); - const observable = k$(values$)(first()); + const observable = values$.pipe(first()); const res = collect(observable); values$.next('foo'); @@ -31,7 +30,7 @@ test('handles source completing after receiving value', async () => { test('returns error if completing without receiving any value', async () => { const values$ = new Subject(); - const observable = k$(values$)(first()); + const observable = values$.pipe(first()); const res = collect(observable); values$.complete(); diff --git a/packages/kbn-observable/src/operators/__tests__/last.test.ts b/packages/kbn-observable/src/operators/__tests__/last.test.ts index 01ba3752727be..cb8c8a4001d8c 100644 --- a/packages/kbn-observable/src/operators/__tests__/last.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/last.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { Subject } from '../../Subject'; import { last } from '../'; @@ -9,7 +8,7 @@ test('returns the last value', async () => { const error = jest.fn(); const complete = jest.fn(); - k$(values$)(last()).subscribe(next, error, complete); + values$.pipe(last()).subscribe(next, error, complete); values$.next('foo'); expect(next).not.toHaveBeenCalled(); @@ -30,7 +29,7 @@ test('returns error if completing without receiving any value', async () => { const error = jest.fn(); - k$(values$)(last()).subscribe({ + values$.pipe(last()).subscribe({ error }); diff --git a/packages/kbn-observable/src/operators/__tests__/map.test.ts b/packages/kbn-observable/src/operators/__tests__/map.test.ts index 7f9cefe4f77c8..c66e6919ed210 100644 --- a/packages/kbn-observable/src/operators/__tests__/map.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/map.test.ts @@ -1,20 +1,19 @@ import { Observable } from '../../Observable'; -import { k$ } from '../../k$'; import { $from } from '../../factories'; import { map, toArray, toPromise } from '../'; const number$ = $from([1, 2, 3]); const collect = (source: Observable) => - k$(source)(toArray(), toPromise()); + source.pipe(toArray(), toPromise()); test('returns the modified value', async () => { - const numbers = await collect(k$(number$)(map(n => n * 1000))); + const numbers = await collect(number$.pipe(map(n => n * 1000))); expect(numbers).toEqual([1000, 2000, 3000]); }); test('sends the index as arg 2', async () => { - const numbers = await collect(k$(number$)(map((n, i) => i))); + const numbers = await collect(number$.pipe(map((n, i) => i))); expect(numbers).toEqual([0, 1, 2]); }); diff --git a/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts b/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts index c5ac1b32caa9a..8006417397161 100644 --- a/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/mergeMap.test.ts @@ -1,5 +1,4 @@ import { Observable } from '../../Observable'; -import { k$ } from '../../k$'; import { Subject } from '../../Subject'; import { mergeMap, map } from '../'; import { $of, $error } from '../../factories'; @@ -11,9 +10,9 @@ test('should mergeMap many outer values to many inner values', async () => { const inner$ = new Subject(); const outer$ = Observable.from([1, 2, 3, 4]); - const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`)); + const project = (value: number) => inner$.pipe(map(x => `${value}-${x}`)); - const observable = k$(outer$)(mergeMap(project)); + const observable = outer$.pipe(mergeMap(project)); const res = collect(observable); await tickMs(10); @@ -48,9 +47,9 @@ test('should mergeMap many outer values to many inner values, early complete', a const outer$ = new Subject(); const inner$ = new Subject(); - const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`)); + const project = (value: number) => inner$.pipe(map(x => `${value}-${x}`)); - const observable = k$(outer$)(mergeMap(project)); + const observable = outer$.pipe(mergeMap(project)); const res = collect(observable); outer$.next(1); @@ -81,7 +80,7 @@ test('should mergeMap many outer to many inner, and inner throws', async () => { const project = (value: number, index: number) => index > 1 ? $error(error) : $of(value); - const observable = k$(source)(mergeMap(project)); + const observable = source.pipe(mergeMap(project)); const res = collect(observable); expect(await res).toEqual([1, 2, error]); @@ -91,9 +90,9 @@ test('should mergeMap many outer to many inner, and outer throws', async () => { const outer$ = new Subject(); const inner$ = new Subject(); - const project = (value: number) => k$(inner$)(map(x => `${value}-${x}`)); + const project = (value: number) => inner$.pipe(map(x => `${value}-${x}`)); - const observable = k$(outer$)(mergeMap(project)); + const observable = outer$.pipe(mergeMap(project)); const res = collect(observable); outer$.next(1); @@ -120,7 +119,7 @@ test('should mergeMap many outer to many inner, and outer throws', async () => { test('should mergeMap many outer to an array for each value', async () => { const source = Observable.from([1, 2, 3]); - const observable = k$(source)(mergeMap(() => $of('a', 'b', 'c'))); + const observable = source.pipe(mergeMap(() => $of('a', 'b', 'c'))); const res = collect(observable); expect(await res).toEqual(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'C']); @@ -132,7 +131,7 @@ test('should mergeMap many outer to inner arrays, using resultSelector', async ( const source = Observable.from([1, 2, 3]); const project = (num: number, str: string) => `${num}/${str}`; - const observable = k$(source)(mergeMap(() => $of('a', 'b', 'c'), project)); + const observable = source.pipe(mergeMap(() => $of('a', 'b', 'c'), project)); const res = collect(observable); expect(await res).toEqual([ diff --git a/packages/kbn-observable/src/operators/__tests__/reduce.test.ts b/packages/kbn-observable/src/operators/__tests__/reduce.test.ts index 751e6382d6e10..77db91093c4bb 100644 --- a/packages/kbn-observable/src/operators/__tests__/reduce.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/reduce.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { reduce } from '../'; import { Subject } from '../../Subject'; import { collect } from '../../lib/collect'; @@ -6,7 +5,7 @@ import { collect } from '../../lib/collect'; test('completes when source completes', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( reduce((acc, val) => { return acc + val; }, 'foo') @@ -23,7 +22,7 @@ test('completes when source completes', async () => { test('injects index', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( reduce((acc, val, index) => { return acc + index; }, 'foo') @@ -40,7 +39,7 @@ test('injects index', async () => { test('completes with initial value if no values received', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( reduce((acc, val, index) => { return acc + val; }, 'foo') diff --git a/packages/kbn-observable/src/operators/__tests__/scan.test.ts b/packages/kbn-observable/src/operators/__tests__/scan.test.ts index 0044f75136981..89c5e0b7d626c 100644 --- a/packages/kbn-observable/src/operators/__tests__/scan.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/scan.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { scan } from '../'; import { Subject } from '../../Subject'; import { collect } from '../../lib/collect'; @@ -6,7 +5,7 @@ import { collect } from '../../lib/collect'; test('completes when source completes', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( scan((acc, val) => { return acc + val; }, 'foo') @@ -23,7 +22,7 @@ test('completes when source completes', async () => { test('injects index', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( scan((acc, val, index) => { return acc + index; }, 'foo') @@ -40,7 +39,7 @@ test('injects index', async () => { test('completes if no values received', async () => { const subject = new Subject(); - const observable = k$(subject)( + const observable = subject.pipe( scan((acc, val, index) => { return acc + val; }, 'foo') diff --git a/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts b/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts index 358be87c952a4..8e3eb61245934 100644 --- a/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/shareLast.test.ts @@ -1,13 +1,12 @@ import { Observable, SubscriptionObserver } from '../../Observable'; import { BehaviorSubject } from '../../BehaviorSubject'; -import { k$ } from '../../k$'; import { shareLast } from '../'; import { collect } from '../../lib/collect'; test('should mirror a simple source Observable', async () => { const source = Observable.from([4, 3, 2, 1]); - const observable = k$(source)(shareLast()); + const observable = source.pipe(shareLast()); const res = collect(observable); expect(await res).toEqual([4, 3, 2, 1, 'C']); @@ -19,7 +18,7 @@ test('should do nothing if result is not subscribed', () => { subscribed = true; }); - k$(source)(shareLast()); + source.pipe(shareLast()); expect(subscribed).toBe(false); }); @@ -34,7 +33,7 @@ test('should multicast the same values to multiple observers', () => { }); const results: any[] = []; - const source = k$(subject)(shareLast()); + const source = subject.pipe(shareLast()); source.subscribe(x => { results.push(`1/${x}`); @@ -78,7 +77,7 @@ test('should multicast an error from the source to multiple observers', () => { const subject = new BehaviorSubject('a'); const results: any[] = []; - const source = k$(subject)(shareLast()); + const source = subject.pipe(shareLast()); source.subscribe({ error(err) { @@ -116,7 +115,7 @@ test('should replay results to subsequent subscriptions if source completes', () const results: any[] = []; - const source = k$(observable)(shareLast()); + const source = observable.pipe(shareLast()); source.subscribe(x => { results.push(`1/${x}`); @@ -150,7 +149,7 @@ test('should completely restart for subsequent subscriptions if source errors', const results: any[] = []; - const source = k$(observable)(shareLast()); + const source = observable.pipe(shareLast()); source.subscribe(x => { results.push(`1/${x}`); @@ -184,7 +183,7 @@ test('restarts if refCount hits 0 due to unsubscriptions', () => { const results: any[] = []; - const source = k$(observable)(shareLast()); + const source = observable.pipe(shareLast()); const sub1 = source.subscribe(x => { results.push(`1/${x}`); diff --git a/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts b/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts index 732f735c62960..10740f510908b 100644 --- a/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/skipRepeats.test.ts @@ -1,6 +1,5 @@ import { Observable } from '../../Observable'; import { Subject } from '../../Subject'; -import { k$ } from '../../k$'; import { $of } from '../../factories'; import { skipRepeats } from '../'; import { collect } from '../../lib/collect'; @@ -8,7 +7,7 @@ import { collect } from '../../lib/collect'; test('should distinguish between values', async () => { const values$ = new Subject(); - const observable = k$(values$)(skipRepeats()); + const observable = values$.pipe(skipRepeats()); const res = collect(observable); values$.next('a'); @@ -27,7 +26,7 @@ test('should distinguish between values and does not complete', () => { const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ next(v) { actual.push(v); } @@ -47,7 +46,7 @@ test('should distinguish between values and does not complete', () => { test('should complete if source is empty', done => { const values$ = $of(); - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ complete: done }); }); @@ -56,7 +55,7 @@ test('should emit if source emits single element only', () => { const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ next(x) { actual.push(x); } @@ -71,7 +70,7 @@ test('should emit if source is scalar', () => { const values$ = $of('a'); const actual: any[] = []; - k$(values$)(skipRepeats()).subscribe({ + values$.pipe(skipRepeats()).subscribe({ next(v) { actual.push(v); } @@ -83,7 +82,7 @@ test('should emit if source is scalar', () => { test('should raise error if source raises error', async () => { const values$ = new Subject(); - const observable = k$(values$)(skipRepeats()); + const observable = values$.pipe(skipRepeats()); const res = collect(observable); values$.next('a'); @@ -103,7 +102,7 @@ test('should raise error if source throws', () => { }); const error = jest.fn(); - k$(obs)(skipRepeats()).subscribe({ + obs.pipe(skipRepeats()).subscribe({ error }); @@ -114,7 +113,7 @@ test('should allow unsubscribing early and explicitly', () => { const values$ = new Subject(); const actual: any[] = []; - const sub = k$(values$)(skipRepeats()).subscribe({ + const sub = values$.pipe(skipRepeats()).subscribe({ next(v) { actual.push(v); } @@ -136,7 +135,7 @@ test('should emit once if comparator returns true always regardless of source em const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats(() => true)).subscribe({ + values$.pipe(skipRepeats(() => true)).subscribe({ next(v) { actual.push(v); } @@ -154,7 +153,7 @@ test('should emit all if comparator returns false always regardless of source em const values$ = new Subject(); const actual: any[] = []; - k$(values$)(skipRepeats(() => false)).subscribe({ + values$.pipe(skipRepeats(() => false)).subscribe({ next(v) { actual.push(v); } @@ -174,7 +173,7 @@ test('should distinguish values by comparator', () => { const comparator = (x: number, y: number) => y % 2 === 0; const actual: any[] = []; - k$(values$)(skipRepeats(comparator)).subscribe({ + values$.pipe(skipRepeats(comparator)).subscribe({ next(v) { actual.push(v); } diff --git a/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts b/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts index 9062069543f12..3c9c3c7d31d5a 100644 --- a/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/switchMap.test.ts @@ -1,5 +1,4 @@ import { Observable } from '../../Observable'; -import { k$ } from '../../k$'; import { switchMap } from '../'; import { collect } from '../../lib/collect'; import { $of } from '../../factories'; @@ -10,7 +9,7 @@ const number$ = $of(1, 2, 3); test('returns the modified value', async () => { const expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3', 'C']; - const observable = k$(number$)( + const observable = number$.pipe( switchMap(x => $of('a' + x, 'b' + x, 'c' + x)) ); const res = collect(observable); @@ -19,7 +18,7 @@ test('returns the modified value', async () => { }); test('injects index to map', async () => { - const observable = k$(number$)(switchMap((x, i) => $of(i))); + const observable = number$.pipe(switchMap((x, i) => $of(i))); const res = collect(observable); expect(await res).toEqual([0, 1, 2, 'C']); @@ -29,16 +28,18 @@ test('should unsubscribe inner observable when source observable emits new value const unsubbed: string[] = []; const subject = new Subject(); - k$(subject)( - switchMap( - x => - new Observable(observer => { - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe(); + .subscribe(); subject.next('a'); expect(unsubbed).toEqual([]); @@ -57,16 +58,18 @@ test('should unsubscribe inner observable when source observable errors', async const unsubbed: string[] = []; const subject = new Subject(); - k$(subject)( - switchMap( - x => - new Observable(observer => { - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe(); + .subscribe(); subject.next('a'); subject.error(new Error('fail')); @@ -78,17 +81,19 @@ test('should unsubscribe inner observables if inner observer completes', async ( const unsubbed: string[] = []; const subject = new Subject(); - k$(subject)( - switchMap( - x => - new Observable(observer => { - observer.complete(); - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + observer.complete(); + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe(); + .subscribe(); subject.next('a'); expect(unsubbed).toEqual(['a']); @@ -107,19 +112,21 @@ test('should unsubscribe inner observables if inner observer errors', async () = const error = jest.fn(); const thrownError = new Error('fail'); - k$(subject)( - switchMap( - x => - new Observable(observer => { - observer.error(thrownError); - return () => { - unsubbed.push(x); - }; - }) + subject + .pipe( + switchMap( + x => + new Observable(observer => { + observer.error(thrownError); + return () => { + unsubbed.push(x); + }; + }) + ) ) - ).subscribe({ - error - }); + .subscribe({ + error + }); subject.next('a'); expect(unsubbed).toEqual(['a']); @@ -137,7 +144,7 @@ test('should switch inner observables', () => { const actual: any[] = []; - k$(outer$)(switchMap(x => inner$[x])).subscribe({ + outer$.pipe(switchMap(x => inner$[x])).subscribe({ next(val) { actual.push(val); } @@ -165,7 +172,7 @@ test('should switch inner empty and empty', () => { const next = jest.fn(); - k$(outer$)(switchMap(x => inner$[x])).subscribe(next); + outer$.pipe(switchMap(x => inner$[x])).subscribe(next); outer$.next('x'); inner$.x.complete(); @@ -189,7 +196,7 @@ test('should switch inner never and throw', async () => { inner$.y.error(error); - const observable = k$(outer$)(switchMap(x => inner$[x])); + const observable = outer$.pipe(switchMap(x => inner$[x])); const res = collect(observable); outer$.next('x'); @@ -205,7 +212,7 @@ test('should handle outer throw', async () => { throw error; }); - const observable = k$(outer$)(switchMap(x => $of(x))); + const observable = outer$.pipe(switchMap(x => $of(x))); const res = collect(observable); expect(await res).toEqual([error]); @@ -217,7 +224,7 @@ test('should handle outer error', async () => { x: new Subject() }; - const observable = k$(outer$)(switchMap(x => inner$[x])); + const observable = outer$.pipe(switchMap(x => inner$[x])); const res = collect(observable); outer$.next('x'); @@ -239,7 +246,7 @@ test('should raise error when projection throws', async () => { const outer$ = new Subject(); const error = new Error('foo'); - const observable = k$(outer$)( + const observable = outer$.pipe( switchMap(x => { throw error; }) @@ -259,7 +266,7 @@ test('should switch inner cold observables, outer is unsubscribed early', () => }; const actual: any[] = []; - const sub = k$(outer$)(switchMap(x => inner$[x])).subscribe({ + const sub = outer$.pipe(switchMap(x => inner$[x])).subscribe({ next(val) { actual.push(val); } diff --git a/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts b/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts index d83b4d5736099..4508732d47947 100644 --- a/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts +++ b/packages/kbn-observable/src/operators/__tests__/toPromise.test.ts @@ -1,4 +1,3 @@ -import { k$ } from '../../k$'; import { Subject } from '../../Subject'; import { toPromise } from '../'; @@ -12,7 +11,7 @@ test('returns the last value', async () => { const resolved = jest.fn(); const rejected = jest.fn(); - k$(values$)(toPromise()).then(resolved, rejected); + values$.pipe(toPromise()).then(resolved, rejected); values$.next('foo'); await tick(); @@ -40,7 +39,7 @@ test('resolves even if no values received', async () => { const resolved = jest.fn(); const rejected = jest.fn(); - k$(values$)(toPromise()).then(resolved, rejected); + values$.pipe(toPromise()).then(resolved, rejected); values$.complete(); await tick(); @@ -55,7 +54,7 @@ test('rejects if error received', async () => { const resolved = jest.fn(); const rejected = jest.fn(); - k$(values$)(toPromise()).then(resolved, rejected); + values$.pipe(toPromise()).then(resolved, rejected); values$.error(new Error('fail')); await tick(); diff --git a/platform/cli/cli.ts b/platform/cli/cli.ts index 552853bfa7210..27b0fa7a4d0d2 100644 --- a/platform/cli/cli.ts +++ b/platform/cli/cli.ts @@ -1,6 +1,6 @@ // TODO Fix build system so we can switch these to `import`s const yargs = require('yargs'); -import { k$, map } from 'kbn-observable'; +import { map } from 'kbn-observable'; import * as args from './args'; import { version } from './version'; @@ -34,9 +34,9 @@ const run = (argv: { [key: string]: any }) => { process.exit(reason === undefined ? 0 : 1); }; - const rawConfig$ = k$(rawConfigService.getConfig$())( - map(rawConfig => overrideConfigWithArgv(rawConfig, argv)) - ); + const rawConfig$ = rawConfigService + .getConfig$() + .pipe(map(rawConfig => overrideConfigWithArgv(rawConfig, argv))); const root = new Root(rawConfig$, env, onShutdown); root.start(); diff --git a/platform/config/ConfigService.ts b/platform/config/ConfigService.ts index f9702eac27246..61d7ea650c560 100644 --- a/platform/config/ConfigService.ts +++ b/platform/config/ConfigService.ts @@ -1,11 +1,4 @@ -import { - Observable, - k$, - map, - first, - skipRepeats, - toPromise -} from 'kbn-observable'; +import { Observable, map, first, skipRepeats, toPromise } from 'kbn-observable'; import { isEqual } from 'lodash'; import { Env } from './Env'; @@ -56,7 +49,7 @@ export class ConfigService { path: ConfigPath, ConfigClass: ConfigWithSchema ) { - return k$(this.getDistinctRawConfig(path))( + return this.getDistinctRawConfig(path).pipe( map(rawConfig => this.createConfig(path, rawConfig, ConfigClass)) ); } @@ -71,7 +64,7 @@ export class ConfigService { path: ConfigPath, ConfigClass: ConfigWithSchema ) { - return k$(this.getDistinctRawConfig(path))( + return this.getDistinctRawConfig(path).pipe( map( rawConfig => rawConfig === undefined @@ -84,7 +77,7 @@ export class ConfigService { async isEnabledAtPath(path: ConfigPath) { const enabledPath = createPluginEnabledPath(path); - const config = await k$(this.config$)(first(), toPromise()); + const config = await this.config$.pipe(first(), toPromise()); if (!config.has(enabledPath)) { return true; @@ -123,7 +116,7 @@ export class ConfigService { private getDistinctRawConfig(path: ConfigPath) { this.markAsHandled(path); - return k$(this.config$)( + return this.config$.pipe( map(config => config.get(path)), skipRepeats(isEqual) ); @@ -134,7 +127,7 @@ export class ConfigService { } async getUnusedPaths(): Promise { - const config = await k$(this.config$)(first(), toPromise()); + const config = await this.config$.pipe(first(), toPromise()); const handledPaths = this.handledPaths.map(pathToString); return config diff --git a/platform/config/RawConfigService.ts b/platform/config/RawConfigService.ts index f66c72298ef44..77533615c4a64 100644 --- a/platform/config/RawConfigService.ts +++ b/platform/config/RawConfigService.ts @@ -1,5 +1,4 @@ import { - k$, BehaviorSubject, Observable, map, @@ -65,7 +64,7 @@ export class RawConfigService { private readonly config$: Observable; constructor(readonly configFile: string) { - this.config$ = k$(this.rawConfigFromFile$)( + this.config$ = this.rawConfigFromFile$.pipe( filter(rawConfig => rawConfig !== notRead), map(rawConfig => { // If the raw config is null, e.g. if empty config file, we default to diff --git a/platform/config/__tests__/ConfigService.test.ts b/platform/config/__tests__/ConfigService.test.ts index 08cba543b6cd1..a90020254d025 100644 --- a/platform/config/__tests__/ConfigService.test.ts +++ b/platform/config/__tests__/ConfigService.test.ts @@ -1,4 +1,4 @@ -import { BehaviorSubject, k$, first, toPromise } from 'kbn-observable'; +import { BehaviorSubject, first, toPromise } from 'kbn-observable'; import { ConfigService, ObjectToRawConfigAdapter } from '..'; import { Env } from '../Env'; @@ -16,7 +16,7 @@ test('returns config at path as observable', async () => { const configService = new ConfigService(config$, defaultEnv, logger); const configs = configService.atPath('key', ExampleClassWithStringSchema); - const exampleConfig = await k$(configs)(first(), toPromise()); + const exampleConfig = await configs.pipe(first(), toPromise()); expect(exampleConfig.value).toBe('foo'); }); @@ -32,7 +32,7 @@ test('throws if config at path does not match schema', async () => { const configs = configService.atPath('key', ExampleClassWithStringSchema); try { - await k$(configs)(first(), toPromise()); + await configs.pipe(first(), toPromise()); } catch (e) { expect(e.message).toMatchSnapshot(); } @@ -48,7 +48,7 @@ test("returns undefined if fetching optional config at a path that doesn't exist 'unique-name', ExampleClassWithStringSchema ); - const exampleConfig = await k$(configs)(first(), toPromise()); + const exampleConfig = await configs.pipe(first(), toPromise()); expect(exampleConfig).toBeUndefined(); }); @@ -63,7 +63,7 @@ test('returns observable config at optional path if it exists', async () => { 'value', ExampleClassWithStringSchema ); - const exampleConfig: any = await k$(configs)(first(), toPromise()); + const exampleConfig: any = await configs.pipe(first(), toPromise()); expect(exampleConfig).toBeDefined(); expect(exampleConfig.value).toBe('bar'); @@ -118,7 +118,7 @@ test("throws error if config class does not implement 'createSchema'", async () const configs = configService.atPath('key', ExampleClass as any); try { - await k$(configs)(first(), toPromise()); + await configs.pipe(first(), toPromise()); } catch (e) { expect(e).toMatchSnapshot(); } diff --git a/platform/config/__tests__/RawConfigService.test.ts b/platform/config/__tests__/RawConfigService.test.ts index 65a1340bea369..a611a4d7044e3 100644 --- a/platform/config/__tests__/RawConfigService.test.ts +++ b/platform/config/__tests__/RawConfigService.test.ts @@ -4,7 +4,7 @@ jest.mock('../readConfig', () => ({ getConfigFromFile: mockGetConfigFromFile })); -import { k$, first, toPromise } from 'kbn-observable'; +import { first, toPromise } from 'kbn-observable'; import { RawConfigService } from '../RawConfigService'; const configFile = '/config/kibana.yml'; @@ -44,10 +44,9 @@ test('returns config at path as observable', async () => { configService.loadConfig(); - const exampleConfig = await k$(configService.getConfig$())( - first(), - toPromise() - ); + const exampleConfig = await configService + .getConfig$() + .pipe(first(), toPromise()); expect(exampleConfig.get('key')).toEqual('value'); expect(exampleConfig.getFlattenedPaths()).toEqual(['key']); diff --git a/platform/legacy/index.ts b/platform/legacy/index.ts index 0c6ac880017fd..f77dbdadc7b00 100644 --- a/platform/legacy/index.ts +++ b/platform/legacy/index.ts @@ -8,7 +8,7 @@ export { /**@internal**/ export { LegacyKbnServer } from './LegacyKbnServer'; -import { k$, map, BehaviorSubject } from 'kbn-observable'; +import { map, BehaviorSubject } from 'kbn-observable'; import { Root } from '../root'; import { Env } from '../config'; import { @@ -23,7 +23,7 @@ import { */ export const injectIntoKbnServer = (rawKbnServer: any) => { const legacyConfig$ = new BehaviorSubject(rawKbnServer.config); - const config$ = k$(legacyConfig$)( + const config$ = legacyConfig$.pipe( map(legacyConfig => new LegacyConfigToRawConfigAdapter(legacyConfig)) ); diff --git a/platform/server/elasticsearch/ElasticsearchFacade.ts b/platform/server/elasticsearch/ElasticsearchFacade.ts index 5ef117ba09cae..1c03b97b69109 100644 --- a/platform/server/elasticsearch/ElasticsearchFacade.ts +++ b/platform/server/elasticsearch/ElasticsearchFacade.ts @@ -1,4 +1,4 @@ -import { k$, first, toPromise } from 'kbn-observable'; +import { first, toPromise } from 'kbn-observable'; import { ElasticsearchService } from './ElasticsearchService'; import { ElasticsearchClusterType } from './ElasticsearchConfig'; @@ -8,9 +8,8 @@ export class ElasticsearchRequestHelpers { constructor(private readonly elasticsearchService: ElasticsearchService) {} getClusterOfType(type: ElasticsearchClusterType): Promise { - return k$(this.elasticsearchService.getClusterOfType$(type))( - first(), - toPromise() - ); + return this.elasticsearchService + .getClusterOfType$(type) + .pipe(first(), toPromise()); } } diff --git a/platform/server/elasticsearch/ElasticsearchService.ts b/platform/server/elasticsearch/ElasticsearchService.ts index ebeb2ef2b499c..a835306f4669c 100644 --- a/platform/server/elasticsearch/ElasticsearchService.ts +++ b/platform/server/elasticsearch/ElasticsearchService.ts @@ -1,7 +1,6 @@ import { Observable, Subscription, - k$, map, filter, switchMap, @@ -26,7 +25,7 @@ export class ElasticsearchService implements CoreService { ) { const log = logger.get('elasticsearch'); - this.clusters$ = k$(config$)( + this.clusters$ = config$.pipe( filter(() => { if (this.subscription !== undefined) { log.error('clusters cannot be changed after they are created'); @@ -75,6 +74,6 @@ export class ElasticsearchService implements CoreService { } getClusterOfType$(type: ElasticsearchClusterType) { - return k$(this.clusters$)(map(clusters => clusters[type])); + return this.clusters$.pipe(map(clusters => clusters[type])); } } diff --git a/platform/server/http/HttpService.ts b/platform/server/http/HttpService.ts index 21bd93cb72b8b..a9a9560691fbb 100644 --- a/platform/server/http/HttpService.ts +++ b/platform/server/http/HttpService.ts @@ -1,4 +1,4 @@ -import { Observable, Subscription, k$, first, toPromise } from 'kbn-observable'; +import { Observable, Subscription, first, toPromise } from 'kbn-observable'; import { Env } from '../../config'; import { HttpServer } from './HttpServer'; @@ -34,7 +34,7 @@ export class HttpService implements CoreService { } }); - const config = await k$(this.config$)(first(), toPromise()); + const config = await this.config$.pipe(first(), toPromise()); await this.httpServer.start(config); } diff --git a/platform/server/plugins/PluginsService.ts b/platform/server/plugins/PluginsService.ts index f93cf1a75f3eb..d9936f4c873ad 100644 --- a/platform/server/plugins/PluginsService.ts +++ b/platform/server/plugins/PluginsService.ts @@ -2,7 +2,6 @@ import { readdir, stat } from 'fs'; import { resolve } from 'path'; import { Observable, - k$, first, map, mergeMap, @@ -37,7 +36,7 @@ export class PluginsService implements CoreService { } async start() { - const plugins = await k$(this.getAllPlugins())( + const plugins = await this.getAllPlugins().pipe( mergeMap( plugin => $fromPromise(this.isPluginEnabled(plugin)), (plugin, isEnabled) => ({ plugin, isEnabled }) @@ -69,7 +68,7 @@ export class PluginsService implements CoreService { } private getAllPlugins() { - return k$(this.pluginsConfig$)( + return this.pluginsConfig$.pipe( first(), mergeMap(config => config.scanDirs), mergeMap(