Skip to content
This repository has been archived by the owner on Mar 31, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from spenceralger/TimefilterAutoRefresh
Browse files Browse the repository at this point in the history
Segmented fetch update
  • Loading branch information
grouma committed Dec 18, 2014
2 parents 1a15a78 + 2f3ef3c commit f137a65
Show file tree
Hide file tree
Showing 40 changed files with 1,022 additions and 743 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!-- render {"template":"# Kibana <%= pkg.version %>"} -->
# Kibana 4.0.0-BETA2
# Kibana 4.0.0-beta3
<!-- /render -->

[![Build Status](https://travis-ci.org/elasticsearch/kibana.svg?branch=master)](https://travis-ci.org/elasticsearch/kibana?branch=master)
Expand Down Expand Up @@ -29,7 +29,6 @@ Kibana is an open source (Apache Licensed), browser based analytics and search d
* Visit [http://localhost:5601](http://localhost:5601)

<!-- include {"path":"docs/quick_start.md"} -->

## Quick Start

You're up and running! Fantastic! Kibana is now running on port 5601, so point your browser at http://YOURDOMAIN.com:5601.
Expand Down Expand Up @@ -325,7 +324,4 @@ Clicking on the *View* action loads that item in the associated applications. Re
Clicking *Edit* will allow you to change the title, description and other settings of the saved object. You can also edit the schema of the stored object.

*Note:* this operation is for advanced users only - making changes here can break large portions of the application.
<!-- /include -->

## Building from Source
If you want the latest code or need something that's not in a release package, you'll need to build from source. See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions.
<!-- /include -->
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kibana",
"private": true,
"version": "4.0.0-BETA2",
"version": "4.0.0-beta3",
"description": "Kibana 4",
"main": "Gulpfile.js",
"dependencies": {},
Expand Down
30 changes: 18 additions & 12 deletions src/kibana/components/agg_response/tabify/_response_writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ define(function (require) {
this.aggStack = _.pluck(this.columns, 'aggConfig');

this.root = new TableGroup();
this.acrStack = [];
this.splitStack = [this.root];
}

Expand All @@ -66,9 +67,6 @@ define(function (require) {
table.aggConfig = agg;
table.key = key;
table.title = agg.makeLabel() + ': ' + (table.fieldFormatter()(key));
if (this.asAggConfigResults) {
table.aggConfigResult = new AggConfigResult(agg, parent.aggConfigResult, key, key);
}
}

// link the parent and child
Expand Down Expand Up @@ -99,16 +97,23 @@ define(function (require) {

buckets.forEach(function (bucket, key) {
// find the existing split that we should extend
var TableGroup = _.find(self.splitStack[0].tables, { aggConfig: agg, key: key });
var tableGroup = _.find(self.splitStack[0].tables, { aggConfig: agg, key: key });
// create the split if it doesn't exist yet
if (!TableGroup) TableGroup = self._table(true, agg, key);
if (!tableGroup) tableGroup = self._table(true, agg, key);

var splitAcr = false;
if (self.asAggConfigResults) splitAcr = new AggConfigResult(agg, self.acrStack[0], key, key);

// push the split onto the stack so that it will receive written tables
self.splitStack.unshift(TableGroup);
self.splitStack.unshift(tableGroup);
splitAcr && self.acrStack.unshift(splitAcr);

// call the block
if (_.isFunction(block)) block.call(self, bucket, key);

// remove the split from the stack
self.splitStack.shift();
splitAcr && self.acrStack.shift();
});
};

Expand Down Expand Up @@ -144,17 +149,18 @@ define(function (require) {
*/
TabbedAggResponseWriter.prototype.cell = function (agg, value, block) {
if (this.asAggConfigResults) {
var parent = _.findLast(this.rowBuffer, function (result) {
return result.aggConfig.schema.group === 'buckets';
});
if (!parent) parent = this.splitStack[0].aggConfigResult;

value = new AggConfigResult(agg, parent, value, value);
value = new AggConfigResult(agg, this.acrStack[0], value, value);
}

var staskResult = this.asAggConfigResults && value.type === 'bucket';

this.rowBuffer.push(value);
if (staskResult) this.acrStack.unshift(value);

if (_.isFunction(block)) block.call(this);

this.rowBuffer.pop(value);
if (staskResult) this.acrStack.shift();

return value;
};
Expand Down
115 changes: 19 additions & 96 deletions src/kibana/components/courier/fetch/_fetch_these.js
Original file line number Diff line number Diff line change
@@ -1,99 +1,21 @@
define(function (require) {
return function FetchTheseProvider(Private, Promise, es, Notifier, sessionId, configFile) {
var _ = require('lodash');
var moment = require('moment');
var errors = require('errors');
var pendingRequests = Private(require('components/courier/_pending_requests'));

var notify = new Notifier({
location: 'Courier Fetch'
});
var initRequest = Private(require('components/courier/fetch/_init_request'));
var reqComplete = Private(require('components/courier/fetch/_request_complete'));
var forEachStrategy = Private(require('components/courier/fetch/_for_each_strategy'));
var requestErrorHandler = Private(require('components/courier/fetch/_request_error_handler'));
var mergeDuplicateRequests = Private(require('components/courier/fetch/_merge_duplicate_requests'));

function eachStrategy(requests, block) {
block = Promise.method(block);
var sets = [];

requests.forEach(function (req) {
var strategy = req.strategy;
var set = _.find(sets, { 0: strategy });
if (set) set[1].push(req);
else sets.push([strategy, [req]]);
});

return Promise.all(sets.map(function (set) {
return (function fetch(requests, strategy) {
return block(requests, strategy)
.then(function checkForIncompleteRequests(result) {
if (_.isFunction(strategy.getIncompleteRequests)) {
var incomplete = strategy.getIncompleteRequests(pendingRequests);
if (incomplete.length) {
return fetch(incomplete, strategy);
}
}
return result;
});
}(set[1], set[0]));
}))
.catch(notify.fatal);
}

function initRequest(req) {
if (req.source.activeFetchCount) {
req.source.activeFetchCount += 1;
} else {
req.source.activeFetchCount = 1;
}

req.moment = moment();
}

function mergeDuplicateRequests(requests) {
// dedupe requests
var index = {};
return requests.splice(0).filter(function (req) {
var iid = req.source._instanceid;
if (!index[iid]) {
// this request is unique so far
index[iid] = req;
// keep the request
return true;
}

// the source was requested at least twice
var uniq = index[iid];
if (uniq._merged) {
// already setup the merged list
uniq._merged.push(req);
} else {
// put all requests into this array and itterate them on response
uniq._merged = [uniq, req];
}
});
}

function reqComplete(req, resp, errorHandler) {
if (resp.timed_out) {
notify.warning(new errors.SearchTimeout());
}

req.complete = true;
req.resp = resp;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;

if (resp.error) {
return errorHandler.handle(req, new errors.FetchFailure(resp));
}

req.strategy.resolveRequest(req, resp);
}

function fetchThese(requests, errorHandler) {
return eachStrategy(requests, function (requests, strategy) {
function fetchThese(requests) {
return forEachStrategy(requests, function (requests, strategy) {
requests.forEach(initRequest);

var uniq = mergeDuplicateRequests(requests);
var states;
var responses;

return Promise.map(uniq, function (req) {
return strategy.getSourceStateFromRequest(req);
Expand All @@ -111,30 +33,31 @@ define(function (require) {
});
})
.then(strategy.getResponses)
.then(function (responses) {
.then(function (_responses_) {
responses = _responses_;

responses.forEach(function (resp) {
return Promise.all(responses.map(function (resp) {
var req = uniq.shift();
var state = states.shift();
if (!req._merged) {
req.state = state;
reqComplete(req, resp, errorHandler);
return reqComplete(req, resp);
} else {
req._merged.forEach(function (mergedReq) {
return Promise.all(req._merged.map(function (mergedReq) {
mergedReq.state = state;
var respClone = _.cloneDeep(resp);
reqComplete(mergedReq, respClone, errorHandler);
});
return reqComplete(mergedReq, respClone);
}));
}
});

}));
})
.then(function () {
return responses;
})
.catch(function (err) {

function sendFailure(req) {
req.source.activeFetchCount -= 1;
errorHandler.handle(req, err);
requestErrorHandler(req, err);
}

uniq.forEach(function (req) {
Expand Down
42 changes: 42 additions & 0 deletions src/kibana/components/courier/fetch/_for_each_strategy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
define(function (require) {
return function FetchForEachRequestStrategy(Private, Promise, Notifier) {
var _ = require('lodash');
var pendingRequests = Private(require('components/courier/_pending_requests'));

var notify = new Notifier({
location: 'Courier Fetch'
});

function forEachStrategy(requests, block) {
block = Promise.method(block);
var sets = [];

requests.forEach(function (req) {
var strategy = req.strategy;
var set = _.find(sets, { 0: strategy });
if (set) set[1].push(req);
else sets.push([strategy, [req]]);
});

return Promise.all(sets.map(function (set) {
return (function fetch(requests, strategy) {

return block(requests, strategy)
.then(function checkForIncompleteRequests(result) {
if (_.isFunction(strategy.getIncompleteRequests)) {
var incomplete = strategy.getIncompleteRequests(pendingRequests);
if (incomplete.length) {
return fetch(incomplete, strategy);
}
}
return result;
});

}(set[1], set[0]));
}))
.catch(notify.fatal);
}

return forEachStrategy;
};
});
17 changes: 17 additions & 0 deletions src/kibana/components/courier/fetch/_init_request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
define(function (require) {
return function FetchInitRequestProvider() {
var moment = require('moment');

function initRequest(req) {
if (req.source.activeFetchCount) {
req.source.activeFetchCount += 1;
} else {
req.source.activeFetchCount = 1;
}

req.moment = moment();
}

return initRequest;
};
});
30 changes: 30 additions & 0 deletions src/kibana/components/courier/fetch/_merge_duplicate_requests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
define(function (require) {
return function FetchMergeDuplicateRequests() {

function mergeDuplicateRequests(requests) {
// dedupe requests
var index = {};
return requests.splice(0).filter(function (req) {
var iid = req.source._instanceid;
if (!index[iid]) {
// this request is unique so far
index[iid] = req;
// keep the request
return true;
}

// the source was requested at least twice
var uniq = index[iid];
if (uniq._merged) {
// already setup the merged list
uniq._merged.push(req);
} else {
// put all requests into this array and itterate them on response
uniq._merged = [uniq, req];
}
});
}

return mergeDuplicateRequests;
};
});
29 changes: 29 additions & 0 deletions src/kibana/components/courier/fetch/_request_complete.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
define(function (require) {
return function FetchRequestCompleteProvider(Private, Notifier) {
var errors = require('errors');
var requestErrorHandler = Private(require('components/courier/fetch/_request_error_handler'));

var notify = new Notifier({
location: 'Courier Fetch'
});

function reqComplete(req, resp) {
if (resp.timed_out) {
notify.warning(new errors.SearchTimeout());
}

req.complete = true;
req.resp = resp;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;

if (resp.error) {
return requestErrorHandler(req, new errors.FetchFailure(resp));
}

return req.strategy.resolveRequest(req, resp);
}

return reqComplete;
};
});
Loading

0 comments on commit f137a65

Please sign in to comment.