From 16f01f102a95a10fcb05b5e4321866b54b23efab Mon Sep 17 00:00:00 2001
From: V1A0 <54343363+V1A0@users.noreply.github.com>
Date: Sat, 2 Apr 2022 12:16:39 +0700
Subject: [PATCH] v0.3.0
- finished tests for postgresqlx
- tests structure changed
- minor bugfix
---
README.md | 2 +-
sqllex/__init__.py | 2 +-
sqllex/core/entities/abc/script_gens.py | 4 +-
tests/__init__.py | 0
tests/postgresqlx/new_test_all.py | 449 ------
.../test_sqllex.py => test_postgresqlx.py} | 343 ++---
tests/test_sqlite3x.py | 1260 +++++++++++++++
tests/test_sqllex.py | 1358 +----------------
tests/timetests/__init__.py | 0
tests/timetests/test_postgresqlx.py | 144 ++
tests/timetests/test_sqlite3x.py | 109 ++
11 files changed, 1629 insertions(+), 2042 deletions(-)
create mode 100644 tests/__init__.py
delete mode 100644 tests/postgresqlx/new_test_all.py
rename tests/{postgresqlx/test_sqllex.py => test_postgresqlx.py} (77%)
create mode 100644 tests/test_sqlite3x.py
create mode 100644 tests/timetests/__init__.py
create mode 100644 tests/timetests/test_postgresqlx.py
create mode 100644 tests/timetests/test_sqlite3x.py
diff --git a/README.md b/README.md
index b0de471..d3b2739 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
-# SQLLEX ORM v0.2.3
+# SQLLEX ORM v0.3.0
![python-auto-ver]
[![lgtm-quality-img]][lgtm-quality-src]
diff --git a/sqllex/__init__.py b/sqllex/__init__.py
index 8713e82..084b4bd 100644
--- a/sqllex/__init__.py
+++ b/sqllex/__init__.py
@@ -17,7 +17,7 @@
# "\033[0m"
# "\n")
-__version__ = '0.3.0b3'
+__version__ = '0.3.0'
__all__ = [
# classes
diff --git a/sqllex/core/entities/abc/script_gens.py b/sqllex/core/entities/abc/script_gens.py
index 56ffad7..9866ea4 100644
--- a/sqllex/core/entities/abc/script_gens.py
+++ b/sqllex/core/entities/abc/script_gens.py
@@ -27,7 +27,7 @@ def insert(script: str, table: str, columns: tuple, placeholder='?'):
return f"" \
f"{script} " \
f'INTO "{str(table)}" (' \
- f"{', '.join(col for col in columns)}) " \
+ f""""{'", "'.join(col for col in columns)}") """ \
f"VALUES (" \
f"{', '.join((placeholder for _ in range(len(columns))))})"
@@ -41,7 +41,7 @@ def update_script(table: str, script: str):
def select(method: str, columns: tuple, table: str = None):
return f"" \
f"{method} "\
- f"{', '.join(str(col) for col in columns)} "\
+ f"""{', '.join(str(col) for col in columns)} """\
f'FROM "{str(table)}" '
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/postgresqlx/new_test_all.py b/tests/postgresqlx/new_test_all.py
deleted file mode 100644
index 76bc413..0000000
--- a/tests/postgresqlx/new_test_all.py
+++ /dev/null
@@ -1,449 +0,0 @@
-from sqllex import *
-from sqllex.debug import debug_mode
-from sqllex.types import *
-from os import remove
-from time import sleep, time
-
-DB_TEMPLATE: DBTemplateType = {
- "t1": {
- "text_t": TEXT,
- "num_t": NUMERIC,
- "int_t": INTEGER,
- "real_t": REAL,
- "none_t": INTEGER,
- "blob_t": 'FLOAT',
- }
-}
-
-db = PostgreSQLx(
- dbname="test1",
- user="postgres",
- password="admin",
- host="127.0.0.1",
- port="5432",
- template=DB_TEMPLATE
-)
-
-
-debug_mode(True, log_file='sqllex-test.log')
-
-
-def remove_db():
- for tab in ['t1', 't2', 't3', 't4', 't5', 't6', 't7', 'users', 'groups']:
- db.drop(tab, IF_EXIST=True)
-
- logger.stop()
- print("Logger stopped")
-
- print(f"{db} removed")
-
- remove("sqllex-test.log")
- print("Log removed")
-
-
-def tables_test():
- db.markup(
- {
- "groups": {
- "group_id": [PRIMARY_KEY, UNIQUE, INTEGER],
- "group_name": [TEXT, NOT_NULL, DEFAULT, "'GroupName'"],
- },
-
- "users": {
- "user_id": [INTEGER, PRIMARY_KEY, UNIQUE],
- "user_name": TEXT,
- "group_id": INTEGER,
-
- FOREIGN_KEY: {
- "group_id": ["groups", "group_id"]
- },
- }
- }
- )
-
- db.create_table(
- "remove_me",
- {
- "xxx": [INTEGER, PRIMARY_KEY],
- "yyy": [INTEGER],
- },
- IF_NOT_EXIST=True
- )
-
- for x in db.tables_names:
- if not (x in ('table1', 'table3', 'remove_me', 'table2', 't1', 'groups', 'users')):
- print(db.tables_names, x)
- raise MemoryError
-
- db.drop('remove_me')
-
- for x in db.tables_names:
- if not (x in ('table1', 'table3', 'table2', 't1', 'groups', 'users')):
- print(db.tables_names)
- raise MemoryError
-
-
-def insert_test():
- # just arg values
- db.insert("t1", 'asdf', 10.0, 1, 3.14, None, 2)
- db["t1"].insert('asdf', 10.0, 1, 3.14, None, 2)
-
- # arg list
- db.insert("t1", ['asdf', 10.0, 1, 3.14, None, 2])
- db["t1"].insert(['asdf', 10.0, 1, 3.14, None, 2])
-
- # arg tuple
- db.insert("t1", ('asdf', 10.0, 1, 3.14, None, 2))
- db["t1"].insert(('asdf', 10.0, 1, 3.14, None, 2))
-
- # arg tuple
- db.insert("t1", {"text_t": 'asdf', "num_t": 10.0, "int_t": 1, "real_t": 3.14, "none_t": None, "blob_t": 2})
- db["t1"].insert({"text_t": 'asdf', "num_t": 10.0, "int_t": 1, "real_t": 3.14, "none_t": None, "blob_t": 2})
-
- # kwargs
- db.insert("t1", text_t='asdf', num_t=10.0, int_t=1, real_t=3.14, none_t=None, blob_t=2)
- db["t1"].insert(text_t='asdf', num_t=10.0, int_t=1, real_t=3.14, none_t=None, blob_t=2)
-
- if db.select_all('t1') == \
- db.select('t1') == \
- db.select('t1', ALL) == \
- db.select('t1', '*') == \
- db['t1'].select_all() == \
- db['t1'].select() == \
- db['t1'].select(ALL):
- sel_all = db.select_all('t1')
- else:
- raise MemoryError
-
- if not sel_all == [('asdf', 10.0, 1, 3.14, None, 2)] * 10:
- print(sel_all)
- print([('asdf', 10.0, 1, 3.14, None, 2)] * 10)
- raise MemoryError
-
-
-def select_test():
- if not db.select('t1', 'text_t') == [('asdf',)] * 10: # <------------- HERE [] to () have to be changes!!!!!!!
- print(db.select('t1', 'text_t'))
- raise MemoryError
-
- if not db.select('t1', ['text_t', 'num_t']) == [('asdf', 10.0)] * 10:
- print(db.select('t1', ['text_t', 'num_t']))
- raise MemoryError
-
- db.insert('t1', ['qwerty1', 11.1, 2, 4.14, None, 5])
- db.insert('t1', ['qwerty2', 11.1, 2, 4.14, None, 6])
-
- # WHERE as dict
- if not db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': 11.1}) == [('qwerty1', 11.1), ('qwerty2', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': 11.1}))
- raise MemoryError
-
- # WHERE as dict
- if not db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': ['=', 11.1], 'blob_t': ['<=', 5]}) == [
- ('qwerty1', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': 11.1, 'blob_t': 5}))
- raise MemoryError
-
- # WHERE as kwarg
- if not db.select('t1', ['text_t', 'num_t'], num_t=11.1) == [('qwerty1', 11.1), ('qwerty2', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], num_t=11.1))
- raise MemoryError
-
- # WHERE as kwargs
- if not db.select('t1', ['text_t', 'num_t'], num_t=11.1, blob_t=6) == [('qwerty2', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], num_t=11.1, blob_t=6))
- raise MemoryError
-
- # LIMIT test
- if not db.select('t1', text_t='asdf', LIMIT=5) == [('asdf', 10.0, 1, 3.14, None, 2)] * 5:
- print(db.select('t1', text_t='asdf', LIMIT=5))
- raise MemoryError
-
- # OFFSET
- if not db.select('t1', text_t='asdf', LIMIT=5, OFFSET=6) == [('asdf', 10.0, 1, 3.14, None, 2)] * 4:
- print(db.select('t1', text_t='asdf', LIMIT=5, OFFSET=6))
- raise MemoryError
-
- if not db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': ['>=', 11.1, 10], 'text_t': 'qwerty1'}) == \
- [('qwerty1', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': ['>=', 11.1, 10], 'text_t': 'qwerty1'}))
- raise MemoryError
-
- db.create_table(
- "t2",
- {
- "id": 'SERIAL',
- "value": [INTEGER, DEFAULT, 8],
- },
- )
-
- db.insertmany('t2', [[1], [2], [3], [4]])
-
- # ORDER_BY ASC
- if not db.select('t2', 'id', ORDER_BY='id ASC') == [(1,), (2,), (3,), (4,)]:
- print(db.select('t2', 'id', ORDER_BY='id ASC'))
- raise MemoryError
-
- # ORDER_BY DESC
- if not db.select('t2', ['id'], ORDER_BY='id DESC') == [(4,), (3,), (2,), (1,)]:
- print(db.select('t2', 'id', ORDER_BY='id DESC'))
- raise MemoryError
-
- if not db.select(
- 't2',
- WHERE={
- 'id': ['<', 3]
- },
- ) == [(1, 8), (2, 8)]:
- print(db.select('t2', WHERE={'id': ['<', 3]}))
- raise MemoryError
-
- # JOIN
- # if not db.select(
- # 't2',
- # 'id',
- # JOIN=[
- # [CROSS_JOIN, 't1', AS, 't', ON, 't.num_t > t2.value']
- # ]
- # ) == [
- # (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,),
- # (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,),
- # (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,),
- # (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,)
- #
- # ]:
- # print(db.select('t2', 'id', JOIN=[[CROSS_JOIN, 't1', AS, 't', ON, 't.num_t > t2.value']]))
- # raise MemoryError
-
-
-def insertmany_test():
- db.create_table(
- 't3',
- {
- 'id': [INTEGER, NOT_NULL],
- 'val': 'VARCHAR'
- }
- )
-
- db.insertmany('t3', [[1, 'hi']] * 10)
- db.insertmany('t3', [[1, 'NO']] * 10)
- db.insertmany('t3', ((1, 'hi'),) * 10)
- db.insertmany('t3', ((1, 'NO'),) * 10)
- db.insertmany('t3', id=[2] * 10, val=['NO'] * 10)
- db.insertmany('t3', id=(2,) * 10, val=('NO',) * 10)
-
- if not db.select_all('t3') == [
- (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',),
- (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',),
- (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'hi',),
- (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',),
- (1, 'hi',), (1, 'hi',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',),
- (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (2, 'NO',), (2, 'NO',),
- (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',),
- (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',),
- (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',)
- ]:
- print(db.select_all('t3'))
- raise MemoryError
-
- db.create_table(
- 't6',
- {
- 'id': [INTEGER, UNIQUE, NOT_NULL],
- 'val': [TEXT]
- }
- )
-"""
- db.insertmany('t6', [[x, 'hi'] for x in range(100)])
-
- if not db.select_all('t6') == [(x, 'hi') for x in range(100)]:
- raise MemoryError
-
- db.insertmany('t6', [(x, 'bye') for x in range(100)], OR=REPLACE)
-
- if not db.select_all('t6') == [(x, 'bye') for x in range(100)]:
- raise MemoryError
-
- db.updatemany('t6', [[], [], []])
-
- if not db.select_all('t6') == [(x, 'bye') for x in range(100)]:
- raise MemoryError
-"""
-
-
-def update_test():
- db.create_table(
- 't4',
- {
- 'id': [INTEGER, NOT_NULL, UNIQUE],
- 'val': [TEXT]
- }
- )
-
- db.insertmany('t4', [[x, bin(x)] for x in range(100)])
-
- db.update(
- 't4',
- {'val': 'NEW_VAL'},
- WHERE={
- 'id': ['<', 50]
- }
- )
-
- if not db.select('t4', 'id', WHERE={"val": 'NEW_VAL'}) == [(x,) for x in range(50)]:
- print(db.select('t4', 'id', WHERE={"val": 'NEW_VAL'}))
- raise MemoryError
-
-
-def delete_test():
- db.delete('t4', id=['<', 50])
-
- if not db.select('t4', 'id', WHERE={"val": 'NEW_VAL'}) == []:
- print(db.select_all('t4'))
- raise MemoryError
-
-
-def replace_test():
- """
- db.create_table(
- 't5',
- {
- 'id': [INTEGER, UNIQUE, NOT_NULL],
- 'val': [TEXT]
- }
- )
-
- db.insertmany('t5', [[x, ] for x in range(100)])
-
- db.replace('t5', [99, 'O_O'])
-
- if not db.select('t5', val='O_O') == [(99, 'O_O')]:
- print(db.select('t5', val='O_O'))
- raise MemoryError
- """
-
-
-def get_tables_test():
- if " None:
@classmethod
def tearDownClass(cls) -> None:
- cls.db.disconnect()
- cls.admin_cur.execute("drop database test_sqllex")
- cls.admin_cur.execute("drop user test_sqllex")
+ pass
def setUp(self) -> None:
self.tests_counter += 1
@@ -78,40 +71,11 @@ def raw_sql_get_tables_names(self):
"""
)))
- # def test_sqlite_db_crating_db(self):
- # """
- # Testing SQLite database init
- # """
- #
- # db_name = f"{self.db_name}_{1}"
- # SQLite3x(db_name)
- # self.assertTrue(os.path.isfile(db_name))
- # os.remove(db_name)
- #
- # db_name = f"{self.db_name}_{2}"
- # SQLite3x(path=db_name)
- # self.assertTrue(os.path.isfile(db_name))
- # os.remove(db_name)
- #
- # db_name = f"{self.db_name}_{3}"
- # SQLite3x(db_name, init_connection=False)
- # self.assertFalse(os.path.isfile(db_name))
- #
- # db_name = f""
- # self.assertRaises(ValueError, SQLite3x, db_name)
- #
- # db_name = f""
- # self.assertRaises(ValueError, SQLite3x, db_name)
- #
- # db_name = f""
- # self.assertRaises(ValueError, SQLite3x, path=db_name)
-
# def test_connection(self):
# """
# Testing connection with class object init
# """
#
- # db_name = f"{self.db_name}_{1}"
# self.assertIsInstance(SQLite3x(db_name).connection, sqlite3.Connection)
# os.remove(db_name)
#
@@ -127,10 +91,10 @@ def test_transaction(self):
Testing transactions
"""
def get_by_id(table: str, val: int):
- return self.db.execute(f"SELECT * FROM {table} WHERE id={val}")
+ return self.db.execute(f'SELECT * FROM {table} WHERE id={val}')
def get_user_by_id(val: int):
- return get_by_id(table='user', val=val)
+ return get_by_id(table='"user"', val=val)
def get_car_by_id(val: int):
return get_by_id(table='car', val=val)
@@ -139,7 +103,7 @@ def get_car_by_id(val: int):
self.db.execute(
"""
CREATE TABLE "user" (
- "id" INTEGER PRIMARY KEY,
+ "id" SERIAL PRIMARY KEY,
"name" TEXT UNIQUE
);
"""
@@ -148,7 +112,7 @@ def get_car_by_id(val: int):
self.db.execute(
"""
CREATE TABLE "car" (
- "id" SERIAL,
+ "id" SERIAL PRIMARY KEY,
"owner_id" INTEGER REFERENCES "user" (id),
"brand" TEXT
);
@@ -166,13 +130,13 @@ def get_car_by_id(val: int):
# Transaction with auto commit for many executes
with self.db.transaction as tran:
self.db.execute(
- "INSERT INTO user VALUES (22, 'Alex22')"
+ """INSERT INTO "user" (id, name) VALUES (22, 'Alex22')"""
)
self.db.execute(
- "INSERT INTO user VALUES (23, 'Alex23')"
+ """INSERT INTO "user" (id, name) VALUES (23, 'Alex23')"""
)
self.db.execute(
- "INSERT INTO user VALUES (24, 'Alex24')"
+ """INSERT INTO "user" (id, name) VALUES (24, 'Alex24')"""
)
self.assertEqual(get_user_by_id(22), [(22, 'Alex22')])
@@ -182,7 +146,7 @@ def get_car_by_id(val: int):
# Transaction with manual commit
with self.db.transaction as tran:
self.db.execute(
- "INSERT INTO user VALUES (2, 'Bob')"
+ """INSERT INTO "user" (id, name) VALUES (2, 'Bob')"""
)
tran.commit()
@@ -191,7 +155,7 @@ def get_car_by_id(val: int):
# Transaction with rollback
with self.db.transaction as tran:
self.db.execute(
- "INSERT INTO user VALUES (3, 'Cara')"
+ """INSERT INTO "user" (id, name) VALUES (3, 'Cara')"""
)
self.assertEqual(get_user_by_id(3), [(3, 'Cara')])
tran.rollback()
@@ -199,15 +163,19 @@ def get_car_by_id(val: int):
self.assertEqual(get_user_by_id(3), [])
# Prep
- self.assertRaises(sqlite3.IntegrityError, self.db.execute, "INSERT INTO user VALUES (2, 'Sam')")
+ self.assertRaises(
+ psycopg2.errors.UniqueViolation,
+ self.db.execute,
+ """INSERT INTO "user" (id, name) VALUES (2, 'Sam')"""
+ )
# Transaction with rollback
with self.db.transaction as tran:
try:
self.db.execute(
- "INSERT INTO user VALUES (2, 'Sam')"
+ """INSERT INTO "user" (id, name) VALUES (2, 'Sam')"""
)
- except sqlite3.IntegrityError:
+ except psycopg2.errors.UniqueViolation:
tran.rollback()
self.assertEqual(get_user_by_id(2), [(2, 'Bob')])
@@ -215,17 +183,17 @@ def get_car_by_id(val: int):
# Normal Transaction
with self.db.transaction as tran:
self.db.execute(
- "INSERT INTO user VALUES (55, 'Master')"
+ """INSERT INTO "user" (id, name) VALUES (55, 'Master')"""
)
self.db.execute(
- "INSERT INTO car VALUES (55, 55, 'BMW')"
+ """INSERT INTO car VALUES (55, 55, 'BMW')"""
)
self.assertEqual(get_user_by_id(55), [(55, 'Master')])
self.assertEqual(get_car_by_id(55), [(55, 55, 'BMW')])
# Prep
- self.assertRaises(sqlite3.IntegrityError, self.db.execute, "INSERT INTO car VALUES (9999, 9999, 'BMW')")
+ self.assertRaises(psycopg2.errors.ForeignKeyViolation, self.db.execute, "INSERT INTO car VALUES (9999, 9999, 'BMW')")
# Transaction with rollback
with self.db.transaction as tran:
@@ -233,7 +201,7 @@ def get_car_by_id(val: int):
self.db.execute(
"INSERT INTO car VALUES (9999, 9999, 'BMW')"
)
- except sqlite3.IntegrityError:
+ except psycopg2.errors.ForeignKeyViolation:
tran.rollback()
self.assertEqual(get_car_by_id(9999), [])
@@ -280,8 +248,6 @@ def test_create_table_all_columns(self):
}
)
- print(self.raw_sql_get_tables_names())
-
self.assertEqual(self.raw_sql_get_tables_names(), ('test_table',))
self.db.create_table(
@@ -735,7 +701,7 @@ def test_select(self):
self.db.execute(
"""
CREATE TABLE IF NOT EXISTS "position" (
- "id" SERIAL,
+ "id" SERIAL PRIMARY KEY,
"name" TEXT,
"description" TEXT DEFAULT NULL
);
@@ -788,7 +754,7 @@ def test_select(self):
# )
self.db.executemany(
- 'INSERT INTO "position" (id, name, description) VALUES (?, ?, ?)',
+ 'INSERT INTO "position" (id, name, description) VALUES (%s, %s, %s)',
(
(0, 'Assistant', 'Novice developer'),
(1, 'Junior', 'Junior developer'),
@@ -799,23 +765,23 @@ def test_select(self):
)
self.db.executemany(
- 'INSERT INTO "employee" (id, firstName, surname, age, positionID) VALUES (?, ?, ?, ?, ?)',
+ 'INSERT INTO "employee" ("firstName", surname, age, "positionID") VALUES (%s, %s, %s, %s)',
(
- (None, 'Alis', 'A', 11, 1),
- (None, 'Bob', 'B', 22, 1),
- (None, 'Carl', 'C', 33, 2),
- (None, 'Alis', 'B', 44, 3),
- (None, 'Dexter', 'B', 55, 1),
- (None, 'Elis', 'A', 22, 1),
- (None, 'Frank', 'B', 33, 1),
- (None, 'Georgy', 'D', 22, 2),
- (None, 'FoxCpp', 'M', 22, 1),
- (None, 'Ira', 'D', 22, 2)
+ ('Alis', 'A', 11, 1),
+ ('Bob', 'B', 22, 1),
+ ('Carl', 'C', 33, 2),
+ ('Alis', 'B', 44, 3),
+ ('Dexter', 'B', 55, 1),
+ ('Elis', 'A', 22, 1),
+ ('Frank', 'B', 33, 1),
+ ('Georgy', 'D', 22, 2),
+ ('FoxCpp', 'M', 22, 1),
+ ('Ira', 'D', 22, 2)
)
)
self.db.executemany(
- 'INSERT INTO "payments" (date, employeeID, amount) VALUES (?, ?, ?)',
+ 'INSERT INTO "payments" (date, "employeeID", amount) VALUES (%s, %s, %s)',
(
('01.01.2022', 2, 2000),
('01.01.2022', 3, 3000),
@@ -838,9 +804,9 @@ def test_select(self):
self.assertEqual(
expected, self.db['employee'].select_all()
)
- self.assertEqual(
- expected, self.db['employee'].select_all(GROUP_BY=1)
- )
+ # self.assertEqual(
+ # expected, self.db['employee'].select_all(GROUP_BY=1)
+ # )
# SELECT one column
expected = self.db.execute('SELECT id FROM "employee"')
@@ -865,16 +831,16 @@ def test_select(self):
)
# SELECT 2 columns
- expected = self.db.execute('SELECT id, firstName FROM "employee"')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee"')
self.assertEqual(
- expected, self.db['employee'].select('id, firstName')
+ expected, self.db['employee'].select('id, "firstName"')
)
self.assertEqual(
- expected, self.db['employee'].select(['id', 'firstName'])
+ expected, self.db['employee'].select(['id', '"firstName"'])
)
self.assertEqual(
- expected, self.db['employee'].select(('id', 'firstName'))
+ expected, self.db['employee'].select(('id', '"firstName"'))
)
self.assertEqual(
expected, self.db['employee'].select(
@@ -886,26 +852,26 @@ def test_select(self):
)
# SELECT 2 columns WHERE (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE id > 2')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" WHERE id > 2')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
+ SELECT=['id', '"firstName"'],
WHERE='id > 2',
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
+ SELECT=['id', '"firstName"'],
WHERE='id > 2'
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
+ SELECT=['id', '"firstName"'],
WHERE=(self.db['employee']['id'] > 2)
)
)
@@ -918,107 +884,107 @@ def test_select(self):
)
# SELECT 2 columns WHERE (condition) AND (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age > 11) AND (positionID <> 2)')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" WHERE (age > 11) AND ("positionID" <> 2)')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
+ SELECT=['id', '"firstName"'],
WHERE=(self.db['employee']['age'] > 11) & (self.db['employee']['positionID'] != 2)
)
)
# SELECT 2 columns WHERE (condition) AND (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) AND (positionID == 2)')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" WHERE (age = 11) AND ("positionID" = 2)')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
+ SELECT=['id', '"firstName"'],
WHERE=(self.db['employee']['age'] == 11) & (self.db['employee']['positionID'] == 2)
)
)
# SELECT 2 columns WHERE (condition) OR (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) OR (positionID == 2)')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" WHERE (age = 11) OR ("positionID" = 2)')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
+ SELECT=['id', '"firstName"'],
WHERE=(self.db['employee']['age'] == 11) | (self.db['employee']['positionID'] == 2)
)
)
# SELECT 3 columns ORDERED BY column
- expected = self.db.execute('SELECT id, firstName, positionID FROM "employee" ORDER BY positionID')
+ expected = self.db.execute('SELECT id, "firstName", "positionID" FROM "employee" ORDER BY "positionID"')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY='positionID'
+ SELECT=['id', '"firstName"', '"positionID"'],
+ ORDER_BY='"positionID"'
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
+ SELECT=['id', '"firstName"', '"positionID"'],
ORDER_BY=3
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY=['positionID']
+ SELECT=['id', '"firstName"', '"positionID"'],
+ ORDER_BY=['"positionID"']
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY=('positionID',)
+ SELECT=['id', '"firstName"', '"positionID"'],
+ ORDER_BY=('"positionID"',)
)
)
# SELECT 2 columns ORDERED BY column1, column2
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName, surname')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" ORDER BY "firstName", surname')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=['firstName', 'surname']
+ SELECT=['id', '"firstName"'],
+ ORDER_BY=['"firstName"', 'surname']
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=('firstName', 'surname')
+ SELECT=['id', '"firstName"'],
+ ORDER_BY=('"firstName"', 'surname')
)
)
# SELECT 2 columns ORDERED BY column ASC
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" ORDER BY "firstName" ASC')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY='firstName ASC'
+ SELECT=['id', '"firstName"'],
+ ORDER_BY='"firstName" ASC'
)
)
# SELECT 2 columns ORDERED BY column DESC
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName DESC')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" ORDER BY "firstName" DESC')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY='firstName DESC'
+ SELECT=['id', '"firstName"'],
+ ORDER_BY='"firstName" DESC'
)
)
@@ -1039,30 +1005,30 @@ def test_select(self):
# When issue #59 will be fixed, this code have to work fine
#
# SELECT 2 columns ORDERED BY column1 ASC, column DESC
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC, surname DESC')
+ expected = self.db.execute('SELECT id, "firstName" FROM "employee" ORDER BY "firstName" ASC, surname DESC')
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
+ SELECT=['id', '"firstName"'],
+ ORDER_BY=['"firstName"', 'ASC', 'surname', 'DESC']
)
)
self.assertEqual(
expected,
self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
+ SELECT=['id', '"firstName"'],
+ ORDER_BY=('"firstName"', 'ASC', 'surname', 'DESC')
)
)
# SELECT with one INNER JOIN
expected = self.db.execute(''
- 'SELECT e.id, e.firstName, p.name '
+ 'SELECT e.id, e."firstName", p.name '
'FROM employee e '
'INNER JOIN position p '
- 'ON e.positionID == p.id '
- 'ORDER BY e.positionID DESC')
+ 'ON e."positionID" = p.id '
+ 'ORDER BY e."positionID" DESC')
self.assertEqual(
expected,
@@ -1085,12 +1051,12 @@ def test_select(self):
# SELECT with two INNER JOINS
expected = self.db.execute(''
- 'SELECT e.id, e.firstName, p.name '
+ 'SELECT e.id, e."firstName", p.name '
'FROM employee e '
'INNER JOIN position p '
- 'ON e.positionID == p.id '
+ 'ON e."positionID" = p.id '
'INNER JOIN payments '
- 'ON e.id == payments.employeeID '
+ 'ON e.id = payments."employeeID" '
'ORDER BY payments.amount DESC')
self.assertEqual(
@@ -1120,12 +1086,12 @@ def test_select(self):
# SELECT with two FULL JOINS
expected = self.db.execute(''
- 'SELECT e.id, e.firstName, p.name '
+ 'SELECT e.id, e."firstName", p.name '
'FROM employee e '
'LEFT JOIN position p '
- 'ON e.positionID == p.id '
+ 'ON e."positionID" = p.id '
'LEFT JOIN payments '
- 'ON e.id == payments.employeeID '
+ 'ON e.id = payments."employeeID" '
'ORDER BY payments.amount DESC')
self.assertEqual(
@@ -1153,61 +1119,61 @@ def test_select(self):
)
)
- # SELECT * FROM employee GROUP BY
- expected = self.db.execute("SELECT * FROM employee GROUP BY surname")
+ # SELECT 'surname, COUNT(id)' FROM employee GROUP BY
+ expected = self.db.execute("SELECT surname, COUNT(id) FROM employee GROUP BY surname")
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY='surname')
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY='surname')
)
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=('surname',))
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY=('surname',))
)
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=['surname'])
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY=['surname'])
)
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=self.db['employee']['surname'])
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY=self.db['employee']['surname'])
)
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=(self.db['employee']['surname'],))
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY=(self.db['employee']['surname'],))
)
# SELECT * FROM employee GROUP BY 2 rows
- expected = self.db.execute("SELECT * FROM employee GROUP BY surname, positionID")
+ expected = self.db.execute('SELECT surname, COUNT(id) FROM employee GROUP BY surname, "positionID"')
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY='surname, positionID')
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY='surname, "positionID"')
)
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=('surname', 'positionID'))
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY=('surname', '"positionID"'))
)
self.assertEqual(
expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=['surname', 'positionID'])
+ self.db.select(SELECT='surname, COUNT(id)', FROM='employee', GROUP_BY=['surname', '"positionID"'])
)
# SELECT with JOINS GROUP BY
expected = self.db.execute(''
- 'SELECT pos.name, pay.amount '
+ 'SELECT pos.name, SUM(pay.amount) '
'FROM payments pay '
'INNER JOIN employee emp '
- 'ON emp.id == pay.employeeID '
+ 'ON emp.id = pay."employeeID" '
'INNER JOIN position pos '
- 'ON emp.positionID == pos.id '
+ 'ON emp."positionID" = pos.id '
'GROUP BY pos.name '
- 'ORDER BY pay.amount DESC')
+ 'ORDER BY 2 DESC')
self.assertEqual(
expected,
self.db['employee'].select(
SELECT=[
self.db['position']['name'],
- self.db['payments']['amount'],
+ 'SUM(amount)',
],
JOIN=(
(
@@ -1221,8 +1187,7 @@ def test_select(self):
),
GROUP_BY=self.db['position']['name'],
ORDER_BY=(
- self.db['payments']['amount'],
- 'DESC'
+ '2 DESC'
)
)
)
@@ -1280,6 +1245,7 @@ def test_extra_features(self):
self.assertEqual(self.db['test_table_1']['id'].table, 'test_table_1')
self.assertEqual(self.db['test_table_2']['id'].table, 'test_table_2')
+ @unittest.skip
@unittest.skipUnless(importlib.util.find_spec('numpy'), "Module numpy not found")
def test_numpy(self):
from numpy import array, nan
@@ -1303,105 +1269,8 @@ def test_numpy(self):
"""
)
- self.db.updatemany("test_table_numpy", array(data))
- self.db.updatemany("test_table_numpy", array([[], []]))
-
-
-def save_prof(func: callable):
- def wrapper(*args, **kwargs):
- import pstats
- import cProfile
-
- with cProfile.Profile() as pr:
- func(*args, **kwargs)
-
- stat = pstats.Stats(pr)
- stat.sort_stats(pstats.SortKey.TIME)
- stat.dump_stats(filename=f'time_{func.__name__}.prof')
-
- return wrapper
-
-
-# @unittest.skip("Turned off manually")
-@unittest.skipUnless(importlib.util.find_spec('cProfile'), "Module cProfile not found")
-@unittest.skipUnless(importlib.util.find_spec('pstats'), "Module pstats not found")
-class TimeTestsSqllexSQLite(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls) -> None:
- cls.complexity = 1000
- cls.db_name = f"test_sqllex_db"
- cls.db = SQLite3x(path=cls.db_name)
-
- @classmethod
- def tearDownClass(cls) -> None:
- cls.db.disconnect()
- os.remove(cls.db_name)
-
- @save_prof
- def test_create_table(self):
- self.db.create_table(
- 'main',
- {
- 'id': [INTEGER, PRIMARY_KEY, UNIQUE],
- 'name': [TEXT],
- 'age': [INTEGER, DEFAULT, 33]
- }
- )
-
- @save_prof
- def test_insert_fast(self):
- for _ in range(self.complexity):
- self.db.insert('main', (None, f'Alex', 33))
-
- @save_prof
- def test_insert_slow(self):
- for _ in range(self.complexity):
- self.db.insert('main', (None, 'Alex'))
-
- @save_prof
- def test_insert_many_fast(self):
- data = [(None, 'Alex', 33) for _ in range(self.complexity)]
-
- self.db.insertmany('main', data)
-
- @save_prof
- def test_insert_many_slow(self):
- data = [(None, 'Alex') for _ in range(self.complexity)]
-
- self.db.insertmany('main', data)
-
- @save_prof
- def test_select_all(self):
- self.db.select_all('main', LIMIT=self.complexity)
-
- @save_prof
- def test_select_where_1(self):
- """
- Select where (something)
- """
- self.db.select(
- 'main', 'id',
- WHERE={
- 'name': 'Alex'
- },
- LIMIT=self.complexity
- )
-
- @save_prof
- def test_select_where_2(self):
- """
- Modern new way for WHERE (SearchConditions)
- """
- main_tab = self.db['main']
- id_col = main_tab['id']
- name_col = main_tab['name']
-
- self.db.select(
- main_tab, id_col,
- WHERE=(name_col == 'Alex'),
- LIMIT=self.complexity
- )
+ self.db.insertmany("test_table_numpy", array(data))
+ self.db.insertmany("test_table_numpy", array([[], []]))
if __name__ == '__main__':
diff --git a/tests/test_sqlite3x.py b/tests/test_sqlite3x.py
new file mode 100644
index 0000000..b1d2aa0
--- /dev/null
+++ b/tests/test_sqlite3x.py
@@ -0,0 +1,1260 @@
+import os
+import sqlite3
+import unittest
+import importlib.util
+from sqllex.constants import *
+from sqllex.classes import SQLite3x
+
+# from sqllex.debug import debug_mode
+# debug_mode(True)
+
+
+class TestSqllexSQLite(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.tests_counter = 0
+
+ @classmethod
+ def tearDownClass(cls) -> None:
+ pass
+
+ def setUp(self) -> None:
+ self.tests_counter += 1
+ self.db_name = f"test_sqllex_db_{self.tests_counter}"
+
+ self.db = SQLite3x(path=self.db_name)
+
+ def tearDown(self) -> None:
+ self.db.disconnect()
+ os.remove(self.db_name)
+
+ def raw_sql_get_tables_names(self):
+ return tuple(map(lambda ret: ret[0], self.db.execute("SELECT name FROM sqlite_master WHERE type='table'")))
+
+ def test_sqlite_db_crating_db(self):
+ """
+ Testing SQLite database init
+ """
+
+ db_name = f"{self.db_name}_{1}"
+ SQLite3x(db_name)
+ self.assertTrue(os.path.isfile(db_name))
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{2}"
+ SQLite3x(path=db_name)
+ self.assertTrue(os.path.isfile(db_name))
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{3}"
+ SQLite3x(db_name, init_connection=False)
+ self.assertFalse(os.path.isfile(db_name))
+
+ db_name = f""
+ self.assertRaises(ValueError, SQLite3x, db_name)
+
+ db_name = f""
+ self.assertRaises(ValueError, SQLite3x, db_name)
+
+ db_name = f""
+ self.assertRaises(ValueError, SQLite3x, path=db_name)
+
+ def test_connection(self):
+ """
+ Testing connection with class object init
+ """
+
+ db_name = f"{self.db_name}_{1}"
+ self.assertIsInstance(SQLite3x(db_name).connection, sqlite3.Connection)
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{1}"
+ self.assertIsInstance(SQLite3x(db_name, init_connection=True).connection, sqlite3.Connection)
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{1}"
+ self.assertIs(SQLite3x(db_name, init_connection=False).connection, None)
+
+ def test_transaction(self):
+ """
+ Testing transactions
+ """
+
+ def get_by_id(table: str, val: int):
+ return self.db.execute(f"SELECT * FROM {table} WHERE id={val}")
+
+ def get_user_by_id(val: int):
+ return get_by_id(table='user', val=val)
+
+ def get_car_by_id(val: int):
+ return get_by_id(table='car', val=val)
+
+ self.db.execute(
+ """
+ CREATE TABLE "user" (
+ "id" INTEGER PRIMARY KEY,
+ "name" TEXT UNIQUE
+ );
+ """
+ )
+
+ self.db.execute(
+ """
+ CREATE TABLE "car" (
+ "id" INTEGER PRIMARY KEY,
+ "owner_id" INTEGER,
+ "brand" TEXT,
+
+ FOREIGN KEY (owner_id) REFERENCES user(id)
+ );
+ """
+ )
+
+ # Transaction with auto commit
+ with self.db.transaction as tran:
+ self.db.execute(
+ "INSERT INTO user VALUES (1, 'Alex')"
+ )
+
+ self.assertEqual(get_user_by_id(1), [(1, 'Alex')])
+
+ # Transaction with auto commit for many executes
+ with self.db.transaction as tran:
+ self.db.execute(
+ "INSERT INTO user VALUES (22, 'Alex22')"
+ )
+ self.db.execute(
+ "INSERT INTO user VALUES (23, 'Alex23')"
+ )
+ self.db.execute(
+ "INSERT INTO user VALUES (24, 'Alex24')"
+ )
+
+ self.assertEqual(get_user_by_id(22), [(22, 'Alex22')])
+ self.assertEqual(get_user_by_id(23), [(23, 'Alex23')])
+ self.assertEqual(get_user_by_id(24), [(24, 'Alex24')])
+
+ # Transaction with manual commit
+ with self.db.transaction as tran:
+ self.db.execute(
+ "INSERT INTO user VALUES (2, 'Bob')"
+ )
+ tran.commit()
+
+ self.assertEqual(get_user_by_id(2), [(2, 'Bob')])
+
+ # Transaction with rollback
+ with self.db.transaction as tran:
+ self.db.execute(
+ "INSERT INTO user VALUES (3, 'Cara')"
+ )
+ self.assertEqual(get_user_by_id(3), [(3, 'Cara')])
+ tran.rollback()
+
+ self.assertEqual(get_user_by_id(3), [])
+
+ # Prep
+ self.assertRaises(sqlite3.IntegrityError, self.db.execute, "INSERT INTO user VALUES (2, 'Sam')")
+
+ # Transaction with rollback
+ with self.db.transaction as tran:
+ try:
+ self.db.execute(
+ "INSERT INTO user VALUES (2, 'Sam')"
+ )
+ except sqlite3.IntegrityError:
+ tran.rollback()
+
+ self.assertEqual(get_user_by_id(2), [(2, 'Bob')])
+
+ # Normal Transaction
+ with self.db.transaction as tran:
+ self.db.execute(
+ "INSERT INTO user VALUES (55, 'Master')"
+ )
+ self.db.execute(
+ "INSERT INTO car VALUES (55, 55, 'BMW')"
+ )
+
+ self.assertEqual(get_user_by_id(55), [(55, 'Master')])
+ self.assertEqual(get_car_by_id(55), [(55, 55, 'BMW')])
+
+ # Prep
+ self.assertRaises(sqlite3.IntegrityError, self.db.execute, "INSERT INTO car VALUES (9999, 9999, 'BMW')")
+
+ # Transaction with rollback
+ with self.db.transaction as tran:
+ try:
+ self.db.execute(
+ "INSERT INTO car VALUES (9999, 9999, 'BMW')"
+ )
+ except sqlite3.IntegrityError:
+ tran.rollback()
+
+ self.assertEqual(get_car_by_id(9999), [])
+
+ def test_create_table_1(self):
+ """
+ Testing table creating
+ """
+
+ self.assertRaises(ValueError, self.db.create_table, 'test_table_1', {})
+ self.assertRaises(ValueError, self.db.create_table, 'test_table_2', '')
+
+ def test_create_table_basic(self):
+ """
+ Testing table creating
+ """
+ columns = {'id': INTEGER}
+
+ self.db.create_table(
+ 'test_table_1',
+ columns
+ )
+ self.assertEqual(self.raw_sql_get_tables_names(), ('test_table_1',))
+
+ self.db.create_table(
+ 'test_table_2',
+ columns
+ )
+ self.assertEqual(self.raw_sql_get_tables_names(), ('test_table_1', 'test_table_2'))
+
+ def test_create_table_all_columns(self):
+ """
+ Testing table creating
+ """
+
+ self.db.create_table(
+ name='test_table',
+ columns={
+ 'id': [INTEGER, AUTOINCREMENT, PRIMARY_KEY],
+ 'user': [TEXT, UNIQUE, NOT_NULL],
+ 'about': [TEXT, DEFAULT, NULL],
+ 'status': [TEXT, DEFAULT, 'offline']
+ }
+ )
+ self.assertEqual(self.raw_sql_get_tables_names(), ('test_table', 'sqlite_sequence'))
+
+ self.db.create_table(
+ name='test_table_1',
+ columns={
+ 'id': [INTEGER, AUTOINCREMENT, PRIMARY_KEY],
+ 'user': [TEXT, UNIQUE, NOT_NULL],
+ 'about': [TEXT, DEFAULT, NULL],
+ 'status': [TEXT, DEFAULT, 'offline']
+ }
+ )
+ self.assertEqual(self.raw_sql_get_tables_names(), ('test_table', 'sqlite_sequence', 'test_table_1'))
+
+ def test_create_table_inx(self):
+ """
+ Testing if not exist kwarg
+ """
+ columns = {'id': INTEGER}
+
+ self.db.create_table('test_table_1', columns, IF_NOT_EXIST=True)
+ self.db.create_table('test_table_2', columns, IF_NOT_EXIST=True)
+ self.db.create_table('test_table_1', columns, IF_NOT_EXIST=True)
+
+ self.assertEqual(self.raw_sql_get_tables_names(), ('test_table_1', 'test_table_2'))
+ self.assertRaises(sqlite3.OperationalError, self.db.create_table, 'test_table_1', columns, IF_NOT_EXIST=False)
+
+ def test_markup(self):
+ """
+ Markup table
+ """
+
+ self.db.markup(
+ {
+ "tt_groups": {
+ "group_id": [PRIMARY_KEY, UNIQUE, INTEGER],
+ "group_name": [TEXT, NOT_NULL, DEFAULT, "GroupName"],
+ },
+
+ "tt_users": {
+ "user_id": [INTEGER, PRIMARY_KEY, UNIQUE],
+ "user_name": TEXT,
+ "group_id": INTEGER,
+
+ FOREIGN_KEY: {
+ "group_id": ["tt_groups", "group_id"]
+ },
+ }
+ }
+ )
+
+ self.assertEqual(
+ self.raw_sql_get_tables_names(),
+ ('tt_groups', 'tt_users')
+ )
+
+ def test_drop_and_create_table(self):
+ """
+ Create and remove table
+ """
+ self.db.execute(
+ """
+ CREATE TABLE "test_table" (
+ "id" INTEGER
+ );
+ """
+ )
+ self.assertEqual(
+ self.raw_sql_get_tables_names(),
+ ('test_table',)
+ )
+
+ self.db.drop('test_table')
+ self.assertEqual(
+ self.raw_sql_get_tables_names(),
+ tuple()
+ )
+
+ def test_insert(self):
+ """
+ All kind of possible inserts without extra arguments (OR, WITH)
+ """
+
+ def get_all_records():
+ return self.db.execute(f'SELECT * FROM "{table_name}"')
+
+ def count_all_records():
+ return len(get_all_records())
+
+ def count_records(step: int = None):
+ """
+ This decorator adding counting
+ with every run of decorated function it increases count_records.counter at value of step
+
+ @count_records(step=1)
+ def func(*args, **kwargs):
+ return None
+
+ func()
+ func()
+
+ count_records.counter == 2
+ """
+
+ def wrapper_(func: callable):
+ def wrapper(*args, **kwargs):
+ # one run == + step records
+ count_records.counter += func.step
+
+ func(*args, **kwargs)
+
+ # checking was it inserted or not
+ self.assertEqual(
+ count_all_records(),
+ count_records.counter,
+ msg=f"Incorrect records amount\n args={args}, kwargs={kwargs}"
+ )
+
+ func.step = step
+
+ return wrapper
+
+ if step is None:
+ step = 1
+
+ count_records.counter = 0
+
+ return wrapper_
+
+ @count_records(step=2) # warning - magic number!
+ def insert_process(*args, **kwargs):
+ self.db.insert(table_name, *args, **kwargs)
+ self.db[table_name].insert(*args, **kwargs)
+
+ @count_records(step=10) # warning - magic number!
+ def insert_many_process(*args, **kwargs):
+ self.db.insertmany(table_name, *args, **kwargs)
+ self.db[table_name].insertmany(*args, **kwargs)
+
+ table_name = 'test_table'
+ columns = ("text_c", "num_c", "int_c", "real_c", 'none_c', 'blob_c')
+ data = ('asdf', 10.0, 1, 3.14, None, 2)
+
+ self.db.execute(
+ """
+ CREATE TABLE IF NOT EXISTS "test_table" (
+ "text_c" TEXT,
+ "num_c" NUMERIC,
+ "int_c" INTEGER,
+ "real_c" REAL,
+ "none_c" NONE,
+ "blob_c" BLOB
+ );
+ """
+ )
+
+ # columns_types = (TEXT, NUMERIC, INTEGER, REAL, NONE, BLOB)
+ #
+ # self.db.markup(
+ # {
+ # table_name: dict(zip(columns, columns_types))
+ # }
+ # )
+
+ # just arg values
+ # 1, 2, 3
+ insert_process(*data)
+
+ # arg list
+ # [1, 2, 3]
+ insert_process(list(data))
+
+ # arg tuple
+ # (1, 2, 3)
+ insert_process(tuple(data))
+
+ # arg tuple
+ # {'col1': 1, 'col2': 2}
+ insert_process(dict(zip(columns, data)))
+
+ # kwargs
+ # col1=1, col2=2
+ insert_process(**dict(zip(columns, data)))
+
+ # not full tuple
+ insert_process(('asdf', 10.0))
+
+ # not full tuple
+ insert_process(['asdf', 10.0])
+
+ # insert_many args
+ # (1, 2), (3, 4) ...
+ insert_many_process(*((data,) * 5))
+
+ # insert_many one arg
+ # ((1, 2), (3, 4) ... )
+ insert_many_process((data,) * 5)
+
+ # Manually SQL script execution
+ all_data = get_all_records()
+
+ self.assertEqual(all_data, self.db.select(table_name))
+ self.assertEqual(all_data, self.db.select(table_name, ALL))
+ self.assertEqual(all_data, self.db.select(table_name, '*'))
+ self.assertEqual(all_data, self.db[table_name].select_all())
+ self.assertEqual(all_data, self.db[table_name].select())
+ self.assertEqual(all_data, self.db[table_name].select(ALL))
+
+ def test_insert_xa_and_update(self):
+ """
+ Insert with extra args and Update
+ """
+
+ def re_init_database():
+ self.db.executescript(
+ """
+ DROP TABLE IF EXISTS "salt";
+ DROP TABLE IF EXISTS "hashes";
+
+ CREATE TABLE IF NOT EXISTS "hashes" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "value" TEXT
+ );
+
+ CREATE TABLE "salt" (
+ "hashID" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "value" TEXT,
+
+ FOREIGN KEY (hashID) REFERENCES hashes (id)
+ );
+
+ INSERT INTO "hashes" (id, value) VALUES (1, '6432642426695757642'), (2, '3259279587463616469'),
+ (3, '4169263184167314937'), (4, '-8758758971870855856'), (5, '-2558087477551224077');
+
+ INSERT INTO "salt" (hashID, value) VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5');
+ """
+ )
+
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert, 'hashes', (1, 'newHash'),
+ )
+
+ # INSERT OR FAIL
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert,
+ 'hashes', (1, 'newHash'), OR=FAIL
+ )
+
+ # INSERT OR ABORT
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert,
+ 'hashes', (1, 'newHash'), OR=ABORT
+ )
+
+ # INSERT OR ROLLBACK
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert,
+ 'hashes', (1, 'newHash'), OR=ROLLBACK
+ )
+
+ # INSERT OR REPLACE
+ re_init_database()
+ self.db.insert(
+ 'hashes',
+ (1, 'newHash'),
+ OR=REPLACE
+ )
+ self.assertEqual(
+ [
+ (1, 'newHash'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ],
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # INSERT OR IGNORE
+ re_init_database()
+ self.db.insert(
+ 'hashes',
+ (1, 'newHash'),
+ (2, 'anotherNewHash'),
+ OR=IGNORE
+ )
+ self.assertEqual(
+ [
+ (1, '6432642426695757642'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ],
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # INSERT many OR REPLACE
+ re_init_database()
+ self.db.insertmany(
+ 'hashes',
+ (
+ (1, 'newHash'),
+ (6, 'anotherNewHash'),
+ ),
+ OR=REPLACE
+ )
+ self.assertEqual(
+ [
+ (1, 'newHash'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077'),
+ (6, 'anotherNewHash'),
+ ],
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # INSERT many OR IGNORE
+ re_init_database()
+ self.db.insertmany(
+ 'hashes',
+ (
+ (1, 'newHash'),
+ (6, 'anotherNewHash'),
+ ),
+ OR=IGNORE
+ )
+ self.assertEqual(
+ [
+ (1, '6432642426695757642'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077'),
+ (6, 'anotherNewHash'),
+ ],
+ self.db.execute("SELECT * FROM hashes")
+ )
+
+ expected = [
+ (1, 'newHash'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ]
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={'value': 'newHash'},
+ WHERE={'id': 1}
+ )
+ self.assertEqual(
+ expected,
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE={
+ self.db['hashes']['id']: 1
+ }
+ )
+ self.assertEqual(
+ expected,
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE=self.db['hashes']['id'] == 1
+ )
+ self.assertEqual(
+ expected,
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE=(1 < self.db['hashes']['id']) & (self.db['hashes']['id'] < 5)
+ )
+ self.assertEqual(
+ self.db.execute("SELECT * FROM hashes"),
+ [
+ (1, '6432642426695757642'),
+ (2, 'newHash'),
+ (3, 'newHash'),
+ (4, 'newHash'),
+ (5, '-2558087477551224077')
+ ]
+ )
+
+ # UPDATE with impossible WHERE condition
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE=(self.db['hashes']['id'] > 9999)
+ )
+ self.assertEqual(
+ self.db.execute("SELECT * FROM hashes"),
+ [
+ (1, '6432642426695757642'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ]
+ )
+
+ def test_select(self):
+ """
+ All kind of selects (WHERE, ORDER BY, JOIN, GROUP BY)
+ """
+
+ self.db.executescript(
+ """
+ CREATE TABLE IF NOT EXISTS "position" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "name" TEXT,
+ "description" TEXT DEFAULT NULL
+ );
+
+ CREATE TABLE IF NOT EXISTS "employee" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "firstName" TEXT,
+ "surname" TEXT,
+ "age" INTEGER NOT NULL,
+ "positionID" INTEGER,
+ FOREIGN KEY (positionID) REFERENCES position (id)
+ );
+
+ CREATE TABLE IF NOT EXISTS "payments" (
+ "date" TEXT,
+ "employeeID" INTEGER,
+ "amount" INTEGER NOT NULL
+ );
+ """
+ )
+
+ # self.db.markup(
+ # {
+ # 'position': {
+ # 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+ # 'name': TEXT,
+ # 'description': [TEXT, DEFAULT, NULL],
+ # },
+ # 'employee': {
+ # 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+ # 'firstName': TEXT,
+ # 'surname': TEXT,
+ # 'age': [INTEGER, NOT_NULL],
+ # 'positionID': INTEGER,
+ #
+ # FOREIGN_KEY: {
+ # 'positionID': ['position', 'id']
+ # }
+ # },
+ # 'payments': {
+ # 'date': [TEXT],
+ # 'employeeID': INTEGER,
+ # 'amount': [INTEGER, NOT_NULL],
+ #
+ # FOREIGN_KEY: {
+ # 'positionID': ['employee', 'id']
+ # },
+ # }
+ # }
+ # )
+
+ self.db.executemany(
+ 'INSERT INTO "position" (id, name, description) VALUES (?, ?, ?)',
+ (
+ (0, 'Assistant', 'Novice developer'),
+ (1, 'Junior', 'Junior developer'),
+ (2, 'Middle', 'Middle developer'),
+ (3, 'Senior', 'senior developer'),
+ (4, 'DevOps', 'DevOps engineer')
+ )
+ )
+
+ self.db.executemany(
+ 'INSERT INTO "employee" (id, firstName, surname, age, positionID) VALUES (?, ?, ?, ?, ?)',
+ (
+ (None, 'Alis', 'A', 11, 1),
+ (None, 'Bob', 'B', 22, 1),
+ (None, 'Carl', 'C', 33, 2),
+ (None, 'Alis', 'B', 44, 3),
+ (None, 'Dexter', 'B', 55, 1),
+ (None, 'Elis', 'A', 22, 1),
+ (None, 'Frank', 'B', 33, 1),
+ (None, 'Georgy', 'D', 22, 2),
+ (None, 'FoxCpp', 'M', 22, 1),
+ (None, 'Ira', 'D', 22, 2)
+ )
+ )
+
+ self.db.executemany(
+ 'INSERT INTO "payments" (date, employeeID, amount) VALUES (?, ?, ?)',
+ (
+ ('01.01.2022', 2, 2000),
+ ('01.01.2022', 3, 3000),
+ ('01.01.2022', 7, 2000),
+ ('01.02.2022', 1, 4000),
+ ('01.02.2022', 2, 2000),
+ ('01.02.2022', 3, 4000),
+ ('01.02.2022', 5, 2000),
+ ('01.02.2022', 6, 4000),
+ ('01.02.2022', 7, 2000),
+ )
+ )
+
+ # SELECT all
+ expected = self.db.execute('SELECT * FROM "employee"')
+
+ self.assertEqual(
+ expected, self.db['employee'].select(ALL)
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select_all()
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select_all(GROUP_BY=1)
+ )
+
+ # SELECT one column
+ expected = self.db.execute('SELECT id FROM "employee"')
+
+ self.assertEqual(
+ expected, self.db.select('employee', 'id')
+ )
+ self.assertEqual(
+ expected, self.db['employee']['id']
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select('id')
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(self.db['employee']['id'])
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select([self.db['employee']['id']])
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select((self.db['employee']['id'],))
+ )
+
+ # SELECT 2 columns
+ expected = self.db.execute('SELECT id, firstName FROM "employee"')
+
+ self.assertEqual(
+ expected, self.db['employee'].select('id, firstName')
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(['id', 'firstName'])
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(('id', 'firstName'))
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(
+ [
+ self.db['employee']['id'],
+ self.db['employee']['firstName']
+ ]
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE id > 2')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE='id > 2',
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE='id > 2'
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['id'] > 2)
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[self.db['employee']['id'], self.db['employee']['firstName']],
+ WHERE=(self.db['employee']['id'] > 2)
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition) AND (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age > 11) AND (positionID <> 2)')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['age'] > 11) & (self.db['employee']['positionID'] != 2)
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition) AND (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) AND (positionID == 2)')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['age'] == 11) & (self.db['employee']['positionID'] == 2)
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition) OR (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) OR (positionID == 2)')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['age'] == 11) | (self.db['employee']['positionID'] == 2)
+ )
+ )
+
+ # SELECT 3 columns ORDERED BY column
+ expected = self.db.execute('SELECT id, firstName, positionID FROM "employee" ORDER BY positionID')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY='positionID'
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY=3
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY=['positionID']
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY=('positionID',)
+ )
+ )
+
+ # SELECT 2 columns ORDERED BY column1, column2
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName, surname')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=['firstName', 'surname']
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=('firstName', 'surname')
+ )
+ )
+
+ # SELECT 2 columns ORDERED BY column ASC
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY='firstName ASC'
+ )
+ )
+
+ # SELECT 2 columns ORDERED BY column DESC
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY='firstName DESC'
+ )
+ )
+
+ # Issue #59 (fixed)
+ # self.assertRaises(
+ # sqlite3.OperationalError,
+ # self.db['employee'].select,
+ # SELECT=['id', 'firstName'],
+ # ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
+ # )
+ # self.assertRaises(
+ # sqlite3.OperationalError,
+ # self.db['employee'].select,
+ # SELECT=['id', 'firstName'],
+ # ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
+ # )
+
+ # When issue #59 will be fixed, this code have to work fine
+ #
+ # SELECT 2 columns ORDERED BY column1 ASC, column DESC
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC, surname DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
+ )
+ )
+
+ # SELECT with one INNER JOIN
+ expected = self.db.execute(''
+ 'SELECT e.id, e.firstName, p.name '
+ 'FROM employee e '
+ 'INNER JOIN position p '
+ 'ON e.positionID == p.id '
+ 'ORDER BY e.positionID DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['employee']['id'],
+ self.db['employee']['firstName'],
+ self.db['position']['name']
+ ],
+ JOIN=(
+ INNER_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ ),
+ ORDER_BY=(
+ self.db['position']['id'],
+ 'DESC'
+ )
+ )
+ )
+
+ # SELECT with two INNER JOINS
+ expected = self.db.execute(''
+ 'SELECT e.id, e.firstName, p.name '
+ 'FROM employee e '
+ 'INNER JOIN position p '
+ 'ON e.positionID == p.id '
+ 'INNER JOIN payments '
+ 'ON e.id == payments.employeeID '
+ 'ORDER BY payments.amount DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['employee']['id'],
+ self.db['employee']['firstName'],
+ self.db['position']['name']
+ ],
+ JOIN=(
+ (
+ INNER_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ ),
+ (
+ INNER_JOIN, self.db['payments'],
+ ON, self.db['employee']['id'] == self.db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ self.db['payments']['amount'],
+ 'DESC'
+ )
+ )
+ )
+
+ # SELECT with two FULL JOINS
+ expected = self.db.execute(''
+ 'SELECT e.id, e.firstName, p.name '
+ 'FROM employee e '
+ 'LEFT JOIN position p '
+ 'ON e.positionID == p.id '
+ 'LEFT JOIN payments '
+ 'ON e.id == payments.employeeID '
+ 'ORDER BY payments.amount DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['employee']['id'],
+ self.db['employee']['firstName'],
+ self.db['position']['name']
+ ],
+ JOIN=(
+ (
+ LEFT_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ ),
+ (
+ LEFT_JOIN, self.db['payments'],
+ ON, self.db['employee']['id'] == self.db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ self.db['payments']['amount'],
+ 'DESC'
+ )
+ )
+ )
+
+ # SELECT * FROM employee GROUP BY
+ expected = self.db.execute("SELECT * FROM employee GROUP BY surname")
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY='surname')
+ )
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=('surname',))
+ )
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=['surname'])
+ )
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=self.db['employee']['surname'])
+ )
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=(self.db['employee']['surname'],))
+ )
+
+ # SELECT * FROM employee GROUP BY 2 rows
+ expected = self.db.execute("SELECT * FROM employee GROUP BY surname, positionID")
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY='surname, positionID')
+ )
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=('surname', 'positionID'))
+ )
+ self.assertEqual(
+ expected,
+ self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=['surname', 'positionID'])
+ )
+
+ # SELECT with JOINS GROUP BY
+ expected = self.db.execute(''
+ 'SELECT pos.name, pay.amount '
+ 'FROM payments pay '
+ 'INNER JOIN employee emp '
+ 'ON emp.id == pay.employeeID '
+ 'INNER JOIN position pos '
+ 'ON emp.positionID == pos.id '
+ 'GROUP BY pos.name '
+ 'ORDER BY pay.amount DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['position']['name'],
+ self.db['payments']['amount'],
+ ],
+ JOIN=(
+ (
+ INNER_JOIN, self.db['payments'],
+ ON, self.db['payments']['employeeID'] == self.db['employee']['id']
+ ),
+ (
+ INNER_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ )
+ ),
+ GROUP_BY=self.db['position']['name'],
+ ORDER_BY=(
+ self.db['payments']['amount'],
+ 'DESC'
+ )
+ )
+ )
+
+ def test_extra_features(self):
+ """
+ Special extra features with no other options to test
+ """
+
+ self.db.markup(
+ {
+ 'test_table_1': {
+ 'id': INTEGER
+ },
+ 'test_table_2': {
+ 'id': INTEGER,
+ 'col_1': INTEGER,
+ 'col_2': INTEGER,
+ 'col_3': INTEGER,
+ },
+ }
+ )
+
+ # Database
+ #
+ self.assertEqual(self.db.path, self.db_name)
+
+ # Tables
+ #
+ self.assertEqual(self.raw_sql_get_tables_names(), self.db.tables_names)
+
+ # self.db['notExistingTable']
+ self.assertRaises(KeyError, self.db.__getitem__, 'notExistingTable')
+
+ # self.db.get_table('notExistingTable')
+ self.assertRaises(KeyError, self.db.get_table, 'notExistingTable')
+
+ self.assertEqual(self.db['test_table_1'].name, 'test_table_1')
+ self.assertEqual(self.db['test_table_2'].name, 'test_table_2')
+
+ # Columns
+ #
+ self.assertEqual(self.db['test_table_1'].columns_names, ('id',))
+ self.assertEqual(self.db['test_table_2'].columns_names, ('id', 'col_1', 'col_2', 'col_3'))
+
+ # self.db['notExistingTable']
+ self.assertRaises(KeyError, self.db['test_table_1'].__getitem__, 'notExistingColumn')
+
+ self.assertFalse(self.db['test_table_1'].has_column('notExistingColumn'))
+ self.assertFalse(self.db['test_table_2'].has_column('notExistingColumn'))
+
+ self.assertEqual(self.db['test_table_1']['id'].name, 'id')
+ self.assertEqual(self.db['test_table_2']['id'].name, 'id')
+
+ self.assertEqual(self.db['test_table_1']['id'].table, 'test_table_1')
+ self.assertEqual(self.db['test_table_2']['id'].table, 'test_table_2')
+
+ @unittest.skipUnless(importlib.util.find_spec('numpy'), "Module numpy not found")
+ def test_numpy(self):
+ from numpy import array, nan
+
+ data = [
+ ('World', 2415712510, 318.1, '9.7%', nan),
+ ('United States', 310645827, 949.5, '44.3%', 'Johnson&Johnson, Moderna, Pfizer/BioNTech'),
+ ('India', 252760364, 186.9, '3.5%', 'Covaxin, Covishield, Oxford/AstraZeneca'),
+ ('Brazil', 78906225, 376.7, '11.3%', 'Oxford / AstraZeneca, Oxford/AstraZeneca, Pfizer/BioNTech'),
+ ]
+
+ self.db.execute(
+ """
+ CREATE TABLE "test_table_numpy" (
+ "Country" TEXT UNIQUE,
+ "Doses_Administered" INTEGER,
+ "Doses_per_1000" REAL,
+ "Fully_Vaccinated_Population" TEXT,
+ "Vaccine_Type_Used" TEXT
+ );
+ """
+ )
+
+ self.db.updatemany("test_table_numpy", array(data))
+ self.db.updatemany("test_table_numpy", array([[], []]))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_sqllex.py b/tests/test_sqllex.py
index 8f8e82c..7b4ca2d 100644
--- a/tests/test_sqllex.py
+++ b/tests/test_sqllex.py
@@ -1,1358 +1,12 @@
-import os
-import sqlite3
import unittest
-import importlib.util
-from sqllex.constants import *
-from sqllex.classes import SQLite3x
-from sqllex.debug import debug_mode
+from test_sqlite3x import TestSqllexSQLite
+from timetests.test_sqlite3x import TimeTestsSqllexSQLite
-# debug_mode(True)
-
-
-class TestSqllexSQLite(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls) -> None:
- cls.tests_counter = 0
-
- @classmethod
- def tearDownClass(cls) -> None:
- pass
-
- def setUp(self) -> None:
- self.tests_counter += 1
- self.db_name = f"test_sqllex_db_{self.tests_counter}"
-
- self.db = SQLite3x(path=self.db_name)
-
- def tearDown(self) -> None:
- self.db.disconnect()
- os.remove(self.db_name)
-
- def raw_sql_get_tables_names(self):
- return tuple(map(lambda ret: ret[0], self.db.execute("SELECT name FROM sqlite_master WHERE type='table'")))
-
- def test_sqlite_db_crating_db(self):
- """
- Testing SQLite database init
- """
-
- db_name = f"{self.db_name}_{1}"
- SQLite3x(db_name)
- self.assertTrue(os.path.isfile(db_name))
- os.remove(db_name)
-
- db_name = f"{self.db_name}_{2}"
- SQLite3x(path=db_name)
- self.assertTrue(os.path.isfile(db_name))
- os.remove(db_name)
-
- db_name = f"{self.db_name}_{3}"
- SQLite3x(db_name, init_connection=False)
- self.assertFalse(os.path.isfile(db_name))
-
- db_name = f""
- self.assertRaises(ValueError, SQLite3x, db_name)
-
- db_name = f""
- self.assertRaises(ValueError, SQLite3x, db_name)
-
- db_name = f""
- self.assertRaises(ValueError, SQLite3x, path=db_name)
-
- def test_connection(self):
- """
- Testing connection with class object init
- """
-
- db_name = f"{self.db_name}_{1}"
- self.assertIsInstance(SQLite3x(db_name).connection, sqlite3.Connection)
- os.remove(db_name)
-
- db_name = f"{self.db_name}_{1}"
- self.assertIsInstance(SQLite3x(db_name, init_connection=True).connection, sqlite3.Connection)
- os.remove(db_name)
-
- db_name = f"{self.db_name}_{1}"
- self.assertIs(SQLite3x(db_name, init_connection=False).connection, None)
-
- def test_transaction(self):
- """
- Testing transactions
- """
- def get_by_id(table: str, val: int):
- return self.db.execute(f"SELECT * FROM {table} WHERE id={val}")
-
- def get_user_by_id(val: int):
- return get_by_id(table='user', val=val)
-
- def get_car_by_id(val: int):
- return get_by_id(table='car', val=val)
-
-
- self.db.execute(
- """
- CREATE TABLE "user" (
- "id" INTEGER PRIMARY KEY,
- "name" TEXT UNIQUE
- );
- """
- )
-
- self.db.execute(
- """
- CREATE TABLE "car" (
- "id" INTEGER PRIMARY KEY,
- "owner_id" INTEGER,
- "brand" TEXT,
-
- FOREIGN KEY (owner_id) REFERENCES user(id)
- );
- """
- )
-
- # Transaction with auto commit
- with self.db.transaction as tran:
- self.db.execute(
- "INSERT INTO user VALUES (1, 'Alex')"
- )
-
- self.assertEqual(get_user_by_id(1), [(1, 'Alex')])
-
- # Transaction with auto commit for many executes
- with self.db.transaction as tran:
- self.db.execute(
- "INSERT INTO user VALUES (22, 'Alex22')"
- )
- self.db.execute(
- "INSERT INTO user VALUES (23, 'Alex23')"
- )
- self.db.execute(
- "INSERT INTO user VALUES (24, 'Alex24')"
- )
-
- self.assertEqual(get_user_by_id(22), [(22, 'Alex22')])
- self.assertEqual(get_user_by_id(23), [(23, 'Alex23')])
- self.assertEqual(get_user_by_id(24), [(24, 'Alex24')])
-
- # Transaction with manual commit
- with self.db.transaction as tran:
- self.db.execute(
- "INSERT INTO user VALUES (2, 'Bob')"
- )
- tran.commit()
-
- self.assertEqual(get_user_by_id(2), [(2, 'Bob')])
-
- # Transaction with rollback
- with self.db.transaction as tran:
- self.db.execute(
- "INSERT INTO user VALUES (3, 'Cara')"
- )
- self.assertEqual(get_user_by_id(3), [(3, 'Cara')])
- tran.rollback()
-
- self.assertEqual(get_user_by_id(3), [])
-
- # Prep
- self.assertRaises(sqlite3.IntegrityError, self.db.execute, "INSERT INTO user VALUES (2, 'Sam')")
-
- # Transaction with rollback
- with self.db.transaction as tran:
- try:
- self.db.execute(
- "INSERT INTO user VALUES (2, 'Sam')"
- )
- except sqlite3.IntegrityError:
- tran.rollback()
-
- self.assertEqual(get_user_by_id(2), [(2, 'Bob')])
-
- # Normal Transaction
- with self.db.transaction as tran:
- self.db.execute(
- "INSERT INTO user VALUES (55, 'Master')"
- )
- self.db.execute(
- "INSERT INTO car VALUES (55, 55, 'BMW')"
- )
-
- self.assertEqual(get_user_by_id(55), [(55, 'Master')])
- self.assertEqual(get_car_by_id(55), [(55, 55, 'BMW')])
-
- # Prep
- self.assertRaises(sqlite3.IntegrityError, self.db.execute, "INSERT INTO car VALUES (9999, 9999, 'BMW')")
-
- # Transaction with rollback
- with self.db.transaction as tran:
- try:
- self.db.execute(
- "INSERT INTO car VALUES (9999, 9999, 'BMW')"
- )
- except sqlite3.IntegrityError:
- tran.rollback()
-
- self.assertEqual(get_car_by_id(9999), [])
-
-
- def test_create_table_1(self):
- """
- Testing table creating
- """
-
- self.assertRaises(ValueError, self.db.create_table, 'test_table_1', {})
- self.assertRaises(ValueError, self.db.create_table, 'test_table_2', '')
-
- def test_create_table_basic(self):
- """
- Testing table creating
- """
- columns = {'id': INTEGER}
-
- self.db.create_table(
- 'test_table_1',
- columns
- )
- self.assertEqual(self.raw_sql_get_tables_names(), ('test_table_1',))
-
- self.db.create_table(
- 'test_table_2',
- columns
- )
- self.assertEqual(self.raw_sql_get_tables_names(), ('test_table_1', 'test_table_2'))
-
- def test_create_table_all_columns(self):
- """
- Testing table creating
- """
-
- self.db.create_table(
- name='test_table',
- columns={
- 'id': [INTEGER, AUTOINCREMENT, PRIMARY_KEY],
- 'user': [TEXT, UNIQUE, NOT_NULL],
- 'about': [TEXT, DEFAULT, NULL],
- 'status': [TEXT, DEFAULT, 'offline']
- }
- )
- self.assertEqual(self.raw_sql_get_tables_names(), ('test_table', 'sqlite_sequence'))
-
- self.db.create_table(
- name='test_table_1',
- columns={
- 'id': [INTEGER, AUTOINCREMENT, PRIMARY_KEY],
- 'user': [TEXT, UNIQUE, NOT_NULL],
- 'about': [TEXT, DEFAULT, NULL],
- 'status': [TEXT, DEFAULT, 'offline']
- }
- )
- self.assertEqual(self.raw_sql_get_tables_names(), ('test_table', 'sqlite_sequence', 'test_table_1'))
-
- def test_create_table_inx(self):
- """
- Testing if not exist kwarg
- """
- columns = {'id': INTEGER}
-
- self.db.create_table('test_table_1', columns, IF_NOT_EXIST=True)
- self.db.create_table('test_table_2', columns, IF_NOT_EXIST=True)
- self.db.create_table('test_table_1', columns, IF_NOT_EXIST=True)
-
- self.assertEqual(self.raw_sql_get_tables_names(), ('test_table_1', 'test_table_2'))
- self.assertRaises(sqlite3.OperationalError, self.db.create_table, 'test_table_1', columns, IF_NOT_EXIST=False)
-
- def test_markup(self):
- """
- Markup table
- """
-
- self.db.markup(
- {
- "tt_groups": {
- "group_id": [PRIMARY_KEY, UNIQUE, INTEGER],
- "group_name": [TEXT, NOT_NULL, DEFAULT, "GroupName"],
- },
-
- "tt_users": {
- "user_id": [INTEGER, PRIMARY_KEY, UNIQUE],
- "user_name": TEXT,
- "group_id": INTEGER,
-
- FOREIGN_KEY: {
- "group_id": ["tt_groups", "group_id"]
- },
- }
- }
- )
-
- self.assertEqual(
- self.raw_sql_get_tables_names(),
- ('tt_groups', 'tt_users')
- )
-
- def test_drop_and_create_table(self):
- """
- Create and remove table
- """
- self.db.execute(
- """
- CREATE TABLE "test_table" (
- "id" INTEGER
- );
- """
- )
- self.assertEqual(
- self.raw_sql_get_tables_names(),
- ('test_table',)
- )
-
- self.db.drop('test_table')
- self.assertEqual(
- self.raw_sql_get_tables_names(),
- tuple()
- )
-
- def test_insert(self):
- """
- All kind of possible inserts without extra arguments (OR, WITH)
- """
-
- def get_all_records():
- return self.db.execute(f'SELECT * FROM "{table_name}"')
-
- def count_all_records():
- return len(get_all_records())
-
- def count_records(step: int = None):
- """
- This decorator adding counting
- with every run of decorated function it increases count_records.counter at value of step
-
- @count_records(step=1)
- def func(*args, **kwargs):
- return None
-
- func()
- func()
-
- count_records.counter == 2
- """
-
- def wrapper_(func: callable):
- def wrapper(*args, **kwargs):
- # one run == + step records
- count_records.counter += func.step
-
- func(*args, **kwargs)
-
- # checking was it inserted or not
- self.assertEqual(
- count_all_records(),
- count_records.counter,
- msg=f"Incorrect records amount\n args={args}, kwargs={kwargs}"
- )
-
- func.step = step
-
- return wrapper
-
- if step is None:
- step = 1
-
- count_records.counter = 0
-
- return wrapper_
-
- @count_records(step=2) # warning - magic number!
- def insert_process(*args, **kwargs):
- self.db.insert(table_name, *args, **kwargs)
- self.db[table_name].insert(*args, **kwargs)
-
- @count_records(step=10) # warning - magic number!
- def insert_many_process(*args, **kwargs):
- self.db.insertmany(table_name, *args, **kwargs)
- self.db[table_name].insertmany(*args, **kwargs)
-
- table_name = 'test_table'
- columns = ("text_c", "num_c", "int_c", "real_c", 'none_c', 'blob_c')
- data = ('asdf', 10.0, 1, 3.14, None, 2)
-
- self.db.execute(
- """
- CREATE TABLE IF NOT EXISTS "test_table" (
- "text_c" TEXT,
- "num_c" NUMERIC,
- "int_c" INTEGER,
- "real_c" REAL,
- "none_c" NONE,
- "blob_c" BLOB
- );
- """
- )
-
- # columns_types = (TEXT, NUMERIC, INTEGER, REAL, NONE, BLOB)
- #
- # self.db.markup(
- # {
- # table_name: dict(zip(columns, columns_types))
- # }
- # )
-
- # just arg values
- # 1, 2, 3
- insert_process(*data)
-
- # arg list
- # [1, 2, 3]
- insert_process(list(data))
-
- # arg tuple
- # (1, 2, 3)
- insert_process(tuple(data))
-
- # arg tuple
- # {'col1': 1, 'col2': 2}
- insert_process(dict(zip(columns, data)))
-
- # kwargs
- # col1=1, col2=2
- insert_process(**dict(zip(columns, data)))
-
- # not full tuple
- insert_process(('asdf', 10.0))
-
- # not full tuple
- insert_process(['asdf', 10.0])
-
- # insert_many args
- # (1, 2), (3, 4) ...
- insert_many_process(*((data,) * 5))
-
- # insert_many one arg
- # ((1, 2), (3, 4) ... )
- insert_many_process((data,) * 5)
-
- # Manually SQL script execution
- all_data = get_all_records()
-
- self.assertEqual(all_data, self.db.select(table_name))
- self.assertEqual(all_data, self.db.select(table_name, ALL))
- self.assertEqual(all_data, self.db.select(table_name, '*'))
- self.assertEqual(all_data, self.db[table_name].select_all())
- self.assertEqual(all_data, self.db[table_name].select())
- self.assertEqual(all_data, self.db[table_name].select(ALL))
-
- def test_insert_xa_and_update(self):
- """
- Insert with extra args and Update
- """
-
- def re_init_database():
- self.db.executescript(
- """
- DROP TABLE IF EXISTS "salt";
- DROP TABLE IF EXISTS "hashes";
-
- CREATE TABLE IF NOT EXISTS "hashes" (
- "id" INTEGER PRIMARY KEY AUTOINCREMENT,
- "value" TEXT
- );
-
- CREATE TABLE "salt" (
- "hashID" INTEGER PRIMARY KEY AUTOINCREMENT,
- "value" TEXT,
-
- FOREIGN KEY (hashID) REFERENCES hashes (id)
- );
-
- INSERT INTO "hashes" (id, value) VALUES (1, '6432642426695757642'), (2, '3259279587463616469'),
- (3, '4169263184167314937'), (4, '-8758758971870855856'), (5, '-2558087477551224077');
-
- INSERT INTO "salt" (hashID, value) VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5');
- """
- )
-
- # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
- re_init_database()
- self.assertRaises(
- sqlite3.IntegrityError,
- self.db.insert, 'hashes', (1, 'newHash'),
- )
-
- # INSERT OR FAIL
- # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
- re_init_database()
- self.assertRaises(
- sqlite3.IntegrityError,
- self.db.insert,
- 'hashes', (1, 'newHash'), OR=FAIL
- )
-
- # INSERT OR ABORT
- # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
- re_init_database()
- self.assertRaises(
- sqlite3.IntegrityError,
- self.db.insert,
- 'hashes', (1, 'newHash'), OR=ABORT
- )
-
- # INSERT OR ROLLBACK
- # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
- re_init_database()
- self.assertRaises(
- sqlite3.IntegrityError,
- self.db.insert,
- 'hashes', (1, 'newHash'), OR=ROLLBACK
- )
-
- # INSERT OR REPLACE
- re_init_database()
- self.db.insert(
- 'hashes',
- (1, 'newHash'),
- OR=REPLACE
- )
- self.assertEqual(
- [
- (1, 'newHash'),
- (2, '3259279587463616469'),
- (3, '4169263184167314937'),
- (4, '-8758758971870855856'),
- (5, '-2558087477551224077')
- ],
- self.db.execute("SELECT * FROM hashes"),
- )
-
- # INSERT OR IGNORE
- re_init_database()
- self.db.insert(
- 'hashes',
- (1, 'newHash'),
- (2, 'anotherNewHash'),
- OR=IGNORE
- )
- self.assertEqual(
- [
- (1, '6432642426695757642'),
- (2, '3259279587463616469'),
- (3, '4169263184167314937'),
- (4, '-8758758971870855856'),
- (5, '-2558087477551224077')
- ],
- self.db.execute("SELECT * FROM hashes"),
- )
-
- # INSERT many OR REPLACE
- re_init_database()
- self.db.insertmany(
- 'hashes',
- (
- (1, 'newHash'),
- (6, 'anotherNewHash'),
- ),
- OR=REPLACE
- )
- self.assertEqual(
- [
- (1, 'newHash'),
- (2, '3259279587463616469'),
- (3, '4169263184167314937'),
- (4, '-8758758971870855856'),
- (5, '-2558087477551224077'),
- (6, 'anotherNewHash'),
- ],
- self.db.execute("SELECT * FROM hashes"),
- )
-
- # INSERT many OR IGNORE
- re_init_database()
- self.db.insertmany(
- 'hashes',
- (
- (1, 'newHash'),
- (6, 'anotherNewHash'),
- ),
- OR=IGNORE
- )
- self.assertEqual(
- [
- (1, '6432642426695757642'),
- (2, '3259279587463616469'),
- (3, '4169263184167314937'),
- (4, '-8758758971870855856'),
- (5, '-2558087477551224077'),
- (6, 'anotherNewHash'),
- ],
- self.db.execute("SELECT * FROM hashes")
- )
-
- expected = [
- (1, 'newHash'),
- (2, '3259279587463616469'),
- (3, '4169263184167314937'),
- (4, '-8758758971870855856'),
- (5, '-2558087477551224077')
- ]
-
- # UPDATE
- re_init_database()
- self.db.update(
- 'hashes',
- SET={'value': 'newHash'},
- WHERE={'id': 1}
- )
- self.assertEqual(
- expected,
- self.db.execute("SELECT * FROM hashes"),
- )
-
- # UPDATE
- re_init_database()
- self.db.update(
- 'hashes',
- SET={
- self.db['hashes']['value']: 'newHash'
- },
- WHERE={
- self.db['hashes']['id']: 1
- }
- )
- self.assertEqual(
- expected,
- self.db.execute("SELECT * FROM hashes"),
- )
-
- # UPDATE
- re_init_database()
- self.db.update(
- 'hashes',
- SET={
- self.db['hashes']['value']: 'newHash'
- },
- WHERE=self.db['hashes']['id'] == 1
- )
- self.assertEqual(
- expected,
- self.db.execute("SELECT * FROM hashes"),
- )
-
- # UPDATE
- re_init_database()
- self.db.update(
- 'hashes',
- SET={
- self.db['hashes']['value']: 'newHash'
- },
- WHERE=(1 < self.db['hashes']['id']) & (self.db['hashes']['id'] < 5)
- )
- self.assertEqual(
- self.db.execute("SELECT * FROM hashes"),
- [
- (1, '6432642426695757642'),
- (2, 'newHash'),
- (3, 'newHash'),
- (4, 'newHash'),
- (5, '-2558087477551224077')
- ]
- )
-
- # UPDATE with impossible WHERE condition
- re_init_database()
- self.db.update(
- 'hashes',
- SET={
- self.db['hashes']['value']: 'newHash'
- },
- WHERE=(self.db['hashes']['id'] > 9999)
- )
- self.assertEqual(
- self.db.execute("SELECT * FROM hashes"),
- [
- (1, '6432642426695757642'),
- (2, '3259279587463616469'),
- (3, '4169263184167314937'),
- (4, '-8758758971870855856'),
- (5, '-2558087477551224077')
- ]
- )
-
- def test_select(self):
- """
- All kind of selects (WHERE, ORDER BY, JOIN, GROUP BY)
- """
-
- self.db.executescript(
- """
- CREATE TABLE IF NOT EXISTS "position" (
- "id" INTEGER PRIMARY KEY AUTOINCREMENT,
- "name" TEXT,
- "description" TEXT DEFAULT NULL
- );
-
- CREATE TABLE IF NOT EXISTS "employee" (
- "id" INTEGER PRIMARY KEY AUTOINCREMENT,
- "firstName" TEXT,
- "surname" TEXT,
- "age" INTEGER NOT NULL,
- "positionID" INTEGER,
- FOREIGN KEY (positionID) REFERENCES position (id)
- );
-
- CREATE TABLE IF NOT EXISTS "payments" (
- "date" TEXT,
- "employeeID" INTEGER,
- "amount" INTEGER NOT NULL
- );
- """
- )
-
- # self.db.markup(
- # {
- # 'position': {
- # 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
- # 'name': TEXT,
- # 'description': [TEXT, DEFAULT, NULL],
- # },
- # 'employee': {
- # 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
- # 'firstName': TEXT,
- # 'surname': TEXT,
- # 'age': [INTEGER, NOT_NULL],
- # 'positionID': INTEGER,
- #
- # FOREIGN_KEY: {
- # 'positionID': ['position', 'id']
- # }
- # },
- # 'payments': {
- # 'date': [TEXT],
- # 'employeeID': INTEGER,
- # 'amount': [INTEGER, NOT_NULL],
- #
- # FOREIGN_KEY: {
- # 'positionID': ['employee', 'id']
- # },
- # }
- # }
- # )
-
- self.db.executemany(
- 'INSERT INTO "position" (id, name, description) VALUES (?, ?, ?)',
- (
- (0, 'Assistant', 'Novice developer'),
- (1, 'Junior', 'Junior developer'),
- (2, 'Middle', 'Middle developer'),
- (3, 'Senior', 'senior developer'),
- (4, 'DevOps', 'DevOps engineer')
- )
- )
-
- self.db.executemany(
- 'INSERT INTO "employee" (id, firstName, surname, age, positionID) VALUES (?, ?, ?, ?, ?)',
- (
- (None, 'Alis', 'A', 11, 1),
- (None, 'Bob', 'B', 22, 1),
- (None, 'Carl', 'C', 33, 2),
- (None, 'Alis', 'B', 44, 3),
- (None, 'Dexter', 'B', 55, 1),
- (None, 'Elis', 'A', 22, 1),
- (None, 'Frank', 'B', 33, 1),
- (None, 'Georgy', 'D', 22, 2),
- (None, 'FoxCpp', 'M', 22, 1),
- (None, 'Ira', 'D', 22, 2)
- )
- )
-
- self.db.executemany(
- 'INSERT INTO "payments" (date, employeeID, amount) VALUES (?, ?, ?)',
- (
- ('01.01.2022', 2, 2000),
- ('01.01.2022', 3, 3000),
- ('01.01.2022', 7, 2000),
- ('01.02.2022', 1, 4000),
- ('01.02.2022', 2, 2000),
- ('01.02.2022', 3, 4000),
- ('01.02.2022', 5, 2000),
- ('01.02.2022', 6, 4000),
- ('01.02.2022', 7, 2000),
- )
- )
-
- # SELECT all
- expected = self.db.execute('SELECT * FROM "employee"')
-
- self.assertEqual(
- expected, self.db['employee'].select(ALL)
- )
- self.assertEqual(
- expected, self.db['employee'].select_all()
- )
- self.assertEqual(
- expected, self.db['employee'].select_all(GROUP_BY=1)
- )
-
- # SELECT one column
- expected = self.db.execute('SELECT id FROM "employee"')
-
- self.assertEqual(
- expected, self.db.select('employee', 'id')
- )
- self.assertEqual(
- expected, self.db['employee']['id']
- )
- self.assertEqual(
- expected, self.db['employee'].select('id')
- )
- self.assertEqual(
- expected, self.db['employee'].select(self.db['employee']['id'])
- )
- self.assertEqual(
- expected, self.db['employee'].select([self.db['employee']['id']])
- )
- self.assertEqual(
- expected, self.db['employee'].select((self.db['employee']['id'],))
- )
-
- # SELECT 2 columns
- expected = self.db.execute('SELECT id, firstName FROM "employee"')
-
- self.assertEqual(
- expected, self.db['employee'].select('id, firstName')
- )
- self.assertEqual(
- expected, self.db['employee'].select(['id', 'firstName'])
- )
- self.assertEqual(
- expected, self.db['employee'].select(('id', 'firstName'))
- )
- self.assertEqual(
- expected, self.db['employee'].select(
- [
- self.db['employee']['id'],
- self.db['employee']['firstName']
- ]
- )
- )
-
- # SELECT 2 columns WHERE (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE id > 2')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- WHERE='id > 2',
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- WHERE='id > 2'
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- WHERE=(self.db['employee']['id'] > 2)
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=[self.db['employee']['id'], self.db['employee']['firstName']],
- WHERE=(self.db['employee']['id'] > 2)
- )
- )
-
- # SELECT 2 columns WHERE (condition) AND (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age > 11) AND (positionID <> 2)')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- WHERE=(self.db['employee']['age'] > 11) & (self.db['employee']['positionID'] != 2)
- )
- )
-
- # SELECT 2 columns WHERE (condition) AND (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) AND (positionID == 2)')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- WHERE=(self.db['employee']['age'] == 11) & (self.db['employee']['positionID'] == 2)
- )
- )
-
- # SELECT 2 columns WHERE (condition) OR (condition)
- expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) OR (positionID == 2)')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- WHERE=(self.db['employee']['age'] == 11) | (self.db['employee']['positionID'] == 2)
- )
- )
-
- # SELECT 3 columns ORDERED BY column
- expected = self.db.execute('SELECT id, firstName, positionID FROM "employee" ORDER BY positionID')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY='positionID'
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY=3
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY=['positionID']
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName', 'positionID'],
- ORDER_BY=('positionID',)
- )
- )
-
- # SELECT 2 columns ORDERED BY column1, column2
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName, surname')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=['firstName', 'surname']
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=('firstName', 'surname')
- )
- )
-
- # SELECT 2 columns ORDERED BY column ASC
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY='firstName ASC'
- )
- )
-
- # SELECT 2 columns ORDERED BY column DESC
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName DESC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY='firstName DESC'
- )
- )
-
- # Issue #59 (fixed)
- # self.assertRaises(
- # sqlite3.OperationalError,
- # self.db['employee'].select,
- # SELECT=['id', 'firstName'],
- # ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
- # )
- # self.assertRaises(
- # sqlite3.OperationalError,
- # self.db['employee'].select,
- # SELECT=['id', 'firstName'],
- # ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
- # )
-
- # When issue #59 will be fixed, this code have to work fine
- #
- # SELECT 2 columns ORDERED BY column1 ASC, column DESC
- expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC, surname DESC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
- )
- )
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=['id', 'firstName'],
- ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
- )
- )
-
- # SELECT with one INNER JOIN
- expected = self.db.execute(''
- 'SELECT e.id, e.firstName, p.name '
- 'FROM employee e '
- 'INNER JOIN position p '
- 'ON e.positionID == p.id '
- 'ORDER BY e.positionID DESC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=[
- self.db['employee']['id'],
- self.db['employee']['firstName'],
- self.db['position']['name']
- ],
- JOIN=(
- INNER_JOIN, self.db['position'],
- ON, self.db['position']['id'] == self.db['employee']['positionID']
- ),
- ORDER_BY=(
- self.db['position']['id'],
- 'DESC'
- )
- )
- )
-
- # SELECT with two INNER JOINS
- expected = self.db.execute(''
- 'SELECT e.id, e.firstName, p.name '
- 'FROM employee e '
- 'INNER JOIN position p '
- 'ON e.positionID == p.id '
- 'INNER JOIN payments '
- 'ON e.id == payments.employeeID '
- 'ORDER BY payments.amount DESC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=[
- self.db['employee']['id'],
- self.db['employee']['firstName'],
- self.db['position']['name']
- ],
- JOIN=(
- (
- INNER_JOIN, self.db['position'],
- ON, self.db['position']['id'] == self.db['employee']['positionID']
- ),
- (
- INNER_JOIN, self.db['payments'],
- ON, self.db['employee']['id'] == self.db['payments']['employeeID']
- )
- ),
- ORDER_BY=(
- self.db['payments']['amount'],
- 'DESC'
- )
- )
- )
-
- # SELECT with two FULL JOINS
- expected = self.db.execute(''
- 'SELECT e.id, e.firstName, p.name '
- 'FROM employee e '
- 'LEFT JOIN position p '
- 'ON e.positionID == p.id '
- 'LEFT JOIN payments '
- 'ON e.id == payments.employeeID '
- 'ORDER BY payments.amount DESC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=[
- self.db['employee']['id'],
- self.db['employee']['firstName'],
- self.db['position']['name']
- ],
- JOIN=(
- (
- LEFT_JOIN, self.db['position'],
- ON, self.db['position']['id'] == self.db['employee']['positionID']
- ),
- (
- LEFT_JOIN, self.db['payments'],
- ON, self.db['employee']['id'] == self.db['payments']['employeeID']
- )
- ),
- ORDER_BY=(
- self.db['payments']['amount'],
- 'DESC'
- )
- )
- )
-
- # SELECT * FROM employee GROUP BY
- expected = self.db.execute("SELECT * FROM employee GROUP BY surname")
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY='surname')
- )
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=('surname',))
- )
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=['surname'])
- )
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=self.db['employee']['surname'])
- )
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=(self.db['employee']['surname'],))
- )
-
- # SELECT * FROM employee GROUP BY 2 rows
- expected = self.db.execute("SELECT * FROM employee GROUP BY surname, positionID")
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY='surname, positionID')
- )
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=('surname', 'positionID'))
- )
- self.assertEqual(
- expected,
- self.db.select(SELECT=ALL, FROM='employee', GROUP_BY=['surname', 'positionID'])
- )
-
- # SELECT with JOINS GROUP BY
- expected = self.db.execute(''
- 'SELECT pos.name, pay.amount '
- 'FROM payments pay '
- 'INNER JOIN employee emp '
- 'ON emp.id == pay.employeeID '
- 'INNER JOIN position pos '
- 'ON emp.positionID == pos.id '
- 'GROUP BY pos.name '
- 'ORDER BY pay.amount DESC')
-
- self.assertEqual(
- expected,
- self.db['employee'].select(
- SELECT=[
- self.db['position']['name'],
- self.db['payments']['amount'],
- ],
- JOIN=(
- (
- INNER_JOIN, self.db['payments'],
- ON, self.db['payments']['employeeID'] == self.db['employee']['id']
- ),
- (
- INNER_JOIN, self.db['position'],
- ON, self.db['position']['id'] == self.db['employee']['positionID']
- )
- ),
- GROUP_BY=self.db['position']['name'],
- ORDER_BY=(
- self.db['payments']['amount'],
- 'DESC'
- )
- )
- )
-
- def test_extra_features(self):
- """
- Special extra features with no other options to test
- """
-
- self.db.markup(
- {
- 'test_table_1': {
- 'id': INTEGER
- },
- 'test_table_2': {
- 'id': INTEGER,
- 'col_1': INTEGER,
- 'col_2': INTEGER,
- 'col_3': INTEGER,
- },
- }
- )
-
- # Database
- #
- self.assertEqual(self.db.path, self.db_name)
-
- # Tables
- #
- self.assertEqual(self.raw_sql_get_tables_names(), self.db.tables_names)
-
- # self.db['notExistingTable']
- self.assertRaises(KeyError, self.db.__getitem__, 'notExistingTable')
-
- # self.db.get_table('notExistingTable')
- self.assertRaises(KeyError, self.db.get_table, 'notExistingTable')
-
- self.assertEqual(self.db['test_table_1'].name, 'test_table_1')
- self.assertEqual(self.db['test_table_2'].name, 'test_table_2')
-
- # Columns
- #
- self.assertEqual(self.db['test_table_1'].columns_names, ('id',))
- self.assertEqual(self.db['test_table_2'].columns_names, ('id', 'col_1', 'col_2', 'col_3'))
-
- # self.db['notExistingTable']
- self.assertRaises(KeyError, self.db['test_table_1'].__getitem__, 'notExistingColumn')
-
- self.assertFalse(self.db['test_table_1'].has_column('notExistingColumn'))
- self.assertFalse(self.db['test_table_2'].has_column('notExistingColumn'))
-
- self.assertEqual(self.db['test_table_1']['id'].name, 'id')
- self.assertEqual(self.db['test_table_2']['id'].name, 'id')
-
- self.assertEqual(self.db['test_table_1']['id'].table, 'test_table_1')
- self.assertEqual(self.db['test_table_2']['id'].table, 'test_table_2')
-
- @unittest.skipUnless(importlib.util.find_spec('numpy'), "Module numpy not found")
- def test_numpy(self):
- from numpy import array, nan
-
- data = [
- ('World', 2415712510, 318.1, '9.7%', nan),
- ('United States', 310645827, 949.5, '44.3%', 'Johnson&Johnson, Moderna, Pfizer/BioNTech'),
- ('India', 252760364, 186.9, '3.5%', 'Covaxin, Covishield, Oxford/AstraZeneca'),
- ('Brazil', 78906225, 376.7, '11.3%', 'Oxford / AstraZeneca, Oxford/AstraZeneca, Pfizer/BioNTech'),
- ]
-
- self.db.execute(
- """
- CREATE TABLE "test_table_numpy" (
- "Country" TEXT UNIQUE,
- "Doses_Administered" INTEGER,
- "Doses_per_1000" REAL,
- "Fully_Vaccinated_Population" TEXT,
- "Vaccine_Type_Used" TEXT
- );
- """
- )
-
- self.db.updatemany("test_table_numpy", array(data))
- self.db.updatemany("test_table_numpy", array([[], []]))
-
-
-def save_prof(func: callable):
- def wrapper(*args, **kwargs):
- import pstats
- import cProfile
-
- with cProfile.Profile() as pr:
- func(*args, **kwargs)
-
- stat = pstats.Stats(pr)
- stat.sort_stats(pstats.SortKey.TIME)
- stat.dump_stats(filename=f'time_{func.__name__}.prof')
-
- return wrapper
-
-
-# @unittest.skip("Turned off manually")
-@unittest.skipUnless(importlib.util.find_spec('cProfile'), "Module cProfile not found")
-@unittest.skipUnless(importlib.util.find_spec('pstats'), "Module pstats not found")
-class TimeTestsSqllexSQLite(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls) -> None:
- cls.complexity = 1000
- cls.db_name = f"test_sqllex_db"
- cls.db = SQLite3x(path=cls.db_name)
-
- @classmethod
- def tearDownClass(cls) -> None:
- cls.db.disconnect()
- os.remove(cls.db_name)
-
- @save_prof
- def test_create_table(self):
- self.db.create_table(
- 'main',
- {
- 'id': [INTEGER, PRIMARY_KEY, UNIQUE],
- 'name': [TEXT],
- 'age': [INTEGER, DEFAULT, 33]
- }
- )
-
- @save_prof
- def test_insert_fast(self):
- for _ in range(self.complexity):
- self.db.insert('main', (None, f'Alex', 33))
-
- @save_prof
- def test_insert_slow(self):
- for _ in range(self.complexity):
- self.db.insert('main', (None, 'Alex'))
-
- @save_prof
- def test_insert_many_fast(self):
- data = [(None, 'Alex', 33) for _ in range(self.complexity)]
-
- self.db.insertmany('main', data)
-
- @save_prof
- def test_insert_many_slow(self):
- data = [(None, 'Alex') for _ in range(self.complexity)]
-
- self.db.insertmany('main', data)
-
- @save_prof
- def test_select_all(self):
- self.db.select_all('main', LIMIT=self.complexity)
-
- @save_prof
- def test_select_where_1(self):
- """
- Select where (something)
- """
- self.db.select(
- 'main', 'id',
- WHERE={
- 'name': 'Alex'
- },
- LIMIT=self.complexity
- )
-
- @save_prof
- def test_select_where_2(self):
- """
- Modern new way for WHERE (SearchConditions)
- """
- main_tab = self.db['main']
- id_col = main_tab['id']
- name_col = main_tab['name']
-
- self.db.select(
- main_tab, id_col,
- WHERE=(name_col == 'Alex'),
- LIMIT=self.complexity
- )
+# from test_postgresqlx import TestSqllexPostgres
+# from timetests.test_postgresqlx import TimeTestsSqllexPostgres
+# from sqllex.debug import debug_mode
+# debug_mode(True)
if __name__ == '__main__':
unittest.main()
diff --git a/tests/timetests/__init__.py b/tests/timetests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/timetests/test_postgresqlx.py b/tests/timetests/test_postgresqlx.py
new file mode 100644
index 0000000..a4eccc8
--- /dev/null
+++ b/tests/timetests/test_postgresqlx.py
@@ -0,0 +1,144 @@
+import unittest
+import importlib.util
+from sqllex.constants import *
+from sqllex.constants.postgresql import *
+from sqllex.classes import PostgreSQLx
+import psycopg2
+
+# from sqllex.debug import debug_mode
+# debug_mode(True)
+
+
+def save_prof(func: callable):
+ def wrapper(*args, **kwargs):
+ import pstats
+ import cProfile
+
+ with cProfile.Profile() as pr:
+ func(*args, **kwargs)
+
+ stat = pstats.Stats(pr)
+ stat.sort_stats(pstats.SortKey.TIME)
+ stat.dump_stats(filename=f'time_{func.__name__}.prof')
+
+ return wrapper
+
+
+# @unittest.skip("Turned off manually")
+@unittest.skipUnless(importlib.util.find_spec('cProfile'), "Module cProfile not found")
+@unittest.skipUnless(importlib.util.find_spec('pstats'), "Module pstats not found")
+class TimeTestsSqllexPostgres(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.complexity = 1000
+ cls.db_name = f"test_sqllex_db"
+
+ conn = psycopg2.connect(
+ dbname='postgres',
+ user='postgres',
+ password='admin'
+ )
+ conn.autocommit = True
+ cls.admin_cur = conn.cursor()
+
+ try:
+ cls.admin_cur.execute("drop database test_sqllex")
+
+ except psycopg2.errors.InvalidCatalogName:
+ pass
+
+ try:
+ cls.admin_cur.execute("drop user test_sqllex")
+
+ except psycopg2.errors.UndefinedObject:
+ pass
+
+ cls.admin_cur.execute("create user test_sqllex with password 'test_sqllex'")
+ cls.admin_cur.execute("alter role test_sqllex set client_encoding to 'utf8'")
+ cls.admin_cur.execute("alter role test_sqllex set default_transaction_isolation to 'read committed'")
+ cls.admin_cur.execute("alter role test_sqllex set timezone to 'UTC'")
+
+ cls.admin_cur.execute("create database test_sqllex owner test_sqllex")
+
+ cls.db = PostgreSQLx(
+ engine=psycopg2,
+ dbname='test_sqllex',
+ user='test_sqllex',
+ password='test_sqllex'
+ )
+
+ @classmethod
+ def tearDownClass(cls) -> None:
+ cls.db.disconnect()
+ cls.admin_cur.execute("drop database test_sqllex")
+ cls.admin_cur.execute("drop user test_sqllex")
+
+ @save_prof
+ def test_create_table(self):
+ self.db.create_table(
+ 'main',
+ {
+ 'id': [SERIAL, PRIMARY_KEY, UNIQUE],
+ 'name': [TEXT],
+ 'age': [INTEGER, DEFAULT, 33]
+ }
+ )
+
+ @save_prof
+ def test_insert_fast(self):
+ for _ in range(self.complexity):
+ self.db.insert('main', (_, f'Alex', 33))
+
+ @save_prof
+ def test_insert_slow(self):
+ for _ in range(self.complexity, 2*self.complexity):
+ self.db.insert('main', (_, 'Alex'))
+
+ @save_prof
+ def test_insert_many_fast(self):
+ data = [(_, 'Alex', 33) for _ in range(2*self.complexity, 3*self.complexity)]
+
+ self.db.insertmany('main', data)
+
+ @save_prof
+ def test_insert_many_slow(self):
+ data = [(_, 'Alex') for _ in range(3*self.complexity, 4*self.complexity)]
+
+ self.db.insertmany('main', data)
+
+ @save_prof
+ def test_select_all(self):
+ self.db.select_all('main', LIMIT=self.complexity)
+
+ @save_prof
+ def test_select_where_1(self):
+ """
+ Select where (something)
+ """
+ self.db.select(
+ 'main', 'id',
+ WHERE={
+ 'name': 'Alex'
+ },
+ LIMIT=self.complexity
+ )
+
+ @save_prof
+ def test_select_where_2(self):
+ """
+ Modern new way for WHERE (SearchConditions)
+ """
+ main_tab = self.db['main']
+ id_col = main_tab['id']
+ name_col = main_tab['name']
+
+ self.db.select(
+ main_tab, id_col,
+ WHERE=(name_col == 'Alex'),
+ LIMIT=self.complexity
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/timetests/test_sqlite3x.py b/tests/timetests/test_sqlite3x.py
new file mode 100644
index 0000000..72578f2
--- /dev/null
+++ b/tests/timetests/test_sqlite3x.py
@@ -0,0 +1,109 @@
+import os
+import unittest
+import importlib.util
+from sqllex.constants import *
+from sqllex.classes import SQLite3x
+
+# from sqllex.debug import debug_mode
+# debug_mode(True)
+
+
+def save_prof(func: callable):
+ def wrapper(*args, **kwargs):
+ import pstats
+ import cProfile
+
+ with cProfile.Profile() as pr:
+ func(*args, **kwargs)
+
+ stat = pstats.Stats(pr)
+ stat.sort_stats(pstats.SortKey.TIME)
+ stat.dump_stats(filename=f'time_{func.__name__}.prof')
+
+ return wrapper
+
+
+# @unittest.skip("Turned off manually")
+@unittest.skipUnless(importlib.util.find_spec('cProfile'), "Module cProfile not found")
+@unittest.skipUnless(importlib.util.find_spec('pstats'), "Module pstats not found")
+class TimeTestsSqllexSQLite(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.complexity = 1000
+ cls.db_name = f"test_sqllex_db"
+ cls.db = SQLite3x(path=cls.db_name)
+
+ @classmethod
+ def tearDownClass(cls) -> None:
+ cls.db.disconnect()
+ os.remove(cls.db_name)
+
+ @save_prof
+ def test_create_table(self):
+ self.db.create_table(
+ 'main',
+ {
+ 'id': [INTEGER, PRIMARY_KEY, UNIQUE],
+ 'name': [TEXT],
+ 'age': [INTEGER, DEFAULT, 33]
+ }
+ )
+
+ @save_prof
+ def test_insert_fast(self):
+ for _ in range(self.complexity):
+ self.db.insert('main', (None, f'Alex', 33))
+
+ @save_prof
+ def test_insert_slow(self):
+ for _ in range(self.complexity):
+ self.db.insert('main', (None, 'Alex'))
+
+ @save_prof
+ def test_insert_many_fast(self):
+ data = [(None, 'Alex', 33) for _ in range(self.complexity)]
+
+ self.db.insertmany('main', data)
+
+ @save_prof
+ def test_insert_many_slow(self):
+ data = [(None, 'Alex') for _ in range(self.complexity)]
+
+ self.db.insertmany('main', data)
+
+ @save_prof
+ def test_select_all(self):
+ self.db.select_all('main', LIMIT=self.complexity)
+
+ @save_prof
+ def test_select_where_1(self):
+ """
+ Select where (something)
+ """
+ self.db.select(
+ 'main', 'id',
+ WHERE={
+ 'name': 'Alex'
+ },
+ LIMIT=self.complexity
+ )
+
+ @save_prof
+ def test_select_where_2(self):
+ """
+ Modern new way for WHERE (SearchConditions)
+ """
+ main_tab = self.db['main']
+ id_col = main_tab['id']
+ name_col = main_tab['name']
+
+ self.db.select(
+ main_tab, id_col,
+ WHERE=(name_col == 'Alex'),
+ LIMIT=self.complexity
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()