Skip to content

Commit

Permalink
Bug/timestamps cause query exceptions (#15)
Browse files Browse the repository at this point in the history
* Broken test due to timestamp column

an interrtupted Full Table sync using a timestamp column
causes a SQLServerException because the value is transformed.

Motivation
----------

API changes
-----------

Implementation Notes
--------------------

Functional Tests
----------------

Regression Tests
----------------

Screenshots
-----------

* wip hacked a fix for the bug but tests behaving oddly on the vm

* wip fixing an actual test failure

* Move transform into write-record in full table syncs

* fix transform refactor

* Add get-last-pk-fetched
  • Loading branch information
KAllan357 authored Dec 11, 2019
1 parent 3ecf20d commit bebc27b
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 47 deletions.
6 changes: 4 additions & 2 deletions src/tap_mssql/singer/messages.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns tap-mssql.singer.messages
(:require [tap-mssql.singer.schema :as singer-schema]
[tap-mssql.singer.transform :as singer-transform]
[clojure.data.json :as json]))

(defn now []
Expand Down Expand Up @@ -78,9 +79,10 @@

(defn write-record!
[stream-name state record catalog]
(let [record-message {"type" "RECORD"
(let [transformed-record (singer-transform/transform catalog stream-name record)
record-message {"type" "RECORD"
"stream" (calculate-destination-stream-name stream-name catalog)
"record" record}
"record" transformed-record}
version (get-in state ["bookmarks" stream-name "version"])]
(if (nil? version)
(write! record-message)
Expand Down
22 changes: 17 additions & 5 deletions src/tap_mssql/sync_strategies/full.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
[tap-mssql.singer.fields :as singer-fields]
[tap-mssql.singer.bookmarks :as singer-bookmarks]
[tap-mssql.singer.messages :as singer-messages]
[tap-mssql.singer.transform :as singer-transform]
[tap-mssql.sync-strategies.common :as common]
[clojure.tools.logging :as log]
[clojure.string :as string]
Expand Down Expand Up @@ -55,11 +54,22 @@
clause-list (map generate-bookmark-clause-inner pk-lists)]
(string/join " OR " clause-list)))

;; TODO: Consider tracking more information to not rely on brittle type checking
(defn get-last-pk-fetched [stream-name state]
(reduce
(fn [acc [k v]]
;; When state has a PersistentVector, assume its a timestamp byte-array
(if (instance? clojure.lang.PersistentVector v)
(assoc acc k (bytes (byte-array (map byte v))))
(assoc acc k v)))
{}
(get-in state ["bookmarks" stream-name "last_pk_fetched"])))

(defn build-sync-query [stream-name schema-name table-name record-keys state]
{:pre [(not (empty? record-keys))
(valid-full-table-state? state stream-name)]}
;; TODO: Fully qualify and quote all database structures, maybe just schema
(let [last-pk-fetched (get-in state ["bookmarks" stream-name "last_pk_fetched"])
(let [last-pk-fetched (get-last-pk-fetched stream-name state)
bookmark-query-text (generate-bookmark-clause last-pk-fetched)
max-pk-values (get-in state ["bookmarks" stream-name "max_pk_values"])
limiting-keys (map common/sanitize-names (keys max-pk-values))
Expand Down Expand Up @@ -105,9 +115,11 @@
sql-params (build-sync-query stream-name schema-name table-name record-keys state)]
(log/infof "Executing query: %s" (pr-str sql-params))
(-> (reduce (fn [acc result]
(let [record (->> (select-keys result record-keys)
(singer-transform/transform catalog stream-name))]
(singer-messages/write-record! stream-name state record catalog)
(let [record (select-keys result record-keys)]
(singer-messages/write-record! stream-name
state
record
catalog)
(->> (singer-bookmarks/update-last-pk-fetched stream-name bookmark-keys acc record)
(singer-messages/write-state-buffered! stream-name))))
state
Expand Down
4 changes: 1 addition & 3 deletions src/tap_mssql/sync_strategies/incremental.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
[tap-mssql.singer.fields :as singer-fields]
[tap-mssql.singer.bookmarks :as singer-bookmarks]
[tap-mssql.singer.messages :as singer-messages]
[tap-mssql.singer.transform :as singer-transform]
[tap-mssql.sync-strategies.common :as common]
[clojure.tools.logging :as log]
[clojure.string :as string]
Expand Down Expand Up @@ -49,8 +48,7 @@
state)]
(log/infof "Executing query: %s" (pr-str sql-params))
(reduce (fn [acc result]
(let [record (->> (select-keys result record-keys)
(singer-transform/transform catalog stream-name))]
(let [record (select-keys result record-keys)]
(singer-messages/write-record! stream-name acc record catalog)
(->> (singer-bookmarks/update-state stream-name replication-key record acc)
(singer-messages/write-state-buffered! stream-name))))
Expand Down
4 changes: 1 addition & 3 deletions src/tap_mssql/sync_strategies/logical.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
[tap-mssql.singer.fields :as singer-fields]
[tap-mssql.singer.bookmarks :as singer-bookmarks]
[tap-mssql.singer.messages :as singer-messages]
[tap-mssql.singer.transform :as singer-transform]
[tap-mssql.sync-strategies.full :as full]
[tap-mssql.sync-strategies.common :as common]
[clojure.tools.logging :as log]
Expand Down Expand Up @@ -196,8 +195,7 @@
(let [record (as-> (select-keys result record-keys) rec
(if (= "D" (get result "sys_change_operation"))
(assoc rec "_sdc_deleted_at" (get result "commit_time"))
rec)
(singer-transform/transform catalog stream-name rec))]
rec))]
(singer-messages/write-record! stream-name st record catalog)
(->> (singer-bookmarks/update-last-pk-fetched stream-name bookmark-keys st record)
(update-current-log-version stream-name
Expand Down
137 changes: 103 additions & 34 deletions test/tap_mssql/sync_interruptible_full_table_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
[clojure.string :as string]
[tap-mssql.core :refer :all]
[tap-mssql.sync-strategies.full :as sync]
[tap-mssql.singer.transform :as transform]
[tap-mssql.singer.messages :as singer-messages]
[tap-mssql.test-utils :refer [with-out-and-err-to-dev-null
test-db-config
Expand Down Expand Up @@ -61,7 +62,14 @@
[:number "int NOT NULL"]
[:datetime "datetime2 NOT NULL"]
[:value "varchar(5000)"]
["PRIMARY KEY (id, number, datetime)"]])])))
["PRIMARY KEY (id, number, datetime)"]])])
(jdbc/db-do-commands (assoc db-spec :dbname "full_table_interruptible_sync_test")
[(jdbc/create-table-ddl
"table_with_timestamp_bookmark_key"
[[:id "uniqueidentifier NOT NULL PRIMARY KEY DEFAULT NEWID()"]
[:number "int NOT NULL"]
[:timestamp "timestamp NOT NULL"]
[:value "varchar(5000)"]])])))

