Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add PPL stats endpoint (#706)
Browse files Browse the repository at this point in the history
* Add PPL stats endpoint

* update
  • Loading branch information
penghuo authored Aug 25, 2020
1 parent 126eb4e commit 684690a
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.ppl;

import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.hamcrest.Matchers.equalTo;

import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.MetricName;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;

public class MetricsIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
loadIndex(Index.BANK);
}

@Test
public void requestCount() throws IOException, InterruptedException {
int beforeQueries = pplRequestTotal();
multiQueries(3);
TimeUnit.SECONDS.sleep(2L);

assertThat(pplRequestTotal(), equalTo(beforeQueries + 3));
}

private void multiQueries(int n) throws IOException {
for (int i = 0; i < n; ++i) {
executeQuery(String.format("source=%s | where age = 31 + 1 | fields age", TEST_INDEX_BANK));
}
}

private Request makeStatRequest() {
return new Request(
"GET", "/_opendistro/_ppl/stats"
);
}

private int pplRequestTotal() throws IOException {
JSONObject jsonObject = new JSONObject(executeStatRequest(makeStatRequest()));
return jsonObject.getInt(MetricName.PPL_REQ_TOTAL.getName());
}

private String executeStatRequest(final Request request) throws IOException {
Response sqlResponse = client().performRequest(request);

Assert.assertTrue(sqlResponse.getStatusLine().getStatusCode() == 200);

InputStream is = sqlResponse.getEntity().getContent();
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String line = null;
while ((line = br.readLine()) != null) {
sb.append(line);
}
}

