diff --git a/.bumpversion.cfg b/.bumpversion.cfg index a9639706..66d755a5 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.3.0 +current_version = 2.3.1 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f058423f..4895ed53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# tap-mssql 2.3.1 2024-07-22 + +* Bug Fix. Issue #62 - change to column selection using lists instead of sets to preserve column ordering + # tap-mssql 2.3.0 2024-04-18 * Bug Fix. Change pendulum DateTime type to datetime.datetime as pymssql 2.3.0 is no longer compatible with query parameters as pendulum DateTime (https://github.com/pymssql/pymssql/issues/889) diff --git a/pyproject.toml b/pyproject.toml index c0bbfb55..60010a7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-mssql" -version = "2.3.0" +version = "2.3.1" description = "A pipelinewise compatible tap for connecting Microsoft SQL Server" authors = ["Rob Winters "] license = "GNU Affero" diff --git a/tap_mssql/__init__.py b/tap_mssql/__init__.py index 2bbae16d..a98cc34b 100644 --- a/tap_mssql/__init__.py +++ b/tap_mssql/__init__.py @@ -314,48 +314,63 @@ def do_discover(mssql_conn, config): discover_catalog(mssql_conn, config).dump() -def desired_columns(selected, table_schema): +def desired_columns(selected : list, table_schema): """Return the set of column names we need to include in the SELECT. selected - set of column names marked as selected in the input catalog table_schema - the most recently discovered Schema for the table """ - all_columns = set() - available = set() - automatic = set() - unsupported = set() - - for column, column_schema in table_schema.properties.items(): - all_columns.add(column) - inclusion = column_schema.inclusion - if inclusion == "automatic": - automatic.add(column) - elif inclusion == "available": - available.add(column) - elif inclusion == "unsupported": - unsupported.add(column) - else: - raise Exception("Unknown inclusion " + inclusion) + all_columns = [column for column in table_schema.properties.keys()] + + available = [ + column for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion == 'available' + ] + + automatic = [ + column for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion == 'automatic' + ] + + unsupported = [ + column for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion == 'unsupported' + ] + + unknown = [ + (column,column_schema.inclusion) + for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion not in ['available', 'automatic', 'unsupported'] + ] + + if unknown: + raise Exception(f"Unknown inclusions: {unknown}") - selected_but_unsupported = selected.intersection(unsupported) + selected_but_unsupported = [c for c in selected if c in unsupported] if selected_but_unsupported: LOGGER.warning( "Columns %s were selected but are not supported. Skipping them.", selected_but_unsupported, ) - selected_but_nonexistent = selected.difference(all_columns) + selected_but_nonexistent = [c for c in selected if c not in all_columns] if selected_but_nonexistent: LOGGER.warning("Columns %s were selected but do not exist.", selected_but_nonexistent) - not_selected_but_automatic = automatic.difference(selected) + not_selected_but_automatic = [c for c in automatic if c not in selected] if not_selected_but_automatic: LOGGER.warning( "Columns %s are primary keys but were not selected. Adding them.", not_selected_but_automatic, ) - return selected.intersection(available).union(automatic) + desired = [c for c in all_columns if (c in available and c in selected) or c in automatic] + + return list(dict.fromkeys(desired)) def is_valid_currently_syncing_stream(selected_stream, state): @@ -405,11 +420,11 @@ def resolve_catalog(discovered_catalog, streams_to_sync): ) continue - selected = { + selected = [ k - for k, v in discovered_table.schema.properties.items() + for k in discovered_table.schema.properties.keys() if common.property_is_selected(catalog_entry, k) or k == replication_key - } + ] # These are the columns we need to select columns = desired_columns(selected, discovered_table.schema) diff --git a/tests/test_tap_mssql.py b/tests/test_tap_mssql.py index 46afc0a1..e8d11549 100755 --- a/tests/test_tap_mssql.py +++ b/tests/test_tap_mssql.py @@ -264,11 +264,14 @@ def test_pk(self): class TestSelectsAppropriateColumns(unittest.TestCase): def runTest(self): - selected_cols = set(["a", "b", "d"]) + selected_cols = ["a", "a", "a1", "a2", "b", "d"] table_schema = Schema( type="object", properties={ "a": Schema(None, inclusion="available"), + "a1": Schema(None, inclusion="available"), + "a2": Schema(None, inclusion="available"), + "a3": Schema(None, inclusion="available"), "b": Schema(None, inclusion="unsupported"), "c": Schema(None, inclusion="automatic"), }, @@ -277,9 +280,21 @@ def runTest(self): got_cols = tap_mssql.desired_columns(selected_cols, table_schema) self.assertEqual( - got_cols, set(["a", "c"]), "Keep automatic as well as selected, available columns." + got_cols, ["a", "a1", "a2", "c"], "Keep automatic as well as selected, available columns. Ordered correctly." ) +class TestInvalidInclusion(unittest.TestCase): + def runTest(self): + selected_cols = ["a", "e"] + table_schema = Schema( + type="object", + properties={ + "a": Schema(None, inclusion="available"), + "e": Schema(None, inclusion="invalid"), + }, + ) + + self.assertRaises(Exception, tap_mssql.desired_columns, selected_cols, table_schema) class TestSchemaMessages(unittest.TestCase): def runTest(self):