Skip to content

Commit

Permalink
Test and improve handling of Date_Time with_timezone=False in Postg…
Browse files Browse the repository at this point in the history
…res (#8114)

- Fixes #8049
- Adds tests for handling of Date_Time upload/download in Postgres.
- Adds tests for edge cases of handling of Decimal and Binary types in Postgres.
  • Loading branch information
radeusgd authored Oct 21, 2023
1 parent b1df8b1 commit 0c27839
Show file tree
Hide file tree
Showing 17 changed files with 294 additions and 37 deletions.
17 changes: 12 additions & 5 deletions distribution/lib/Standard/Database/0.0.0-dev/src/Data/Table.enso
Original file line number Diff line number Diff line change
Expand Up @@ -2158,11 +2158,18 @@ type Table

expected_types = self.columns.map .value_type
actual_types = materialized_table.columns.map .value_type
expected_types.zip actual_types . fold materialized_table acc-> types_pair->
expected_type = types_pair.first
actual_type = types_pair.second
if expected_type == actual_type then acc else
Warning.attach (Inexact_Type_Coercion.Warning expected_type actual_type) acc
warnings_builder = Vector.new_builder
expected_types.zip actual_types expected_type-> actual_type->
if expected_type == actual_type then Nothing else
expected_type_kind = Meta.meta expected_type . constructor
actual_type_kind = Meta.meta actual_type . constructor
## We ignore simple approximations that our in-memory backend does - things like adding default
timezone (because we do not have Date_Time without timezone in-memory),
or changing Float32 to Float64 are silently ignored.
However, bigger changes, like a Binary type column getting coerced to Mixed - _will_ still be reported.
if expected_type_kind == actual_type_kind then Nothing else
warnings_builder.append (Inexact_Type_Coercion.Warning expected_type actual_type)
Problem_Behavior.Report_Warning.attach_problems_before warnings_builder.to_vector materialized_table

## PRIVATE
Creates a query corresponding to this table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import Standard.Table.Data.Type.Value_Type.Bits
import Standard.Table.Data.Type.Value_Type.Value_Type
import Standard.Table.Internal.Java_Exports

from project.Errors import Unsupported_Database_Operation

polyglot java import java.sql.ResultSet
polyglot java import java.time.LocalTime as Java_Local_Time

Expand Down Expand Up @@ -145,6 +147,17 @@ date_time_fetcher =
make_builder_from_java_object_builder java_builder
Column_Fetcher.Value fetch_value make_builder

## PRIVATE
A column fetcher that fetches the database column without timezone,
interpreting it as LocalDateTime and then converting to Enso Date_Time by
adding the default system timezone.
local_date_time_fetcher =
fetch_value rs i = JDBCUtils.getLocalDateTimeAsZoned rs i
make_builder initial_size _ =
java_builder = Java_Exports.make_date_time_builder initial_size
make_builder_from_java_object_builder java_builder
Column_Fetcher.Value fetch_value make_builder

## PRIVATE
A default implementation that will assign specialized fetchers for the
Integer, Float, Char and Boolean value types and a fallback for any other
Expand All @@ -160,14 +173,15 @@ default_fetcher_for_value_type value_type =
Value_Type.Boolean -> boolean_fetcher
Value_Type.Time -> time_fetcher
# We currently don't distinguish timestamps without a timezone on the Enso value side.
Value_Type.Date_Time _ -> date_time_fetcher
Value_Type.Date_Time has_timezone ->
if has_timezone then date_time_fetcher else local_date_time_fetcher
# If we can determine that scale = 0
Value_Type.Decimal _ scale ->
is_guaranteed_integer = scale.is_nothing.not && scale <= 0
case is_guaranteed_integer of
True -> big_integer_fetcher
# If we cannot guarantee that the column is integer, we will fall back to Float values, since there is no BigDecimal implementation yet.
# TODO I think we should add a warning somewhere
# In another place this will trigger a Inexact_Type_Coercion warning.
False -> double_fetcher
_ -> fallback_fetcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ type JDBC_Connection

It is the caller's responsibility to call this method from within a
transaction to ensure consistency.
batch_insert : Text -> Statement_Setter -> Materialized_Table -> Integer -> Integer | Nothing -> Nothing
batch_insert self insert_template statement_setter table batch_size row_limit=Nothing =
batch_insert : Text -> Statement_Setter -> Materialized_Table -> Integer -> Vector Value_Type | Nothing -> Integer | Nothing -> Nothing
batch_insert self insert_template statement_setter table batch_size expected_type_hints=Nothing row_limit=Nothing =
In_Transaction.ensure_in_transaction <| self.with_connection java_connection-> handle_sql_errors related_query=insert_template <|
Managed_Resource.bracket (java_connection.prepareStatement insert_template) .close stmt->
log_sql_if_enabled self insert_template
Expand All @@ -244,7 +244,7 @@ type JDBC_Connection
Panic.throw <| Illegal_State.Error "A single update within the batch unexpectedly affected "+affected_rows.to_text+" rows."
0.up_to num_rows . each row_id->
values = columns.map col-> col.at row_id
set_statement_values stmt statement_setter values
set_statement_values stmt statement_setter values expected_type_hints=expected_type_hints
stmt.addBatch
if (row_id+1 % batch_size) == 0 then check_rows stmt.executeBatch batch_size
if (num_rows % batch_size) != 0 then check_rows stmt.executeBatch (num_rows % batch_size)
Expand Down Expand Up @@ -298,9 +298,17 @@ handle_sql_errors ~action related_query=Nothing =
## PRIVATE
Uses the provided `Statement_Setter` strategy to fill holes in a
provided `PreparedStatement`.
set_statement_values stmt statement_setter values =

A list of expected value types can be passed as `expected_type_hints` to add
these hints to the `Statement_Setter` to customize the behaviour for some
specific target value types.
set_statement_values : PreparedStatement -> Statement_Setter -> Vector -> Vector Value_Type | Nothing -> Nothing
set_statement_values stmt statement_setter values expected_type_hints=Nothing =
values.each_with_index ix-> value->
statement_setter.fill_hole stmt (ix + 1) value
type_hint = case expected_type_hints of
Nothing -> Nothing
hints : Vector -> hints.at ix
statement_setter.fill_hole stmt (ix + 1) type_hint value

## PRIVATE
A helper that logs performed SQL queries/statements to a file, if an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ type Postgres_Dialect
True -> Value_Type.Decimal 1000 scale
# If the needed precision is too big, we cannot set it, so we set the precision to unlimited. This loses scale.
False -> Value_Type.Decimal Nothing Nothing
Warning.attach (Inexact_Type_Coercion.Warning base_type new_type) new_type
Warning.attach (Inexact_Type_Coercion.Warning base_type new_type unavailable=False) new_type
False -> base_type
_ -> base_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Postgres_Type_Mapping
type_name = if with_timezone then "timestamptz" else "timestamp"
SQL_Type.Value Types.TIMESTAMP type_name
Value_Type.Binary _ _ ->
## `bytea` is the compact storage for binary data, but it does not allow setting fixed size.
We currently do not use Binary datatype much, so it is hard to decide which one is more appropriate,
but we may also consider using the standard SQL `bit(n)` and `bit varying(n)` types.
See: https://www.postgresql.org/docs/current/datatype-bit.html
SQL_Type.Value Types.BINARY "bytea" precision=max_precision
Value_Type.Mixed ->
Error.throw (Unsupported_Database_Operation.Error "Postgres tables do not support Mixed types.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from Standard.Base import all
import Standard.Base.Errors.Illegal_State.Illegal_State

import Standard.Table.Data.Type.Value_Type.Value_Type

polyglot java import java.math.BigDecimal as Java_Big_Decimal
polyglot java import java.sql.PreparedStatement
polyglot java import java.sql.Types as Java_Types
Expand All @@ -11,7 +13,7 @@ polyglot java import org.enso.database.JDBCUtils
type Statement_Setter
## PRIVATE
Encapsulates the logic for filling a hole in a prepared statement.
Value (fill_hole : PreparedStatement -> Integer -> Any -> Nothing)
Value (fill_hole : PreparedStatement -> Integer -> Value_Type|Nothing -> Any -> Nothing)

## PRIVATE
The default setter that is handling simple commonly supported types.
Expand All @@ -25,12 +27,13 @@ type Statement_Setter
It will panic if called.
null : Statement_Setter
null =
fill_hole_unexpected _ _ _ =
fill_hole_unexpected _ _ _ _ =
Panic.throw (Illegal_State.Error "The associated statement does not expect any values to be set. This is a bug in the Database library.")
Statement_Setter.Value fill_hole_unexpected

## PRIVATE
fill_hole_default stmt i value = case value of
fill_hole_default : PreparedStatement -> Integer -> Value_Type|Nothing -> Any -> Nothing
fill_hole_default stmt i type_hint value = case value of
Nothing -> stmt.setNull i Java_Types.NULL
_ : Boolean -> stmt.setBoolean i value
_ : Integer -> case NumericConverter.isBigInteger value of
Expand All @@ -40,7 +43,16 @@ fill_hole_default stmt i value = case value of
False -> stmt.setLong i value
_ : Float -> stmt.setDouble i value
_ : Text -> stmt.setString i value
_ : Date_Time -> JDBCUtils.setZonedDateTime stmt i value
_ : Date_Time ->
has_timezone = case type_hint of
Value_Type.Date_Time with_timezone -> with_timezone
# We include the timezone by default
_ -> True
case has_timezone of
True ->
JDBCUtils.setZonedDateTime stmt i value
False ->
JDBCUtils.setLocalDateTime stmt i value
## Time_Of_Day and Date sometimes work ok, but sometimes are passed as
`org.graalvm.polyglot.Value` to the JDBC driver which is then unable to
infer the correct type for them. Instead, we use these helper functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,25 @@ internal_create_table_structure connection table_name structure primary_key temp
## PRIVATE
A helper that can upload a table from any backend to a database.
It should be run within a transaction and wrapped in `handle_upload_errors`.
internal_upload_table source_table connection table_name primary_key temporary on_problems=Problem_Behavior.Report_Error row_limit=Nothing =

Arguments:
- source_table: the table to be uploaded.
If it's a `Database_Table`, the query will be materialized as a new table.
If it's an `In_Memory_Table`, the data will be uploaded to the newly created table.
- connection: the connection to the database.
- table_name: the name of the table to be created.
- primary_key: the primary key of the table to be created. Can be `Nothing` to set no key.
- temporary: if `True`, the table will be created as temporary.
- structure_hint: If set, it can be used to hint what types should be used for the columns of the table. Useful if the types slightly differ from the in-memory source types.
- on_problems: the behavior to be used when reporting problems.
- row_limit: if set, only the first `row_limit` rows will be uploaded.
internal_upload_table : Database_Table | In_Memory_Table -> Connection -> Text -> Nothing | Vector Text -> Boolean -> Vector Column_Description -> Problem_Behavior -> Integer|Nothing -> Database_Table
internal_upload_table source_table connection table_name primary_key temporary structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=Nothing =
case source_table of
_ : In_Memory_Table ->
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems row_limit
internal_upload_in_memory_table source_table connection table_name primary_key temporary structure_hint on_problems row_limit
_ : Database_Table ->
internal_upload_database_table source_table connection table_name primary_key temporary on_problems row_limit
internal_upload_database_table source_table connection table_name primary_key temporary structure_hint on_problems row_limit
_ ->
Panic.throw <| Illegal_Argument.Error ("Unsupported table type: " + Meta.get_qualified_type_name source_table)

Expand Down Expand Up @@ -110,9 +123,11 @@ select_into_table_implementation source_table connection table_name primary_key

## PRIVATE
It should be run within a transaction and wrapped in `handle_upload_errors`.
internal_upload_in_memory_table source_table connection table_name primary_key temporary on_problems row_limit =
internal_upload_in_memory_table source_table connection table_name primary_key temporary structure_hint on_problems row_limit =
In_Transaction.ensure_in_transaction <|
created_table_name = internal_create_table_structure connection table_name structure=source_table primary_key=primary_key temporary=temporary on_problems=on_problems
verify_structure_hint structure_hint source_table.column_names
structure = structure_hint.if_nothing source_table
created_table_name = internal_create_table_structure connection table_name structure=structure primary_key=primary_key temporary=temporary on_problems=on_problems
column_names = source_table.column_names

## `created_table_name.if_not_error` is used to ensure that if there are
Expand All @@ -124,21 +139,24 @@ internal_upload_in_memory_table source_table connection table_name primary_key t
insert_template = make_batched_insert_template connection created_table_name column_names
statement_setter = connection.dialect.get_statement_setter
Panic.rethrow <|
connection.jdbc_connection.batch_insert insert_template statement_setter source_table batch_size=default_batch_size row_limit=row_limit
expected_type_hints = align_structure connection structure . map .value_type
connection.jdbc_connection.batch_insert insert_template statement_setter source_table batch_size=default_batch_size expected_type_hints=expected_type_hints row_limit=row_limit

upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name created_table_name)

## PRIVATE
It should be run within a transaction and wrapped in `handle_upload_errors`.
internal_upload_database_table source_table connection table_name primary_key temporary on_problems row_limit =
internal_upload_database_table source_table connection table_name primary_key temporary structure_hint on_problems row_limit =
In_Transaction.ensure_in_transaction <|
connection_check = if source_table.connection.jdbc_connection == connection.jdbc_connection then True else
Error.throw (Unsupported_Database_Operation.Error "The Database table to be uploaded must be coming from the same connection as the connection on which the new table is being created. Cross-connection uploads are currently not supported. To work around this, you can first `.read` the table into memory and then upload it from memory to a different connection.")

verify_structure_hint structure_hint source_table.column_names
structure = structure_hint.if_nothing source_table
connection_check.if_not_error <|
# Warning: in some DBs, calling a DDL query in a transaction may commit it. We may have to have some special handling for this.
created_table_name = internal_create_table_structure connection table_name structure=source_table primary_key=primary_key temporary=temporary on_problems=on_problems
created_table_name = internal_create_table_structure connection table_name structure=structure primary_key=primary_key temporary=temporary on_problems=on_problems
upload_status = created_table_name.if_not_error <|
internal_translate_known_upload_errors source_table connection primary_key <|
effective_source_table = case row_limit of
Expand All @@ -155,6 +173,13 @@ internal_upload_database_table source_table connection table_name primary_key te
upload_status.if_not_error <|
connection.query (SQL_Query.Table_Name created_table_name)

## PRIVATE
verify_structure_hint structure_hint column_names =
if structure_hint.is_nothing.not then
column_names.zip structure_hint expected_name-> column_description->
if column_description.name != expected_name then
Panic.throw (Illegal_State.Error ("The provided structure hint does not match the column names of the source table. Expected: "+column_names.to_display_text+", got: "+(structure_hint.map .name . to_display_text)+". This is a bug in the Database library."))

## PRIVATE
Ensures that provided primary key columns are present in the table and that
there are no duplicates.
Expand Down Expand Up @@ -309,7 +334,9 @@ common_update_table (source_table : Database_Table | In_Memory_Table) (target_ta
dry_run = Context.Output.is_enabled.not
row_limit = if dry_run then dry_run_row_limit else Nothing
check_for_null_keys_if_any_keys_set source_table effective_key_columns <| Context.Output.with_enabled <|
tmp_table = internal_upload_table source_table connection tmp_table_name primary_key=effective_key_columns temporary=True on_problems=Problem_Behavior.Report_Error row_limit=row_limit
structure_hint = target_table.select_columns source_table.column_names reorder=True . columns . map c->
Column_Description.Value c.name c.value_type
tmp_table = internal_upload_table source_table connection tmp_table_name primary_key=effective_key_columns structure_hint=structure_hint temporary=True on_problems=Problem_Behavior.Report_Error row_limit=row_limit
tmp_table.if_not_error <|
resulting_table = append_to_existing_table tmp_table target_table update_action effective_key_columns dry_run=dry_run
## We don't need to drop the table if append panics, because
Expand Down Expand Up @@ -440,7 +467,7 @@ check_update_arguments_structure_match source_table target_table key_columns upd
source_type = source_column.value_type
target_type = target_column.value_type
if source_type == target_type then [] else
if source_type.can_be_widened_to target_type then [Inexact_Type_Coercion.Warning source_type target_type] else
if source_type.can_be_widened_to target_type then [Inexact_Type_Coercion.Warning source_type target_type unavailable=False] else
Error.throw (Column_Type_Mismatch.Error source_column.name target_type source_type)

source_columns = Set.from_vector source_table.column_names
Expand Down Expand Up @@ -544,7 +571,7 @@ prepare_source_table_for_delete_matching (key_values_to_delete : Database_Table
Delete_Rows_Source.Value prepared_table Nothing
_ : In_Memory_Table ->
tmp_table_name = connection.base_connection.table_naming_helper.generate_random_table_name "enso-temp-keys-table-"
uploaded_table = internal_upload_in_memory_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit
uploaded_table = internal_upload_in_memory_table prepared_table connection tmp_table_name primary_key=key_columns temporary=True structure_hint=Nothing on_problems=Problem_Behavior.Report_Error row_limit=dry_run_row_limit

row_limit_exceeded = prepared_table.row_count > dry_run_row_limit
dry_run_message_suffix = case row_limit_exceeded of
Expand Down
Loading

0 comments on commit 0c27839

Please sign in to comment.