Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add useful logging #33

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.esdomain;

import com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexNotFoundException;
import org.json.JSONObject;

Expand All @@ -38,11 +40,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.settings.Settings.EMPTY;

/**
* Local cluster state information which may be stale but help avoid blocking operation in NIO thread.
Expand All @@ -67,6 +71,9 @@ public class LocalClusterState {
/** Current cluster state on local node */
private ClusterService clusterService;

/** Sql specific settings in ES cluster settings */
private SqlSettings sqlSettings;

/** Index name expression resolver to get concrete index name */
private IndexNameExpressionResolver resolver;

Expand All @@ -76,6 +83,9 @@ public class LocalClusterState {
*/
private final Cache<Tuple<List<String>, List<String>>, IndexMappings> cache;

/** Latest setting value for each registered key. Thread-safe is required. */
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>();


public static synchronized LocalClusterState state() {
if (INSTANCE == null) {
Expand Down Expand Up @@ -103,6 +113,20 @@ public void setClusterService(ClusterService clusterService) {
});
}

public void setSqlSettings(SqlSettings sqlSettings) {
this.sqlSettings = sqlSettings;
for (Setting<?> setting : sqlSettings.getSettings()) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(
setting,
newVal -> {
if (LOG.isDebugEnabled()) {
LOG.debug("The value of setting [{}] changed to [{}]", setting.getKey(), newVal);
}
latestSettings.put(setting.getKey(), newVal);
});
}
}

public void setResolver(IndexNameExpressionResolver resolver) {
this.resolver = resolver;
}
Expand All @@ -111,6 +135,18 @@ private LocalClusterState() {
cache = CacheBuilder.newBuilder().maximumSize(100).build();
}

/**
* Get setting value by key. Return default value if not configured explicitly.
*
* @param key setting key registered during plugin launch.
* @return setting value or default
*/
@SuppressWarnings("unchecked")
public <T> T getSettingValue(String key) {
Objects.requireNonNull(sqlSettings, "SQL setting is null");
return (T) latestSettings.getOrDefault(key, sqlSettings.getSetting(key).getDefault(EMPTY));
}