return sb.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static Metric createMetric(MetricName name) {
case REQ_TOTAL:
case DEFAULT_CURSOR_REQUEST_TOTAL:
case DEFAULT:
case PPL_REQ_TOTAL:
return new NumericMetric<>(name.getName(), new BasicCounter());
case CIRCUIT_BREAKER:
return new GaugeMetric<>(name.getName(), BackOffRetryStrategy.GET_CB_STATE);
Expand All @@ -33,6 +34,9 @@ public static Metric createMetric(MetricName name) {
case FAILED_REQ_COUNT_CUS:
case FAILED_REQ_COUNT_SYS:
case FAILED_REQ_COUNT_CB:
case PPL_REQ_COUNT_TOTAL:
case PPL_FAILED_REQ_COUNT_CUS:
case PPL_FAILED_REQ_COUNT_SYS:
return new NumericMetric<>(name.getName(), new RollingCounter());
default:
return new NumericMetric<>(name.getName(), new BasicCounter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package com.amazon.opendistroforelasticsearch.sql.legacy.metrics;

import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public enum MetricName {
Expand All @@ -29,7 +31,12 @@ public enum MetricName {
DEFAULT_CURSOR_REQUEST_TOTAL("default_cursor_request_total"),
DEFAULT_CURSOR_REQUEST_COUNT_TOTAL("default_cursor_request_count"),
CIRCUIT_BREAKER("circuit_breaker"),
DEFAULT("default");
DEFAULT("default"),

PPL_REQ_TOTAL("ppl_request_total"),
PPL_REQ_COUNT_TOTAL("ppl_request_count"),
PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"),
PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr");

private String name;

Expand All @@ -45,10 +52,18 @@ public static List<String> getNames() {
return Arrays.stream(MetricName.values()).map(v -> v.name).collect(Collectors.toList());
}


private static Set<MetricName> NUMERICAL_METRIC = new ImmutableSet.Builder<MetricName>()
.add(PPL_REQ_TOTAL)
.add(PPL_REQ_COUNT_TOTAL)
.add(PPL_FAILED_REQ_COUNT_SYS)
.add(PPL_FAILED_REQ_COUNT_CUS)
.build();

public boolean isNumerical() {
return this == REQ_TOTAL || this == REQ_COUNT_TOTAL || this == FAILED_REQ_COUNT_SYS
|| this == FAILED_REQ_COUNT_CUS || this == FAILED_REQ_COUNT_CB || this == DEFAULT
|| this == DEFAULT_CURSOR_REQUEST_TOTAL || this == DEFAULT_CURSOR_REQUEST_COUNT_TOTAL;
|| this == FAILED_REQ_COUNT_CUS || this == FAILED_REQ_COUNT_CB || this == DEFAULT
|| this == DEFAULT_CURSOR_REQUEST_TOTAL || this == DEFAULT_CURSOR_REQUEST_COUNT_TOTAL
|| NUMERICAL_METRIC.contains(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.amazon.opendistroforelasticsearch.sql.legacy.plugin.RestSqlStatsAction;
import com.amazon.opendistroforelasticsearch.sql.legacy.plugin.SqlSettings;
import com.amazon.opendistroforelasticsearch.sql.plugin.rest.RestPPLQueryAction;
import com.amazon.opendistroforelasticsearch.sql.plugin.rest.RestPPLStatsAction;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -100,7 +101,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestPPLQueryAction(restController, clusterService, pluginSettings, settings),
new RestSqlAction(settings, clusterService, pluginSettings),
new RestSqlStatsAction(settings, restController),
new RestSqlSettingsAction(settings, restController)
new RestSqlSettingsAction(settings, restController),
new RestPPLStatsAction(settings, restController)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.amazon.opendistroforelasticsearch.sql.exception.QueryEngineException;
import com.amazon.opendistroforelasticsearch.sql.exception.SemanticCheckException;
import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine.QueryResponse;
import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.MetricName;
import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.Metrics;
import com.amazon.opendistroforelasticsearch.sql.legacy.utils.LogUtils;
import com.amazon.opendistroforelasticsearch.sql.plugin.request.PPLQueryRequestFactory;
import com.amazon.opendistroforelasticsearch.sql.ppl.PPLService;
import com.amazon.opendistroforelasticsearch.sql.ppl.config.PPLServiceConfig;
Expand Down Expand Up @@ -97,6 +100,11 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) {
Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment();
Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment();

LogUtils.addRequestId();

if (!pplEnabled.get()) {
return channel -> reportError(channel, new IllegalAccessException(
"Either opendistro.ppl.enabled or rest.action.multi.allow_explicit_index setting is false"
Expand Down Expand Up @@ -145,7 +153,13 @@ public void onResponse(QueryResponse response) {
@Override
public void onFailure(Exception e) {
LOG.error("Error happened during query handling", e);
reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
if (isClientError(e)) {
Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment();
reportError(channel, e, BAD_REQUEST);
} else {
Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment();
reportError(channel, e, SERVICE_UNAVAILABLE);
}
}

private void sendResponse(RestStatus status, String content) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.plugin.rest;

import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;

import com.amazon.opendistroforelasticsearch.sql.legacy.executor.format.ErrorMessageFactory;
import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.Metrics;
import com.amazon.opendistroforelasticsearch.sql.legacy.utils.LogUtils;
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;

/**
* PPL Node level status.
*/
public class RestPPLStatsAction extends BaseRestHandler {

private static final Logger LOG = LogManager.getLogger(RestPPLStatsAction.class);

/**
* API endpoint path.
*/
public static final String PPL_STATS_API_ENDPOINT = "/_opendistro/_ppl/stats";

public RestPPLStatsAction(Settings settings, RestController restController) {
super();
}

@Override
public String getName() {
return "ppl_stats_action";
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.POST, PPL_STATS_API_ENDPOINT),
new Route(RestRequest.Method.GET, PPL_STATS_API_ENDPOINT)
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {

LogUtils.addRequestId();

try {
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK,
Metrics.getInstance().collectToJSON()));
} catch (Exception e) {
LOG.error("Failed during Query PPL STATS Action.", e);

return channel -> channel.sendResponse(new BytesRestResponse(SERVICE_UNAVAILABLE,
ErrorMessageFactory.createErrorMessage(e, SERVICE_UNAVAILABLE.getStatus()).toString()));
}
}
}

0 comments on commit 684690a

Please sign in to comment.