diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ad44a1136fb4d..7861816930e97 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -19,17 +19,18 @@ Please make sure you have signed the [Contributor License Agreement](http://www. - Install ruby *1.9.3* (we recommend using [rbenv](https://github.com/sstephenson/rbenv)) - See [rbenv docs](https://github.com/sstephenson/rbenv#installation) for installation assistance - - ```sh - ## install ruby and ruby-build using your local package manager (apt, brew, etc) - ## ex. brew install rbenv ruby-build + + ```sh + ## install ruby and ruby-build using your local package manager (apt, brew, etc) + brew install rbenv ruby-build + ``` + - Run `rbenv init` and add `eval "$(rbenv init -)"` to your shell (ex. .bashrc/.bash_profile) - Run `rbenv install` to install the required version - Run `ruby -v` and make sure you are using 1.9.3 - Check the installation docs if you have issues getting the correct version - Install bundler by running `gem install bundler` - - Install local gems by running `cd src/server; bundle; cd ../..` - ``` + - Install local gems by running `bundle` - Install grunt and bower globally @@ -40,7 +41,7 @@ Please make sure you have signed the [Contributor License Agreement](http://www. - Install node, bower, and ruby dependencies ```sh - npm install && bower install && cd src/server && bundle && cd ../.. + npm install && bower install && bundle ``` - Start the development server. diff --git a/Gruntfile.js b/Gruntfile.js index bf8edf3062701..e3c914fb39290 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -43,7 +43,7 @@ module.exports = function (grunt) { return 'plugins/' + dirname(fileName) + '/index'; }); - config.bundledPluginModuleIds = grunt.bundledPluginModuleIds = moduleIds; + config.bundled_plugin_module_ids = grunt.bundled_plugin_module_ids = moduleIds; // load plugins require('load-grunt-config')(grunt, { diff --git a/README.md b/README.md index 43e7d99ca7e74..f2ed40c6a035b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# Kibana 4.0.0-BETA2 +# Kibana 4.0.0-beta3 [![Build Status](https://travis-ci.org/elasticsearch/kibana.svg?branch=master)](https://travis-ci.org/elasticsearch/kibana?branch=master) @@ -22,6 +22,8 @@ Kibana is an open source (Apache Licensed), browser based analytics and search d - Elasticsearch version 1.4.0 or later - ...and nothing else +*Note:* Groovy scripting must be enabled in Elasticsearch + ## Installation * Download: [http://www.elasticsearch.org/overview/kibana/installation/](http://www.elasticsearch.org/overview/kibana/installation/) @@ -29,7 +31,6 @@ Kibana is an open source (Apache Licensed), browser based analytics and search d * Visit [http://localhost:5601](http://localhost:5601) - ## Quick Start You're up and running! Fantastic! Kibana is now running on port 5601, so point your browser at http://YOURDOMAIN.com:5601. @@ -325,7 +326,4 @@ Clicking on the *View* action loads that item in the associated applications. Re Clicking *Edit* will allow you to change the title, description and other settings of the saved object. You can also edit the schema of the stored object. *Note:* this operation is for advanced users only - making changes here can break large portions of the application. - - -## Building from Source -If you want the latest code or need something that's not in a release package, you'll need to build from source. See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions. + \ No newline at end of file diff --git a/bower.json b/bower.json index 03c2c8db32773..621e43b79c019 100644 --- a/bower.json +++ b/bower.json @@ -30,7 +30,7 @@ "angular-ui-select": "~0.9.3", "async": "~0.2.10", "bluebird": "~2.1.3", - "bootstrap": "~3.1.1", + "bootstrap": "~3.3.1", "d3": "~3.4.8", "elasticsearch": "elasticsearch/bower-elasticsearch-js#prerelease", "Faker": "~1.1.0", @@ -48,7 +48,7 @@ "require-css": "~0.1.2", "requirejs": "~2.1.10", "requirejs-text": "~2.0.10", - "lodash-deep": "spenceralger/lodash-deep#k4" + "lodash-deep": "spenceralger/lodash-deep#a2768a72d7" }, "devDependencies": {} } diff --git a/package.json b/package.json index c00c934bef45e..c4fdd63b14867 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "kibana", "private": true, - "version": "4.0.0-BETA2", + "version": "4.0.0-beta3", "description": "Kibana 4", "main": "Gulpfile.js", "dependencies": {}, diff --git a/src/kibana/components/agg_response/hierarchical/build_hierarchical_data.js b/src/kibana/components/agg_response/hierarchical/build_hierarchical_data.js index d44b2f6d2e90e..03a7a0bfdedb2 100644 --- a/src/kibana/components/agg_response/hierarchical/build_hierarchical_data.js +++ b/src/kibana/components/agg_response/hierarchical/build_hierarchical_data.js @@ -69,7 +69,11 @@ define(function (require) { var agg = firstAgg._next; var split = buildSplit(agg, metric, bucket[agg.id]); // Since splits display labels we need to set it. - split.label = firstAgg.fieldFormatter()(bucket.key) + ': ' + firstAgg.field().displayName; + split.label = firstAgg.fieldFormatter()(bucket.key); + + var displayName = firstAgg.fieldDisplayName(); + if (!_.isEmpty(displayName)) split.label += ': ' + displayName; + split.tooltipFormatter = tooltipFormatter(raw.columns); var aggConfigResult = new AggConfigResult(firstAgg, null, null, bucket.key); split.split = { aggConfig: firstAgg, aggConfigResult: aggConfigResult, key: bucket.key }; diff --git a/src/kibana/components/agg_response/tabify/_response_writer.js b/src/kibana/components/agg_response/tabify/_response_writer.js index 3ff16a0505d3d..bc0ee848d581b 100644 --- a/src/kibana/components/agg_response/tabify/_response_writer.js +++ b/src/kibana/components/agg_response/tabify/_response_writer.js @@ -45,6 +45,7 @@ define(function (require) { this.aggStack = _.pluck(this.columns, 'aggConfig'); this.root = new TableGroup(); + this.acrStack = []; this.splitStack = [this.root]; } @@ -66,9 +67,6 @@ define(function (require) { table.aggConfig = agg; table.key = key; table.title = agg.makeLabel() + ': ' + (table.fieldFormatter()(key)); - if (this.asAggConfigResults) { - table.aggConfigResult = new AggConfigResult(agg, parent.aggConfigResult, key, key); - } } // link the parent and child @@ -99,16 +97,23 @@ define(function (require) { buckets.forEach(function (bucket, key) { // find the existing split that we should extend - var TableGroup = _.find(self.splitStack[0].tables, { aggConfig: agg, key: key }); + var tableGroup = _.find(self.splitStack[0].tables, { aggConfig: agg, key: key }); // create the split if it doesn't exist yet - if (!TableGroup) TableGroup = self._table(true, agg, key); + if (!tableGroup) tableGroup = self._table(true, agg, key); + + var splitAcr = false; + if (self.asAggConfigResults) splitAcr = new AggConfigResult(agg, self.acrStack[0], key, key); // push the split onto the stack so that it will receive written tables - self.splitStack.unshift(TableGroup); + self.splitStack.unshift(tableGroup); + splitAcr && self.acrStack.unshift(splitAcr); + // call the block if (_.isFunction(block)) block.call(self, bucket, key); + // remove the split from the stack self.splitStack.shift(); + splitAcr && self.acrStack.shift(); }); }; @@ -144,17 +149,18 @@ define(function (require) { */ TabbedAggResponseWriter.prototype.cell = function (agg, value, block) { if (this.asAggConfigResults) { - var parent = _.findLast(this.rowBuffer, function (result) { - return result.aggConfig.schema.group === 'buckets'; - }); - if (!parent) parent = this.splitStack[0].aggConfigResult; - - value = new AggConfigResult(agg, parent, value, value); + value = new AggConfigResult(agg, this.acrStack[0], value, value); } + var staskResult = this.asAggConfigResults && value.type === 'bucket'; + this.rowBuffer.push(value); + if (staskResult) this.acrStack.unshift(value); + if (_.isFunction(block)) block.call(this); + this.rowBuffer.pop(value); + if (staskResult) this.acrStack.shift(); return value; }; diff --git a/src/kibana/components/agg_table/agg_table.html b/src/kibana/components/agg_table/agg_table.html index bb2c632248d92..a807370585eca 100644 --- a/src/kibana/components/agg_table/agg_table.html +++ b/src/kibana/components/agg_table/agg_table.html @@ -5,8 +5,12 @@ per-page="perPage">
- - Export + Export:   + + Raw +     + + Formatted
diff --git a/src/kibana/components/agg_table/agg_table.js b/src/kibana/components/agg_table/agg_table.js index 3698a925c70db..24d8af3cc7490 100644 --- a/src/kibana/components/agg_table/agg_table.js +++ b/src/kibana/components/agg_table/agg_table.js @@ -33,15 +33,14 @@ define(function (require) { quoteValues: config.get('csv:quoteValues') }; - self.exportAsCsv = function () { - var csv = new Blob([self.toCsv()], { type: 'text/plain' }); + self.exportAsCsv = function (formatted) { + var csv = new Blob([self.toCsv(formatted)], { type: 'text/plain' }); self._saveAs(csv, self.csv.filename); }; - - self.toCsv = function () { - var rows = $scope.table.rows; - var columns = $scope.table.columns; + self.toCsv = function (formatted) { + var rows = formatted ? $scope.formattedRows : $scope.table.rows; + var columns = formatted ? $scope.formattedColumns : $scope.table.columns; var nonAlphaNumRE = /[^a-zA-Z0-9]/; var allDoubleQuoteRE = /"/g; diff --git a/src/kibana/components/agg_types/buckets/create_filter/filters.js b/src/kibana/components/agg_types/buckets/create_filter/filters.js index 78ff2c529d132..43f3bb8dc10a8 100644 --- a/src/kibana/components/agg_types/buckets/create_filter/filters.js +++ b/src/kibana/components/agg_types/buckets/create_filter/filters.js @@ -2,10 +2,18 @@ define(function (require) { var _ = require('lodash'); return function CreateFilterFiltersProvider(Private) { return function (aggConfig, key) { - return _.find(aggConfig.params.filters, function (filter) { - filter.meta = { index: aggConfig.vis.indexPattern.id }; - return filter.query.query_string.query === key; - }); + // have the aggConfig write agg dsl params + var dslFilters = _.deepGet(aggConfig.toDsl(), 'filters.filters'); + var filter = dslFilters[key]; + + if (filter) { + return { + query: filter.query, + meta: { + index: aggConfig.vis.indexPattern.id + } + }; + } }; }; }); diff --git a/src/kibana/components/agg_types/buckets/filters.js b/src/kibana/components/agg_types/buckets/filters.js index 992fdde3d2e2d..6cbdba5e393e6 100644 --- a/src/kibana/components/agg_types/buckets/filters.js +++ b/src/kibana/components/agg_types/buckets/filters.js @@ -1,16 +1,9 @@ define(function (require) { - return function FiltersAggDefinition(Private) { + return function FiltersAggDefinition(Private, Notifier) { var _ = require('lodash'); var AggType = Private(require('components/agg_types/_agg_type')); var createFilter = Private(require('components/agg_types/buckets/create_filter/filters')); - - function getTickLabel(query) { - - if (query.query_string && query.query_string.query) { - return query.query_string.query; - } - return JSON.stringify(query); - } + var notif = new Notifier({ location: 'Filters Agg' }); return new AggType({ name: 'filters', @@ -22,12 +15,24 @@ define(function (require) { editor: require('text!components/agg_types/controls/filters.html'), default: [ {input: {}} ], write: function (aggConfig, output) { - output.params = { - filters: _.transform(aggConfig.params.filters, function (filters, filter, iterator) { - // We need to check here - filters[getTickLabel(filter.input.query)] = filter.input || {query: {query_string: {query: '*'}}}; - }, {}) - }; + var inFilters = aggConfig.params.filters; + if (!_.size(inFilters)) return; + + var outFilters = _.transform(inFilters, function (filters, filter) { + var input = filter.input; + if (!input) return notif.log('malformed filter agg params, missing "input" query'); + + var query = input.query; + if (!query) return notif.log('malformed filter agg params, missing "query" on input'); + + var label = _.deepGet(query, 'query_string.query') || JSON.stringify(query); + filters[label] = input; + }, {}); + + if (!_.size(outFilters)) return; + + var params = output.params || (output.params = {}); + params.filters = outFilters; } } ] diff --git a/src/kibana/components/agg_types/buckets/geo_hash.js b/src/kibana/components/agg_types/buckets/geo_hash.js index d26919d06b3c4..ba7de3557da70 100644 --- a/src/kibana/components/agg_types/buckets/geo_hash.js +++ b/src/kibana/components/agg_types/buckets/geo_hash.js @@ -1,8 +1,25 @@ define(function (require) { - return function GeoHashAggDefinition(Private) { + return function GeoHashAggDefinition(Private, config) { var _ = require('lodash'); var moment = require('moment'); var AggType = Private(require('components/agg_types/_agg_type')); + var defaultPrecision = 3; + + function getPrecision(precision) { + var maxPrecision = _.parseInt(config.get('visualization:tileMap:maxPrecision')); + + precision = parseInt(precision, 10); + + if (isNaN(precision)) { + precision = defaultPrecision; + } + + if (precision > maxPrecision) { + return maxPrecision; + } + + return precision; + } return new AggType({ name: 'geohash_grid', @@ -15,14 +32,11 @@ define(function (require) { }, { name: 'precision', - default: 3, + default: defaultPrecision, editor: require('text!components/agg_types/controls/precision.html'), + deserialize: getPrecision, write: function (aggConfig, output) { - var precision = parseInt(aggConfig.params.precision, 10); - if (isNaN(precision)) { - precision = 3; - } - output.params.precision = precision; + output.params.precision = getPrecision(aggConfig.params.precision); } } ] diff --git a/src/kibana/components/agg_types/controls/precision.html b/src/kibana/components/agg_types/controls/precision.html index 0cc2cf4e90e08..dd5e108a00a35 100644 --- a/src/kibana/components/agg_types/controls/precision.html +++ b/src/kibana/components/agg_types/controls/precision.html @@ -9,7 +9,7 @@ class="form-control" type="range" min="1" - max="9" + max="{{config.get('visualization:tileMap:maxPrecision')}}" >
{{params.precision}} diff --git a/src/kibana/components/config/config.js b/src/kibana/components/config/config.js index af3d36891f148..41d06e8cfa8d7 100644 --- a/src/kibana/components/config/config.js +++ b/src/kibana/components/config/config.js @@ -4,17 +4,17 @@ define(function (require) { ]); var configFile = JSON.parse(require('text!config')); - configFile.elasticsearch = ( - window.location.protocol + '//' + - window.location.hostname + - (window.location.port ? ':' + window.location.port : '') + - '/elasticsearch'); + configFile.elasticsearch = (function () { + var a = document.createElement('a'); + a.href = 'elasticsearch'; + return a.href; + }()); // allow the rest of the app to get the configFile easily module.constant('configFile', configFile); // service for delivering config variables to everywhere else - module.service('config', function (Private, Notifier, kbnVersion, kbnSetup, $rootScope) { + module.service('config', function (Private, Notifier, kbnVersion, kbnSetup, $rootScope, buildNum) { var config = this; var angular = require('angular'); @@ -63,8 +63,11 @@ define(function (require) { }; return doc.fetch().then(function initDoc(resp) { - if (!resp.found) return doc.doIndex({}).then(getDoc); - else { + if (!resp.found) { + return doc.doIndex({ + buildNum: buildNum + }).then(getDoc); + } else { // apply update, and keep it quiet the first time applyMassUpdate(resp, true); diff --git a/src/kibana/components/config/defaults.js b/src/kibana/components/config/defaults.js index ba3ca50d25041..c21fa068e482c 100644 --- a/src/kibana/components/config/defaults.js +++ b/src/kibana/components/config/defaults.js @@ -30,6 +30,10 @@ define(function (require) { value: 100, description: 'Never show more than this many bar in date histograms, scale values if needed', }, + 'visualization:tileMap:maxPrecision': { + value: 6, + description: 'The maximum geoHash size allowed in a tile map', + }, 'csv:separator': { value: ',', description: 'Separate exported values with this string', diff --git a/src/kibana/components/courier/_pending_requests.js b/src/kibana/components/courier/_pending_requests.js deleted file mode 100644 index 8c7e2c85e3b31..0000000000000 --- a/src/kibana/components/courier/_pending_requests.js +++ /dev/null @@ -1,10 +0,0 @@ -define(function (require) { - return function PendingRequestList() { - /** - * Queue of pending requests, requests are removed as - * they are processed by fetch.[sourceType](). - * @type {Array} - */ - return []; - }; -}); \ No newline at end of file diff --git a/src/kibana/components/courier/_request_queue.js b/src/kibana/components/courier/_request_queue.js new file mode 100644 index 0000000000000..321a49945470e --- /dev/null +++ b/src/kibana/components/courier/_request_queue.js @@ -0,0 +1,35 @@ +define(function (require) { + return function PendingRequestList() { + var _ = require('lodash'); + + /** + * Queue of pending requests, requests are removed as + * they are processed by fetch.[sourceType](). + * @type {Array} + */ + var queue = []; + + queue.getInactive = function (/* strategies */) { + return queue.get.apply(queue, arguments) + .filter(function (req) { + return !req.started; + }); + }; + + queue.get = function (/* strategies.. */) { + var strategies = _.toArray(arguments); + return queue.filter(function (req) { + var strategyMatch = !strategies.length; + if (!strategyMatch) { + strategyMatch = strategies.some(function (strategy) { + return req.strategy === strategy; + }); + } + + return strategyMatch && req.canStart(); + }); + }; + + return queue; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/courier.js b/src/kibana/components/courier/courier.js index f8c32f26e2ad1..d4e4f4ffab437 100644 --- a/src/kibana/components/courier/courier.js +++ b/src/kibana/components/courier/courier.js @@ -1,20 +1,23 @@ define(function (require) { var errors = require('errors'); - + var _ = require('lodash'); + require('services/es'); require('services/promises'); require('components/index_patterns/index_patterns'); require('modules').get('kibana/courier') - .service('courier', function ($rootScope, Private, Promise, indexPatterns) { + .service('courier', function ($rootScope, Private, Promise, indexPatterns, Notifier) { function Courier() { var self = this; var DocSource = Private(require('components/courier/data_source/doc_source')); var SearchSource = Private(require('components/courier/data_source/search_source')); - var pendingRequests = Private(require('components/courier/_pending_requests')); + var requestQueue = Private(require('components/courier/_request_queue')); + var errorHandlers = Private(require('components/courier/_error_handlers')); + var fetch = Private(require('components/courier/fetch/fetch')); var docLooper = self.docLooper = Private(require('components/courier/looper/doc')); var searchLooper = self.searchLooper = Private(require('components/courier/looper/search')); @@ -29,7 +32,6 @@ define(function (require) { self.SearchSource = SearchSource; var HastyRefresh = errors.HastyRefresh; - var Abort = errors.Abort; /** * update the time between automatic search requests @@ -47,6 +49,7 @@ define(function (require) { */ self.start = function () { searchLooper.start(); + docLooper.start(); return this; }; @@ -56,7 +59,9 @@ define(function (require) { * individual errors are routed to their respective requests. */ self.fetch = function () { - return searchLooper.run(); + fetch.searches().then(function () { + searchLooper.restart(); + }); }; @@ -105,15 +110,26 @@ define(function (require) { searchLooper.stop(); docLooper.stop(); - [].concat(pendingRequests.splice(0), this._errorHandlers.splice(0)) - .forEach(function (req) { - req.defer.reject(new Abort()); - }); + _.invoke(requestQueue, 'abort'); - if (pendingRequests.length) { + if (requestQueue.length) { throw new Error('Aborting all pending requests failed.'); } }; + + // Listen for refreshInterval changes + $rootScope.$watch('timefilter.refreshInterval', function () { + var refreshValue = _.deepGet($rootScope, 'timefilter.refreshInterval.value'); + if (_.isNumber(refreshValue)) { + self.fetchInterval(refreshValue); + } else { + self.fetchInterval(0); + } + }); + + var onFatalDefer = Promise.defer(); + onFatalDefer.promise.then(self.close); + Notifier.fatalCallbacks.push(onFatalDefer.resolve); } return new Courier(); diff --git a/src/kibana/components/courier/data_source/_abstract.js b/src/kibana/components/courier/data_source/_abstract.js index a9f6502a6736c..db8d5be4d22ac 100644 --- a/src/kibana/components/courier/data_source/_abstract.js +++ b/src/kibana/components/courier/data_source/_abstract.js @@ -1,10 +1,8 @@ define(function (require) { - var inherits = require('lodash').inherits; var _ = require('lodash'); - var nextTick = require('utils/next_tick'); - return function SourceAbstractFactory(Private, Promise, PromiseEmitter, timefilter) { - var pendingRequests = Private(require('components/courier/_pending_requests')); + return function SourceAbstractFactory(Private, Promise, PromiseEmitter) { + var requestQueue = Private(require('components/courier/_request_queue')); var errorHandlers = Private(require('components/courier/_error_handlers')); var courierFetch = Private(require('components/courier/fetch/fetch')); @@ -123,8 +121,7 @@ define(function (require) { var self = this; return new PromiseEmitter(function (resolve, reject, defer) { - var req = self._createRequest(defer); - pendingRequests.push(req); + self._createRequest(defer); }, handler); }; @@ -154,34 +151,40 @@ define(function (require) { /** * Fetch just this source ASAP - * @param {Function} cb - callback + * + * ONLY USE IF YOU WILL BE USING THE RESULTS + * provided by the returned promise, otherwise + * call #fetchQueued() + * + * @async */ SourceAbstract.prototype.fetch = function () { var self = this; + var req = _.first(self._myQueued()); - var req = self._createRequest(); - pendingRequests.push(req); - - // fetch just the requests for this source - courierFetch.these(self._getType(), pendingRequests.splice(0).filter(function (req) { - if (req.source !== self) { - pendingRequests.push(req); - return false; - } + if (!req) { + req = self._createRequest(); + } - return true; - })); + courierFetch.these([req]); return req.defer.promise; }; + /** + * Fetch all pending requests for this source ASAP + * @async + */ + SourceAbstract.prototype.fetchQueued = function () { + return courierFetch.these(this._myQueued()); + }; + /** * Cancel all pending requests for this dataSource * @return {undefined} */ - SourceAbstract.prototype.cancelPending = function () { - var pending = _.where(pendingRequests, { source: this}); - _.pull.apply(_, [pendingRequests].concat(pending)); + SourceAbstract.prototype.cancelQueued = function () { + _.invoke(this._myQueued(), 'abort'); }; /** @@ -189,29 +192,20 @@ define(function (require) { * @return {undefined} */ SourceAbstract.prototype.destroy = function () { - this.cancelPending(); + this.cancelQueued(); }; /***** * PRIVATE API *****/ - SourceAbstract.prototype._createRequest = function (defer) { - var self = this; - - var req = { - source: self, - defer: defer || Promise.defer() - }; - - if (self.history) { - // latest history at the top - self.history.unshift(req); - // trim all entries beyond 19/20 - self.history.splice(20); - } + SourceAbstract.prototype._myQueued = function () { + var reqs = requestQueue.get(this._fetchStrategy); + return _.where(reqs, { source: this }); + }; - return req; + SourceAbstract.prototype._createRequest = function () { + throw new Error('_createRequest must be implemented by subclass'); }; /** @@ -233,7 +227,7 @@ define(function (require) { var current = this; // call the ittr and return it's promise - return (function ittr(resolve, reject) { + return (function ittr() { // itterate the _state object (not array) and // pass each key:value pair to source._mergeProp. if _mergeProp // returns a promise, then wait for it to complete and call _mergeProp again @@ -259,6 +253,8 @@ define(function (require) { }()) .then(function () { if (type === 'search') { + flatState.body = flatState.body || {}; + // defaults for the query if (!flatState.body.query) { flatState.body.query = { @@ -269,7 +265,10 @@ define(function (require) { var computedFields = flatState.index.getComputedFields(); flatState.body.fields = computedFields.fields; flatState.body.script_fields = flatState.body.script_fields || {}; + flatState.body.fielddata_fields = flatState.body.fielddata_fields || []; + _.extend(flatState.body.script_fields, computedFields.scriptFields); + flatState.body.fielddata_fields = _.union(flatState.body.fielddata_fields, computedFields.fielddataFields); /** diff --git a/src/kibana/components/courier/data_source/_doc_send_to_es.js b/src/kibana/components/courier/data_source/_doc_send_to_es.js index 3c760ec269c5d..353eca50f2102 100644 --- a/src/kibana/components/courier/data_source/_doc_send_to_es.js +++ b/src/kibana/components/courier/data_source/_doc_send_to_es.js @@ -3,7 +3,8 @@ define(function (require) { var errors = require('errors'); return function (Promise, Private, es) { - var pendingRequests = Private(require('components/courier/_pending_requests')); + var requestQueue = Private(require('components/courier/_request_queue')); + var courierFetch = Private(require('components/courier/fetch/fetch')); /** * Backend for doUpdate and doIndex @@ -12,12 +13,12 @@ define(function (require) { * of the the docs current version be sent to es? * @param {String} body - HTTP request body */ - return function (method, validateVersion, body) { + return function (method, validateVersion, body, ignore) { var doc = this; // straight assignment will causes undefined values var params = _.pick(this._state, ['id', 'type', 'index']); params.body = body; - params.ignore = [409]; + params.ignore = ignore || [409]; if (validateVersion && params.id) { params.version = doc._getVersion(); @@ -44,19 +45,22 @@ define(function (require) { // clear the queue and filter out the removed items, pushing the // unmatched ones back in. - pendingRequests.splice(0).filter(function (req) { + var respondTo = requestQueue.splice(0).filter(function (req) { var isDoc = req.source._getType() === 'doc'; var keyMatches = isDoc && req.source._versionKey() === key; - if (keyMatches) { - // resolve the request with a copy of the response - req.defer.resolve(_.cloneDeep(fetchResp)); - return; + // put some request back into the queue + if (!keyMatches) { + requestQueue.push(req); + return false; } - // otherwise, put the request back into the queue - pendingRequests.push(req); + return true; }); + + return courierFetch.fakeFetchThese(respondTo, respondTo.map(function () { + return _.cloneDeep(fetchResp); + })); }); return resp._id; @@ -67,4 +71,4 @@ define(function (require) { }); }; }; -}); \ No newline at end of file +}); diff --git a/src/kibana/components/courier/data_source/_root_search_source.js b/src/kibana/components/courier/data_source/_root_search_source.js index 98bfc49d25012..553359dd7fa7a 100644 --- a/src/kibana/components/courier/data_source/_root_search_source.js +++ b/src/kibana/components/courier/data_source/_root_search_source.js @@ -1,6 +1,5 @@ define(function (require) { return function RootSearchSource(Private, $rootScope, config, Promise, indexPatterns, timefilter, Notifier) { - var _ = require('lodash'); var SearchSource = Private(require('components/courier/data_source/search_source')); var notify = new Notifier({ location: 'Root Search Source' }); diff --git a/src/kibana/components/courier/data_source/doc_source.js b/src/kibana/components/courier/data_source/doc_source.js index e1cad9140398c..cc1c03ad2dbab 100644 --- a/src/kibana/components/courier/data_source/doc_source.js +++ b/src/kibana/components/courier/data_source/doc_source.js @@ -1,15 +1,10 @@ define(function (require) { var _ = require('lodash'); - var errors = require('errors'); - - var inherits = require('lodash').inherits; return function DocSourceFactory(Private, Promise, es, sessionStorage) { var sendToEs = Private(require('components/courier/data_source/_doc_send_to_es')); var SourceAbstract = Private(require('components/courier/data_source/_abstract')); - - var VersionConflict = errors.VersionConflict; - var RequestFailure = errors.RequestFailure; + var DocRequest = Private(require('components/courier/fetch/request/doc')); _(DocSource).inherits(SourceAbstract); function DocSource(initialState) { @@ -23,6 +18,10 @@ define(function (require) { * PUBLIC API *****/ + DocSource.prototype._createRequest = function (defer) { + return new DocRequest(this, defer); + }; + /** * List of methods that is turned into a chainable API in the constructor * @type {Array} @@ -54,6 +53,10 @@ define(function (require) { return sendToEs.call(this, 'index', false, body); }; + DocSource.prototype.doCreate = function (body) { + return sendToEs.call(this, 'create', false, body, []); + }; + /***** * PRIVATE API *****/ @@ -151,4 +154,4 @@ define(function (require) { return DocSource; }; -}); \ No newline at end of file +}); diff --git a/src/kibana/components/courier/data_source/search_source.js b/src/kibana/components/courier/data_source/search_source.js index 2c4257a2d7deb..d89363ce84706 100644 --- a/src/kibana/components/courier/data_source/search_source.js +++ b/src/kibana/components/courier/data_source/search_source.js @@ -2,11 +2,9 @@ define(function (require) { return function SearchSourceFactory(Promise, Private) { var _ = require('lodash'); - var errors = require('errors'); var SourceAbstract = Private(require('components/courier/data_source/_abstract')); - - var FetchFailure = errors.FetchFailure; - var RequestFailure = errors.RequestFailure; + var SearchRequest = Private(require('components/courier/fetch/request/search')); + var SegmentedRequest = Private(require('components/courier/fetch/request/segmented')); _(SearchSource).inherits(SourceAbstract); function SearchSource(initialState) { @@ -125,6 +123,18 @@ define(function (require) { return normal; }; + SearchSource.prototype.onBeginSegmentedFetch = function (initFunction) { + var self = this; + return Promise.try(function addRequest() { + var req = new SegmentedRequest(self, Promise.defer(), initFunction); + + // return promises created by the completion handler so that + // errors will bubble properly + return req.defer.promise.then(addRequest); + }); + }; + + /****** * PRIVATE APIS ******/ @@ -137,6 +147,19 @@ define(function (require) { return 'search'; }; + /** + * Create a common search request object, which should + * be put into the pending request queye, for this search + * source + * + * @param {Deferred} defer - the deferred object that should be resolved + * when the request is complete + * @return {SearchRequest} + */ + SearchSource.prototype._createRequest = function (defer) { + return new SearchRequest(this, defer); + }; + /** * Used to merge properties into the state within ._flatten(). * The state is passed in and modified by the function diff --git a/src/kibana/components/courier/fetch/_call_client.js b/src/kibana/components/courier/fetch/_call_client.js new file mode 100644 index 0000000000000..7df3d7a574d4d --- /dev/null +++ b/src/kibana/components/courier/fetch/_call_client.js @@ -0,0 +1,107 @@ +define(function (require) { + return function CourierFetchCallClient(Private, Promise, es, configFile, sessionId) { + var _ = require('lodash'); + + var isRequest = Private(require('components/courier/fetch/_is_request')); + var mergeDuplicateRequests = Private(require('components/courier/fetch/_merge_duplicate_requests')); + + var ABORTED = Private(require('components/courier/fetch/_req_status')).ABORTED; + var DUPLICATE = Private(require('components/courier/fetch/_req_status')).DUPLICATE; + + function callClient(strategy, requests) { + // merging docs can change status to DUPLICATE, capture new statuses + var statuses = mergeDuplicateRequests(requests); + + // get the actual list of requests that we will be fetching + var executable = statuses.filter(isRequest); + var execCount = executable.length; + + // resolved by respond() + var esPromise; + var defer = Promise.defer(); + + // attach abort handlers, close over request index + statuses.forEach(function (req, i) { + if (!isRequest(req)) return; + req.whenAborted(function () { + requestWasAborted(req, i).catch(defer.reject); + }); + }); + + // handle a request being aborted while being fetched + var requestWasAborted = Promise.method(function (req, i) { + if (statuses[i] === ABORTED) { + defer.reject(new Error('Request was aborted twice?')); + } + + execCount -= 1; + if (execCount > 0) { + // the multi-request still contains other requests + return; + } + + if (esPromise && _.isFunction(esPromise.abort)) { + esPromise.abort(); + } + + esPromise = ABORTED; + + return respond(); + }); + + // for each respond with either the response or ABORTED + var respond = function (responses) { + responses = responses || []; + return Promise.map(requests, function (req, i) { + switch (statuses[i]) { + case ABORTED: + return ABORTED; + case DUPLICATE: + return req._uniq.resp; + default: + return responses[_.findIndex(executable, req)]; + } + }) + .then(defer.resolve, defer.reject); + }; + + // Now that all of THAT^^^ is out of the way, lets actually + // call out to elasticsearch + Promise.resolve(strategy.convertReqsToBody(executable)) + .then(function (body) { + // while the strategy was converting, our request was aborted + if (esPromise === ABORTED) { + throw ABORTED; + } + + return (esPromise = es[strategy.clientMethod]({ + timeout: configFile.shard_timeout, + ignore_unavailable: true, + preference: sessionId, + body: body + })); + }) + .then(function (clientResp) { + return strategy.getResponses(clientResp); + }) + .then(respond) + .catch(function (err) { + if (err === ABORTED) respond(); + else defer.reject(err); + }); + + // return our promise, but catch any errors we create and + // send them to the requests + return defer.promise + .catch(function (err) { + requests.forEach(function (req, i) { + if (statuses[i] !== ABORTED) { + req.handleFailure(err); + } + }); + }); + } + + return callClient; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_call_response_handlers.js b/src/kibana/components/courier/fetch/_call_response_handlers.js new file mode 100644 index 0000000000000..e76f7b1fda63f --- /dev/null +++ b/src/kibana/components/courier/fetch/_call_response_handlers.js @@ -0,0 +1,52 @@ +define(function (require) { + return function CourierFetchCallResponseHandlers(Private, Promise) { + var ABORTED = Private(require('components/courier/fetch/_req_status')).ABORTED; + var INCOMPLETE = Private(require('components/courier/fetch/_req_status')).INCOMPLETE; + var notify = Private(require('components/courier/fetch/_notifier')); + + var SearchTimeout = require('errors').SearchTimeout; + var RequestFailure = require('errors').RequestFailure; + + function callResponseHandlers(requests, responses) { + return Promise.map(requests, function (req, i) { + var resp = responses[i]; + + if (req === ABORTED || req.aborted) { + return ABORTED; + } + + if (resp.timed_out) { + notify.warning(new SearchTimeout()); + } + + function progress() { + if (req.isIncomplete()) { + return INCOMPLETE; + } + + req.complete(); + return resp; + } + + if (resp.error) { + if (req.filterError(resp)) { + return progress(); + } else { + return req.handleFailure(new RequestFailure(null, resp)); + } + } + + return Promise.try(function () { + return req.transformResponse(resp); + }) + .then(function () { + resp = arguments[0]; + return req.handleResponse(resp); + }) + .then(progress); + }); + } + + return callResponseHandlers; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_continue_incomplete.js b/src/kibana/components/courier/fetch/_continue_incomplete.js new file mode 100644 index 0000000000000..49070040d36f3 --- /dev/null +++ b/src/kibana/components/courier/fetch/_continue_incomplete.js @@ -0,0 +1,27 @@ +define(function (require) { + return function CourierFetchContinueIncompleteRequests(Private) { + var INCOMPLETE = Private(require('components/courier/fetch/_req_status')).INCOMPLETE; + + function continueIncompleteRequests(strategy, requests, responses, fetchWithStrategy) { + var incomplete = []; + + responses.forEach(function (resp, i) { + if (resp === INCOMPLETE) { + incomplete.push(requests[i]); + } + }); + + if (!incomplete.length) return responses; + + return fetchWithStrategy(strategy, incomplete) + .then(function (completedResponses) { + return responses.map(function (prevResponse) { + if (prevResponse !== INCOMPLETE) return prevResponse; + return completedResponses.shift(); + }); + }); + } + + return continueIncompleteRequests; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_fetch_these.js b/src/kibana/components/courier/fetch/_fetch_these.js new file mode 100644 index 0000000000000..125d2bef2465d --- /dev/null +++ b/src/kibana/components/courier/fetch/_fetch_these.js @@ -0,0 +1,68 @@ +define(function (require) { + return function FetchTheseProvider(Private, Promise) { + var notify = Private(require('components/courier/fetch/_notifier')); + var forEachStrategy = Private(require('components/courier/fetch/_for_each_strategy')); + + // core tasks + var callClient = Private(require('components/courier/fetch/_call_client')); + var callResponseHandlers = Private(require('components/courier/fetch/_call_response_handlers')); + var continueIncomplete = Private(require('components/courier/fetch/_continue_incomplete')); + + var ABORTED = Private(require('components/courier/fetch/_req_status')).ABORTED; + var DUPLICATE = Private(require('components/courier/fetch/_req_status')).DUPLICATE; + var INCOMPLETE = Private(require('components/courier/fetch/_req_status')).INCOMPLETE; + + function fetchThese(requests) { + return forEachStrategy(requests, function (strategy, requests) { + return fetchWithStrategy(strategy, requests.map(function (req) { + if (!req.started) return req; + return req.retry(); + })); + }) + .catch(notify.fatal); + } + + function fetchWithStrategy(strategy, requests) { + + requests = requests.map(function (req) { + if (req.aborted) { + return ABORTED; + } + + if (req.started) { + req.continue(); + } else { + req.start(); + } + + return req; + }); + + return Promise.resolve() + .then(function () { + return callClient(strategy, requests); + }) + .then(function (responses) { + return callResponseHandlers(requests, responses); + }) + .then(function (responses) { + return continueIncomplete(strategy, requests, responses, fetchWithStrategy); + }) + .then(function (responses) { + return responses.map(function (resp) { + switch (resp) { + case ABORTED: + return null; + case DUPLICATE: + case INCOMPLETE: + throw new Error('Failed to clear incomplete or duplicate request from responses.'); + default: + return resp; + } + }); + }); + } + + return fetchThese; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_for_each_strategy.js b/src/kibana/components/courier/fetch/_for_each_strategy.js new file mode 100644 index 0000000000000..15b037e580d3a --- /dev/null +++ b/src/kibana/components/courier/fetch/_for_each_strategy.js @@ -0,0 +1,23 @@ +define(function (require) { + return function FetchForEachRequestStrategy(Private, Promise) { + var _ = require('lodash'); + + function forEachStrategy(requests, block) { + block = Promise.method(block); + var sets = []; + + requests.forEach(function (req) { + var strategy = req.strategy; + var set = _.find(sets, { 0: strategy }); + if (set) set[1].push(req); + else sets.push([strategy, [req]]); + }); + + return Promise.all(sets.map(function (set) { + return block(set[0], set[1]); + })); + } + + return forEachStrategy; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_is_request.js b/src/kibana/components/courier/fetch/_is_request.js new file mode 100644 index 0000000000000..51d022e856866 --- /dev/null +++ b/src/kibana/components/courier/fetch/_is_request.js @@ -0,0 +1,9 @@ +define(function (require) { + return function CourierFetchIsRequestProvider(Private) { + var AbstractRequest = Private(require('components/courier/fetch/request/request')); + + return function isRequest(obj) { + return obj instanceof AbstractRequest; + }; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_merge_duplicate_requests.js b/src/kibana/components/courier/fetch/_merge_duplicate_requests.js new file mode 100644 index 0000000000000..03127fd582fdb --- /dev/null +++ b/src/kibana/components/courier/fetch/_merge_duplicate_requests.js @@ -0,0 +1,28 @@ +define(function (require) { + return function FetchMergeDuplicateRequests(Private) { + var isRequest = Private(require('components/courier/fetch/_is_request')); + var DUPLICATE = Private(require('components/courier/fetch/_req_status')).DUPLICATE; + + function mergeDuplicateRequests(requests) { + // dedupe requests + var index = {}; + return requests.map(function (req) { + if (!isRequest(req)) return req; + + var iid = req.source._instanceid; + if (!index[iid]) { + // this request is unique so far + index[iid] = req; + // keep the request + return req; + } + + // the source was requested at least twice + req._uniq = index[iid]; + return DUPLICATE; + }); + } + + return mergeDuplicateRequests; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_notifier.js b/src/kibana/components/courier/fetch/_notifier.js new file mode 100644 index 0000000000000..3573bcf99ebd2 --- /dev/null +++ b/src/kibana/components/courier/fetch/_notifier.js @@ -0,0 +1,7 @@ +define(function (require) { + return function CourierFetchNotifier(Notifier) { + return new Notifier({ + location: 'Courier Fetch' + }); + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_req_status.js b/src/kibana/components/courier/fetch/_req_status.js new file mode 100644 index 0000000000000..4c04a2f4de859 --- /dev/null +++ b/src/kibana/components/courier/fetch/_req_status.js @@ -0,0 +1,9 @@ +define(function (require) { + return function CourierFetchRequestStatus() { + return { + ABORTED: {}, + DUPLICATE: {}, + INCOMPLETE: {} + }; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/_request_error_handler.js b/src/kibana/components/courier/fetch/_request_error_handler.js deleted file mode 100644 index 8d488625b755b..0000000000000 --- a/src/kibana/components/courier/fetch/_request_error_handler.js +++ /dev/null @@ -1,29 +0,0 @@ -define(function (require) { - return function RequestErrorHandlerFactory(Private, Notifier) { - var pendingRequests = Private(require('components/courier/_pending_requests')); - var errorHandlers = Private(require('components/courier/_error_handlers')); - - var notify = new Notifier({ - location: 'Courier Fetch Error' - }); - - function RequestErrorHandler() {} - - RequestErrorHandler.prototype.handle = function (req, error) { - pendingRequests.push(req); - - var handlerCount = 0; - errorHandlers.splice(0).forEach(function (handler) { - if (handler.source !== req.source) return errorHandlers.push(handler); - handler.defer.resolve(error); - handlerCount++; - }); - - if (!handlerCount) { - notify.fatal(new Error('unhandled error ' + (error.stack || error.message))); - } - }; - - return RequestErrorHandler; - }; -}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/fetch.js b/src/kibana/components/courier/fetch/fetch.js index 513f670293181..cf2d42834743d 100644 --- a/src/kibana/components/courier/fetch/fetch.js +++ b/src/kibana/components/courier/fetch/fetch.js @@ -1,172 +1,92 @@ define(function (require) { - return function fetchService(Private, es, Promise, Notifier, sessionId, configFile) { + return function fetchService(Private, Promise) { var _ = require('lodash'); - var errors = require('errors'); - var moment = require('moment'); - var docStrategy = Private(require('components/courier/fetch/strategy/doc')); - var searchStrategy = Private(require('components/courier/fetch/strategy/search')); var strategies = this.strategies = { - doc: docStrategy, - search: searchStrategy + doc: Private(require('components/courier/fetch/strategy/doc')), + search: Private(require('components/courier/fetch/strategy/search')) }; - var RequestErrorHandler = Private(require('components/courier/fetch/_request_error_handler')); - var pendingRequests = Private(require('components/courier/_pending_requests')); + var requestQueue = Private(require('components/courier/_request_queue')); + var fetchThese = Private(require('components/courier/fetch/_fetch_these')); - var notify = new Notifier({ - location: 'Courier Fetch' - }); + var callResponseHandlers = Private(require('components/courier/fetch/_call_response_handlers')); + var INCOMPLETE = Private(require('components/courier/fetch/_req_status')).INCOMPLETE; - var fetchThese = function (strategy, requests, reqErrHandler) { - var all, body; - - // dedupe requests - var uniqs = {}; - all = requests.splice(0).filter(function (req) { - if (req.source.activeFetchCount) { - req.source.activeFetchCount += 1; - } else { - req.source.activeFetchCount = 1; - } - - req.moment = moment(); - - var iid = req.source._instanceid; - if (!uniqs[iid]) { - // this request is unique so far - uniqs[iid] = req; - // keep the request - return true; - } - - // the source was requested at least twice - var uniq = uniqs[iid]; - if (uniq._merged) { - // already setup the merged list - uniq._merged.push(req); - } else { - // put all requests into this array and itterate them on response - uniq._merged = [uniq, req]; - } - }); - - return Promise.map(all, _.limit(strategy.getSourceStateFromRequest, 1)) - .then(function (states) { - // all requests must have been disabled - if (!states.length) return Promise.resolve(); - - body = strategy.convertStatesToBody(states); - - return es[strategy.clientMethod]({ - timeout: configFile.shard_timeout, - preference: sessionId, - body: body - }) - .then(function (resp) { - var sendResponse = function (req, resp) { - if (resp.timed_out) { - notify.warning(new errors.SearchTimeout()); - } - req.complete = true; - req.resp = resp; - req.ms = req.moment.diff() * -1; - req.source.activeFetchCount -= 1; - - if (resp.error) return reqErrHandler.handle(req, new errors.FetchFailure(resp)); - else strategy.resolveRequest(req, resp); - }; - - strategy.getResponses(resp).forEach(function (resp) { - var req = all.shift(); - var state = states.shift(); - if (!req._merged) { - req.state = state; - sendResponse(req, resp); - } else { - req._merged.forEach(function (mergedReq) { - mergedReq.state = state; - sendResponse(mergedReq, _.cloneDeep(resp)); - }); - } - }); - - // pass the response along to the next promise - return resp; - }) - .catch(function (err) { - var sendFailure = function (req) { - req.source.activeFetchCount -= 1; - reqErrHandler.handle(req, err); - }; - - all.forEach(function (req) { - if (!req._merged) sendFailure(req); - else req._merged.forEach(sendFailure); - }); - throw err; - }); - }, notify.fatal); - }; - - var fetchPending = function (strategy) { - var requests = strategy.getPendingRequests(pendingRequests); + function fetchQueued(strategy) { + var requests = requestQueue.get(strategy); if (!requests.length) return Promise.resolve(); - else return fetchThese(strategy, requests, new RequestErrorHandler()); - }; - - var fetchASource = function (strategy, source) { - var defer = Promise.defer(); - fetchThese(strategy, [ - { - source: source, - defer: defer - } - ], new RequestErrorHandler()); - return defer.promise; - }; + else return fetchThese(requests); + } /** * Fetch all pending docs that are ready to be fetched - * @param {Courier} courier - The courier to read the pending - * requests from * @async */ - this.docs = _.partial(fetchPending, docStrategy); + this.docs = _.partial(fetchQueued, strategies.doc); /** * Fetch all pending search requests - * @param {Courier} courier - The courier to read the pending - * requests from * @async */ - this.searches = _.partial(fetchPending, searchStrategy); + this.searches = _.partial(fetchQueued, strategies.search); + + + function fetchASource(source, strategy) { + strategy = strategy || strategies[source._getType()]; + var defer = Promise.defer(); + + fetchThese([ + source._createRequest(defer.resolve) + ]); + + return defer.promise; + } /** * Fetch a single doc source * @param {DocSource} source - The DocSource to request * @async */ - this.doc = _.partial(fetchASource, docStrategy); + this.doc = fetchASource; /** * Fetch a single search source * @param {SearchSource} source - The SearchSource to request * @async */ - this.search = _.partial(fetchASource, searchStrategy); + this.search = fetchASource; /** - * Fetch a list of pendingRequests, which is already filtered - * @param {string} type - the type name for the sources in the requests + * Fetch a list of requests * @param {array} reqs - the requests to fetch + * @async + */ + this.these = fetchThese; + + /** + * Send responses to a list of requests, used when requests + * should be skipped (like when a doc is updated with an index). + * + * This logic is a simplified version of what fetch_these does, and + * could have been added elsewhere, but I would rather the logic be + * here than outside the courier/fetch module. + * + * @param {array[Request]} requests - the list of requests to respond to + * @param {array[any]} responses - the list of responses for each request */ - this.these = function (type, reqs) { - return fetchThese( - strategies[type], - reqs, - new RequestErrorHandler() - ); + this.fakeFetchThese = function (requests, responses) { + return Promise.map(requests, function (req) { + return req.start(); + }) + .then(function () { + return callResponseHandlers(requests, responses); + }) + .then(function (requestStates) { + if (_.contains(requestStates, INCOMPLETE)) { + throw new Error('responding to requests did not complete!'); + } + }); }; }; }); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/request/_error_handler.js b/src/kibana/components/courier/fetch/request/_error_handler.js new file mode 100644 index 0000000000000..665f1f9172ef2 --- /dev/null +++ b/src/kibana/components/courier/fetch/request/_error_handler.js @@ -0,0 +1,27 @@ +define(function (require) { + return function RequestErrorHandlerFactory(Private, Notifier) { + var errHandlers = Private(require('components/courier/_error_handlers')); + + var notify = new Notifier({ + location: 'Courier Fetch Error' + }); + + function handleError(req, error) { + var myHandlers = []; + + errHandlers.splice(0).forEach(function (handler) { + (handler.source !== req.source ? myHandlers : errHandlers).push(handler); + }); + + if (!myHandlers.length) { + notify.fatal(new Error('unhandled error ' + (error.stack || error.message))); + } else { + myHandlers.forEach(function (handler) { + handler.defer.resolve(error); + }); + } + } + + return handleError; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/request/_segmented_handle.js b/src/kibana/components/courier/fetch/request/_segmented_handle.js new file mode 100644 index 0000000000000..3af37cba8e088 --- /dev/null +++ b/src/kibana/components/courier/fetch/request/_segmented_handle.js @@ -0,0 +1,40 @@ +define(function (require) { + return function CourierSegmentedReqHandle(Private) { + var _ = require('lodash'); + var Events = Private(require('factories/events')); + + + /** + * Simple class for creating an object to send to the + * requester of a SegmentedRequest. Since the SegmentedRequest + * extends AbstractRequest, it wasn't able to be the event + * emitter it was born to be. This provides a channel for + * setting values on the segmented request, and an event + * emitter for the request to speak outwardly + * + * @param {SegmentedRequest} - req - the requst this handle relates to + */ + _(SegmentedHandle).inherits(Events); + function SegmentedHandle(req) { + SegmentedHandle.Super.call(this); + this._req = req; + } + + /** + * Set the sort direction for the request. + * + * @param {string} dir - one of 'asc' or 'desc' + */ + SegmentedHandle.prototype.setDirection = function (dir) { + switch (dir) { + case 'asc': + case 'desc': + return (this._req._direction = dir); + default: + throw new TypeError('unkown sort direction "' + dir + '"'); + } + }; + + return SegmentedHandle; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/request/doc.js b/src/kibana/components/courier/fetch/request/doc.js new file mode 100644 index 0000000000000..6f224e0be3c21 --- /dev/null +++ b/src/kibana/components/courier/fetch/request/doc.js @@ -0,0 +1,46 @@ +define(function (require) { + return function DocRequestProvider(Private) { + var _ = require('lodash'); + + var docStrategy = Private(require('components/courier/fetch/strategy/doc')); + var AbstractRequest = Private(require('components/courier/fetch/request/request')); + + _(DocRequest).inherits(AbstractRequest); + function DocRequest(source, defer) { + DocRequest.Super.call(this, source, defer); + + this.type = 'doc'; + this.strategy = docStrategy; + } + + DocRequest.prototype.canStart = function () { + var parent = DocRequest.Super.prototype.canStart.call(this); + if (!parent) return false; + + // _getStoredVersion updates the internal + // cache returned by _getVersion, so _getVersion + // must be called first + var version = this.source._getVersion(); + var storedVersion = this.source._getStoredVersion(); + + // conditions that equal "fetch This DOC!" + var unknownVersion = !version && !storedVersion; + var versionMismatch = version !== storedVersion; + var localVersionCleared = version && !storedVersion; + + if (unknownVersion || versionMismatch || localVersionCleared) return true; + }; + + DocRequest.prototype.handleResponse = function (resp) { + if (resp.found) { + this.source._storeVersion(resp._version); + } else { + this.source._clearVersion(); + } + + return DocRequest.Super.prototype.handleResponse.call(this, resp); + }; + + return DocRequest; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/request/request.js b/src/kibana/components/courier/fetch/request/request.js new file mode 100644 index 0000000000000..61446941e6dc4 --- /dev/null +++ b/src/kibana/components/courier/fetch/request/request.js @@ -0,0 +1,117 @@ +define(function (require) { + return function AbstractReqProvider(Private, Promise) { + var _ = require('lodash'); + var moment = require('moment'); + var errors = require('errors'); + var requestQueue = Private(require('components/courier/_request_queue')); + var requestErrorHandler = Private(require('components/courier/fetch/request/_error_handler')); + + function AbstractReq(source, defer) { + if (!(this instanceof AbstractReq) || !this.constructor || this.constructor === AbstractReq) { + throw new Error('The AbstractReq class should not be called directly'); + } + + this.source = source; + this.defer = defer || Promise.defer(); + + requestQueue.push(this); + } + + AbstractReq.prototype.canStart = function () { + return !this.stopped && !this.source._fetchDisabled; + }; + + AbstractReq.prototype.start = function () { + if (this.started) { + throw new TypeError('Unable to start request because it has already started'); + } + + this.started = true; + this.moment = moment(); + + var source = this.source; + if (source.activeFetchCount) { + source.activeFetchCount += 1; + } else { + source.activeFetchCount = 1; + } + + if (source.history) { + source.history = _.first(source.history.concat(this), 20); + } + }; + + AbstractReq.prototype.getFetchParams = function () { + return this.source._flatten(); + }; + + AbstractReq.prototype.transformResponse = function (resp) { + return resp; + }; + + AbstractReq.prototype.filterError = function (resp) { + return false; + }; + + AbstractReq.prototype.handleResponse = function (resp) { + this.success = true; + this.resp = resp; + }; + + AbstractReq.prototype.handleFailure = function (error) { + this.success = false; + this.resp = error && error.resp; + this.retry(); + return requestErrorHandler(this, error); + }; + + AbstractReq.prototype.isIncomplete = function () { + return false; + }; + + AbstractReq.prototype.continue = function () { + throw new Error('Unable to continue ' + this.type + ' request'); + }; + + AbstractReq.prototype.retry = function () { + var clone = this.clone(); + this.abort(); + return clone; + }; + + // don't want people overriding this, so it becomes a natural + // part of .abort() and .complete() + function stop(then) { + return function () { + if (this.stopped) return; + + this.stopped = true; + this.source.activeFetchCount -= 1; + _.pull(requestQueue, this); + then.call(this); + }; + } + + AbstractReq.prototype.abort = stop(function () { + this.defer = null; + this.aborted = true; + if (this._whenAborted) _.callEach(this._whenAborted); + }); + + AbstractReq.prototype.whenAborted = function (cb) { + this._whenAborted = (this._whenAborted || []); + this._whenAborted.push(cb); + }; + + AbstractReq.prototype.complete = stop(function () { + this.ms = this.moment.diff() * -1; + this.defer.resolve(this.resp); + }); + + AbstractReq.prototype.clone = function () { + return new this.constructor(this.source, this.defer); + }; + + return AbstractReq; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/request/search.js b/src/kibana/components/courier/fetch/request/search.js new file mode 100644 index 0000000000000..7387a7001f1fd --- /dev/null +++ b/src/kibana/components/courier/fetch/request/search.js @@ -0,0 +1,19 @@ +define(function (require) { + return function SearchReqProvider(Private) { + var _ = require('lodash'); + + var searchStrategy = Private(require('components/courier/fetch/strategy/search')); + var AbstractRequest = Private(require('components/courier/fetch/request/request')); + + _(SearchReq).inherits(AbstractRequest); + var Super = SearchReq.Super; + function SearchReq(source, defer) { + Super.call(this, source, defer); + + this.type = 'search'; + this.strategy = searchStrategy; + } + + return SearchReq; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/request/segmented.js b/src/kibana/components/courier/fetch/request/segmented.js new file mode 100644 index 0000000000000..8f1d9546b243a --- /dev/null +++ b/src/kibana/components/courier/fetch/request/segmented.js @@ -0,0 +1,209 @@ +define(function (require) { + return function CourierSegmentedReqProvider(es, Private, Promise, Notifier, timefilter) { + var _ = require('lodash'); + var SearchReq = Private(require('components/courier/fetch/request/search')); + var requestQueue = Private(require('components/courier/_request_queue')); + var SegmentedHandle = Private(require('components/courier/fetch/request/_segmented_handle')); + + var notify = new Notifier({ + location: 'Segmented Fetch' + }); + + _(SegmentedReq).inherits(SearchReq); + function SegmentedReq(source, defer, initFn) { + SearchReq.call(this, source, defer); + + this.type = 'segmented'; + + // segmented request specific state + this._initFn = initFn; + this._totalSize = false; + this._remainingSize = false; + this._direction = 'desc'; + this._handle = new SegmentedHandle(this); + + // prevent the source from changing between requests, + // all calls will return the same promise + this._getFlattenedSource = _.once(this._getFlattenedSource); + } + + /********* + ** SearchReq overrides + *********/ + + SegmentedReq.prototype.start = function () { + this._createQueue(); + this._all = this._queue.slice(0); + this._complete = []; + this._active = null; + this._segments = []; + + this._mergedResp = { + took: 0, + hits: { + hits: [], + total: 0, + max_score: 0 + } + }; + + // give the request consumer a chance to receive each segment and set + // parameters via the handle + if (_.isFunction(this._initFn)) this._initFn(this._handle); + + // Send the initial fetch status + this._reportStatus(); + + return SearchReq.prototype.start.call(this); + }; + + SegmentedReq.prototype.continue = function () { + return this._reportStatus(); + }; + + SegmentedReq.prototype.getFetchParams = function () { + var self = this; + + return self._getFlattenedSource() + .then(function (flatSource) { + var params = _.cloneDeep(flatSource); + var index = self._active = self._queue.shift(); + + params.index = index; + + if (self._remainingSize !== false) { + params.body.size = self._remainingSize; + } + + return params; + }); + }; + + SegmentedReq.prototype.handleResponse = function (resp) { + return this._consumeSegment(resp); + }; + + SegmentedReq.prototype.filterError = function (resp) { + if (/ClusterBlockException.*index\sclosed/.test(resp.error)) { + this._consumeSegment(false); + return true; + } + }; + + SegmentedReq.prototype.isIncomplete = function () { + return this._queue.length > 0; + }; + + SegmentedReq.prototype.clone = function () { + return new SegmentedReq(this.source, this.defer, this._initFn); + }; + + SegmentedReq.prototype.complete = function () { + this._reportStatus(); + this._handle.emit('complete'); + return SearchReq.prototype.complete.call(this); + }; + + + /********* + ** SegmentedReq specific methods + *********/ + + SegmentedReq.prototype._createQueue = function () { + var timeBounds = timefilter.getBounds(); + var indexPattern = this.source.get('index'); + var queue = indexPattern.toIndexList(timeBounds.min, timeBounds.max); + + if (!_.isArray(queue)) queue = [queue]; + if (this._direction === 'desc') queue = queue.reverse(); + + return (this._queue = queue); + }; + + SegmentedReq.prototype._reportStatus = function () { + return this._handle.emit('status', { + total: this._all.length, + complete: this._complete.length, + remaining: this._queue.length, + active: this._active, + hitCount: this._mergedResp.hits.hits.length + }); + }; + SegmentedReq.prototype._getFlattenedSource = function () { + var self = this; + + return self.source._flatten() + .then(function (flatSource) { + var size = _.parseInt(_.deepGet(flatSource, 'body.size')); + if (_.isNumber(size)) { + self._totalSize = self._remainingSize = size; + } + + return flatSource; + }); + }; + + SegmentedReq.prototype._consumeSegment = function (seg) { + var index = this._active; + this._complete.push(index); + if (!seg) return; // segment was ignored/filtered, don't store it + + var hadHits = _.deepGet(this._mergedResp, 'hits.hits.length') > 0; + var gotHits = _.deepGet(seg, 'hits.hits.length') > 0; + var firstHits = !hadHits && gotHits; + var haveHits = hadHits || gotHits; + + this._mergeSegment(seg); + this.resp = _.omit(this._mergedResp, '_bucketIndex'); + + if (this._remainingSize !== false) { + this._remainingSize -= seg.hits.hits.length; + } + + if (firstHits) this._handle.emit('first', seg); + if (gotHits) this._handle.emit('segment', seg); + if (haveHits) this._handle.emit('mergedSegment', this.resp); + }; + + SegmentedReq.prototype._mergeSegment = notify.timed('merge response segment', function (seg) { + var merged = this._mergedResp; + + this._segments.push(seg); + + merged.took += seg.took; + merged.hits.total = Math.max(merged.hits.total, seg.hits.total); + merged.hits.max_score = Math.max(merged.hits.max_score, seg.hits.max_score); + [].push.apply(merged.hits.hits, seg.hits.hits); + + if (!seg.aggregations) return; + + Object.keys(seg.aggregations).forEach(function (aggKey) { + + if (!merged.aggregations) { + // start merging aggregations + merged.aggregations = {}; + merged._bucketIndex = {}; + } + + if (!merged.aggregations[aggKey]) { + merged.aggregations[aggKey] = { + buckets: [] + }; + } + + seg.aggregations[aggKey].buckets.forEach(function (bucket) { + var mbucket = merged._bucketIndex[bucket.key]; + if (mbucket) { + mbucket.doc_count += bucket.doc_count; + return; + } + + mbucket = merged._bucketIndex[bucket.key] = bucket; + merged.aggregations[aggKey].buckets.push(mbucket); + }); + }); + }); + + return SegmentedReq; + }; +}); \ No newline at end of file diff --git a/src/kibana/components/courier/fetch/strategy/doc.js b/src/kibana/components/courier/fetch/strategy/doc.js index 6e26a44596df3..e222e6233238b 100644 --- a/src/kibana/components/courier/fetch/strategy/doc.js +++ b/src/kibana/components/courier/fetch/strategy/doc.js @@ -1,26 +1,22 @@ define(function (require) { - return function FetchStrategyForDoc() { + return function FetchStrategyForDoc(Promise) { return { clientMethod: 'mget', - /** - * Turn a request into a flat "state" - * @param {[type]} req [description] - * @return {[type]} [description] - */ - getSourceStateFromRequest: function (req) { - return req.source._flatten(); - }, - /** * Flatten a series of requests into as ES request body * @param {array} requests - an array of flattened requests * @return {string} - the request body */ - convertStatesToBody: function (states) { - return { - docs: states - }; + convertReqsToBody: function (reqs) { + return Promise.map(reqs, function (req) { + return req.getFetchParams(); + }) + .then(function (reqsParams) { + return { + docs: reqsParams + }; + }); }, /** @@ -30,50 +26,6 @@ define(function (require) { */ getResponses: function (resp) { return resp.docs; - }, - - /** - * Resolve a single request using a single response from an msearch - * @param {object} req - The request object, with a defer and source property - * @param {object} resp - An object from the mget response's "docs" array - * @return {Promise} - the promise created by responding to the request - */ - resolveRequest: function (req, resp) { - if (resp.found) { - req.source._storeVersion(resp._version); - } else { - req.source._clearVersion(); - } - return req.defer.resolve(resp); - }, - - /** - * Get the doc requests from the courier that are ready to be fetched - * @return {array} - The filtered request list, pulled from - * the courier's _pendingRequests queue - */ - getPendingRequests: function (pendingRequests) { - return pendingRequests.splice(0).filter(function (req) { - // filter by type first - if (req.source._getType() !== 'doc') { - pendingRequests.push(req); - return; - } - - // _getStoredVersion updates the internal - // cache returned by _getVersion, so _getVersion - // must be called first - var version = req.source._getVersion(); - var storedVersion = req.source._getStoredVersion(); - - // conditions that equal "fetch This DOC!" - var unknownVersion = !version && !storedVersion; - var versionMismatch = version !== storedVersion; - var localVersionCleared = version && !storedVersion; - - if (unknownVersion || versionMismatch || localVersionCleared) return true; - else pendingRequests.push(req); - }); } }; }; diff --git a/src/kibana/components/courier/fetch/strategy/search.js b/src/kibana/components/courier/fetch/strategy/search.js index 018316f629939..0e7df87a138bc 100644 --- a/src/kibana/components/courier/fetch/strategy/search.js +++ b/src/kibana/components/courier/fetch/strategy/search.js @@ -1,35 +1,39 @@ define(function (require) { - return function FetchStrategyForSearch(Private, Promise, Notifier, timefilter) { + return function FetchStrategyForSearch(Private, Promise, timefilter, configFile) { var _ = require('lodash'); - var notify = new Notifier(); - return { clientMethod: 'msearch', - getSourceStateFromRequest: function (req) { - return req.source._flatten(); - }, - /** * Flatten a series of requests into as ES request body + * * @param {array} requests - the requests to serialize * @return {string} - the request body */ - convertStatesToBody: function (states) { - return states.map(function (state) { - var timeBounds = timefilter.getBounds(); - var indexList = state.index.toIndexList(timeBounds.min, timeBounds.max); - - return JSON.stringify({ + convertReqsToBody: function (reqs) { + return Promise.map(reqs, function (req) { + return req.getFetchParams(); + }) + .then(function (reqsParams) { + return reqsParams.map(function (reqParams) { + var indexList = reqParams.index; + + if (_.isFunction(_.deepGet(indexList, 'toIndexList'))) { + var timeBounds = timefilter.getBounds(); + indexList = indexList.toIndexList(timeBounds.min, timeBounds.max); + } + + return JSON.stringify({ index: indexList, - type: state.type, + type: reqParams.type, ignore_unavailable: true }) + '\n' - + JSON.stringify(state.body); + + JSON.stringify(reqParams.body || {}); - }).join('\n') + '\n'; + }).join('\n') + '\n'; + }); }, /** @@ -39,37 +43,6 @@ define(function (require) { */ getResponses: function (resp) { return resp.responses; - }, - - /** - * Resolve a single request using a single response from an msearch - * @param {object} req - The request object, with a defer and source property - * @param {object} resp - An object from the mget response's "docs" array - * @return {Promise} - the promise created by responding to the request - */ - resolveRequest: function (req, resp) { - if (resp && resp.hits) { - resp.hits.hits.forEach(function (hit) { - hit._source = _.flattenWith('.', hit._source); - }); - } - req.defer.resolve(resp); - }, - - /** - * Get the doc requests from the courier that are ready to be fetched - * @param {array} pendingRequests - The list of pending requests, from - * which the requests to make should be - * removed - * @return {array} - The filtered request list, pulled from - * the courier's _pendingRequests queue - */ - getPendingRequests: function (pendingRequests) { - return pendingRequests.splice(0).filter(function (req) { - // filter by type first - if (req.source._getType() === 'search' && !req.source._fetchDisabled) return true; - else pendingRequests.push(req); - }); } }; }; diff --git a/src/kibana/components/courier/looper/_looper.js b/src/kibana/components/courier/looper/_looper.js index ee5166509bb96..a7e7e7598c512 100644 --- a/src/kibana/components/courier/looper/_looper.js +++ b/src/kibana/components/courier/looper/_looper.js @@ -1,133 +1,185 @@ define(function (require) { - return function LooperFactory($timeout, Notifier) { + return function LooperFactory($timeout, Notifier, Promise) { var _ = require('lodash'); var notify = new Notifier(); function Looper(ms, fn) { - var _ms = ms === void 0 ? 1500 : ms; - var _fns = [fn || _.noop]; - var _timerId; - var _started = false; - var looper = this; - - /** - * Set the number of milliseconds between - * each loop - * - * @param {integer} ms - * @chainable - */ - looper.ms = function (ms) { - _ms = ms; - looper.restart(); - return this; - }; - - /** - * Set the function that will be executed at the - * end of each looper. - * - * @param {function} fn - * @chainable - */ - looper.add = function (fn) { - _fns.push(fn); - return this; - }; - - /** - * Set the function that will be executed at the - * end of each looper. - * - * @param {function} fn - * @chainable - */ - looper.remove = function (fn) { - var i = _fns.indexOf(fn); - if (i > -1) _fns.splice(i, 1); - return this; - }; - - /** - * Start the looping madness - * - * @chainable - */ - looper.start = function () { - looper.stop(); - _started = true; - - // start with a run of the loop, which sets the next run - looper._looperOver(); - - return this; - }; - - /** - * ... - * - * @chainable - */ - looper.stop = function () { - if (_timerId) _timerId = $timeout.cancel(_timerId); - _started = false; - return this; - }; - - /** - * Restart the looper only if it is already started. - * Called automatically when ms is changed - * - * @chainable - */ - looper.restart = function () { - if (looper.started()) { - looper.stop(); - looper.start(); - } - return this; - }; - - /** - * Is the looper currently started/running/scheduled/going to execute - * - * @return {boolean} - */ - looper.started = function () { - return !!_started; - }; - - /** - * Wraps _fn so that _fn can be changed - * without rescheduling and schedules - * the next itteration - * - * @private - * @return {undefined} - */ - looper._looperOver = function () { - try { - _.callEach(_fns); - } catch (e) { - looper.stop(); - if (typeof console === 'undefined' || !console.error) { - throw e; - } else { - console.error(e.stack || e.message || e); - } - } - - _timerId = _ms ? $timeout(looper._looperOver, _ms) : null; - }; - - /** - * execute the _fn, and restart the timer - */ - looper.run = function () { - looper.start(); - }; + this._fn = fn; + this._ms = ms === void 0 ? 1500 : ms; + this._timer; + this._started = false; + + this._looperOver = _.bind(this._looperOver, this); } + /** + * Set the number of milliseconds between + * each loop + * + * @param {integer} ms + * @chainable + */ + Looper.prototype.ms = function (ms) { + this._ms = _.parseInt(ms) || 0; + + if (!this._started) return; + + if (this._ms) { + this.start(false); + } else { + this._unScheduleLoop(); + } + + return this; + }; + + /** + * Cancels the current looper while keeping internal + * state as started + * + * @chainable + */ + Looper.prototype.pause = function () { + this._unScheduleLoop(); + return this; + }; + + /** + * Start the looping madness + * + * @chainable + */ + Looper.prototype.start = function (loopOver) { + if (loopOver == null) loopOver = true; + + if (!this._started) { + this._started = true; + } else { + this._unScheduleLoop(); + } + + if (loopOver) { + this._looperOver(); + } else { + this._scheduleLoop(); + } + + return this; + }; + + /** + * ... + * + * @chainable + */ + Looper.prototype.stop = function () { + this._unScheduleLoop(); + this._started = false; + return this; + }; + + /** + * Restart the looper only if it is already started. + * Called automatically when ms is changed + * + * @chainable + */ + Looper.prototype.restart = function () { + this.start(false); + return this; + }; + + /** + * Is the looper currently started/running/scheduled/going to execute + * + * @return {boolean} + */ + Looper.prototype.started = function () { + return !!this._started; + }; + + /** + * Returns the current loop interval + * + * @return {number} + */ + Looper.prototype.loopInterval = function () { + return this._ms; + }; + + /** + * Called when the loop is executed before the previous + * run has completed. + * + * @override + * @return {undefined} + */ + Looper.prototype.onHastyLoop = function () { + // override this in subclasses + }; + + /** + * Wraps this._fn so that this._fn can be changed + * without rescheduling and schedules + * the next itteration + * + * @private + * @return {undefined} + */ + Looper.prototype._looperOver = function () { + var self = this; + + if (self.active) { + self.onHastyLoop(); + return; + } + + self.active = Promise + .try(this._fn) + .then(function () { + self._scheduleLoop(); + }) + .catch(function (err) { + self.stop(); + notify.fatal(err); + }) + .finally(function () { + self.active = null; + }); + }; + + /** + * Schedule the next itteration of the loop + * + * @private + * @return {number} - the timer promise + */ + Looper.prototype._scheduleLoop = function () { + this._unScheduleLoop(); + this._timer = this._ms ? $timeout(this._looperOver, this._ms) : null; + return this._timer; + }; + + /** + * Cancel the next itteration of the loop + * + * @private + * @return {number} - the timer promise + */ + Looper.prototype._unScheduleLoop = function () { + if (this._timer) { + $timeout.cancel(this._timer); + this._timer = null; + } + }; + + /** + * execute the this._fn, and restart the timer + */ + Looper.prototype.run = function () { + this.start(); + }; + return Looper; }; }); \ No newline at end of file diff --git a/src/kibana/components/courier/looper/doc.js b/src/kibana/components/courier/looper/doc.js index 2498fc7fa2830..5021c36c5a074 100644 --- a/src/kibana/components/courier/looper/doc.js +++ b/src/kibana/components/courier/looper/doc.js @@ -7,7 +7,9 @@ define(function (require) { * The Looper which will manage the doc fetch interval * @type {Looper} */ - var docLooper = new Looper(1500, fetch.docs).start(); + var docLooper = new Looper(1500, function () { + fetch.docs(); + }); return docLooper; }; diff --git a/src/kibana/components/courier/looper/search.js b/src/kibana/components/courier/looper/search.js index ec4bcf51157b2..d528da7d96593 100644 --- a/src/kibana/components/courier/looper/search.js +++ b/src/kibana/components/courier/looper/search.js @@ -1,23 +1,25 @@ define(function (require) { - return function SearchLooperService(Private, Promise) { - var errors = require('errors'); + return function SearchLooperService(Private, Promise, Notifier) { var fetch = Private(require('components/courier/fetch/fetch')); - var Looper = Private(require('components/courier/looper/_looper')); + var searchStrategy = Private(require('components/courier/fetch/strategy/search')); + var requestQueue = Private(require('components/courier/_request_queue')); - // track the currently executing search resquest - var _activeAutoSearch = null; + var Looper = Private(require('components/courier/looper/_looper')); + var notif = new Notifier({ location: 'Search Looper' }); /** * The Looper which will manage the doc fetch interval * @type {Looper} */ var searchLooper = new Looper(null, function () { - // fatal if refreshes take longer then the refresh interval - if (_activeAutoSearch) Promise.reject(new errors.HastyRefresh()); - return _activeAutoSearch = fetch.searches().finally(function (res) { - _activeAutoSearch = null; - }); - }).start(); + return fetch.these( + requestQueue.getInactive(searchStrategy) + ); + }); + + searchLooper.onHastyLoop = function () { + notif.warning('Skipping search attempt because previous search request has not completed'); + }; return searchLooper; }; diff --git a/src/kibana/components/courier/saved_object/saved_object.js b/src/kibana/components/courier/saved_object/saved_object.js index e3de5d6fa7178..2899f1f17b37a 100644 --- a/src/kibana/components/courier/saved_object/saved_object.js +++ b/src/kibana/components/courier/saved_object/saved_object.js @@ -1,7 +1,6 @@ define(function (require) { return function SavedObjectFactory(es, configFile, Promise, Private, Notifier, indexPatterns) { var errors = require('errors'); - var angular = require('angular'); var _ = require('lodash'); var slugifyId = require('utils/slugify_id'); @@ -211,18 +210,23 @@ define(function (require) { }; self.saveSource = function (source) { - return docSource.doIndex(source) - .then(function (id) { + var finish = function (id) { self.id = id; - }) - .then(function () { return es.indices.refresh({ index: configFile.kibana_index + }) + .then(function () { + return self.id; }); - }) - .then(function () { - // ensure that the object has the potentially new id - return self.id; + }; + return docSource.doCreate(source) + .then(finish) + .catch(function (err) { + var confirmMessage = 'Are you sure you want to overwrite this?'; + if (_.deepGet(err, 'origError.status') === 409 && window.confirm(confirmMessage)) { + return docSource.doIndex(source).then(finish); + } + return Promise.resolve(false); }); }; @@ -232,8 +236,10 @@ define(function (require) { * @return {undefined} */ self.destroy = function () { - docSource.cancelPending(); - if (self.searchSource) self.searchSource.cancelPending(); + docSource.cancelQueued(); + if (self.searchSource) { + self.searchSource.cancelQueued(); + } }; /** diff --git a/src/kibana/components/errors.js b/src/kibana/components/errors.js index aafa2d64e1e40..0cdafb3e78f09 100644 --- a/src/kibana/components/errors.js +++ b/src/kibana/components/errors.js @@ -52,6 +52,8 @@ define(function (require) { * @param {Object} resp - optional HTTP response */ errors.RequestFailure = function RequestFailure(err, resp) { + err = err || false; + KbnError.call(this, 'Request to Elasticsearch failed: ' + JSON.stringify(resp || err.message), errors.RequestFailure); diff --git a/src/kibana/components/filter_bar/filter_bar.html b/src/kibana/components/filter_bar/filter_bar.html index 9f2f49ea4c90c..b5972d81e5aee 100644 --- a/src/kibana/components/filter_bar/filter_bar.html +++ b/src/kibana/components/filter_bar/filter_bar.html @@ -2,12 +2,13 @@
+
@@ -16,12 +17,16 @@ "{{ filter.meta.value }}"
@@ -35,6 +40,7 @@
+