diff --git a/src/fixtures/stubbed_logstash_index_pattern.js b/src/fixtures/stubbed_logstash_index_pattern.js index 2b114919a81e5..b91257e863a04 100644 --- a/src/fixtures/stubbed_logstash_index_pattern.js +++ b/src/fixtures/stubbed_logstash_index_pattern.js @@ -1,6 +1,6 @@ define(function (require) { return function stubbedLogstashIndexPatternService(Private) { - var StubIndexPattern = Private(require('testUtils/stubIndexPattern')); + var StubIndexPattern = Private(require('testUtils/stub_index_pattern')); var fieldTypes = Private(require('ui/index_patterns/_field_types')); var mockLogstashFields = Private(require('fixtures/logstash_fields')); diff --git a/src/fixtures/stubbed_search_source.js b/src/fixtures/stubbed_search_source.js index d80e52c6cf5c3..3a5dc3a465551 100644 --- a/src/fixtures/stubbed_search_source.js +++ b/src/fixtures/stubbed_search_source.js @@ -2,8 +2,9 @@ define(function (require) { var sinon = require('auto-release-sinon'); var searchResponse = require('fixtures/search_response'); - return function stubSearchSource(Private, $q) { + return function stubSearchSource(Private, $q, Promise) { var deferedResult = $q.defer(); + var indexPattern = Private(require('fixtures/stubbed_logstash_index_pattern')); return { sort: sinon.spy(), @@ -13,7 +14,7 @@ define(function (require) { get: function (param) { switch (param) { case 'index': - return Private(require('fixtures/stubbed_logstash_index_pattern')); + return indexPattern; default: throw new Error('Param "' + param + '" is not implemented in the stubbed search source'); } @@ -29,7 +30,9 @@ define(function (require) { return deferedResult.promise; }, onError: function () { return $q.defer().promise; }, - + _flatten: function () { + return Promise.resolve({ index: indexPattern, body: {} }); + } }; }; diff --git a/src/plugins/kibana/public/discover/controllers/discover.js b/src/plugins/kibana/public/discover/controllers/discover.js index 88b40cd678f53..b23223d257ef6 100644 --- a/src/plugins/kibana/public/discover/controllers/discover.js +++ b/src/plugins/kibana/public/discover/controllers/discover.js @@ -337,14 +337,15 @@ define(function (require) { }()); var sortFn = null; - if (sortBy === 'non-time') { + if (sortBy !== 'implicit') { sortFn = new HitSortFn(sort[1]); } $scope.updateTime(); if (sort[0] === '_score') segmented.setMaxSegments(1); segmented.setDirection(sortBy === 'time' ? (sort[1] || 'desc') : 'desc'); - segmented.setSize(sortBy === 'time' ? $scope.opts.sampleSize : false); + segmented.setSortFn(sortFn); + segmented.setSize($scope.opts.sampleSize); // triggered when the status updated segmented.on('status', function (status) { @@ -362,30 +363,30 @@ define(function (require) { return failure.index + failure.shard + failure.reason; }); } + })); - var rows = $scope.rows; - var indexPattern = $scope.searchSource.get('index'); + segmented.on('mergedSegment', function (merged) { + $scope.mergedEsResp = merged; + $scope.hits = merged.hits.total; - // merge the rows and the hits, use a new array to help watchers - rows = $scope.rows = rows.concat(resp.hits.hits); + var indexPattern = $scope.searchSource.get('index'); - if (sortFn) { - notify.event('resort rows', function () { - rows.sort(sortFn); - rows = $scope.rows = rows.slice(0, totalSize); - $scope.fieldCounts = {}; - }); - } + // the merge rows, use a new array to help watchers + $scope.rows = merged.hits.hits.slice(); notify.event('flatten hit and count fields', function () { var counts = $scope.fieldCounts; + + // if we haven't counted yet, or need a fresh count because we are sorting, reset the counts + if (!counts || sortFn) counts = $scope.fieldCounts = {}; + $scope.rows.forEach(function (hit) { - // skip this work if we have already done it and we are NOT sorting. - // --- + // skip this work if we have already done it + if (hit.$$_counted) return; + // when we are sorting results, we need to redo the counts each time because the - // "top 500" may change with each response - if (hit.$$_counted && !sortFn) return; - hit.$$_counted = true; + // "top 500" may change with each response, so don't mark this as counted + if (!sortFn) hit.$$_counted = true; var fields = _.keys(indexPattern.flattenHit(hit)); var n = fields.length; @@ -396,13 +397,6 @@ define(function (require) { } }); }); - - })); - - segmented.on('mergedSegment', function (merged) { - $scope.mergedEsResp = merged; - $scope.hits = merged.hits.total; - }); segmented.on('complete', function () { diff --git a/src/testUtils/stubIndexPattern.js b/src/testUtils/stub_index_pattern.js similarity index 91% rename from src/testUtils/stubIndexPattern.js rename to src/testUtils/stub_index_pattern.js index a4338c9b553ed..d350fa1254ca6 100644 --- a/src/testUtils/stubIndexPattern.js +++ b/src/testUtils/stub_index_pattern.js @@ -23,6 +23,13 @@ define(function (require) { this.routes = IndexPattern.prototype.routes; this.toIndexList = _.constant(Promise.resolve([pattern])); + this.toDetailedIndexList = _.constant(Promise.resolve([ + { + index: pattern, + min: 0, + max: 1 + } + ])); this.getComputedFields = _.bind(getComputedFields, this); this.flattenHit = flattenHit(this); this.formatHit = formatHit(this, fieldFormats.getDefaultInstance('string')); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented.js b/src/ui/public/courier/fetch/request/__tests__/segmented.js index 8c7745d47bed1..7edfdd69190a1 100644 --- a/src/ui/public/courier/fetch/request/__tests__/segmented.js +++ b/src/ui/public/courier/fetch/request/__tests__/segmented.js @@ -47,9 +47,12 @@ describe('ui/courier/fetch/request/segmented', () => { } function mockIndexPattern() { - const queue = [1, 2, 3]; return { - toIndexList: sinon.stub().returns(Promise.resolve(queue)) + toDetailedIndexList: sinon.stub().returns(Promise.resolve([ + { index: 1, min: 0, max: 1 }, + { index: 2, min: 0, max: 1 }, + { index: 3, min: 0, max: 1 }, + ])) }; } }); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js b/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js index 11a05589a3cee..65057932a350a 100644 --- a/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js +++ b/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js @@ -34,11 +34,11 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { expect(req._queueCreated).to.be(true); }); - it('relies on indexPattern.toIndexList to generate queue', async function () { + it('relies on indexPattern.toDetailedIndexList to generate queue', async function () { const source = new MockSource(); const ip = source.get('index'); const indices = [1,2,3]; - sinon.stub(ip, 'toIndexList').returns(Promise.resolve(indices)); + sinon.stub(ip, 'toDetailedIndexList').returns(Promise.resolve(indices)); const req = new SegmentedReq(source); const output = await req._createQueue(); @@ -49,14 +49,14 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { const source = new MockSource(); const ip = source.get('index'); const req = new SegmentedReq(source); - sinon.stub(ip, 'toIndexList').returns(Promise.resolve([1,2,3])); + sinon.stub(ip, 'toDetailedIndexList').returns(Promise.resolve([1,2,3])); req.setDirection('asc'); await req._createQueue(); - expect(ip.toIndexList.lastCall.args[2]).to.be('asc'); + expect(ip.toDetailedIndexList.lastCall.args[2]).to.be('asc'); req.setDirection('desc'); await req._createQueue(); - expect(ip.toIndexList.lastCall.args[2]).to.be('desc'); + expect(ip.toDetailedIndexList.lastCall.args[2]).to.be('desc'); }); }); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js b/src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js new file mode 100644 index 0000000000000..1e5131fbe248c --- /dev/null +++ b/src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js @@ -0,0 +1,129 @@ +import ngMock from 'ngMock'; +import expect from 'expect.js'; +import { times } from 'lodash'; +import sinon from 'auto-release-sinon'; + +import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn'; +import NoDigestPromises from 'testUtils/noDigestPromises'; + +describe('Segmented Request Index Selection', function () { + let Promise; + let $rootScope; + let SegmentedReq; + let MockSource; + let HitSortFn; + + NoDigestPromises.activateForSuite(); + + beforeEach(ngMock.module('kibana')); + beforeEach(ngMock.inject((Private, $injector) => { + Promise = $injector.get('Promise'); + HitSortFn = Private(HitSortFnProv); + $rootScope = $injector.get('$rootScope'); + SegmentedReq = Private(require('ui/courier/fetch/request/segmented')); + + const StubbedSearchSourceProvider = require('fixtures/stubbed_search_source'); + MockSource = class { + constructor() { + return $injector.invoke(StubbedSearchSourceProvider); + } + }; + })); + + it('queries with size until all 500 docs returned', async function () { + const search = new MockSource(); + const indexPattern = search.get('index'); + sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([ + { index: 'one', min: 0, max: 1 }, + { index: 'two', min: 0, max: 1 }, + { index: 'three', min: 0, max: 1 }, + { index: 'four', min: 0, max: 1 }, + { index: 'five', min: 0, max: 1 }, + ])); + + const req = new SegmentedReq(search); + req._handle.setDirection('desc'); + req._handle.setSortFn(new HitSortFn('desc')); + req._handle.setSize(500); + await req.start(); + + // first 200 + expect((await req.getFetchParams()).body.size).to.be(500); + await req.handleResponse({ + hits: { total: 1000, hits: times(200, (i) => ({ i })) } + }); + + // total = 400 + expect((await req.getFetchParams()).body.size).to.be(500); + await req.handleResponse({ + hits: { total: 1000, hits: times(200, (i) => ({ i })) } + }); + + // total = 600 + expect((await req.getFetchParams()).body.size).to.be(500); + await req.handleResponse({ + hits: { total: 1000, hits: times(200, (i) => ({ i })) } + }); + + expect((await req.getFetchParams()).body.size).to.be(0); + await req.handleResponse({ + hits: { total: 1000, hits: times(200, (i) => ({ i })) } + }); + + expect((await req.getFetchParams()).body.size).to.be(0); + await req.handleResponse({ + hits: { total: 1000, hits: times(200, (i) => ({ i })) } + }); + }); + + it(`sets size 0 for indices that couldn't procude hits`, async function () { + const search = new MockSource(); + const indexPattern = search.get('index'); + + // the segreq is looking for 10 documents, and we will give it ten docs with time:5 in the first response. + // on the second index it should still request 10 documents because it could produce documents with time:5. + // the next two indexes will get size 0, since they couldn't produce documents with the time:5 + // the final index will get size:10, because it too can produce docs with time:5 + sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([ + { index: 'one', min: 0, max: 10 }, + { index: 'two', min: 0, max: 10 }, + { index: 'three', min: 12, max: 20 }, + { index: 'four', min: 15, max: 20 }, + { index: 'five', min: 5, max: 50 }, + ])); + + const req = new SegmentedReq(search); + req._handle.setDirection('desc'); + req._handle.setSortFn(new HitSortFn('desc')); + req._handle.setSize(10); + await req.start(); + + // first 10 + expect((await req.getFetchParams()).body.size).to.be(10); + await req.handleResponse({ + hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) } + }); + + // total = 400 + expect((await req.getFetchParams()).body.size).to.be(10); + await req.handleResponse({ + hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) } + }); + + // total = 600 + expect((await req.getFetchParams()).body.size).to.be(0); + await req.handleResponse({ + hits: { total: 1000, hits: [] } + }); + + expect((await req.getFetchParams()).body.size).to.be(0); + await req.handleResponse({ + hits: { total: 1000, hits: [] } + }); + + expect((await req.getFetchParams()).body.size).to.be(10); + await req.handleResponse({ + hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) } + }); + }); +}); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js b/src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js new file mode 100644 index 0000000000000..68a153bde3292 --- /dev/null +++ b/src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js @@ -0,0 +1,55 @@ +import ngMock from 'ngMock'; +import expect from 'expect.js'; +import { times } from 'lodash'; +import sinon from 'auto-release-sinon'; + +import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn'; +import NoDigestPromises from 'testUtils/noDigestPromises'; + +describe('Segmented Request Size Picking', function () { + let Promise; + let $rootScope; + let SegmentedReq; + let MockSource; + let HitSortFn; + + NoDigestPromises.activateForSuite(); + + beforeEach(ngMock.module('kibana')); + beforeEach(ngMock.inject((Private, $injector) => { + Promise = $injector.get('Promise'); + HitSortFn = Private(HitSortFnProv); + $rootScope = $injector.get('$rootScope'); + SegmentedReq = Private(require('ui/courier/fetch/request/segmented')); + + const StubbedSearchSourceProvider = require('fixtures/stubbed_search_source'); + MockSource = class { + constructor() { + return $injector.invoke(StubbedSearchSourceProvider); + } + }; + })); + + context('without a size', function () { + it('does not set the request size', async function () { + const req = new SegmentedReq(new MockSource()); + req._handle.setDirection('desc'); + req._handle.setSortFn(new HitSortFn('desc')); + await req.start(); + + expect((await req.getFetchParams()).body).to.not.have.property('size'); + }); + }); + + context('with a size', function () { + it('sets the request size to the entire desired size', async function () { + const req = new SegmentedReq(new MockSource()); + req._handle.setDirection('desc'); + req._handle.setSize(555); + req._handle.setSortFn(new HitSortFn('desc')); + await req.start(); + + expect((await req.getFetchParams()).body).to.have.property('size', 555); + }); + }); +}); diff --git a/src/ui/public/courier/fetch/request/_segmented_handle.js b/src/ui/public/courier/fetch/request/_segmented_handle.js index d12b5a10679b3..ad46cd350c4eb 100644 --- a/src/ui/public/courier/fetch/request/_segmented_handle.js +++ b/src/ui/public/courier/fetch/request/_segmented_handle.js @@ -22,6 +22,7 @@ define(function (require) { this.setDirection = _.bindKey(req, 'setDirection'); this.setSize = _.bindKey(req, 'setSize'); this.setMaxSegments = _.bindKey(req, 'setMaxSegments'); + this.setSortFn = _.bindKey(req, 'setSortFn'); } return SegmentedHandle; diff --git a/src/ui/public/courier/fetch/request/segmented.js b/src/ui/public/courier/fetch/request/segmented.js index a2291b3b99c6b..13ace5fcc0238 100644 --- a/src/ui/public/courier/fetch/request/segmented.js +++ b/src/ui/public/courier/fetch/request/segmented.js @@ -1,6 +1,7 @@ define(function (require) { return function CourierSegmentedReqProvider(es, Private, Promise, Notifier, timefilter, config) { var _ = require('lodash'); + var isNumber = require('lodash').isNumber; var SearchReq = Private(require('ui/courier/fetch/request/search')); var SegmentedHandle = Private(require('ui/courier/fetch/request/_segmented_handle')); @@ -16,13 +17,16 @@ define(function (require) { // segmented request specific state this._initFn = initFn; - this._desiredSize = false; + + this._desiredSize = null; this._maxSegments = config.get('courier:maxSegmentCount'); - this._hitsReceived = 0; this._direction = 'desc'; + this._sortFn = null; this._queueCreated = false; this._handle = new SegmentedHandle(this); + this._hitWindow = null; + // prevent the source from changing between requests, // all calls will return the same promise this._getFlattenedSource = _.once(this._getFlattenedSource); @@ -70,8 +74,7 @@ define(function (require) { SegmentedReq.prototype.getFetchParams = function () { var self = this; - return self._getFlattenedSource() - .then(function (flatSource) { + return self._getFlattenedSource().then(function (flatSource) { var params = _.cloneDeep(flatSource); // calculate the number of indices to fetch in this request in order to prevent @@ -81,10 +84,12 @@ define(function (require) { // request, making sure the first request returns faster. var remainingSegments = self._maxSegments - self._segments.length; var indexCount = Math.max(1, Math.floor(self._queue.length / remainingSegments)); - params.index = self._active = self._queue.splice(0, indexCount); - if (self._desiredSize !== false) { - params.body.size = Math.max(self._desiredSize - self._hitsReceived, 0); + var indices = self._active = self._queue.splice(0, indexCount); + params.index = _.pluck(indices, 'index'); + + if (isNumber(self._desiredSize)) { + params.body.size = self._pickSizeForIndices(indices); } if (params.body.size === 0) params.search_type = 'count'; @@ -149,6 +154,15 @@ define(function (require) { } }; + /** + * Set the function that will be used to sort the rows + * + * @param {fn} + */ + SegmentedReq.prototype.setSortFn = function (sortFn) { + this._sortFn = sortFn; + }; + /** * Set the sort total number of documents to * emit @@ -160,7 +174,8 @@ define(function (require) { * @param {number|false} */ SegmentedReq.prototype.setSize = function (totalSize) { - this._desiredSize = _.parseInt(totalSize) || false; + this._desiredSize = _.parseInt(totalSize); + if (isNaN(this._desiredSize)) this._desiredSize = null; }; SegmentedReq.prototype._createQueue = function () { @@ -169,7 +184,7 @@ define(function (require) { var indexPattern = self.source.get('index'); self._queueCreated = false; - return indexPattern.toIndexList(timeBounds.min, timeBounds.max, self._direction) + return indexPattern.toDetailedIndexList(timeBounds.min, timeBounds.max, self._direction) .then(function (queue) { if (!_.isArray(queue)) queue = [queue]; @@ -205,13 +220,30 @@ define(function (require) { this._mergeSegment(seg); this.resp = _.omit(this._mergedResp, '_bucketIndex'); - this._hitsReceived += seg.hits.hits.length; if (firstHits) this._handle.emit('first', seg); if (gotHits) this._handle.emit('segment', seg); if (haveHits) this._handle.emit('mergedSegment', this.resp); }; + SegmentedReq.prototype._mergeHits = function (hits) { + var mergedHits = this._mergedResp.hits.hits; + var desiredSize = this._desiredSize; + var sortFn = this._sortFn; + + _.pushAll(hits, mergedHits); + + if (sortFn) { + notify.event('resort rows', function () { + mergedHits.sort(sortFn); + }); + } + + if (isNumber(desiredSize)) { + mergedHits = this._mergedResp.hits.hits = mergedHits.slice(0, desiredSize); + } + }; + SegmentedReq.prototype._mergeSegment = notify.timed('merge response segment', function (seg) { var merged = this._mergedResp; @@ -220,7 +252,11 @@ define(function (require) { merged.took += seg.took; merged.hits.total += seg.hits.total; merged.hits.max_score = Math.max(merged.hits.max_score, seg.hits.max_score); - [].push.apply(merged.hits.hits, seg.hits.hits); + + if (_.size(seg.hits.hits)) { + this._mergeHits(seg.hits.hits); + this._detectHitsWindow(merged.hits.hits); + } if (!seg.aggregations) return; @@ -251,6 +287,51 @@ define(function (require) { }); }); + SegmentedReq.prototype._detectHitsWindow = function (hits) { + hits = hits || []; + var indexPattern = this.source.get('index'); + var desiredSize = this._desiredSize; + + var size = _.size(hits); + if (!isNumber(desiredSize) || size < desiredSize) { + this._hitWindow = { + size: size, + min: -Infinity, + max: Infinity + }; + return; + } + + let min; + let max; + + hits.forEach(function (deepHit) { + var hit = indexPattern.flattenHit(deepHit); + var time = hit[indexPattern.timeFieldName]; + if (min == null || time < min) min = time; + if (max == null || time > max) max = time; + }); + + this._hitWindow = { size, min, max }; + }; + + SegmentedReq.prototype._pickSizeForIndices = function (indices) { + var hitWindow = this._hitWindow; + var desiredSize = this._desiredSize; + + if (!isNumber(desiredSize)) return null; + // we don't have any hits yet, get us more info! + if (!hitWindow) return desiredSize; + // the order of documents isn't important, just get us more + if (!this._sortFn) return Math.max(desiredSize - hitWindow.size, 0); + // if all of the documents in every index fall outside of our current doc set, we can ignore them. + var someOverlap = indices.some(function (index) { + return index.min <= hitWindow.max && hitWindow.min <= index.max; + }); + + return someOverlap ? desiredSize : 0; + }; + return SegmentedReq; }; }); diff --git a/src/ui/public/index_patterns/_index_pattern.js b/src/ui/public/index_patterns/_index_pattern.js index cd79b721cbb4b..ba2d40e063091 100644 --- a/src/ui/public/index_patterns/_index_pattern.js +++ b/src/ui/public/index_patterns/_index_pattern.js @@ -180,7 +180,7 @@ define(function (require) { self.toIndexList = function (start, stop, sortDirection) { return self .toDetailedIndexList(start, stop, sortDirection) - .then(detailedIndices => { + .then(function (detailedIndices) { if (!_.isArray(detailedIndices)) { return detailedIndices.index; } @@ -190,7 +190,8 @@ define(function (require) { }; self.toDetailedIndexList = Promise.method(function (start, stop, sortDirection) { - const interval = self.getInterval(); + var interval = self.getInterval(); + if (interval) { return intervals.toIndexList(self.id, interval, start, stop, sortDirection); } diff --git a/src/ui/public/index_patterns/_intervals.js b/src/ui/public/index_patterns/_intervals.js index 59374222df136..c449a7d8890a7 100644 --- a/src/ui/public/index_patterns/_intervals.js +++ b/src/ui/public/index_patterns/_intervals.js @@ -79,7 +79,11 @@ define(function (require) { const min = start.valueOf(); const max = bound.valueOf(); - indexList[add]({ index, min, max }); + indexList[add]({ + index: index, + min: min, + max: max + }); start = next; }