Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and optimize worker task scheduling #10417

Merged
merged 9 commits into from
Mar 11, 2021
99 changes: 39 additions & 60 deletions src/source/geojson_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type Actor from '../util/actor.js';
import type {Callback} from '../types/callback.js';
import type {GeoJSON, GeoJSONFeature} from '@mapbox/geojson-types';
import type {GeoJSONSourceSpecification, PromoteIdSpecification} from '../style-spec/types.js';
import type {Cancelable} from '../types/cancelable.js';

/**
* A source containing GeoJSON.
Expand Down Expand Up @@ -79,9 +80,10 @@ class GeoJSONSource extends Evented implements Source {
map: Map;
actor: Actor;
_loaded: boolean;
_coalesce: ?boolean;
_metadataFired: ?boolean;
_collectResourceTiming: boolean;
_resourceTiming: Array<PerformanceResourceTiming>;
_removed: boolean;
_pendingLoad: ?Cancelable;

/**
* @private
Expand All @@ -100,7 +102,6 @@ class GeoJSONSource extends Evented implements Source {
this.tileSize = 512;
this.isTileClipped = true;
this.reparseOverscaled = true;
this._removed = false;
this._loaded = false;

this.actor = dispatcher.getActor();
Expand All @@ -110,7 +111,6 @@ class GeoJSONSource extends Evented implements Source {
this._options = extend({}, options);

this._collectResourceTiming = options.collectResourceTiming;
this._resourceTiming = [];

if (options.maxzoom !== undefined) this.maxzoom = options.maxzoom;
if (options.type) this.type = options.type;
Expand Down Expand Up @@ -147,29 +147,9 @@ class GeoJSONSource extends Evented implements Source {
}, options.workerOptions);
}

load() {
this.fire(new Event('dataloading', {dataType: 'source'}));
this._updateWorkerData((err) => {
if (err) {
this.fire(new ErrorEvent(err));
return;
}

const data: Object = {dataType: 'source', sourceDataType: 'metadata'};
if (this._collectResourceTiming && this._resourceTiming && (this._resourceTiming.length > 0)) {
data.resourceTiming = this._resourceTiming;
this._resourceTiming = [];
}

// although GeoJSON sources contain no metadata, we fire this event to let the SourceCache
// know its ok to start requesting tiles.
this.fire(new Event('data', data));
});
}

onAdd(map: Map) {
this.map = map;
this.load();
this.setData(this._data);
}

/**
Expand All @@ -180,21 +160,7 @@ class GeoJSONSource extends Evented implements Source {
*/
setData(data: GeoJSON | string) {
this._data = data;
this.fire(new Event('dataloading', {dataType: 'source'}));
this._updateWorkerData((err) => {
if (err) {
this.fire(new ErrorEvent(err));
return;
}

const data: Object = {dataType: 'source', sourceDataType: 'content'};
if (this._collectResourceTiming && this._resourceTiming && (this._resourceTiming.length > 0)) {
data.resourceTiming = this._resourceTiming;
this._resourceTiming = [];
}
this.fire(new Event('data', data));
});

this._updateWorkerData();
return this;
}

