From 22f38d83a222ba436d1cd8dc84aa96dc2a5ca805 Mon Sep 17 00:00:00 2001 From: Rithvik Panchapakesan <141077331+sfc-gh-rpanchapakesan@users.noreply.github.com> Date: Mon, 20 Nov 2023 09:54:27 -0800 Subject: [PATCH] to-string and more tests --- src/snowflake/connector/cursor.py | 11 ++- test/integ/test_cursor.py | 137 ++++++++++++++++-------------- 2 files changed, 83 insertions(+), 65 deletions(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index f45ac4347..ee05d1532 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -298,7 +298,16 @@ def _to_result_metadata_v1(self): self._is_nullable, ) - def __eq__(self, other): + def __str__(self) -> str: + return ( + f"ResultMetadataV2(name={self._name},type_code={self._type_code}," + + f"is_nullable={self._is_nullable},display_size={self._display_size}," + + "internal_size={self._internal_size},precision={self._precision}," + + "scale={self._scale},vector_dimension={self._vector_dimension}," + + "fields={self.fields})" + ) + + def __eq__(self, other) -> bool: if not isinstance(other, self.__class__): return False diff --git a/test/integ/test_cursor.py b/test/integ/test_cursor.py index a42c7eb99..a136b9bd7 100644 --- a/test/integ/test_cursor.py +++ b/test/integ/test_cursor.py @@ -330,28 +330,32 @@ def test_insert_timestamp_select(conn, db_parameters): assert current_time == result_time_value[0], "the time result was wrong" - desc = c.description - assert len(desc) == 6, "invalid number of column meta data" - assert desc[0][0].upper() == "AA", "invalid column name" - assert desc[1][0].upper() == "TSLTZ", "invalid column name" - assert desc[2][0].upper() == "TSTZ", "invalid column name" - assert desc[3][0].upper() == "TSNTZ", "invalid column name" - assert desc[4][0].upper() == "DT", "invalid column name" - assert desc[5][0].upper() == "TM", "invalid column name" - assert ( - constants.FIELD_ID_TO_NAME[desc[0][1]] == "FIXED" - ), f"invalid column name: {constants.FIELD_ID_TO_NAME[desc[0][1]]}" - assert ( - constants.FIELD_ID_TO_NAME[desc[1][1]] == "TIMESTAMP_LTZ" - ), "invalid column name" - assert ( - constants.FIELD_ID_TO_NAME[desc[2][1]] == "TIMESTAMP_TZ" - ), "invalid column name" - assert ( - constants.FIELD_ID_TO_NAME[desc[3][1]] == "TIMESTAMP_NTZ" - ), "invalid column name" - assert constants.FIELD_ID_TO_NAME[desc[4][1]] == "DATE", "invalid column name" - assert constants.FIELD_ID_TO_NAME[desc[5][1]] == "TIME", "invalid column name" + for desc in [c.description, c._description_internal]: + assert len(desc) == 6, "invalid number of column meta data" + assert desc[0].name.upper() == "AA", "invalid column name" + assert desc[1].name.upper() == "TSLTZ", "invalid column name" + assert desc[2].name.upper() == "TSTZ", "invalid column name" + assert desc[3].name.upper() == "TSNTZ", "invalid column name" + assert desc[4].name.upper() == "DT", "invalid column name" + assert desc[5].name.upper() == "TM", "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[0].type_code] == "FIXED" + ), f"invalid column name: {constants.FIELD_ID_TO_NAME[desc[0][1]]}" + assert ( + constants.FIELD_ID_TO_NAME[desc[1].type_code] == "TIMESTAMP_LTZ" + ), "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[2].type_code] == "TIMESTAMP_TZ" + ), "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[3].type_code] == "TIMESTAMP_NTZ" + ), "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[4].type_code] == "DATE" + ), "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[5].type_code] == "TIME" + ), "invalid column name" finally: cnx2.close() @@ -483,10 +487,12 @@ def test_insert_binary_select(conn, db_parameters): results = [b for (b,) in c] assert value == results[0], "the binary result was wrong" - desc = c.description - assert len(desc) == 1, "invalid number of column meta data" - assert desc[0][0].upper() == "B", "invalid column name" - assert constants.FIELD_ID_TO_NAME[desc[0][1]] == "BINARY", "invalid column name" + for desc in [c.description, c._description_internal]: + assert len(desc) == 1, "invalid number of column meta data" + assert desc[0].name.upper() == "B", "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[0].type_code] == "BINARY" + ), "invalid column name" finally: cnx2.close() @@ -523,10 +529,12 @@ def test_insert_binary_select_with_bytearray(conn, db_parameters): results = [b for (b,) in c] assert bytes(value) == results[0], "the binary result was wrong" - desc = c.description - assert len(desc) == 1, "invalid number of column meta data" - assert desc[0][0].upper() == "B", "invalid column name" - assert constants.FIELD_ID_TO_NAME[desc[0][1]] == "BINARY", "invalid column name" + for desc in [c.description, c._description_internal]: + assert len(desc) == 1, "invalid number of column meta data" + assert desc[0].name.upper() == "B", "invalid column name" + assert ( + constants.FIELD_ID_TO_NAME[desc[0].type_code] == "BINARY" + ), "invalid column name" finally: cnx2.close() @@ -607,8 +615,8 @@ def test_geography(conn_cnx): with cnx.cursor() as cur: # Test with GEOGRAPHY return type result = cur.execute(f"select * from {name_geo}") - metadata = result.description - assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOGRAPHY" + for metadata in [cur.description, cur._description_internal]: + assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOGRAPHY" data = result.fetchall() for raw_data in data: row = json.loads(raw_data[0]) @@ -637,8 +645,8 @@ def test_geometry(conn_cnx): with cnx.cursor() as cur: # Test with GEOMETRY return type result = cur.execute(f"select * from {name_geo}") - metadata = result.description - assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOMETRY" + for metadata in [cur.description, cur._description_internal]: + assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOMETRY" data = result.fetchall() for raw_data in data: row = json.loads(raw_data[0]) @@ -674,9 +682,9 @@ def test_vector(conn_cnx, is_public_test): cur.execute( f"select int_vec, float_vec from {name_vectors} order by float_vec" ) - metadata = cur.description - assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR" - assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR" + for metadata in [cur.description, cur._description_internal]: + assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR" + assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR" data = cur.fetchall() for i, row in enumerate(data): if expected_data_floats[i] == "NULL": @@ -693,9 +701,9 @@ def test_vector(conn_cnx, is_public_test): cur.execute( f"select int_vec, float_vec from {name_vectors} where int_vec = [1,2,3]::VECTOR(int,3)" ) - metadata = cur.description - assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR" - assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR" + for metadata in [cur.description, cur._description_internal]: + assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR" + assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR" data = cur.fetchall() assert len(data) == 0 @@ -1618,31 +1626,32 @@ def test_out_of_range_year(conn_cnx, result_format, cursor_type, fetch_method): def test_describe(conn_cnx): with conn_cnx() as con: with con.cursor() as cur: - table_name = random_string(5, "test_describe_") - # test select - description = cur.describe( - "select * from VALUES(1, 3.1415926, 'snow', TO_TIMESTAMP('2021-01-01 00:00:00'))" - ) - assert description is not None - column_types = [column[1] for column in description] - assert constants.FIELD_ID_TO_NAME[column_types[0]] == "FIXED" - assert constants.FIELD_ID_TO_NAME[column_types[1]] == "FIXED" - assert constants.FIELD_ID_TO_NAME[column_types[2]] == "TEXT" - assert "TIMESTAMP" in constants.FIELD_ID_TO_NAME[column_types[3]] - assert len(cur.fetchall()) == 0 - - # test insert - cur.execute(f"create table {table_name} (aa int)") - try: - description = cur.describe( - "insert into {name}(aa) values({value})".format( - name=table_name, value="1234" - ) + for describe in [cur.describe, cur._describe_internal]: + table_name = random_string(5, "test_describe_") + # test select + description = describe( + "select * from VALUES(1, 3.1415926, 'snow', TO_TIMESTAMP('2021-01-01 00:00:00'))" ) - assert description[0][0] == "number of rows inserted" - assert cur.rowcount is None - finally: - cur.execute(f"drop table if exists {table_name}") + assert description is not None + column_types = [column.type_code for column in description] + assert constants.FIELD_ID_TO_NAME[column_types[0]] == "FIXED" + assert constants.FIELD_ID_TO_NAME[column_types[1]] == "FIXED" + assert constants.FIELD_ID_TO_NAME[column_types[2]] == "TEXT" + assert "TIMESTAMP" in constants.FIELD_ID_TO_NAME[column_types[3]] + assert len(cur.fetchall()) == 0 + + # test insert + cur.execute(f"create table {table_name} (aa int)") + try: + description = describe( + "insert into {name}(aa) values({value})".format( + name=table_name, value="1234" + ) + ) + assert description[0].name == "number of rows inserted" + assert cur.rowcount is None + finally: + cur.execute(f"drop table if exists {table_name}") @pytest.mark.skipolddriver