Skip to content

Commit

Permalink
Add support for passthrough Elasticsearch queries
Browse files Browse the repository at this point in the history
Cherry-pick of trinodb/trino#3735
This allows running queries over the results of a raw Elasticsearch query.
It extends the syntax of the enhanced ES table names with the following:

SELECT * FROM es.default."<index>$query:<base32-encoded ES query>"

The query is base32-encoded to avoid having to deal with escaping quotes and case
sensitivity issues in table identifiers.

The result of these query tables is a table with a single row and a single column
named "result" of type JSON.

Co-authored-by: Martin Traverso <[email protected]>
Co-authored-by: Manfred Moser <[email protected]>
  • Loading branch information
3 people authored and zhenxiao committed Apr 16, 2021
1 parent 3513d07 commit 8c0b1ff
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 12 deletions.
22 changes: 22 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,28 @@ as part of the table name, separated by a colon. For example:
.. _full text query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax


Pass-through Queries
--------------------

The Elasticsearch connector allows you to embed any valid Elasticsearch query,
that uses the `Elasticsearch Query DSL
<https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_
in your SQL query.

The results can then be used in any SQL statement, wrapping the Elasticsearch
query. The syntax extends the syntax of the enhanced Elasticsearch table names
with the following::

SELECT * FROM es.default."<index>$query:<es-query>"

The Elasticsearch query string ``es-query`` is base32-encoded to avoid having to
deal with escaping quotes and case sensitivity issues in table identifiers.

The result of these query tables is a table with a single row and a single
column named ``result`` of type VARCHAR. It contains the JSON payload returned
by Elasticsearch, and can be processed with the :doc:`built-in JSON functions
</functions/json>`.

AWS Authorization
-----------------

Expand Down
12 changes: 12 additions & 0 deletions presto-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,19 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.elasticsearch;

import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.StandardTypes;
Expand All @@ -33,11 +34,15 @@
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;

import javax.inject.Inject;

Expand All @@ -58,14 +63,25 @@
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.QUERY;
import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.SCAN;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class ElasticsearchMetadata
implements ConnectorMetadata
{
private static final ObjectMapper JSON_PARSER = new JsonObjectMapperProvider().get();

private static final String PASSTHROUGH_QUERY_SUFFIX = "$query";
private final Map<String, ColumnHandle> queryTableColumns;
private final ColumnMetadata queryResultColumnMetadata;

private final ElasticsearchClient client;
private final String schemaName;
private final Type ipAddressType;
Expand All @@ -76,7 +92,18 @@ public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client
requireNonNull(config, "config is null");
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
this.client = requireNonNull(client, "client is null");
requireNonNull(config, "config is null");
this.schemaName = config.getDefaultSchema();

Type jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
queryResultColumnMetadata = ColumnMetadata.builder()
.setName("result")
.setType(jsonType)
.setNullable(true)
.setHidden(false)
.build();

queryTableColumns = ImmutableMap.of("result", new ElasticsearchColumnHandle("result", jsonType, false));
}

@Override
Expand All @@ -93,12 +120,37 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT
String[] parts = tableName.getTableName().split(":", 2);
String table = parts[0];
Optional<String> query = Optional.empty();
ElasticsearchTableHandle.Type type = SCAN;
if (parts.length == 2) {
query = Optional.of(parts[1]);
if (table.endsWith(PASSTHROUGH_QUERY_SUFFIX)) {
table = table.substring(0, table.length() - PASSTHROUGH_QUERY_SUFFIX.length());
byte[] decoded;
try {
decoded = BaseEncoding.base32().decode(parts[1].toUpperCase(ENGLISH));
}
catch (IllegalArgumentException e) {
throw new PrestoException(INVALID_ARGUMENTS, format("Elasticsearch query for '%s' is not base32-encoded correctly", table), e);
}

String queryJson = new String(decoded, UTF_8);
try {
// Ensure this is valid json
JSON_PARSER.readTree(queryJson);
}
catch (JsonProcessingException e) {
throw new PrestoException(INVALID_ARGUMENTS, format("Elasticsearch query for '%s' is not valid JSON", table), e);
}

query = Optional.of(queryJson);
type = QUERY;
}
else {
query = Optional.of(parts[1]);
}
}

if (listTables(session, Optional.of(schemaName)).contains(new SchemaTableName(schemaName, table))) {
return new ElasticsearchTableHandle(schemaName, table, query);
return new ElasticsearchTableHandle(type, schemaName, table, query);
}
}
return null;
Expand All @@ -122,6 +174,12 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table;

if (isPassthroughQuery(handle)) {
return new ConnectorTableMetadata(
new SchemaTableName(handle.getSchema(), handle.getIndex()),
ImmutableList.of(queryResultColumnMetadata));
}
return getTableMetadata(handle.getSchema(), handle.getIndex());
}

Expand Down Expand Up @@ -295,15 +353,39 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
ElasticsearchTableHandle table = (ElasticsearchTableHandle) tableHandle;

if (isPassthroughQuery(table)) {
return queryTableColumns;
}

InternalTableMetadata tableMetadata = makeInternalTableMetadata(tableHandle);
return tableMetadata.getColumnHandles();
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
ElasticsearchColumnHandle handle = (ElasticsearchColumnHandle) columnHandle;
return new ColumnMetadata(handle.getName(), handle.getType());
ElasticsearchTableHandle table = (ElasticsearchTableHandle) tableHandle;
ElasticsearchColumnHandle column = (ElasticsearchColumnHandle) columnHandle;

if (isPassthroughQuery(table)) {
if (column.getName().equals(queryResultColumnMetadata.getName())) {
return queryResultColumnMetadata;
}

throw new IllegalArgumentException(format("Unexpected column for table '%s$query': %s", table.getIndex(), column.getName()));
}

return ColumnMetadata.builder()
.setName(column.getName())
.setType(column.getType())
.build();
}

private static boolean isPassthroughQuery(ElasticsearchTableHandle table)
{
return table.getType().equals(QUERY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
*/
package com.facebook.presto.elasticsearch;

import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.elasticsearch.client.ElasticsearchClient;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
Expand All @@ -27,18 +31,21 @@

import java.util.List;

import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.QUERY;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class ElasticsearchPageSourceProvider
implements ConnectorPageSourceProvider
{
private final ElasticsearchClient client;
private final Type jsonType;

@Inject
public ElasticsearchPageSourceProvider(ElasticsearchClient client)
public ElasticsearchPageSourceProvider(ElasticsearchClient client, TypeManager typeManager)
{
this.client = requireNonNull(client, "client is null");
this.jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
}

@Override
Expand All @@ -55,6 +62,10 @@ public ConnectorPageSource createPageSource(
ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout;
ElasticsearchSplit elasticsearchSplit = (ElasticsearchSplit) split;

if (layoutHandle.getTable().getType().equals(QUERY)) {
return new PassthroughQueryPageSource(client, layoutHandle.getTable(), jsonType);
}

if (columns.isEmpty()) {
return new CountQueryPageSource(client, session, layoutHandle.getTable(), elasticsearchSplit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.QUERY;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

Expand All @@ -49,10 +52,14 @@ public ConnectorSplitSource getSplits(
ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout;
ElasticsearchTableHandle tableHandle = layoutHandle.getTable();

List<ElasticsearchSplit> splits = client.getSearchShards(tableHandle.getIndex()).stream()
.map(shard -> new ElasticsearchSplit(shard.getIndex(), shard.getId(), layoutHandle.getTupleDomain(), shard.getAddress()))
.collect(toImmutableList());

return new FixedSplitSource(splits);
if (tableHandle.getType().equals(QUERY)) {
return new FixedSplitSource(ImmutableList.of(new ElasticsearchSplit(tableHandle.getIndex(), 0, layoutHandle.getTupleDomain(), Optional.empty())));
}
else {
List<ElasticsearchSplit> splits = client.getSearchShards(tableHandle.getIndex()).stream()
.map(shard -> new ElasticsearchSplit(shard.getIndex(), shard.getId(), layoutHandle.getTupleDomain(), shard.getAddress()))
.collect(toImmutableList());
return new FixedSplitSource(splits);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,36 @@
public final class ElasticsearchTableHandle
implements ConnectorTableHandle
{
public enum Type
{
SCAN,
QUERY
}

private final Type type;
private final String schema;
private final String index;
private final Optional<String> query;

@JsonCreator
public ElasticsearchTableHandle(
@JsonProperty("type") Type type,
@JsonProperty("schema") String schema,
@JsonProperty("index") String index,
@JsonProperty("query") Optional<String> query)
{
this.type = requireNonNull(type, "type is null");
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.query = requireNonNull(query, "query is null");
}

@JsonProperty
public Type getType()
{
return type;
}

@JsonProperty
public String getIndex()
{
Expand All @@ -62,7 +77,7 @@ public Optional<String> getQuery()
@Override
public int hashCode()
{
return Objects.hash(schema, index, query);
return Objects.hash(type, schema, index, query);
}

@Override
Expand All @@ -76,7 +91,8 @@ public boolean equals(Object obj)
}

ElasticsearchTableHandle other = (ElasticsearchTableHandle) obj;
return Objects.equals(this.getSchema(), other.getSchema()) &&
return Objects.equals(this.type, other.getType()) &&
Objects.equals(this.getSchema(), other.getSchema()) &&
Objects.equals(this.getIndex(), other.getIndex()) &&
Objects.equals(this.getQuery(), other.getQuery());
}
Expand All @@ -85,6 +101,7 @@ public boolean equals(Object obj)
public String toString()
{
return toStringHelper(this)
.add("type", getType())
.add("schema", getSchema())
.add("index", getIndex())
.add("query", getQuery())
Expand Down
Loading

0 comments on commit 8c0b1ff

Please sign in to comment.