-
Notifications
You must be signed in to change notification settings - Fork 0
/
upsertSmallBatches.py
73 lines (61 loc) · 2.32 KB
/
upsertSmallBatches.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env python3
"""
Test psycopg with CockroachDB.
"""
import time
import random
import logging
from argparse import ArgumentParser, RawTextHelpFormatter
import psycopg2
from psycopg2.errors import SerializationFailure
def upsert_ref(conn, k):
with conn.cursor() as cur:
cur.execute("upsert into ref (select key, random() from fact, generate_series(1, 1000) where key =%s);" %k )
logging.info("upserted fact key =%s ", k)
logging.debug("upsert: status message: %s", cur.statusmessage)
conn.commit()
def main():
opt = parse_cmdline()
print(opt)
logging.basicConfig(level=logging.DEBUG if opt.verbose else logging.INFO)
conn = psycopg2.connect(opt.dsn)
# print_balances(conn)
for i in range(1, 1000001):
n = 0
while True:
n += 1
if (n == 10):
raise Exception("did not succeed within 10 retries")
try:
upsert_ref(conn, i)
break
# The function below is used to test the transaction retry logic. It
# can be deleted from production code.
# run_transaction(conn, test_retry_loop)
except ValueError as ve:
# Below, we print the error and continue on so this example is easy to
# run (and run, and run...). In real code you should handle this error
# and any others thrown by the database interaction.
if ve.code != "40001":
raise ve
else:
print(ve, n)
conn.execute('ROLLBACK;')
sleep(int(((2**n) * 100) + rand( 100 - 1 ) + 1))
logging.debug("run_transaction(conn, ) failed: %s", i)
# Close communication with the database.
conn.close()
def parse_cmdline():
parser = ArgumentParser(description=__doc__,
formatter_class=RawTextHelpFormatter)
parser.add_argument(
"dsn",
help="database connection string\n\n"
"For cockroach insecure cluster, use postgresql://[email protected]:26257/defaultdb?sslmode=disable,\n"
)
parser.add_argument("-v", "--verbose",
action="store_true", help="print debug info")
opt = parser.parse_args()
return opt
if __name__ == "__main__":
main()