Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add useful logging #33

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.esdomain;

import com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexNotFoundException;
import org.json.JSONObject;

Expand All @@ -38,11 +40,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.settings.Settings.EMPTY;

/**
* Local cluster state information which may be stale but help avoid blocking operation in NIO thread.
Expand All @@ -67,6 +71,9 @@ public class LocalClusterState {
/** Current cluster state on local node */
private ClusterService clusterService;

/** Sql specific settings in ES cluster settings */
private SqlSettings sqlSettings;

/** Index name expression resolver to get concrete index name */
private IndexNameExpressionResolver resolver;

Expand All @@ -76,6 +83,9 @@ public class LocalClusterState {
*/
private final Cache<Tuple<List<String>, List<String>>, IndexMappings> cache;

/** Latest setting value for each registered key. Thread-safe is required. */
private final Map<String, Object> settingMap = new ConcurrentHashMap<>();
dai-chen marked this conversation as resolved.
Show resolved Hide resolved


public static synchronized LocalClusterState state() {
if (INSTANCE == null) {
Expand Down Expand Up @@ -103,6 +113,20 @@ public void setClusterService(ClusterService clusterService) {
});
}

public void setSqlSettings(SqlSettings sqlSettings) {
this.sqlSettings = sqlSettings;
for (Setting<?> setting : sqlSettings.getSettings()) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(
setting,
newVal -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting value of [{}] changed to [{}]", setting.getKey(), newVal);
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
}
settingMap.put(setting.getKey(), newVal);
});
}
}

public void setResolver(IndexNameExpressionResolver resolver) {
this.resolver = resolver;
}
Expand All @@ -111,6 +135,18 @@ private LocalClusterState() {
cache = CacheBuilder.newBuilder().maximumSize(100).build();
}

/**
* Get setting value by key. Return default value if not configured explicitly.
*
* @param key setting key registered during plugin launch.
* @return setting value or default
*/
@SuppressWarnings("unchecked")
public <T> T getSettingValue(String key) {
Objects.requireNonNull(sqlSettings, "SQL setting is null");
return (T) settingMap.getOrDefault(key, sqlSettings.getSetting(key).getDefault(EMPTY));
}