(defn populate-data
[config]
Expand Down Expand Up @@ -89,7 +97,12 @@
:datetime (-> (generators/date)
.toInstant
.toString)
:value (str %)) (range)))))
:value (str %)) (range))))
(jdbc/insert-multi! (-> (config/->conn-map config)
(assoc :dbname "full_table_interruptible_sync_test"))
"table_with_timestamp_bookmark_key"
(take 2000 (map #(hash-map :number (rand-int 1000000)
:value (str %)) (range)))))

(defn test-db-fixture [f config]
(with-out-and-err-to-dev-null
Expand Down Expand Up @@ -138,44 +151,50 @@

(deftest ^:integration verify-unsupported-column-has-empty-schema
(with-matrix-assertions test-db-configs test-db-fixture
(is (= {}
(get-in (first (filter #(= "SCHEMA" (% "type"))
(->> (catalog/discover test-db-config)
(select-stream "full_table_interruptible_sync_test_dbo_table_with_unsupported_column")
(get-messages-from-output test-db-config
"full_table_interruptible_sync_test_dbo_table_with_unsupported_column"))))
["schema" "properties" "value"])))))
(let [test-db-config (assoc test-db-config "include_schemas_in_destination_stream_name" "true")
_ (set-include-db-and-schema-names-in-messages! test-db-config)]
(is (= {}
(get-in (first (filter #(= "SCHEMA" (% "type"))
(->> (catalog/discover test-db-config)
(select-stream "full_table_interruptible_sync_test_dbo_table_with_unsupported_column")
(get-messages-from-output test-db-config
"full_table_interruptible_sync_test_dbo_table_with_unsupported_column"))))
["schema" "properties" "value"]))))))

