-
Notifications
You must be signed in to change notification settings - Fork 56
/
conftest.py
244 lines (174 loc) · 6.12 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import contextlib
import datetime
import functools
import itertools
import os
import random
import signal as stdlib_signal
import string
import uuid
import aiopg
import psycopg2
import pytest
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from procrastinate import aiopg_connector as aiopg_connector_module
from procrastinate import app as app_module
from procrastinate import builtin_tasks, jobs
from procrastinate import psycopg2_connector as psycopg2_connector_module
from procrastinate import schema, testing
from procrastinate.contrib.sqlalchemy import (
psycopg2_connector as sqlalchemy_psycopg2_connector_module,
)
# Just ensuring the tests are not polluted by environment
for key in os.environ:
if key.startswith("PROCRASTINATE_"):
os.environ.pop(key)
def cursor_execute(cursor, query, *identifiers, format=True):
if identifiers:
query = sql.SQL(query).format(
*(sql.Identifier(identifier) for identifier in identifiers)
)
cursor.execute(query)
@contextlib.contextmanager
def db_executor(dbname):
with contextlib.closing(psycopg2.connect("", dbname=dbname)) as connection:
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with connection.cursor() as cursor:
yield functools.partial(cursor_execute, cursor)
@pytest.fixture
def db_execute():
return db_executor
def db_create(dbname, template=None):
with db_executor("postgres") as execute:
execute("DROP DATABASE IF EXISTS {}", dbname)
if template:
execute("CREATE DATABASE {} TEMPLATE {}", dbname, template)
else:
execute("CREATE DATABASE {}", dbname)
def db_drop(dbname):
with db_executor("postgres") as execute:
execute("DROP DATABASE IF EXISTS {}", dbname)
@pytest.fixture
def db_factory():
dbs_to_drop = []
def _(dbname, template=None):
db_create(dbname=dbname, template=template)
dbs_to_drop.append(dbname)
yield _
for dbname in dbs_to_drop:
db_drop(dbname=dbname)
@pytest.fixture(scope="session")
def setup_db():
dbname = "procrastinate_test_template"
db_create(dbname=dbname)
connector = aiopg_connector_module.AiopgConnector(dbname=dbname)
connector.open()
schema_manager = schema.SchemaManager(connector=connector)
schema_manager.apply_schema()
# We need to close the psycopg2 underlying connection synchronously
connector.close()
yield dbname
db_drop(dbname=dbname)
@pytest.fixture
def connection_params(setup_db, db_factory):
db_factory(dbname="procrastinate_test", template=setup_db)
yield {"dsn": "", "dbname": "procrastinate_test"}
@pytest.fixture
def sqlalchemy_engine_dsn(setup_db, db_factory):
db_factory(dbname="procrastinate_test", template=setup_db)
yield "postgresql+psycopg2:///procrastinate_test"
@pytest.fixture
async def connection(connection_params):
async with aiopg.connect(**connection_params) as connection:
yield connection
@pytest.fixture
async def not_opened_aiopg_connector(connection_params):
yield aiopg_connector_module.AiopgConnector(**connection_params)
@pytest.fixture
def not_opened_psycopg2_connector(connection_params):
yield psycopg2_connector_module.Psycopg2Connector(**connection_params)
@pytest.fixture
def not_opened_sqlalchemy_psycopg2_connector(sqlalchemy_engine_dsn):
yield sqlalchemy_psycopg2_connector_module.SQLAlchemyPsycopg2Connector(
dsn=sqlalchemy_engine_dsn, echo=True
)
@pytest.fixture
async def aiopg_connector(not_opened_aiopg_connector):
await not_opened_aiopg_connector.open_async()
yield not_opened_aiopg_connector
await not_opened_aiopg_connector.close_async()
@pytest.fixture
def psycopg2_connector(not_opened_psycopg2_connector):
not_opened_psycopg2_connector.open()
yield not_opened_psycopg2_connector
not_opened_psycopg2_connector.close()
@pytest.fixture
def sqlalchemy_psycopg2_connector(not_opened_sqlalchemy_psycopg2_connector):
not_opened_sqlalchemy_psycopg2_connector.open()
yield not_opened_sqlalchemy_psycopg2_connector
not_opened_sqlalchemy_psycopg2_connector.close()
@pytest.fixture
def kill_own_pid():
def f(signal=stdlib_signal.SIGTERM):
os.kill(os.getpid(), signal)
return f
@pytest.fixture
def connector():
return testing.InMemoryConnector()
@pytest.fixture
def reset_builtin_task_names():
builtin_tasks.remove_old_jobs.name = "procrastinate.builtin_tasks.remove_old_jobs"
builtin_tasks.builtin.tasks = {
task.name: task for task in builtin_tasks.builtin.tasks.values()
}
@pytest.fixture
def not_opened_app(connector, reset_builtin_task_names):
return app_module.App(connector=connector)
@pytest.fixture
def app(not_opened_app):
with not_opened_app.open() as app:
yield app
@pytest.fixture
def job_manager(app):
return app.job_manager
@pytest.fixture
def serial():
return itertools.count(1)
@pytest.fixture
def random_str():
def _(length=8):
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))
return _
@pytest.fixture
def job_factory(serial, random_str):
def factory(**kwargs):
defaults = {
"id": next(serial),
"task_name": f"task_{random_str()}",
"task_kwargs": {},
"lock": str(uuid.uuid4()),
"queueing_lock": None,
"queue": f"queue_{random_str()}",
}
final_kwargs = defaults.copy()
final_kwargs.update(kwargs)
return jobs.Job(**final_kwargs)
return factory
@pytest.fixture
def deferred_job_factory(job_factory, job_manager):
async def factory(*, job_manager=job_manager, **kwargs):
job = job_factory(id=None, **kwargs)
return await job_manager.defer_job_async(job)
return factory
def aware_datetime(
year, month, day, hour=0, minute=0, second=0, microsecond=0, tz_offset=None
):
tzinfo = (
datetime.timezone(datetime.timedelta(hours=tz_offset))
if tz_offset
else datetime.timezone.utc
)
return datetime.datetime(
year, month, day, hour, minute, second, microsecond, tzinfo=tzinfo
)