Skip to content

Commit

Permalink
Report date data as a standardized format (opendistro-for-elasticsear…
Browse files Browse the repository at this point in the history
…ch#367)

* expose date fields using a standardized date format

* move date format logic to new class

* add tests for DateTimeFormatter

* added some negative tests

* style fixes

* testing locale fixes

* addressed code review comments and build failures

* post-merge fix

* additional fixes

* get CAST alias info from result set class, rather than cluster state

* remove unused import

* remove unused import

* reduce duplication & reference enum value

* setting default timezone to UTC while parsing date values

* add support for custom & multiple formats

* add case for Kibana flights date data with T but no time field
  • Loading branch information
jordanw-bq authored Feb 28, 2020
1 parent 9769c30 commit 7c57d72
Showing 8 changed files with 1,061 additions and 9 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -223,6 +223,7 @@ dependencies {
compile group: "org.elasticsearch.plugin", name: 'reindex-client', version: "${es_version}"
compile group: 'com.google.guava', name: 'guava', version:'15.0'
compile group: 'org.json', name: 'json', version:'20180813'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'

// ANTLR gradle plugin and runtime dependency
antlr "org.antlr:antlr4:4.7.1"
Original file line number Diff line number Diff line change
@@ -91,7 +91,6 @@ public class LocalClusterState {
*/
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>();


public static synchronized LocalClusterState state() {
if (INSTANCE == null) {
INSTANCE = new LocalClusterState();
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState;
import com.amazon.opendistroforelasticsearch.sql.esdomain.mapping.FieldMappings;
import com.amazon.opendistroforelasticsearch.sql.esdomain.mapping.TypeMappings;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

/**
* Formatter to transform date fields into a consistent format for consumption by clients.
*/
public class DateFieldFormatter {
private static final Logger LOG = LogManager.getLogger(DateFieldFormatter.class);
private static final String FORMAT_JDBC = "yyyy-MM-dd HH:mm:ss.SSS";
private static final String FORMAT_DELIMITER = "\\|\\|";

private static final String FORMAT_DOT_DATE_AND_TIME = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
private static final String FORMAT_DOT_KIBANA_SAMPLE_DATA_LOGS_EXCEPTION = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
private static final String FORMAT_DOT_KIBANA_SAMPLE_DATA_FLIGHTS_EXCEPTION = "yyyy-MM-dd'T'HH:mm:ss";
private static final String FORMAT_DOT_KIBANA_SAMPLE_DATA_FLIGHTS_EXCEPTION_NO_TIME = "yyyy-MM-dd'T'";
private static final String FORMAT_DOT_KIBANA_SAMPLE_DATA_ECOMMERCE_EXCEPTION = "yyyy-MM-dd'T'HH:mm:ssXXX";
private static final String FORMAT_DOT_DATE = DateFormat.getFormatString("date");

private final Map<String, List<String>> dateFieldFormatMap;
private final Map<String, String> fieldAliasMap;
private Set<String> dateColumns;

public DateFieldFormatter(String indexName, List<Schema.Column> columns, Map<String, String> fieldAliasMap) {
this.dateFieldFormatMap = getDateFieldFormatMap(indexName);
this.dateColumns = getDateColumns(columns);
this.fieldAliasMap = fieldAliasMap;
}

@VisibleForTesting
protected DateFieldFormatter(Map<String, List<String>> dateFieldFormatMap,
List<Schema.Column> columns,
Map<String, String> fieldAliasMap) {
this.dateFieldFormatMap = dateFieldFormatMap;
this.dateColumns = getDateColumns(columns);
this.fieldAliasMap = fieldAliasMap;
}

/**
* Apply the JDBC date format ({@code yyyy-MM-dd HH:mm:ss.SSS}) to date values in the current row.
*
* @param rowSource The row in which to format the date values.
*/
public void applyJDBCDateFormat(Map<String, Object> rowSource) {
for (String columnName : dateColumns) {
Object columnOriginalDate = rowSource.get(columnName);
if (columnOriginalDate == null) {
// Don't try to parse null date values
continue;
}

List<String> formats = getFormatsForColumn(columnName);
if (formats == null) {
LOG.warn("Could not determine date formats for column {}; returning original value", columnName);
continue;
}

Date date = parseDateString(formats, columnOriginalDate.toString());
if (date != null) {
rowSource.put(columnName, DateFormat.getFormattedDate(date, FORMAT_JDBC));
break;
} else {
LOG.warn("Could not parse date value; returning original value");
}
}
}

private List<String> getFormatsForColumn(String columnName) {
// Handle special cases for column names
if (fieldAliasMap.get(columnName) != null) {
// Column was aliased, and we need to find the base name for the column
columnName = fieldAliasMap.get(columnName);
} else if (columnName.split("\\.").length == 2) {
// Column is part of a join, and is qualified by the table alias
columnName = columnName.split("\\.")[1];
}
return dateFieldFormatMap.get(columnName);
}

private Set<String> getDateColumns(List<Schema.Column> columns) {
return columns.stream()
.filter(column -> column.getType().equals(Schema.Type.DATE.nameLowerCase()))
.map(Schema.Column::getName)
.collect(Collectors.toSet());
}

private Map<String, List<String>> getDateFieldFormatMap(String indexName) {
LocalClusterState state = LocalClusterState.state();
Map<String, List<String>> formatMap = new HashMap<>();

String[] indices = indexName.split("\\|");
Collection<TypeMappings> typeProperties = state.getFieldMappings(indices)
.allMappings();

for (TypeMappings mappings: typeProperties) {
FieldMappings fieldMappings = mappings.firstMapping();
for (Map.Entry<String, Map<String, Object>> field : fieldMappings.data().entrySet()) {
String fieldName = field.getKey();
Map<String, Object> properties = field.getValue();

if (properties.containsKey("format")) {
formatMap.put(fieldName, getFormatsFromProperties(properties.get("format").toString()));
} else {
// Give all field types a format, since operations such as casts
// can change the output type for a field to `date`.
formatMap.put(fieldName, getFormatsFromProperties("date_optional_time"));
}
}
}

return formatMap;
}

private List<String> getFormatsFromProperties(String formatProperty) {
String[] formats = formatProperty.split(FORMAT_DELIMITER);
return Arrays.asList(formats);
}

private Date parseDateString(List<String> formats, String columnOriginalDate) {
TimeZone originalDefaultTimeZone = TimeZone.getDefault();
Date parsedDate = null;

// Apache Commons DateUtils uses the default TimeZone for the JVM when parsing.
// However, since all dates on Elasticsearch are stored as UTC, we need to
// parse these values using the UTC timezone.
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
for (String columnFormat : formats) {
try {
switch (columnFormat) {
case "date_optional_time":
parsedDate = DateUtils.parseDate(
columnOriginalDate,
FORMAT_DOT_KIBANA_SAMPLE_DATA_LOGS_EXCEPTION,
FORMAT_DOT_KIBANA_SAMPLE_DATA_FLIGHTS_EXCEPTION,
FORMAT_DOT_KIBANA_SAMPLE_DATA_FLIGHTS_EXCEPTION_NO_TIME,
FORMAT_DOT_KIBANA_SAMPLE_DATA_ECOMMERCE_EXCEPTION,
FORMAT_DOT_DATE_AND_TIME,
FORMAT_DOT_DATE);
break;
case "epoch_millis":
parsedDate = new Date(Long.parseLong(columnOriginalDate));
break;
case "epoch_second":
parsedDate = new Date(Long.parseLong(columnOriginalDate) * 1000);
break;
default:
String formatString = DateFormat.getFormatString(columnFormat);
if (formatString == null) {
// Custom format; take as-is
formatString = columnFormat;
}
parsedDate = DateUtils.parseDate(columnOriginalDate, formatString);
}
} catch (ParseException | NumberFormatException e) {
LOG.warn(String.format("Could not parse date string %s as %s", columnOriginalDate, columnFormat));
}
}
// Reset default timezone after parsing
TimeZone.setDefault(originalDefaultTimeZone);

return parsedDate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

public class DateFormat {

private static Map<String, String> formatMap = new HashMap<>();

static {
// Special cases that are parsed separately
formatMap.put("date_optional_time", "");
formatMap.put("epoch_millis", "");
formatMap.put("epoch_second", "");

formatMap.put("basic_date", Date.BASIC_DATE);
formatMap.put("basic_date_time", Date.BASIC_DATE + Time.T + Time.BASIC_TIME + Time.MILLIS + Time.TZ);
formatMap.put("basic_date_time_no_millis", Date.BASIC_DATE + Time.T + Time.BASIC_TIME + Time.TZ);

formatMap.put("basic_ordinal_date", Date.BASIC_ORDINAL_DATE);
formatMap.put("basic_ordinal_date_time",
Date.BASIC_ORDINAL_DATE + Time.T + Time.BASIC_TIME + Time.MILLIS + Time.TZ);
formatMap.put("basic_ordinal_date_time_no_millis", Date.BASIC_ORDINAL_DATE+ Time.T + Time.BASIC_TIME + Time.TZ);

formatMap.put("basic_time", Time.BASIC_TIME + Time.MILLIS + Time.TZ);
formatMap.put("basic_time_no_millis", Time.BASIC_TIME + Time.TZ);

formatMap.put("basic_t_time", Time.T + Time.BASIC_TIME + Time.MILLIS + Time.TZ);
formatMap.put("basic_t_time_no_millis", Time.T + Time.BASIC_TIME + Time.TZ);

formatMap.put("basic_week_date", Date.BASIC_WEEK_DATE);
formatMap.put("basic_week_date_time", Date.BASIC_WEEK_DATE + Time.T + Time.BASIC_TIME + Time.MILLIS + Time.TZ);
formatMap.put("basic_week_date_time_no_millis", Date.BASIC_WEEK_DATE + Time.T + Time.BASIC_TIME + Time.TZ);

formatMap.put("date", Date.DATE);
formatMap.put("date_hour", Date.DATE + Time.T + Time.HOUR);
formatMap.put("date_hour_minute", Date.DATE + Time.T + Time.HOUR_MINUTE);
formatMap.put("date_hour_minute_second", Date.DATE + Time.T + Time.TIME);
formatMap.put("date_hour_minute_second_fraction", Date.DATE + Time.T + Time.TIME + Time.MILLIS);
formatMap.put("date_hour_minute_second_millis", Date.DATE + Time.T + Time.TIME + Time.MILLIS);
formatMap.put("date_time", Date.DATE + Time.T + Time.TIME + Time.MILLIS + Time.TZZ);
formatMap.put("date_time_no_millis", Date.DATE + Time.T + Time.TIME + Time.TZZ);

formatMap.put("hour", Time.HOUR);
formatMap.put("hour_minute", Time.HOUR_MINUTE);
formatMap.put("hour_minute_second", Time.TIME);
formatMap.put("hour_minute_second_fraction", Time.TIME + Time.MILLIS);
formatMap.put("hour_minute_second_millis", Time.TIME + Time.MILLIS);

formatMap.put("ordinal_date", Date.ORDINAL_DATE);
formatMap.put("ordinal_date_time", Date.ORDINAL_DATE + Time.T + Time.TIME + Time.MILLIS + Time.TZZ);
formatMap.put("ordinal_date_time_no_millis", Date.ORDINAL_DATE + Time.T + Time.TIME + Time.TZZ);

formatMap.put("time", Time.TIME + Time.MILLIS + Time.TZZ);
formatMap.put("time_no_millis", Time.TIME + Time.TZZ);

formatMap.put("t_time", Time.T + Time.TIME + Time.MILLIS + Time.TZZ);
formatMap.put("t_time_no_millis", Time.T + Time.TIME + Time.TZZ);

formatMap.put("week_date", Date.WEEK_DATE);
formatMap.put("week_date_time", Date.WEEK_DATE + Time.T + Time.TIME + Time.MILLIS + Time.TZZ);
formatMap.put("week_date_time_no_millis", Date.WEEK_DATE + Time.T + Time.TIME + Time.TZZ);

// Note: input mapping is "weekyear", but output value is "week_year"
formatMap.put("week_year", Date.WEEKYEAR);
formatMap.put("weekyear_week", Date.WEEKYEAR_WEEK);
formatMap.put("weekyear_week_day", Date.WEEK_DATE);

formatMap.put("year", Date.YEAR);
formatMap.put("year_month", Date.YEAR_MONTH);
formatMap.put("year_month_day", Date.DATE);
}

private DateFormat() {
}

public static String getFormatString(String formatName) {
return formatMap.get(formatName);
}

public static String getFormattedDate(java.util.Date date, String dateFormat) {
Instant instant = date.toInstant();
ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, ZoneId.of("Etc/UTC"));
return zdt.format(DateTimeFormatter.ofPattern(dateFormat));
}

private static class Date {
static String BASIC_DATE = "yyyyMMdd";
static String BASIC_ORDINAL_DATE = "yyyyDDD";
static String BASIC_WEEK_DATE = "YYYY'W'wwu";

static String DATE = "yyyy-MM-dd";
static String ORDINAL_DATE = "yyyy-DDD";

static String YEAR = "yyyy";
static String YEAR_MONTH = "yyyy-MM";

static String WEEK_DATE = "YYYY-'W'ww-u";
static String WEEKYEAR = "YYYY";
static String WEEKYEAR_WEEK = "YYYY-'W'ww";
}

private static class Time {
static String T = "'T'";
static String BASIC_TIME = "HHmmss";
static String TIME = "HH:mm:ss";

static String HOUR = "HH";
static String HOUR_MINUTE = "HH:mm";

static String MILLIS = ".SSS";
static String TZ = "Z";
static String TZZ = "XX";
}
}
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ private ResultSet loadResultSet(Client client, QueryStatement queryStatement, Ob
if (queryStatement instanceof Delete) {
return new DeleteResultSet(client, (Delete) queryStatement, queryResult);
} else if (queryStatement instanceof Query) {
return new SelectResultSet(client, (Query) queryStatement, queryResult, scriptColumnType);
return new SelectResultSet(client, (Query) queryStatement, queryResult, scriptColumnType, formatType);
} else if (queryStatement instanceof IndexStatement) {
IndexStatement statement = (IndexStatement) queryStatement;
StatementType statementType = statement.getStatementType();
Loading

0 comments on commit 7c57d72

Please sign in to comment.