(deftest ^:integration verify-unsupported-primary-key-throws
(with-matrix-assertions test-db-configs test-db-fixture
(is (thrown-with-msg? java.lang.Exception
#"has unsupported primary key"
(->> test-db-config
(catalog/discover)
((fn [catalog] ;; Fake the pk being unsupported
(-> catalog (assoc-in ["streams"
"full_table_interruptible_sync_test_dbo_table_with_unsupported_pk"
"metadata"
"properties"
"id"
"inclusion"]
"unsupported")
(assoc-in ["streams"
"full_table_interruptible_sync_test_dbo_table_with_unsupported_pk"
"metadata"
"properties"
"id"
"selected-by-default"]
false))))
(select-stream "full_table_interruptible_sync_test_dbo_table_with_unsupported_pk")
(get-messages-from-output test-db-config "full_table_interruptible_sync_test_dbo_table_with_unsupported_pk"))))))
(let [test-db-config (assoc test-db-config "include_schemas_in_destination_stream_name" "true")
_ (set-include-db-and-schema-names-in-messages! test-db-config)]
(is (thrown-with-msg? java.lang.Exception
#"has unsupported primary key"
(->> test-db-config
(catalog/discover)
((fn [catalog] ;; Fake the pk being unsupported
(-> catalog (assoc-in ["streams"
"full_table_interruptible_sync_test_dbo_table_with_unsupported_pk"
"metadata"
"properties"
"id"
"inclusion"]
"unsupported")
(assoc-in ["streams"
"full_table_interruptible_sync_test_dbo_table_with_unsupported_pk"
"metadata"
"properties"
"id"
"selected-by-default"]
false))))
(select-stream "full_table_interruptible_sync_test_dbo_table_with_unsupported_pk")
(get-messages-from-output test-db-config "full_table_interruptible_sync_test_dbo_table_with_unsupported_pk")))))))

