diff --git a/redash/query_runner/elasticsearch.py b/redash/query_runner/elasticsearch.py index 73a286c299..825cec0eb7 100644 --- a/redash/query_runner/elasticsearch.py +++ b/redash/query_runner/elasticsearch.py @@ -102,26 +102,11 @@ def __init__(self, configuration): def _get_mappings(self, url): mappings = {} error = None - try: r = requests.get(url, auth=self.auth) r.raise_for_status() - mappings_data = r.json() - - for index_name in mappings_data: - index_mappings = mappings_data[index_name] - for m in index_mappings.get("mappings", {}): - for property_name in index_mappings["mappings"][m]["properties"]: - property_data = index_mappings["mappings"][m]["properties"][property_name] - if property_name not in mappings: - property_type = property_data.get("type", None) - if property_type: - if property_type in ELASTICSEARCH_TYPES_MAPPING: - mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type] - else: - mappings[property_name] = TYPE_STRING - #raise Exception("Unknown property type: {0}".format(property_type)) + mappings = r.json() except requests.HTTPError as e: logger.exception(e) error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text) @@ -133,6 +118,59 @@ def _get_mappings(self, url): return mappings, error + def _get_query_mappings(self, url): + mappings, error = self._get_mappings(url) + if error: + return mappings, error + + for index_name in mappings_data: + index_mappings = mappings_data[index_name] + for m in index_mappings.get("mappings", {}): + for property_name in index_mappings["mappings"][m]["properties"]: + property_data = index_mappings["mappings"][m]["properties"][property_name] + if property_name not in mappings: + property_type = property_data.get("type", None) + if property_type: + if property_type in ELASTICSEARCH_TYPES_MAPPING: + mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type] + else: + mappings[property_name] = TYPE_STRING + #raise Exception("Unknown property type: {0}".format(property_type)) + + return mappings, error + + def get_schema(self, *args, **kwargs): + def parse_doc(doc, path=None): + '''Recursively parse a doc type dictionary + ''' + path = path or [] + result = [] + for field, description in doc['properties'].items(): + if 'properties' in description: + result.extend(parse_doc(description, path + [field])) + else: + result.append('.'.join(path + [field])) + return result + + schema = {} + url = "{0}/_mappings".format(self.server_url) + mappings, error = self._get_mappings(url) + + if mappings: + # make a schema for each index + # the index contains a mappings dict with documents + # in a hierarchical format + for name, index in mappings.items(): + columns = [] + schema[name] = {'name': name} + for doc, items in index['mappings'].items(): + columns.extend(parse_doc(items)) + + # remove duplicates + # sort alphabetically + schema[name]['columns'] = sorted(set(columns)) + return schema.values() + def _parse_results(self, mappings, result_fields, raw_result, result_columns, result_rows): def add_column_if_needed(mappings, column_name, friendly_name, result_columns, result_columns_index): if friendly_name not in result_columns_index: @@ -290,7 +328,7 @@ def run_query(self, query, user): url = "{0}/{1}/_search?".format(self.server_url, index_name) mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name) - mappings, error = self._get_mappings(mapping_url) + mappings, error = self._get_query_mappings(mapping_url) if error: return None, error #logger.debug(json.dumps(mappings, indent=4)) @@ -369,7 +407,7 @@ def run_query(self, query, user): url = "{0}/{1}/_search".format(self.server_url, index_name) mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name) - mappings, error = self._get_mappings(mapping_url) + mappings, error = self._get_query_mappings(mapping_url) if error: return None, error