Skip to content

Commit

Permalink
to-string and more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rpanchapakesan committed Nov 20, 2023
1 parent c863376 commit 22f38d8
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 65 deletions.
11 changes: 10 additions & 1 deletion src/snowflake/connector/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,16 @@ def _to_result_metadata_v1(self):
self._is_nullable,
)

def __eq__(self, other):
def __str__(self) -> str:
return (
f"ResultMetadataV2(name={self._name},type_code={self._type_code},"
+ f"is_nullable={self._is_nullable},display_size={self._display_size},"
+ "internal_size={self._internal_size},precision={self._precision},"
+ "scale={self._scale},vector_dimension={self._vector_dimension},"
+ "fields={self.fields})"
)

def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
return False

Expand Down
137 changes: 73 additions & 64 deletions test/integ/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,28 +330,32 @@ def test_insert_timestamp_select(conn, db_parameters):

assert current_time == result_time_value[0], "the time result was wrong"

desc = c.description
assert len(desc) == 6, "invalid number of column meta data"
assert desc[0][0].upper() == "AA", "invalid column name"
assert desc[1][0].upper() == "TSLTZ", "invalid column name"
assert desc[2][0].upper() == "TSTZ", "invalid column name"
assert desc[3][0].upper() == "TSNTZ", "invalid column name"
assert desc[4][0].upper() == "DT", "invalid column name"
assert desc[5][0].upper() == "TM", "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[0][1]] == "FIXED"
), f"invalid column name: {constants.FIELD_ID_TO_NAME[desc[0][1]]}"
assert (
constants.FIELD_ID_TO_NAME[desc[1][1]] == "TIMESTAMP_LTZ"
), "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[2][1]] == "TIMESTAMP_TZ"
), "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[3][1]] == "TIMESTAMP_NTZ"
), "invalid column name"
assert constants.FIELD_ID_TO_NAME[desc[4][1]] == "DATE", "invalid column name"
assert constants.FIELD_ID_TO_NAME[desc[5][1]] == "TIME", "invalid column name"
for desc in [c.description, c._description_internal]:
assert len(desc) == 6, "invalid number of column meta data"
assert desc[0].name.upper() == "AA", "invalid column name"
assert desc[1].name.upper() == "TSLTZ", "invalid column name"
assert desc[2].name.upper() == "TSTZ", "invalid column name"
assert desc[3].name.upper() == "TSNTZ", "invalid column name"
assert desc[4].name.upper() == "DT", "invalid column name"
assert desc[5].name.upper() == "TM", "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[0].type_code] == "FIXED"
), f"invalid column name: {constants.FIELD_ID_TO_NAME[desc[0][1]]}"
assert (
constants.FIELD_ID_TO_NAME[desc[1].type_code] == "TIMESTAMP_LTZ"
), "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[2].type_code] == "TIMESTAMP_TZ"
), "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[3].type_code] == "TIMESTAMP_NTZ"
), "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[4].type_code] == "DATE"
), "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[5].type_code] == "TIME"
), "invalid column name"
finally:
cnx2.close()

Expand Down Expand Up @@ -483,10 +487,12 @@ def test_insert_binary_select(conn, db_parameters):
results = [b for (b,) in c]
assert value == results[0], "the binary result was wrong"

desc = c.description
assert len(desc) == 1, "invalid number of column meta data"
assert desc[0][0].upper() == "B", "invalid column name"
assert constants.FIELD_ID_TO_NAME[desc[0][1]] == "BINARY", "invalid column name"
for desc in [c.description, c._description_internal]:
assert len(desc) == 1, "invalid number of column meta data"
assert desc[0].name.upper() == "B", "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[0].type_code] == "BINARY"
), "invalid column name"
finally:
cnx2.close()

Expand Down Expand Up @@ -523,10 +529,12 @@ def test_insert_binary_select_with_bytearray(conn, db_parameters):
results = [b for (b,) in c]
assert bytes(value) == results[0], "the binary result was wrong"

desc = c.description
assert len(desc) == 1, "invalid number of column meta data"
assert desc[0][0].upper() == "B", "invalid column name"
assert constants.FIELD_ID_TO_NAME[desc[0][1]] == "BINARY", "invalid column name"
for desc in [c.description, c._description_internal]:
assert len(desc) == 1, "invalid number of column meta data"
assert desc[0].name.upper() == "B", "invalid column name"
assert (
constants.FIELD_ID_TO_NAME[desc[0].type_code] == "BINARY"
), "invalid column name"
finally:
cnx2.close()