(deftest ^:integration verify-full-table-sync-with-rowversion-resumes-on-interruption
(with-matrix-assertions test-db-configs test-db-fixture
;; Steps:
;; Sync partially, a table with row version, interrupted at some point
;; -- e.g., (with-redefs [valid-message? (fn [msg] (if (some-atom-thing-changes-after x calls) (throw...) (valid-message? msg)))] ... )
(let [old-write-record singer-messages/write-record!]
(let [test-db-config (assoc test-db-config "include_schemas_in_destination_stream_name" "true")
_ (set-include-db-and-schema-names-in-messages! test-db-config)
old-write-record singer-messages/write-record!]
(with-redefs [singer-messages/write-record! (fn [stream-name state record catalog]
(swap! record-count inc)
(if (> @record-count 120)
Expand Down Expand Up @@ -211,7 +230,8 @@
(partition 2 1)
(drop-while (fn [[a b]] (not= "STATE" (b "type"))))
first)]
(= (get-in last-record ["record" "rowversion"]) (get-in last-state ["value" "bookmarks" "full_table_interruptible_sync_test_dbo_data_table_rowversion" "last_pk_fetched" "rowversion"])))
(= (get-in last-record ["record" "rowversion"])
(transform/transform-binary (get-in last-state ["value" "bookmarks" "full_table_interruptible_sync_test_dbo_data_table_rowversion" "last_pk_fetched" "rowversion"]))))
"Either no state emitted, or state does not match previous record")
;; Next state emitted has the version of the last record emitted before that state
(is (let [[last-record last-state] (->> first-messages
Expand All @@ -232,7 +252,9 @@

(deftest ^:integration verify-full-table-interruptible-bookmark-clause
(with-matrix-assertions test-db-configs test-db-fixture
(let [stream-name "schema_name_table_name"
(let [test-db-config (assoc test-db-config "include_schemas_in_destination_stream_name" "true")
_ (set-include-db-and-schema-names-in-messages! test-db-config)
stream-name "schema_name_table_name"
schema-name "schema_name"
table-name "table_name"
record-keys ["id" "number" "datetime" "value"]]
Expand Down Expand Up @@ -295,7 +317,9 @@
(with-matrix-assertions test-db-configs test-db-fixture
;; Steps:
;; 1. Sync the full table and make sure it returns all the records.
(let [first-messages (->> (catalog/discover test-db-config)
(let [test-db-config (assoc test-db-config "include_schemas_in_destination_stream_name" "true")
_ (set-include-db-and-schema-names-in-messages! test-db-config)
first-messages (->> (catalog/discover test-db-config)
(select-stream "full_table_interruptible_sync_test_dbo_table_with_composite_pks")
(get-messages-from-output test-db-config
"full_table_interruptible_sync_test_dbo_table_with_composite_pks"
Expand Down Expand Up @@ -346,3 +370,48 @@
(get-in (last second-messages) ["value" "bookmarks" "full_table_interruptible_sync_test_dbo_table_with_composite_pks" "last_pk_fetched"])))
(is (nil?
(get-in (last second-messages) ["value" "bookmarks" "full_table_interruptible_sync_test_dbo_table_with_composite_pks" "max_pk_value"]))))))

(deftest ^:integration verify-interrupted-full-table-sync-with-timestamp-pk-succeeds
(with-matrix-assertions test-db-configs test-db-fixture
;; Steps:
;; 1. Sync 1000 rows, capture state, and sync the remaining. A timestamp should be used in the WHERE clause of the query
(let [test-db-config (assoc test-db-config "include_schemas_in_destination_stream_name" "true")
_ (set-include-db-and-schema-names-in-messages! test-db-config)
old-write-record singer-messages/write-record!
first-messages (with-redefs [singer-messages/write-record! (fn [stream-name state record catalog]
(swap! record-count inc)
(if (> @record-count 1000)
(do
(reset! record-count 0)
(throw (ex-info "Interrupting!" {:ignore true})))
(old-write-record stream-name state record catalog)))]
(->> (catalog/discover test-db-config)
(select-stream "full_table_interruptible_sync_test_dbo_table_with_timestamp_bookmark_key")
(get-messages-from-output test-db-config
"full_table_interruptible_sync_test_dbo_table_with_timestamp_bookmark_key")))
first-state (get (->> first-messages
(filter #(= "STATE" (% "type")))
last)
"value")
second-messages (->> (catalog/discover test-db-config)
(select-stream "full_table_interruptible_sync_test_dbo_table_with_timestamp_bookmark_key")
(get-messages-from-output test-db-config
"full_table_interruptible_sync_test_dbo_table_with_timestamp_bookmark_key"
first-state))]

(is (= 2000 (count (reduce
(fn [acc rec]
(conj acc (str (get-in rec ["record" "id"]))))
#{}
(concat
(->> second-messages
(filter #(= "RECORD" (% "type"))))
(->> first-messages
(filter #(= "RECORD" (% "type")))))))))

;; Make sure last state has no last_pk_fetched or max_pk_value bookmarks, indicating complete full table
(is (= "STATE" (get (last second-messages) "type")))
(is (nil?
(get-in (last second-messages) ["value" "bookmarks" "full_table_interruptible_sync_test_dbo_table_with_timestamp_bookmark_key" "last_pk_fetched"])))
(is (nil?
(get-in (last second-messages) ["value" "bookmarks" "full_table_interruptible_sync_test_dbo_table_with_timestamp_bookmark_key" "max_pk_value"]))))))

0 comments on commit bebc27b

Please sign in to comment.