Expand Down Expand Up @@ -262,7 +228,15 @@ class GeoJSONSource extends Evented implements Source {
* handles loading the geojson data and preparing to serve it up as tiles,
* using geojson-vt or supercluster as appropriate.
*/
_updateWorkerData(callback: Callback<void>) {
_updateWorkerData() {
// if there's an earlier loadData to finish, wait until it finishes and then do another update
if (this._pendingLoad) {
this._coalesce = true;
return;
}

this.fire(new Event('dataloading', {dataType: 'source'}));

this._loaded = false;
const options = extend({}, this.workerOptions);
const data = this._data;
Expand All @@ -276,24 +250,28 @@ class GeoJSONSource extends Evented implements Source {
// target {this.type}.loadData rather than literally geojson.loadData,
// so that other geojson-like source types can easily reuse this
// implementation
this.actor.send(`${this.type}.loadData`, options, (err, result) => {
if (this._removed || (result && result.abandoned)) {
return;
}

this._pendingLoad = this.actor.send(`${this.type}.loadData`, options, (err, result) => {
this._loaded = true;
this._pendingLoad = null;

if (result && result.resourceTiming && result.resourceTiming[this.id])
this._resourceTiming = result.resourceTiming[this.id].slice(0);
// Any `loadData` calls that piled up while we were processing
// this one will get coalesced into a single call when this
// 'coalesce' message is processed.
// We would self-send from the worker if we had access to its
// message queue. Waiting instead for the 'coalesce' to round-trip
// through the foreground just means we're throttling the worker
// to run at a little less than full-throttle.
this.actor.send(`${this.type}.coalesce`, {source: options.source}, null);
callback(err);
if (err) {
this.fire(new ErrorEvent(err));

} else {
// although GeoJSON sources contain no metadata, we fire this event at first
// to let the SourceCache know its ok to start requesting tiles.
const data: Object = {dataType: 'source', sourceDataType: this._metadataFired ? 'content' : 'metadata'};
if (this._collectResourceTiming && result && result.resourceTiming && result.resourceTiming[this.id]) {
data.resourceTiming = result.resourceTiming[this.id];
}
this.fire(new Event('data', data));
this._metadataFired = true;
}

if (this._coalesce) {
this._updateWorkerData();
this._coalesce = false;
}
});
}

Expand Down Expand Up @@ -333,7 +311,7 @@ class GeoJSONSource extends Evented implements Source {
tile.loadVectorData(data, this.map.painter, message === 'reloadTile');

return callback(null);
});
}, undefined, message === 'loadTile');
}

abortTile(tile: Tile) {
Expand All @@ -350,8 +328,9 @@ class GeoJSONSource extends Evented implements Source {
}

onRemove() {
this._removed = true;
this.actor.send('removeSource', {type: this.type, source: this.id});
if (this._pendingLoad) {
this._pendingLoad.cancel();
}
}

serialize() {
Expand Down
81 changes: 1 addition & 80 deletions src/source/geojson_worker_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ function loadGeoJSONTile(params: RequestedTileParameters, callback: LoadVectorDa
});
}

export type SourceState =
| 'Idle' // Source empty or data loaded
| 'Coalescing' // Data finished loading, but discard 'loadData' messages until receiving 'coalesced'
| 'NeedsLoadData'; // 'loadData' received while coalescing, trigger one more 'loadData' on receiving 'coalesced'

/**
* The {@link WorkerSource} implementation that supports {@link GeoJSONSource}.
* This class is designed to be easily reused to support custom source types
Expand All @@ -94,11 +89,6 @@ export type SourceState =
*/
class GeoJSONWorkerSource extends VectorTileWorkerSource {
loadGeoJSON: LoadGeoJSON;
_state: SourceState;
_pendingCallback: Callback<{
resourceTiming?: {[_: string]: Array<PerformanceResourceTiming>},
abandoned?: boolean }>;
_pendingLoadDataParams: LoadGeoJSONParameters;
_geoJSONIndex: GeoJSONIndex

/**
Expand Down Expand Up @@ -131,39 +121,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
* @param callback
* @private
*/
loadData(params: LoadGeoJSONParameters, callback: Callback<{
resourceTiming?: {[_: string]: Array<PerformanceResourceTiming>},
abandoned?: boolean }>) {
if (this._pendingCallback) {
// Tell the foreground the previous call has been abandoned
this._pendingCallback(null, {abandoned: true});
}
this._pendingCallback = callback;
this._pendingLoadDataParams = params;

if (this._state &&
this._state !== 'Idle') {
this._state = 'NeedsLoadData';
} else {
this._state = 'Coalescing';
this._loadData();
}
}

/**
* Internal implementation: called directly by `loadData`
* or by `coalesce` using stored parameters.
*/
_loadData() {
if (!this._pendingCallback || !this._pendingLoadDataParams) {
assert(false);
return;
}
const callback = this._pendingCallback;
const params = this._pendingLoadDataParams;
delete this._pendingCallback;
delete this._pendingLoadDataParams;

loadData(params: LoadGeoJSONParameters, callback: Callback<{resourceTiming?: {[_: string]: Array<PerformanceResourceTiming>}}>) {
const requestParam = params && params.request;
const perf = requestParam && requestParam.collectResourceTiming;

Expand Down Expand Up @@ -209,35 +167,6 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
});
}

/**
* While processing `loadData`, we coalesce all further
* `loadData` messages into a single call to _loadData
* that will happen once we've finished processing the
* first message. {@link GeoJSONSource#_updateWorkerData}
* is responsible for sending us the `coalesce` message
* at the time it receives a response from `loadData`
*
* State: Idle
* ↑ |
* 'coalesce' 'loadData'
* | (triggers load)
* | ↓
* State: Coalescing
* ↑ |
* (triggers load) |
* 'coalesce' 'loadData'
* | ↓
* State: NeedsLoadData
*/
coalesce() {
if (this._state === 'Coalescing') {
this._state = 'Idle';
} else if (this._state === 'NeedsLoadData') {
this._state = 'Coalescing';
this._loadData();
}
}

/**
* Implements {@link WorkerSource#reloadTile}.
*
Expand Down Expand Up @@ -289,14 +218,6 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
}
}

removeSource(params: {source: string}, callback: Callback<mixed>) {
if (this._pendingCallback) {
// Don't leak callbacks
this._pendingCallback(null, {abandoned: true});
}
callback();
}

getClusterExpansionZoom(params: {clusterId: number}, callback: Callback<number>) {
try {
callback(null, this._geoJSONIndex.getClusterExpansionZoom(params.clusterId));
Expand Down
2 changes: 1 addition & 1 deletion src/source/raster_dem_tile_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RasterDEMTileSource extends RasterTileSource implements Source {

if (!tile.actor || tile.state === 'expired') {
tile.actor = this.dispatcher.getActor();
tile.actor.send('loadDEMTile', params, done.bind(this));
tile.actor.send('loadDEMTile', params, done.bind(this), undefined, true);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/source/vector_tile_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ class VectorTileSource extends Evented implements Source {
expires: data.expires,
rawData: data.rawData.slice(0)
};
if (tile.actor) tile.actor.send('loadTile', params, done.bind(this));
if (tile.actor) tile.actor.send('loadTile', params, done.bind(this), undefined, true);
}
}, true);
tile.request = {cancel};

} else {
tile.request = tile.actor.send('loadTile', params, done.bind(this));
tile.request = tile.actor.send('loadTile', params, done.bind(this), undefined, true);
}

} else if (tile.state === 'loading') {
Expand Down Expand Up @@ -277,14 +277,14 @@ class VectorTileSource extends Evented implements Source {
delete tile.request;
}
if (tile.actor) {
tile.actor.send('abortTile', {uid: tile.uid, type: this.type, source: this.id}, undefined);
tile.actor.send('abortTile', {uid: tile.uid, type: this.type, source: this.id});
}
}

unloadTile(tile: Tile) {
tile.unloadVectorData();
if (tile.actor) {
tile.actor.send('removeTile', {uid: tile.uid, type: this.type, source: this.id}, undefined);
tile.actor.send('removeTile', {uid: tile.uid, type: this.type, source: this.id});
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/source/worker_tile.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class WorkerTile {
glyphMap = result;
maybePrepare.call(this);
}
}, undefined, undefined, taskMetadata);
}, undefined, true, taskMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to queue this on the main thread. We need to queue the response to this

Copy link
Member Author

@mourner mourner Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the intent was to make mustQueue force queing only on the worker thread, whether it's the task (if you're sending from the main) or the response (if you're sending from the worker).

} else {
glyphMap = {};
}
Expand All @@ -182,7 +182,7 @@ class WorkerTile {
iconMap = result;
maybePrepare.call(this);
}
}, undefined, undefined, taskMetadata);
}, undefined, true, taskMetadata);
} else {
iconMap = {};
}
Expand All @@ -195,7 +195,7 @@ class WorkerTile {
patternMap = result;
maybePrepare.call(this);
}
}, undefined, undefined, taskMetadata);
}, undefined, true, taskMetadata);
} else {
patternMap = {};
}
Expand Down
17 changes: 10 additions & 7 deletions src/util/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {bindAll, isWorker, isSafari} from './util.js';
import window from './window.js';
import {serialize, deserialize} from './web_worker_transfer.js';
import Scheduler from './scheduler.js';
import {PerformanceUtils} from './performance.js';

import type {Transferable} from '../types/transferable.js';
import type {Cancelable} from '../types/cancelable.js';
Expand Down Expand Up @@ -107,20 +108,22 @@ class Actor {
cancel.cancel();
}
} else {
if (isWorker() || data.mustQueue) {
// In workers, store the tasks that we need to process before actually processing them. This
// is necessary because we want to keep receiving messages, and in particular,
// <cancel> messages. Some tasks may take a while in the worker thread, so before
// executing the next task in our queue, postMessage preempts this and <cancel>
// messages can be processed. We're using a MessageChannel object to get throttle the
// process() flow to one at a time.
if (isWorker() && data.mustQueue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass the responses from getImages and getGlyphs to the scheduler. Checking for the presence of callback.metadata in actor.js might be enough to decide whether to do that but I think letting the scheduler decide that might be slightly cleaner

The queuing on the main thread was intentional but it looks like it might not be needed since we dropped IE. I think this is the only case where we did queuing on the main thread. It was also applied to iOS Safari < 12.1 but I don't think it was actually needed there... not sure though. @arindam1993 do you remember if the queuing was only needed for IE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also old Safari verions wherein AbortController doesn't actually abort fetches.

// for worker tasks that are often cancelled, such as loadTile, store them before actually
// processing them. This is necessary because we want to keep receiving <cancel> messages.
// Some tasks may take a while in the worker thread, so before executing the next task
// in our queue, postMessage preempts this and <cancel> messages can be processed.
// We're using a MessageChannel object to get throttle the process() flow to one at a time.
const callback = this.callbacks[id];
const metadata = (callback && callback.metadata) || {type: "message"};
this.cancelCallbacks[id] = this.scheduler.add(() => this.processTask(id, data), metadata);
} else {
// In the main thread, process messages immediately so that other work does not slip in
// between getting partial data back from workers.
// Do the same for worker tasks that need processing as soon as possible.
const m = isWorker() ? PerformanceUtils.beginMeasure('workerTask') : undefined;
this.processTask(id, data);
if (m) PerformanceUtils.endMeasure(m);
}
}
}
Expand Down
Loading