Skip to content

Commit

Permalink
fix(elasticsearch): time_zone setting does not work for cast datetime…
Browse files Browse the repository at this point in the history
… expressions (#17048)

* fix(elasticsearch): cast does not take effect for time zone settings

* test(elasticsearch): add test

* fix(test): fix typo

* docs(elasticsearch): add annotation

* docs(elasticsearch): add time_zone desc

* docs(elasticsearch): fix typo

* refactor(db_engine): change convert_dttm signature

* fix(test): fix test

* fix(es): add try catch

* fix(test): fix caplog

* fix(test): fix typo
  • Loading branch information
aniaan authored Nov 25, 2021
1 parent cf3f0e5 commit 5a1c681
Show file tree
Hide file tree
Showing 33 changed files with 219 additions and 61 deletions.
20 changes: 20 additions & 0 deletions docs/src/pages/docs/Connecting to Databases/elasticsearch.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,23 @@ POST /_aliases
```

Then register your table with the alias name logstasg_all

**Time zone**

By default, Superset uses UTC time zone for elasticsearch query. If you need to specify a time zone,
please edit your Database and enter the settings of your specified time zone in the Other > ENGINE PARAMETERS:


```
{
"connect_args": {
"time_zone": "Asia/Shanghai"
}
}
```

Another issue to note about the time zone problem is that before elasticsearch7.8, if you want to convert a string into a `DATETIME` object,
you need to use the `CAST` function,but this function does not support our `time_zone` setting. So it is recommended to upgrade to the version after elasticsearch7.8.
After elasticsearch7.8, you can use the `DATETIME_PARSE` function to solve this problem.
The DATETIME_PARSE function is to support our `time_zone` setting, and here you need to fill in your elasticsearch version number in the Other > VERSION setting.
the superset will use the `DATETIME_PARSE` function for conversion.
29 changes: 20 additions & 9 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,23 @@ def is_temporal(self) -> bool:
def db_engine_spec(self) -> Type[BaseEngineSpec]:
return self.table.db_engine_spec

@property
def db_extra(self) -> Dict[str, Any]:
return self.table.database.get_extra()

@property
def type_generic(self) -> Optional[utils.GenericDataType]:
if self.is_dttm:
return GenericDataType.TEMPORAL
column_spec = self.db_engine_spec.get_column_spec(self.type)
column_spec = self.db_engine_spec.get_column_spec(
self.type, db_extra=self.db_extra
)
return column_spec.generic_type if column_spec else None

def get_sqla_col(self, label: Optional[str] = None) -> Column:
label = label or self.column_name
db_engine_spec = self.db_engine_spec
column_spec = db_engine_spec.get_column_spec(self.type)
column_spec = db_engine_spec.get_column_spec(self.type, db_extra=self.db_extra)
type_ = column_spec.sqla_type if column_spec else None
if self.expression:
tp = self.table.get_template_processor()
Expand Down Expand Up @@ -332,7 +338,9 @@ def get_timestamp_expression(

pdf = self.python_date_format
is_epoch = pdf in ("epoch_s", "epoch_ms")
column_spec = self.db_engine_spec.get_column_spec(self.type)
column_spec = self.db_engine_spec.get_column_spec(
self.type, db_extra=self.db_extra
)
type_ = column_spec.sqla_type if column_spec else DateTime
if not self.expression and not time_grain and not is_epoch:
sqla_col = column(self.column_name, type_=type_)
Expand All @@ -357,7 +365,11 @@ def dttm_sql_literal(
],
) -> str:
"""Convert datetime object to a SQL expression string"""
sql = self.db_engine_spec.convert_dttm(self.type, dttm) if self.type else None
sql = (
self.db_engine_spec.convert_dttm(self.type, dttm, db_extra=self.db_extra)
if self.type
else None
)

if sql:
return sql
Expand All @@ -370,10 +382,8 @@ def dttm_sql_literal(
utils.TimeRangeEndpoint.INCLUSIVE,
utils.TimeRangeEndpoint.EXCLUSIVE,
):
tf = (
self.table.database.get_extra()
.get("python_date_format_by_column_name", {})
.get(self.column_name)
tf = self.db_extra.get("python_date_format_by_column_name", {}).get(
self.column_name
)

if tf:
Expand Down Expand Up @@ -1523,10 +1533,11 @@ def _normalize_prequery_result_type(
value = value.item()

column_ = columns_by_name[dimension]
db_extra: Dict[str, Any] = self.database.get_extra()

if column_.type and column_.is_temporal and isinstance(value, str):
sql = self.db_engine_spec.convert_dttm(
column_.type, dateutil.parser.parse(value),
column_.type, dateutil.parser.parse(value), db_extra=db_extra
)

if sql:
Expand Down
4 changes: 3 additions & 1 deletion superset/connectors/sqla/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def get_physical_table_metadata(
db_type = db_engine_spec.column_datatype_to_string(
col["type"], db_dialect
)
type_spec = db_engine_spec.get_column_spec(db_type)
type_spec = db_engine_spec.get_column_spec(
db_type, db_extra=database.get_extra()
)
col.update(
{
"type": db_type,
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class AthenaEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"from_iso8601_date('{dttm.date().isoformat()}')"
Expand Down
9 changes: 5 additions & 4 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType
from superset.utils.hashing import md5_sha_from_str
from superset.utils.memoized import memoized
from superset.utils.network import is_hostname_valid, is_port_open

if TYPE_CHECKING:
Expand Down Expand Up @@ -692,13 +691,14 @@ def df_to_sql(

@classmethod
def convert_dttm( # pylint: disable=unused-argument
cls, target_type: str, dttm: datetime,
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
"""
Convert Python datetime object to a SQL expression
:param target_type: The target type of expression
:param dttm: The datetime object
:param db_extra: The database extra object
:return: The SQL expression
"""
return None
Expand Down Expand Up @@ -1286,10 +1286,10 @@ def is_select_query(cls, parsed_query: ParsedQuery) -> bool:
return parsed_query.is_select()

@classmethod
@memoized
def get_column_spec( # pylint: disable=unused-argument
cls,
native_type: Optional[str],
db_extra: Optional[Dict[str, Any]] = None,
source: utils.ColumnTypeSource = utils.ColumnTypeSource.GET_TABLE,
column_type_mappings: Tuple[
Tuple[
Expand All @@ -1304,6 +1304,7 @@ def get_column_spec( # pylint: disable=unused-argument
Converts native database type to sqlalchemy column type.
:param native_type: Native database typee
:param source: Type coming from the database table or cursor description
:param db_extra: The database extra object
:return: ColumnSpec object
"""
col_types = cls.get_sqla_column_type(
Expand All @@ -1315,7 +1316,7 @@ def get_column_spec( # pylint: disable=unused-argument
# using datetimes
if generic_type == GenericDataType.TEMPORAL:
column_type = literal_dttm_type_factory(
column_type, cls, native_type or ""
column_type, cls, native_type or "", db_extra=db_extra or {}
)
is_dttm = generic_type == GenericDataType.TEMPORAL
return ColumnSpec(
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ class BigQueryEngineSpec(BaseEngineSpec):
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST('{dttm.date().isoformat()}' AS DATE)"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
import logging
from datetime import datetime
from typing import Dict, List, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING

from urllib3.exceptions import NewConnectionError

Expand Down Expand Up @@ -72,7 +72,9 @@ def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
return new_exception(str(exception))

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"toDate('{dttm.date().isoformat()}')"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from typing import Any, Dict, Optional, TYPE_CHECKING

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand Down Expand Up @@ -50,7 +50,9 @@ def epoch_ms_to_dttm(cls) -> str:
return "{col}"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.TIMESTAMP:
return f"{dttm.timestamp() * 1000}"
Expand Down
8 changes: 5 additions & 3 deletions superset/db_engine_specs/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.o

from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.hive import HiveEngineSpec
Expand All @@ -40,8 +40,10 @@ class DatabricksODBCEngineSpec(BaseEngineSpec):
_time_grain_expressions = HiveEngineSpec._time_grain_expressions

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
return HiveEngineSpec.convert_dttm(target_type, dttm)
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
return HiveEngineSpec.convert_dttm(target_type, dttm, db_extra=db_extra)

@classmethod
def epoch_to_dttm(cls) -> str:
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional

from superset.db_engine_specs.base import BaseEngineSpec
from superset.utils import core as utils
Expand Down Expand Up @@ -43,7 +43,9 @@ def epoch_to_dttm(cls) -> str:
return "TO_DATE({col})"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"TO_DATE('{dttm.date().isoformat()}', 'YYYY-MM-DD')"
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional
from urllib import parse

from sqlalchemy.engine.url import URL
Expand Down Expand Up @@ -55,7 +55,9 @@ def epoch_ms_to_dttm(cls) -> str:
return "TO_DATE({col})"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"TO_DATE('{dttm.date().isoformat()}', 'yyyy-MM-dd')"
Expand Down
4 changes: 3 additions & 1 deletion superset/db_engine_specs/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def get_extra_params(database: "Database") -> Dict[str, Any]:
return extra

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"CAST(TIME_PARSE('{dttm.date().isoformat()}') AS DATE)"
Expand Down
37 changes: 34 additions & 3 deletions superset/db_engine_specs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from typing import Dict, Optional, Type
from distutils.version import StrictVersion
from typing import Any, Dict, Optional, Type

from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.exceptions import (
Expand All @@ -25,6 +27,8 @@
)
from superset.utils import core as utils

logger = logging.getLogger()


class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine = "elasticsearch"
Expand Down Expand Up @@ -59,9 +63,34 @@ def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
}

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:

db_extra = db_extra or {}
if target_type.upper() == utils.TemporalType.DATETIME:
es_version = db_extra.get("version")
# The elasticsearch CAST function does not take effect for the time zone
# setting. In elasticsearch7.8 and above, we can use the DATETIME_PARSE
# function to solve this problem.
supports_dttm_parse = False
try:
if es_version:
supports_dttm_parse = StrictVersion(es_version) >= StrictVersion(
"7.8"
)
except Exception as ex: # pylint: disable=broad-except
logger.error("Unexpected error while convert es_version", exc_info=True)
logger.exception(ex)

if supports_dttm_parse:
datetime_formatted = dttm.isoformat(sep=" ", timespec="seconds")
return (
f"""DATETIME_PARSE('{datetime_formatted}', 'yyyy-MM-dd HH:mm:ss')"""
)

return f"""CAST('{dttm.isoformat(timespec="seconds")}' AS DATETIME)"""

return None


Expand All @@ -87,7 +116,9 @@ class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
engine_name = "ElasticSearch (OpenDistro SQL)"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
if target_type.upper() == utils.TemporalType.DATETIME:
return f"""'{dttm.isoformat(timespec="seconds")}'"""
return None
Expand Down
6 changes: 4 additions & 2 deletions superset/db_engine_specs/firebird.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional
from typing import Any, Dict, Optional

from superset.db_engine_specs.base import BaseEngineSpec, LimitMethod
from superset.utils import core as utils
Expand Down Expand Up @@ -70,7 +70,9 @@ def epoch_to_dttm(cls) -> str:
return "DATEADD(second, {col}, CAST('00:00:00' AS TIMESTAMP))"

@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.TIMESTAMP:
dttm_formatted = dttm.isoformat(sep=" ")
Expand Down
Loading

0 comments on commit 5a1c681

Please sign in to comment.