/** Get field mappings by index expressions. All types and fields are included in response. */
public IndexMappings getFieldMappings(String[] indices) {
return getFieldMappings(indices, ALL_TYPES, ALL_FIELDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package com.amazon.opendistroforelasticsearch.sql.executor;

import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState;
import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException;
import com.amazon.opendistroforelasticsearch.sql.executor.format.ErrorMessage;
import com.amazon.opendistroforelasticsearch.sql.query.QueryAction;
import com.amazon.opendistroforelasticsearch.sql.query.join.BackOffRetryStrategy;
import org.apache.logging.log4j.LogManager;
Expand All @@ -28,10 +30,13 @@
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.function.Predicate;

import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_SLOWLOG;
import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;

/**
* A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread.
Expand Down Expand Up @@ -66,17 +71,17 @@ public class AsyncRestExecutor implements RestExecutor {
public void execute(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) throws Exception {
if (isBlockingAction(queryAction) && isRunningInTransportThread()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Async blocking query action [{}] for executor [{}] in current thread [{}]",
name(executor), name(queryAction), Thread.currentThread().getName());
LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]",
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
requestId(queryAction), name(executor), name(queryAction), Thread.currentThread().getName());
}
async(client, params, queryAction, channel);
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug("Continue running query action [{}] for executor [{}] in current thread [{}]",
name(executor), name(queryAction), Thread.currentThread().getName());
LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]",
requestId(queryAction), name(executor), name(queryAction), Thread.currentThread().getName());
}
executor.execute(client, params, queryAction, channel);
sync(client, params, queryAction, channel);
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -97,24 +102,56 @@ private boolean isRunningInTransportThread() {
private void async(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) {
// Run given task in thread pool asynchronously
client.threadPool().schedule(
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME,
() -> {
try {
executor.execute(client, params, queryAction, channel);
doExecuteWithTimeMeasured(client, params, queryAction, channel);
} catch (IOException | SqlParseException e) {
LOG.warn("[MCB] async task got an IO/SQL exception: {}", e.getMessage());
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", requestId(queryAction), e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
LOG.warn("[MCB] async task got a runtime exception: {}", e.getMessage());
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", requestId(queryAction), e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE, "Memory circuit is broken."));
} catch (Throwable t) {
LOG.warn("[MCB] async task got an unknown throwable: {}", t.getMessage());
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", requestId(queryAction), t.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, String.valueOf(t.getMessage())));
} finally {
BackOffRetryStrategy.releaseMem(executor);
}
});
},
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME);
}

private void sync(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) {
try {
doExecuteWithTimeMeasured(client, params, queryAction, channel);
}
catch (Exception e) {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
LOG.error(String.format("[%s] Failed during query execution in executor", requestId(queryAction)), e);
channel.sendResponse(new BytesRestResponse(SERVICE_UNAVAILABLE, new ErrorMessage(e, SERVICE_UNAVAILABLE.getStatus()).toString()));
}
}

/** Time the real execution of Executor and log slow query for troubleshooting */
private void doExecuteWithTimeMeasured(Client client,
Map<String, String> params,
QueryAction action,
RestChannel channel) throws Exception {
Instant startTime = Instant.now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instant.now() can have pretty big errors depending on time drift/skew/last time of sync of system time/etc. Please use System.nanoTime() for anything where you need to measure a period and not get a specific point in time (https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instant.now() can have pretty big errors depending on time drift/skew/last time of sync of system time/etc. Please use System.nanoTime() for anything where you need to measure a period and not get a specific point in time (https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--).

Thanks for the suggestion! I wasn't aware of this. Could you pass me some reference for the problem you mentioned? Because we only care about the delta value returned from Duration.between, is it possible to be wrong too in some case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one touches upon the problem a little bit: https://www.baeldung.com/java-measure-elapsed-time. Most of the sources say that resolution of nanoTime is better, but that's not the issue. The real difference is that System.currentTimeMillis() and Instant.now() (which relies on the former) both use wall-clock time from the JVM, which in its turn gets it from OS. Depending on what protocol the machine is using to synchronize the time, it can lag behind or go far ahead and then during sync get a huge change (either negative or positive). The System.nanoTime() is based on software-based counter (not related to wall clock), which only increases, and does not need to be synced with any other machine/server/clock, so the diffs of values from nanoTime will always be pretty accurate (I guess as long as you are using it on the same thread).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one touches upon the problem a little bit: https://www.baeldung.com/java-measure-elapsed-time. Most of the sources say that resolution of nanoTime is better, but that's not the issue. The real difference is that System.currentTimeMillis() and Instant.now() (which relies on the former) both use wall-clock time from the JVM, which in its turn gets it from OS. Depending on what protocol the machine is using to synchronize the time, it can lag behind or go far ahead and then during sync get a huge change (either negative or positive). The System.nanoTime() is based on software-based counter (not related to wall clock), which only increases, and does not need to be synced with any other machine/server/clock, so the diffs of values from nanoTime will always be pretty accurate (I guess as long as you are using it on the same thread).

Thanks! Good to know the subtle difference. I think Instant and Duration is sufficient in our case. Because we are not benchmarking, we just need a rough elapsed time in second(s). nanoTime() may be more accurate in some edge case, though it's hassle to convert it to sec or millisec for logging or comparison. Hopefully this makes sense to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just two lines that need to be changed:
#140 should be final long startNanos = System.nanoTime();
#145 should be `final Duration elapsed = Duration.ofNanos(System.nanoTime() - startNanos);

It's up to you if you want to keep it this way, not a blocker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just two lines that need to be changed:
#140 should be final long startNanos = System.nanoTime();
#145 should be `final Duration elapsed = Duration.ofNanos(System.nanoTime() - startNanos);

It's up to you if you want to keep it this way, not a blocker.

Easier than I thought. Testing it. Will push very soon. Thanks!

try {
executor.execute(client, params, action, channel);
}
finally {
Duration elapsed = Duration.between(startTime, Instant.now());
int slowLogThreshold = LocalClusterState.state().getSettingValue(QUERY_SLOWLOG);
if (elapsed.getSeconds() > slowLogThreshold) {
LOG.warn("[{}] Slow query: elapsed={} (ms)", requestId(action), elapsed.toMillis());
}
}
}

private String requestId(QueryAction action) {
return action.getSqlRequest().getId();
}

private String name(Object object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;

public class ErrorMessage {
Expand All @@ -37,10 +38,16 @@ public ErrorMessage(Exception exception, int status) {

private String fetchType() { return exception.getClass().getSimpleName(); }

private String fetchReason() { return emptyStringIfNull(exception.getLocalizedMessage()); }
private String fetchReason() {
return status == RestStatus.BAD_REQUEST.getStatus()
? "SQL in the query request is not valid"
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
: "There was internal problem at backend";
}

private String fetchDetails() {
return exception.toString();
// Some exception prints internal information (full class name) which is security concern
//return exception.toString();
return emptyStringIfNull(exception.getLocalizedMessage());
}

private String emptyStringIfNull(String str) { return str != null ? str : ""; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
SqlRequest sqlRequest = SqlRequest.NULL;
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
try {
final SqlRequest sqlRequest = SqlRequestFactory.getSqlRequest(request);
sqlRequest = SqlRequestFactory.getSqlRequest(request);
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("[{}] Incoming request {}: {}", sqlRequest.getId(), request.uri(), sqlRequest.getSql());

final QueryAction queryAction = new SearchDao(client).explain(sqlRequest.getSql());
queryAction.setSqlRequest(sqlRequest);

Expand All @@ -88,7 +91,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> restExecutor.execute(client, additionalParams, queryAction, channel);
}
} catch (Exception e) {
LOG.error("Failed during Query Action.", e);
LOG.error(String.format("[%s] Failed during query execution", sqlRequest.getId()), e);
return reportError(e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -47,6 +48,9 @@

public class SqlPlug extends Plugin implements ActionPlugin {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

/** Sql plugin specific settings in ES cluster settings */
private final SqlSettings sqlSettings = new SqlSettings();

public SqlPlug() {
}

Expand All @@ -69,6 +73,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setSqlSettings(sqlSettings);
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry);
}

Expand All @@ -85,4 +90,8 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
);
}

@Override
public List<Setting<?>> getSettings() {
return sqlSettings.getSettings();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.plugin;

import org.elasticsearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Setting.Property.Dynamic;
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;

/**
* SQL plugin settings
*/
public class SqlSettings {

/**
* Get plugin settings stored in cluster setting. Why not use ES slow log settings consistently?
* 1) It's per-index setting.
* 2) It has separate setting for Query and Fetch phase which are all ES internal concepts.
*/
public static final String QUERY_SLOWLOG = "opendistro.sql.query.slowlog";

private final Map<String, Setting<?>> settings;

public SqlSettings() {
Map<String, Setting<?>> settings = new HashMap<>();
settings.put(QUERY_SLOWLOG, Setting.intSetting(QUERY_SLOWLOG, 2, NodeScope, Dynamic));

this.settings = unmodifiableMap(settings);
}

public SqlSettings(Map<String, Setting<?>> settings) {
this.settings = unmodifiableMap(settings);
}

public Setting<?> getSetting(String key) {
if (settings.containsKey(key)) {
return settings.get(key);
}
throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

public List<Setting<?>> getSettings() {
return new ArrayList<>(settings.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class QueryAction {

protected Query query;
protected Client client;
protected SqlRequest sqlRequest;
protected SqlRequest sqlRequest = SqlRequest.NULL;

public QueryAction(Client client, Query query) {
this.client = client;
Expand All @@ -58,6 +58,8 @@ public QueryAction(Client client, Query query) {

public void setSqlRequest(SqlRequest sqlRequest) { this.sqlRequest = sqlRequest; }

public SqlRequest getSqlRequest() { return sqlRequest; }

protected void updateRequestWithCollapse(Select select, SearchRequestBuilder request) throws SqlParseException {
JsonFactory jsonFactory = new JsonFactory();
for (Hint hint : select.getHints()) {
Expand Down
Loading