Skip to content

Commit

Permalink
Throw an error if min_valid_version is null (#24)
Browse files Browse the repository at this point in the history
* Throw an error if min_valid_version is null

* Make more functional

* Check for nil min-valid-version in assert-log-based-is-enabled

* Failing tests for table name conflict

* Add schema to changetracking queries

* Add schema-name to another reference of get-min-valid-version

* Use SQL parameters to protect against invalid characters in the string

* Refactor for readability

Co-authored-by: Dan Mosora <[email protected]>
  • Loading branch information
luandy64 and dmosorast authored Mar 3, 2020
1 parent 1bfc58d commit b5b1fa3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 26 deletions.
68 changes: 42 additions & 26 deletions src/tap_mssql/sync_strategies/logical.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
[clojure.string :as string]
[clojure.java.jdbc :as jdbc]))

(defn get-change-tracking-tables* [config db-name]
;; TODO: What if it's the same named table in different schemas?
(set (map #(:table_name %)
(jdbc/query (assoc (config/->conn-map config)
:dbname db-name)
[(str "SELECT OBJECT_NAME(object_id) AS table_name "
"FROM sys.change_tracking_tables")]))))
(defn get-change-tracking-tables*
"Structure: {\"schema_name\" [\"table1\" \"table2\" ...] ...}"
[config dbname]
(reduce (fn [acc val] (assoc acc
(:schema_name val)
(-> (get acc (:schema_name val))
(concat [(:table_name val)])
set)))
{}
(jdbc/query (assoc (config/->conn-map config)
:dbname dbname)
[(str "SELECT OBJECT_SCHEMA_NAME(object_id) AS schema_name, "
" OBJECT_NAME(object_id) AS table_name "
"FROM sys.change_tracking_tables")])))

(def get-change-tracking-tables (memoize get-change-tracking-tables*))

Expand All @@ -28,21 +35,45 @@

(def get-change-tracking-databases (memoize get-change-tracking-databases*))

(defn get-object-id-by-table-name [config dbname schema-name table-name]
(let [sql-query ["SELECT OBJECT_ID(?) AS object_id"
(-> (partial format "%s.%s.%s")
(apply (map common/sanitize-names [dbname schema-name table-name])))]]
(log/infof "Executing query: %s" sql-query)
(->> (jdbc/query (assoc (config/->conn-map config) :dbname dbname) sql-query)
first
:object_id)))

(defn get-min-valid-version [config dbname schema-name table-name]
(let [object-id (get-object-id-by-table-name config dbname schema-name table-name)
sql-query (format "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(%d) as min_valid_version" object-id)]
(log/infof "Executing query: %s" sql-query)
(-> (jdbc/query (assoc (config/->conn-map config) :dbname dbname) [sql-query])
first
:min_valid_version)))

(defn assert-log-based-is-enabled [config catalog stream-name state]
(let [table-name (get-in catalog ["streams" stream-name "table_name"])
dbname (get-in catalog ["streams" stream-name "metadata" "database-name"])]
(let [table-name (get-in catalog ["streams" stream-name "table_name"])
schema-name (get-in catalog ["streams" stream-name "metadata" "schema-name"])
dbname (get-in catalog ["streams" stream-name "metadata" "database-name"])
min-valid-version (get-min-valid-version config dbname schema-name table-name)]
(when (not (contains? (get-change-tracking-databases config) dbname))
(throw (UnsupportedOperationException.
(format (str "Cannot sync stream: %s using log-based replication. "
"Change Tracking is not enabled for database: %s")
stream-name
dbname))))
(when (not (contains? (get-change-tracking-tables config dbname) table-name))
(when (not (contains? (-> (get-change-tracking-tables config dbname)
(get schema-name)) table-name))
(throw (UnsupportedOperationException.
(format (str "Cannot sync stream: %s using log-based replication. "
"Change Tracking is not enabled for table: %s")
stream-name
table-name))))
(when (nil? min-valid-version)
(throw (IllegalArgumentException.
(format "The min_valid_version for table name %s was NULL."
table-name))))
state))

(defn get-current-log-version [config catalog stream-name]
Expand Down Expand Up @@ -74,15 +105,6 @@
((partial singer-messages/write-state! stream-name)))
state))


(defn get-object-id-by-table-name [config dbname table-name]
(let [sql-query "SELECT name, object_id FROM sys.tables"]
(log/infof "Executing query: %s" sql-query)
(->> (jdbc/query (assoc (config/->conn-map config) :dbname dbname) [sql-query])
(filter #(= table-name (:name %)))
first
:object_id)))

(defn min-valid-version-out-of-date?
"Uses the CHANGE_TRACKING_MIN_VALID_VERSION function to check if our current log version is out of date and lost.
Returns true if we have no current log version."
Expand All @@ -91,13 +113,7 @@
table-name (get-in catalog ["streams" stream-name "table_name"])
dbname (get-in catalog ["streams" stream-name "metadata" "database-name"])
current-log-version (get-in state ["bookmarks" stream-name "current_log_version"])
object-id (get-object-id-by-table-name config dbname table-name)
sql-query (format "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(%d) as min_valid_version" object-id)
_ (log/infof "Executing query: %s" sql-query)
min-valid-version (-> (jdbc/query (assoc (config/->conn-map config) :dbname dbname) [sql-query])
first
:min_valid_version)]

min-valid-version (get-min-valid-version config dbname schema-name table-name)]
(if (nil? current-log-version)
true
(let [out-of-date? (> min-valid-version current-log-version)]
Expand Down
5 changes: 5 additions & 0 deletions test/tap_mssql/sync_log_based_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
[config]
(let [db-spec (config/->conn-map config)]
(jdbc/db-do-commands db-spec ["CREATE DATABASE log_based_sync_test"])
(jdbc/db-do-commands (assoc db-spec :dbname "log_based_sync_test")
["CREATE SCHEMA schema_with_conflict"])
(jdbc/db-do-commands (assoc db-spec :dbname "log_based_sync_test")
["CREATE SCHEMA schema_with_table"])
(jdbc/db-do-commands (assoc db-spec :dbname "log_based_sync_test")
Expand All @@ -45,6 +47,9 @@
"data_table_2"
[[:id "uniqueidentifier NOT NULL PRIMARY KEY DEFAULT NEWID()"]
[:value "int"]])])
;; Same table as below, created first to check for unexpected failures with ignoring schema
(jdbc/db-do-commands (assoc db-spec :dbname "log_based_sync_test")
["CREATE TABLE schema_with_conflict.data_table (id uniqueidentifier NOT NULL PRIMARY KEY DEFAULT NEWID(), value int)"])
(jdbc/db-do-commands (assoc db-spec :dbname "log_based_sync_test")
["CREATE TABLE schema_with_table.data_table (id uniqueidentifier NOT NULL PRIMARY KEY DEFAULT NEWID(), value int)"])))

Expand Down

0 comments on commit b5b1fa3

Please sign in to comment.