Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore ES indexes without mappings #4535

Merged
merged 1 commit into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.BaseEncoding;
import io.airlift.json.ObjectMapperProvider;
import io.prestosql.elasticsearch.client.ElasticsearchClient;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -339,13 +341,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
}

ImmutableList.Builder<SchemaTableName> result = ImmutableList.builder();
Set<String> indexes = ImmutableSet.copyOf(client.getIndexes());

client.getIndexes().stream()
indexes.stream()
.map(index -> new SchemaTableName(this.schemaName, index))
.forEach(result::add);

client.getAliases().stream()
.map(index -> new SchemaTableName(this.schemaName, index))
client.getAliases().entrySet().stream()
.filter(entry -> indexes.contains(entry.getKey()))
.flatMap(entry -> entry.getValue().stream()
.map(alias -> new SchemaTableName(this.schemaName, alias)))
.forEach(result::add);

return result.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,22 @@ private int shardPreference(SearchShardsResponse.Shard left, SearchShardsRespons

public List<String> getIndexes()
{
return doRequest("/_cat/indices?h=index&format=json&s=index:asc", body -> {
return doRequest("/_cat/indices?h=index,docs.count,docs.deleted&format=json&s=index:asc", body -> {
try {
ImmutableList.Builder<String> result = ImmutableList.builder();
JsonNode root = OBJECT_MAPPER.readTree(body);
for (int i = 0; i < root.size(); i++) {
result.add(root.get(i).get("index").asText());
String index = root.get(i).get("index").asText();
// make sure the index has mappings we can use to derive the schema
int docsCount = root.get(i).get("docs.count").asInt();
int deletedDocsCount = root.get(i).get("docs.deleted").asInt();
if (docsCount == 0 && deletedDocsCount == 0) {
// without documents, the index won't have any dynamic mappings, but maybe there are some explicit ones
if (getIndexMetadata(index).getSchema().getFields().isEmpty()) {
continue;
}
}
result.add(index);
}
return result.build();
}
Expand All @@ -462,18 +472,21 @@ public List<String> getIndexes()
});
}

public List<String> getAliases()
public Map<String, List<String>> getAliases()
{
return doRequest("/_aliases", body -> {
try {
ImmutableList.Builder<String> result = ImmutableList.builder();
ImmutableMap.Builder<String, List<String>> result = ImmutableMap.builder();
JsonNode root = OBJECT_MAPPER.readTree(body);

Iterator<JsonNode> elements = root.elements();
Iterator<Map.Entry<String, JsonNode>> elements = root.fields();
while (elements.hasNext()) {
JsonNode element = elements.next();
JsonNode aliases = element.get("aliases");
result.addAll(aliases.fieldNames());
Map.Entry<String, JsonNode> element = elements.next();
JsonNode aliases = element.getValue().get("aliases");
Iterator<String> aliasNames = aliases.fieldNames();
if (aliasNames.hasNext()) {
result.put(element.getKey(), ImmutableList.copyOf(aliasNames));
}
}
return result.build();
}
Expand All @@ -493,6 +506,9 @@ public IndexMetadata getIndexMetadata(String index)
.elements().next()
.get("mappings");

if (!mappings.elements().hasNext()) {
return new IndexMetadata(new IndexMetadata.ObjectType(ImmutableList.of()));
}
if (!mappings.has("properties")) {
// Older versions of ElasticSearch supported multiple "type" mappings
// for a given index. Newer versions support only one and don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public abstract class BaseElasticsearchSmokeTest
extends AbstractTestIntegrationSmokeTest
Expand Down Expand Up @@ -818,6 +820,56 @@ public void testPassthroughQuery()
"Elasticsearch query for 'orders' is not valid JSON");
}

@Test
public void testEmptyIndexWithMappings()
throws IOException
{
String indexName = "test_empty_index_with_mappings";

@Language("JSON")
String mappings = "" +
"{" +
" \"properties\": { " +
" \"dummy_column\": { \"type\": \"long\" }" +
" }" +
"}";

createIndex(indexName, mappings);

assertQuery(format("SELECT column_name FROM information_schema.columns WHERE table_name = '%s'", indexName), "VALUES ('dummy_column')");
assertTrue(computeActual("SHOW TABLES").getOnlyColumnAsSet().contains(indexName));
assertQueryReturnsEmptyResult("SELECT * FROM " + indexName);
}

@Test
public void testEmptyIndexNoMappings()
throws IOException
{
String indexName = "test_empty_index";

createIndex(indexName);
assertTableDoesNotExist(indexName);
}

@Test
public void testEmptyAliasNoMappings()
throws IOException
{
String indexName = "test_empty_index_for_alias";
String aliasName = "test_empty_alias";

createIndex(indexName);
addAlias(indexName, aliasName);
assertTableDoesNotExist(aliasName);
}

private void assertTableDoesNotExist(String name)
{
assertQueryReturnsEmptyResult(format("SELECT * FROM information_schema.columns WHERE table_name = '%s'", name));
assertFalse(computeActual("SHOW TABLES").getOnlyColumnAsSet().contains(name));
assertQueryFails("SELECT * FROM " + name, ".*Table 'elasticsearch.tpch." + name + "' does not exist");
}

protected abstract String indexEndpoint(String index, String docId);

private void index(String index, Map<String, Object> document)
Expand All @@ -840,6 +892,12 @@ private void addAlias(String index, String alias)

protected abstract String indexMapping(@Language("JSON") String properties);

private void createIndex(String indexName)
throws IOException
{
client.getLowLevelClient().performRequest("PUT", "/" + indexName);
}

private void createIndex(String indexName, @Language("JSON") String properties)
throws IOException
{
Expand Down