Skip to content

Commit

Permalink
[ML] Fixing job progress subscriber leak (#43767)
Browse files Browse the repository at this point in the history
  • Loading branch information
jgowdyelastic authored Aug 23, 2019
1 parent 6d659e5 commit efb27e4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ export class JobCreator {
protected _useDedicatedIndex: boolean = false;
protected _start: number = 0;
protected _end: number = 0;
protected _subscribers: ProgressSubscriber[];
protected _subscribers: ProgressSubscriber[] = [];
protected _aggs: Aggregation[] = [];
protected _fields: Field[] = [];
protected _sparseData: boolean = false;
private _stopAllRefreshPolls: {
stop: boolean;
};
} = { stop: false };

constructor(indexPattern: IndexPattern, savedSearch: SavedSearch, query: object) {
this._indexPattern = indexPattern;
Expand All @@ -51,8 +51,6 @@ export class JobCreator {
}

this._datafeed_config.query = query;
this._subscribers = [];
this._stopAllRefreshPolls = { stop: false };
}

public get type(): JOB_TYPE {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class JobRunner {
private _stopRefreshPoll: {
stop: boolean;
};
private _subscribers: ProgressSubscriber[];

constructor(jobCreator: JobCreator) {
this._jobId = jobCreator.jobId;
Expand All @@ -38,9 +39,7 @@ export class JobRunner {
this._stopRefreshPoll = jobCreator.stopAllRefreshPolls;

this._progress$ = new BehaviorSubject(this._percentageComplete);
// link the _subscribers list from the JobCreator
// to the progress BehaviorSubject.
jobCreator.subscribers.forEach(s => this._progress$.subscribe(s));
this._subscribers = jobCreator.subscribers;
}

public get datafeedState(): DATAFEED_STATE {
Expand Down Expand Up @@ -72,6 +71,11 @@ export class JobRunner {
pollProgress: boolean
): Promise<boolean> {
try {
// link the _subscribers list from the JobCreator
// to the progress BehaviorSubject.
const subscriptions =
pollProgress === true ? this._subscribers.map(s => this._progress$.subscribe(s)) : [];

await this.openJob();
const { started } = await mlJobService.startDatafeed(
this._datafeedId,
Expand All @@ -93,6 +97,9 @@ export class JobRunner {
setTimeout(async () => {
await check();
}, this._refreshInterval);
} else {
// job has finished running, unsubscribe everyone
subscriptions.forEach(s => s.unsubscribe());
}
};
// wait for the first check to run and then return success.
Expand Down

0 comments on commit efb27e4

Please sign in to comment.