Skip to content

Commit

Permalink
Ignore ES indexes without mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
aalbu committed Jul 24, 2020
1 parent 7d613d0 commit 6af3256
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.BaseEncoding;
import io.airlift.json.ObjectMapperProvider;
import io.prestosql.elasticsearch.client.ElasticsearchClient;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -339,13 +341,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
}

ImmutableList.Builder<SchemaTableName> result = ImmutableList.builder();
Set<String> indexes = ImmutableSet.copyOf(client.getIndexes());

client.getIndexes().stream()
indexes.stream()
.map(index -> new SchemaTableName(this.schemaName, index))
.forEach(result::add);

client.getAliases().stream()
.map(index -> new SchemaTableName(this.schemaName, index))
client.getAliases().entrySet().stream()
.filter(entry -> indexes.contains(entry.getKey()))
.flatMap(entry -> entry.getValue().stream()
.map(alias -> new SchemaTableName(this.schemaName, alias)))
.forEach(result::add);

return result.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,22 @@ private int shardPreference(SearchShardsResponse.Shard left, SearchShardsRespons

public List<String> getIndexes()
{
return doRequest("/_cat/indices?h=index&format=json&s=index:asc", body -> {
return doRequest("/_cat/indices?h=index,docs.count,docs.deleted&format=json&s=index:asc", body -> {
try {
ImmutableList.Builder<String> result = ImmutableList.builder();
JsonNode root = OBJECT_MAPPER.readTree(body);
for (int i = 0; i < root.size(); i++) {
result.add(root.get(i).get("index").asText());
String index = root.get(i).get("index").asText();
// make sure the index has mappings we can use to derive the schema
int docsCount = root.get(i).get("docs.count").asInt();
int deletedDocsCount = root.get(i).get("docs.deleted").asInt();
if (docsCount == 0 && deletedDocsCount == 0) {
// without documents, the index won't have any dynamic mappings, but maybe there are some explicit ones
if (getIndexMetadata(index).getSchema().getFields().isEmpty()) {
continue;
}
}
result.add(index);
}
return result.build();
}
Expand All @@ -462,18 +472,21 @@ public List<String> getIndexes()
});
}

public List<String> getAliases()
public Map<String, List<String>> getAliases()
{
return doRequest("/_aliases", body -> {
try {
ImmutableList.Builder<String> result = ImmutableList.builder();
ImmutableMap.Builder<String, List<String>> result = ImmutableMap.builder();
JsonNode root = OBJECT_MAPPER.readTree(body);

Iterator<JsonNode> elements = root.elements();
Iterator<Map.Entry<String, JsonNode>> elements = root.fields();
while (elements.hasNext()) {
JsonNode element = elements.next();
JsonNode aliases = element.get("aliases");
result.addAll(aliases.fieldNames());
Map.Entry<String, JsonNode> element = elements.next();
JsonNode aliases = element.getValue().get("aliases");
Iterator<String> aliasNames = aliases.fieldNames();
if (aliasNames.hasNext()) {
result.put(element.getKey(), ImmutableList.copyOf(aliasNames));
}
}
return result.build();
}
Expand All @@ -493,6 +506,9 @@ public IndexMetadata getIndexMetadata(String index)
.elements().next()
.get("mappings");

if (!mappings.elements().hasNext()) {
return new IndexMetadata(new IndexMetadata.ObjectType(ImmutableList.of()));
}
if (!mappings.has("properties")) {
// Older versions of ElasticSearch supported multiple "type" mappings
// for a given index. Newer versions support only one and don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public abstract class BaseElasticsearchSmokeTest
extends AbstractTestIntegrationSmokeTest
Expand Down Expand Up @@ -818,6 +820,56 @@ public void testPassthroughQuery()
"Elasticsearch query for 'orders' is not valid JSON");
}

@Test
public void testEmptyIndexWithMappings()
throws IOException
{
String indexName = "test_empty_index_with_mappings";

@Language("JSON")
String mappings = "" +
"{" +
" \"properties\": { " +
" \"dummy_column\": { \"type\": \"long\" }" +
" }" +
"}";

createIndex(indexName, mappings);

assertQuery(format("SELECT column_name FROM information_schema.columns WHERE table_name = '%s'", indexName), "VALUES ('dummy_column')");
assertTrue(computeActual("SHOW TABLES").getOnlyColumnAsSet().contains(indexName));
assertQueryReturnsEmptyResult("SELECT * FROM " + indexName);
}

@Test
public void testEmptyIndexNoMappings()
throws IOException
{
String indexName = "test_empty_index";

createIndex(indexName);
assertTableDoesNotExist(indexName);
}

@Test
public void testEmptyAliasNoMappings()
throws IOException
{
String indexName = "test_empty_index_for_alias";
String aliasName = "test_empty_alias";

createIndex(indexName);
addAlias(indexName, aliasName);
assertTableDoesNotExist(aliasName);
}

private void assertTableDoesNotExist(String name)
{
assertQueryReturnsEmptyResult(format("SELECT * FROM information_schema.columns WHERE table_name = '%s'", name));
assertFalse(computeActual("SHOW TABLES").getOnlyColumnAsSet().contains(name));
assertQueryFails("SELECT * FROM " + name, ".*Table 'elasticsearch.tpch." + name + "' does not exist");
}

protected abstract String indexEndpoint(String index, String docId);

private void index(String index, Map<String, Object> document)
Expand All @@ -840,6 +892,12 @@ private void addAlias(String index, String alias)

protected abstract String indexMapping(@Language("JSON") String properties);

private void createIndex(String indexName)
throws IOException
{
client.getLowLevelClient().performRequest("PUT", "/" + indexName);
}

private void createIndex(String indexName, @Language("JSON") String properties)
throws IOException
{
Expand Down

0 comments on commit 6af3256

Please sign in to comment.