Skip to content

Commit

Permalink
Resolved conflicts (elastic#59467)
Browse files Browse the repository at this point in the history
  • Loading branch information
igoristic authored Mar 5, 2020
1 parent 47094cd commit 542a0d6
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ export class PipelineListing extends Component {
className,
} = this.props;

const sortingOptions = sorting || { field: 'id', direction: 'asc' };
if (sortingOptions.field === 'name') {
sortingOptions.field = 'id';
}
const columns = this.getColumns();

return (
Expand All @@ -155,7 +159,7 @@ export class PipelineListing extends Component {
className={className || 'logstashNodesTable'}
rows={data}
columns={columns}
sorting={sorting}
sorting={sortingOptions}
message={upgradeMessage}
pagination={pagination}
fetchMoreData={fetchMoreData}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { flagSupportedClusters } from './flag_supported_clusters';
import { getMlJobsForCluster } from '../elasticsearch';
import { getKibanasForClusters } from '../kibana';
import { getLogstashForClusters } from '../logstash';
import { getPipelines } from '../logstash/get_pipelines';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';
import { getBeatsForClusters } from '../beats';
import { alertsClustersAggregation } from '../../cluster_alerts/alerts_clusters_aggregation';
import { alertsClusterSearch } from '../../cluster_alerts/alerts_cluster_search';
Expand All @@ -34,7 +34,6 @@ import { checkCcrEnabled } from '../elasticsearch/ccr';
import { getStandaloneClusterDefinition, hasStandaloneClusters } from '../standalone_clusters';
import { getLogTypes } from '../logs';
import { isInCodePath } from './is_in_code_path';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';

/**
* Get all clusters or the cluster associated with {@code clusterUuid} when it is defined.
Expand All @@ -55,7 +54,6 @@ export async function getClustersFromRequest(
} = indexPatterns;

const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const isStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID;

let clusters = [];
Expand Down Expand Up @@ -163,22 +161,14 @@ export async function getClustersFromRequest(
// add logstash data
if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) {
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size);
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, pipelines, [
'logstash_cluster_pipeline_nodes_count',
]);
// add the logstash data to each cluster
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, 1);
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });

// withhold LS overview stats until pipeline metrics have at least one full bucket
if (
logstash.clusterUuid === req.params.clusterUuid &&
clusterPipelineNodesCount.length === 0
) {
// withhold LS overview stats until there is at least 1 pipeline
if (logstash.clusterUuid === clusterUuid && !pipelines.length) {
logstash.stats = {};
}

set(clusters[clusterIndex], 'logstash', logstash.stats);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ function createMetricAggs(metric) {
return metric.aggs;
}

function fetchSeries(
async function fetchSeries(
req,
indexPattern,
metric,
Expand Down Expand Up @@ -142,7 +142,7 @@ function fetchSeries(
}

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return callWithRequest(req, 'search', params);
return await callWithRequest(req, 'search', params);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { get } from 'lodash';
import { get, cloneDeep, last } from 'lodash';
import { filter } from '../pagination/filter';
import { getLogstashPipelineIds } from './get_pipeline_ids';
import { sortPipelines } from './sort_pipelines';
Expand All @@ -31,11 +31,12 @@ export async function getPaginatedPipelines(
req,
lsIndexPattern,
{ clusterUuid, logstashUuid },
metricSet,
{ throughputMetric, nodesCountMetric },
pagination,
sort,
queryText
) {
const sortField = sort.field;
const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const pipelines = await getLogstashPipelineIds(
Expand All @@ -45,61 +46,165 @@ export async function getPaginatedPipelines(
size
);

// `metricSet` defines a list of metrics that are sortable in the UI
// but we don't need to fetch all the data for these metrics to perform
// the necessary sort - we only need the last bucket of data so we
// fetch the last two buckets of data (to ensure we have a single full bucekt),
// then return the value from that last bucket
if (sortField === throughputMetric) {
await getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric);
} else if (sortField === nodesCountMetric) {
await getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric);
}

// Filtering
const filteredPipelines = filter(pipelines, queryText, ['id']); // We only support filtering by id right now

// Sorting
const sortedPipelines = sortPipelines(filteredPipelines, sort);

// Pagination
const pageOfPipelines = paginate(pagination, sortedPipelines);

const response = {
pipelines: await getPipelines(
req,
lsIndexPattern,
pageOfPipelines,
throughputMetric,
nodesCountMetric
),
totalPipelineCount: filteredPipelines.length,
};

return processPipelinesAPIResponse(response, throughputMetric, nodesCountMetric);
}

function processPipelinesAPIResponse(response, throughputMetricKey, nodesCountMetricKey) {
// Clone to avoid mutating original response
const processedResponse = cloneDeep(response);

// Normalize metric names for shared component code
// Calculate latest throughput and node count for each pipeline
processedResponse.pipelines.forEach(pipeline => {
pipeline.metrics = {
throughput: pipeline.metrics[throughputMetricKey],
nodesCount: pipeline.metrics[nodesCountMetricKey],
};

pipeline.latestThroughput = (last(pipeline.metrics.throughput.data) || [])[1];
pipeline.latestNodesCount = (last(pipeline.metrics.nodesCount.data) || [])[1];
});
return processedResponse;
}

async function getPaginatedThroughputData(pipelines, req, lsIndexPattern, throughputMetric) {
const metricSeriesData = Object.values(
await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(
req,
lsIndexPattern,
metricSet,
[throughputMetric],
[],
{
pipeline,
},
2
);

resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
resolve(reduceData(pipeline, data));
});
})
)
);

for (const pipelineAggregationData of metricSeriesData) {
for (const pipeline of pipelines) {
if (pipelineAggregationData.id === pipeline.id) {
for (const metric of metricSet) {
const dataSeries = get(pipelineAggregationData, `metrics.${metric}.data`, [[]]);
pipeline[metric] = dataSeries[dataSeries.length - 1][1];
}
const dataSeries = get(pipelineAggregationData, `metrics.${throughputMetric}.data`, [[]]);
pipeline[throughputMetric] = dataSeries.pop()[1];
}
}
}
}

// Manually apply pagination/sorting/filtering concerns
async function getPaginatedNodesData(pipelines, req, lsIndexPattern, nodesCountMetric) {
const metricSeriesData = await getMetrics(
req,
lsIndexPattern,
[nodesCountMetric],
[],
{ pageOfPipelines: pipelines },
2
);
const { data } = metricSeriesData[nodesCountMetric][0] || [[]];
const pipelinesMap = (data.pop() || [])[1] || {};
if (!Object.keys(pipelinesMap).length) {
return;
}
pipelines.forEach(pipeline => void (pipeline[nodesCountMetric] = pipelinesMap[pipeline.id]));
}

// Filtering
const filteredPipelines = filter(pipelines, queryText, ['id']); // We only support filtering by id right now
async function getPipelines(req, lsIndexPattern, pipelines, throughputMetric, nodesCountMetric) {
const throughputPipelines = await getThroughputPipelines(
req,
lsIndexPattern,
pipelines,
throughputMetric
);
const nodePipelines = await getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric);
const finalPipelines = pipelines.map(({ id }) => {
const pipeline = {
id,
metrics: {
[throughputMetric]: throughputPipelines.find(p => p.id === id).metrics[throughputMetric],
[nodesCountMetric]: nodePipelines.find(p => p.id === id).metrics[nodesCountMetric],
},
};
return pipeline;
});
return finalPipelines;
}

// Sorting
const sortedPipelines = sortPipelines(filteredPipelines, sort);
async function getThroughputPipelines(req, lsIndexPattern, pipelines, throughputMetric) {
const metricsResponse = await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(req, lsIndexPattern, [throughputMetric], [], {
pipeline,
});

// Pagination
const pageOfPipelines = paginate(pagination, sortedPipelines);
resolve(reduceData(pipeline, data));
});
})
);

return Object.values(metricsResponse);
}

async function getNodePipelines(req, lsIndexPattern, pipelines, nodesCountMetric) {
const metricData = await getMetrics(req, lsIndexPattern, [nodesCountMetric], [], {
pageOfPipelines: pipelines,
});

const metricObject = metricData[nodesCountMetric][0];
const pipelinesData = pipelines.map(({ id }) => {
return {
id,
metrics: {
[nodesCountMetric]: {
...metricObject,
data: metricObject.data.map(([timestamp, valueMap]) => [timestamp, valueMap[id]]),
},
},
};
});

return pipelinesData;
}

function reduceData({ id }, data) {
return {
pageOfPipelines,
totalPipelineCount: filteredPipelines.length,
id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
};
}
Loading

0 comments on commit 542a0d6

Please sign in to comment.