Expand Down Expand Up @@ -607,8 +615,8 @@ def test_geography(conn_cnx):
with cnx.cursor() as cur:
# Test with GEOGRAPHY return type
result = cur.execute(f"select * from {name_geo}")
metadata = result.description
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOGRAPHY"
for metadata in [cur.description, cur._description_internal]:
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOGRAPHY"
data = result.fetchall()
for raw_data in data:
row = json.loads(raw_data[0])
Expand Down Expand Up @@ -637,8 +645,8 @@ def test_geometry(conn_cnx):
with cnx.cursor() as cur:
# Test with GEOMETRY return type
result = cur.execute(f"select * from {name_geo}")
metadata = result.description
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOMETRY"
for metadata in [cur.description, cur._description_internal]:
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "GEOMETRY"
data = result.fetchall()
for raw_data in data:
row = json.loads(raw_data[0])
Expand Down Expand Up @@ -674,9 +682,9 @@ def test_vector(conn_cnx, is_public_test):
cur.execute(
f"select int_vec, float_vec from {name_vectors} order by float_vec"
)
metadata = cur.description
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR"
assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR"
for metadata in [cur.description, cur._description_internal]:
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR"
assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR"
data = cur.fetchall()
for i, row in enumerate(data):
if expected_data_floats[i] == "NULL":
Expand All @@ -693,9 +701,9 @@ def test_vector(conn_cnx, is_public_test):
cur.execute(
f"select int_vec, float_vec from {name_vectors} where int_vec = [1,2,3]::VECTOR(int,3)"
)
metadata = cur.description
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR"
assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR"
for metadata in [cur.description, cur._description_internal]:
assert FIELD_ID_TO_NAME[metadata[0].type_code] == "VECTOR"
assert FIELD_ID_TO_NAME[metadata[1].type_code] == "VECTOR"
data = cur.fetchall()
assert len(data) == 0

Expand Down Expand Up @@ -1618,31 +1626,32 @@ def test_out_of_range_year(conn_cnx, result_format, cursor_type, fetch_method):
def test_describe(conn_cnx):
with conn_cnx() as con:
with con.cursor() as cur:
table_name = random_string(5, "test_describe_")
# test select
description = cur.describe(
"select * from VALUES(1, 3.1415926, 'snow', TO_TIMESTAMP('2021-01-01 00:00:00'))"
)
assert description is not None
column_types = [column[1] for column in description]
assert constants.FIELD_ID_TO_NAME[column_types[0]] == "FIXED"
assert constants.FIELD_ID_TO_NAME[column_types[1]] == "FIXED"
assert constants.FIELD_ID_TO_NAME[column_types[2]] == "TEXT"
assert "TIMESTAMP" in constants.FIELD_ID_TO_NAME[column_types[3]]
assert len(cur.fetchall()) == 0

# test insert
cur.execute(f"create table {table_name} (aa int)")
try:
description = cur.describe(
"insert into {name}(aa) values({value})".format(
name=table_name, value="1234"
)
for describe in [cur.describe, cur._describe_internal]:
table_name = random_string(5, "test_describe_")
# test select
description = describe(
"select * from VALUES(1, 3.1415926, 'snow', TO_TIMESTAMP('2021-01-01 00:00:00'))"
)
assert description[0][0] == "number of rows inserted"
assert cur.rowcount is None
finally:
cur.execute(f"drop table if exists {table_name}")
assert description is not None
column_types = [column.type_code for column in description]
assert constants.FIELD_ID_TO_NAME[column_types[0]] == "FIXED"
assert constants.FIELD_ID_TO_NAME[column_types[1]] == "FIXED"
assert constants.FIELD_ID_TO_NAME[column_types[2]] == "TEXT"
assert "TIMESTAMP" in constants.FIELD_ID_TO_NAME[column_types[3]]
assert len(cur.fetchall()) == 0

# test insert
cur.execute(f"create table {table_name} (aa int)")
try:
description = describe(
"insert into {name}(aa) values({value})".format(
name=table_name, value="1234"
)
)
assert description[0].name == "number of rows inserted"
assert cur.rowcount is None
finally:
cur.execute(f"drop table if exists {table_name}")


@pytest.mark.skipolddriver
Expand Down

0 comments on commit 22f38d8

Please sign in to comment.