/** Get field mappings by index expressions. All types and fields are included in response. */
public IndexMappings getFieldMappings(String[] indices) {
return getFieldMappings(indices, ALL_TYPES, ALL_FIELDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.executor;

import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState;
import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException;
import com.amazon.opendistroforelasticsearch.sql.query.QueryAction;
import com.amazon.opendistroforelasticsearch.sql.query.join.BackOffRetryStrategy;
Expand All @@ -28,10 +29,11 @@
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.function.Predicate;

import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_SLOWLOG;

/**
* A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread.
Expand Down Expand Up @@ -66,17 +68,17 @@ public class AsyncRestExecutor implements RestExecutor {
public void execute(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) throws Exception {
if (isBlockingAction(queryAction) && isRunningInTransportThread()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Async blocking query action [{}] for executor [{}] in current thread [{}]",
name(executor), name(queryAction), Thread.currentThread().getName());
LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]",
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
requestId(queryAction), name(executor), name(queryAction), Thread.currentThread().getName());
}
async(client, params, queryAction, channel);
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug("Continue running query action [{}] for executor [{}] in current thread [{}]",
name(executor), name(queryAction), Thread.currentThread().getName());
LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]",
requestId(queryAction), name(executor), name(queryAction), Thread.currentThread().getName());
}
executor.execute(client, params, queryAction, channel);
doExecuteWithTimeMeasured(client, params, queryAction, channel);
}
}

Expand All @@ -97,24 +99,46 @@ private boolean isRunningInTransportThread() {
private void async(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) {
// Run given task in thread pool asynchronously
client.threadPool().schedule(
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME,
() -> {
try {
executor.execute(client, params, queryAction, channel);
doExecuteWithTimeMeasured(client, params, queryAction, channel);
} catch (IOException | SqlParseException e) {
LOG.warn("[MCB] async task got an IO/SQL exception: {}", e.getMessage());
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", requestId(queryAction), e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
LOG.warn("[MCB] async task got a runtime exception: {}", e.getMessage());
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", requestId(queryAction), e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE, "Memory circuit is broken."));
} catch (Throwable t) {
LOG.warn("[MCB] async task got an unknown throwable: {}", t.getMessage());
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", requestId(queryAction), t.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, String.valueOf(t.getMessage())));
} finally {
BackOffRetryStrategy.releaseMem(executor);
}
});
},
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME);
}

/** Time the real execution of Executor and log slow query for troubleshooting */
private void doExecuteWithTimeMeasured(Client client,
Map<String, String> params,
QueryAction action,
RestChannel channel) throws Exception {
long startTime = System.nanoTime();
try {
executor.execute(client, params, action, channel);
}
finally {
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
int slowLogThreshold = LocalClusterState.state().getSettingValue(QUERY_SLOWLOG);
if (elapsed.getSeconds() >= slowLogThreshold) {
LOG.warn("[{}] Slow query: elapsed={} (ms)", requestId(action), elapsed.toMillis());
}
}
}

private String requestId(QueryAction action) {
return action.getSqlRequest().getId();
}

private String name(Object object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;

public class ErrorMessage {
Expand All @@ -37,10 +38,16 @@ public ErrorMessage(Exception exception, int status) {

private String fetchType() { return exception.getClass().getSimpleName(); }

private String fetchReason() { return emptyStringIfNull(exception.getLocalizedMessage()); }
private String fetchReason() {
return status == RestStatus.BAD_REQUEST.getStatus()
? "Invalid SQL query"
: "There was internal problem at backend";
}

private String fetchDetails() {
return exception.toString();
// Some exception prints internal information (full class name) which is security concern
//return exception.toString();
return emptyStringIfNull(exception.getLocalizedMessage());
}

private String emptyStringIfNull(String str) { return str != null ? str : ""; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
SqlRequest sqlRequest = SqlRequest.NULL;
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
try {
final SqlRequest sqlRequest = SqlRequestFactory.getSqlRequest(request);
sqlRequest = SqlRequestFactory.getSqlRequest(request);
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("[{}] Incoming request {}: {}", sqlRequest.getId(), request.uri(), sqlRequest.getSql());

final QueryAction queryAction = new SearchDao(client).explain(sqlRequest.getSql());
queryAction.setSqlRequest(sqlRequest);

Expand All @@ -88,7 +91,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> restExecutor.execute(client, additionalParams, queryAction, channel);
}
} catch (Exception e) {
LOG.error("Failed during Query Action.", e);
LOG.error(String.format("[%s] Failed during query execution", sqlRequest.getId()), e);
return reportError(e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -47,6 +48,9 @@

public class SqlPlug extends Plugin implements ActionPlugin {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

/** Sql plugin specific settings in ES cluster settings */
private final SqlSettings sqlSettings = new SqlSettings();

public SqlPlug() {
}

Expand All @@ -69,6 +73,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setSqlSettings(sqlSettings);
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry);
}

Expand All @@ -85,4 +90,8 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
);
}

@Override
public List<Setting<?>> getSettings() {
return sqlSettings.getSettings();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.plugin;

import org.elasticsearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Setting.Property.Dynamic;
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;

/**
* SQL plugin settings
*/
public class SqlSettings {

/**
* Get plugin settings stored in cluster setting. Why not use ES slow log settings consistently?
* 1) It's per-index setting.
* 2) It has separate setting for Query and Fetch phase which are all ES internal concepts.
*/
public static final String QUERY_SLOWLOG = "opendistro.sql.query.slowlog";

private final Map<String, Setting<?>> settings;

public SqlSettings() {
Map<String, Setting<?>> settings = new HashMap<>();
settings.put(QUERY_SLOWLOG, Setting.intSetting(QUERY_SLOWLOG, 2, NodeScope, Dynamic));

this.settings = unmodifiableMap(settings);
}

public SqlSettings(Map<String, Setting<?>> settings) {
this.settings = unmodifiableMap(settings);
}

public Setting<?> getSetting(String key) {
if (settings.containsKey(key)) {
return settings.get(key);
}
throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

public List<Setting<?>> getSettings() {
return new ArrayList<>(settings.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class QueryAction {

protected Query query;
protected Client client;
protected SqlRequest sqlRequest;
protected SqlRequest sqlRequest = SqlRequest.NULL;

public QueryAction(Client client, Query query) {
this.client = client;
Expand All @@ -58,6 +58,8 @@ public QueryAction(Client client, Query query) {

public void setSqlRequest(SqlRequest sqlRequest) { this.sqlRequest = sqlRequest; }

public SqlRequest getSqlRequest() { return sqlRequest; }

protected void updateRequestWithCollapse(Select select, SearchRequestBuilder request) throws SqlParseException {
JsonFactory jsonFactory = new JsonFactory();
for (Hint hint : select.getHints()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,29 @@

import java.io.IOException;
import java.util.Collections;
import java.util.UUID;

public class SqlRequest {

public static final SqlRequest NULL = new SqlRequest("Unassigned", "", null);

/** Unique request ID for tracking */
private final String id;
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

String sql;
JSONObject jsonContent;

public SqlRequest(String sql, JSONObject jsonContent) {

public SqlRequest(String id, String sql, JSONObject jsonContent) {
this.id = id;
this.sql = sql;
this.jsonContent = jsonContent;
}

public SqlRequest(String sql, JSONObject jsonContent) {
this(UUID.randomUUID().toString(), sql, jsonContent);
}

private static boolean isValidJson(String json) {
try {
new JSONObject(json);
Expand All @@ -54,6 +66,8 @@ public JSONObject getJsonContent() {
return this.jsonContent;
}

public String getId() { return id; }

/**
* JSONObject's getJSONObject method will return just the value, this helper method is to extract the key and
* value of 'filter' and return the JSON as a string.
Expand Down
Loading